Python CLI จัดการไฟล์ใหญ่ Async ด้วย Pub/Sub

สวัสดีครับ

โปรเจคหลายๆ ตัวของเราเนี่ย บางทีต้องทำงานกับไฟล์ใหญ่ๆ เยอะๆ ใช่ไหมครับ แล้วการที่เราให้ CLI Tool ของเราจัดการประมวลผลไฟล์พวกนั้นตรงๆ เลยเนี่ย ก็อาจจะใช้เวลานานมากๆ ทำเอาเสียเวลาไปเลยนะครับ

วันนี้ผมเลยจะมาแนะนำวิธีจัดการไฟล์ขนาดใหญ่พวกนี้แบบ Async ด้วย Python CLI นะครับ โดยเราจะใช้พลังของ Google Pub/Sub มาช่วยให้งานของเราเดินหน้าไปได้แบบไม่ติดขัด ไม่ต้องรอนานเลยครับ

เตรียมตัวก่อนเริ่ม

ก่อนอื่นนะครับ เราต้องติดตั้ง Library ของ Google Cloud Pub/Sub ก่อน แบบนี้ครับ

pip install google-cloud-pubsub

แล้วก็อย่าลืม Login บัญชี Google Cloud ของเราให้เรียบร้อย ตั้งค่า Project ID ให้พร้อมใช้งานนะครับ

ส่งข้อมูลไฟล์เข้าคิวด้วย Pub/Sub

ขั้นแรกเนี่ย เราก็ต้องมีตัวส่งข้อมูลไฟล์ไปบอก Pub/Sub ก่อนนะครับ ว่ามีไฟล์นี้เข้ามานะ ต้องประมวลผลอะไรแบบนี้ครับ ตัวอย่างโค้ดง่ายๆ นะครับ

from google.cloud import pubsub_v1
import json
import datetime

project_id = \"your-gcp-project-id\" # ตรงนี้อย่าลืมเปลี่ยนนะครับ
topic_id = \"your-pubsub-topic\"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_file_info(file_path):
    message_data = {
        \"file_path\": file_path,
        \"timestamp\": datetime.datetime.now(datetime.timezone.utc).isoformat()
    }
    data = json.dumps(message_data).encode(\"utf-8\")
    future = publisher.publish(topic_path, data)
    print(f\"Published message ID: {future.result()}\")

if __name__ == \"__main__\":
    # สมมุติว่านี่คือไฟล์ขนาดใหญ่ที่เราอยากให้ระบบประมวลผลทีหลังนะครับ
    large_file = \"data/big_sales_report_2023.csv\"
    publish_file_info(large_file)
    print(f\"ส่งข้อมูลไฟล์ '{large_file}' ไปที่ Pub/Sub เพื่อรอการประมวลผลแล้วครับ\")

โค้ดข้างบนนี้ก็แค่รับ file_path มานะครับ แล้วก็สร้าง Message เป็น JSON ส่งไปให้ Pub/Sub ใน Topic ที่เรากำหนดไว้ ข้อดีคือ CLI ของเราแค่ส่งเฉยๆ แล้วก็จบเลย ไม่ต้องรอ

สร้าง CLI Tool สำหรับส่งงาน

จากนั้นนะครับ เราก็เอาฟังก์ชัน publish_file_info ไปใส่ใน CLI Tool ของเราได้เลย ใช้ argparse ง่ายๆ นะครับ

import argparse
# จากบทความนี้นะครับ ให้เอา publish_file_info จากไฟล์ก่อนหน้ามาใส่ตรงนี้
# from your_publisher_module import publish_file_info 

def main():
    parser = argparse.ArgumentParser(description=\"CLI Tool สำหรับส่งไฟล์ขนาดใหญ่เข้าคิวประมวลผล Async.\")
    parser.add_argument(\"file\", help=\"ระบุ Path ของไฟล์ใหญ่ที่ต้องการประมวลผล.\")
    args = parser.parse_args()

    print(f\"กำลังส่งข้อมูลไฟล์ '{args.file}' เข้าคิวประมวลผลด้วย Pub/Sub... ไม่ต้องรอนานนะครับ\")
    # เรียกใช้ฟังก์ชันส่งข้อมูลไฟล์เข้า Pub/Sub ตรงนี้นะครับ
    # publish_file_info(args.file) 
    print(f\"ส่งข้อมูลไฟล์ '{args.file}' เข้าคิวประมวลผลเรียบร้อยแล้วครับ! ตอนนี้คุณไปทำอย่างอื่นได้เลยครับ\")

if __name__ == \"__main__\":
    main()

ทีนี้เวลาเราเรียกใช้ CLI นะครับ ก็แค่ python cli_tool.py data/your_big_file.log แบบนี้ครับ ระบบก็จะส่งข้อมูลไปที่ Pub/Sub แล้ว CLI ก็จะทำงานเสร็จทันทีเลย เร็วปรี๊ดเลยครับ

ฝั่ง Consumer ที่รอรับงานไปประมวลผล

แน่นอนครับ มีคนส่งแล้วก็ต้องมีคนรับใช่ไหมครับ ฝั่งที่จะประมวลผลไฟล์จริงๆ เนี่ย มักจะเป็น Service แยกของเรานะครับ อาจจะเป็น Cloud Function, Compute Engine, หรือ Kubernetes ที่รัน Consumer รอรับข้อความจาก Pub/Sub แบบนี้ครับ

from google.cloud import pubsub_v1
import json
import time

project_id = \"your-gcp-project-id\"
subscription_id = \"your-pubsub-subscription-id\" # ตรงนี้ก็อย่าลืมเปลี่ยนนะครับ

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def process_large_file(file_path):
    print(f\"### เริ่มประมวลผลไฟล์: {file_path} แล้วครับ... (ใช้เวลานานหน่อยนะ)\")
    # ตรงนี้คือส่วนที่เราจะเขียนโค้ดสำหรับประมวลผลไฟล์จริงๆ นะครับ
    # เช่น อ่านไฟล์ CSV, ประมวลผลข้อมูล, บันทึกลง Database
    time.sleep(10) # จำลองการประมวลผลที่ใช้เวลานาน 10 วินาที
    print(f\"### ประมวลผลไฟล์ {file_path} เสร็จเรียบร้อยแล้วครับ!\")

def callback(message):
    print(f\"--- ได้รับข้อความแล้วครับ: {message.data.decode('utf-8')}\")
    try:
        message_data = json.loads(message.data.decode(\"utf-8\"))
        file_to_process = message_data.get(\"file_path\")
        if file_to_process:
            process_large_file(file_to_process)
        message.ack() # สำคัญนะครับ ต้อง ack ด้วย ไม่งั้น Pub/Sub จะส่งซ้ำ
    except Exception as e:
        print(f\"มีข้อผิดพลาดตอนประมวลผล: {e} ขอ Nack ข้อความนี้ครับ\")
        message.nack() # ถ้ามีปัญหา ก็บอก Pub/Sub ว่าประมวลผลไม่สำเร็จ ให้ส่งใหม่

print(f\"กำลังรอข้อความใหม่ที่ Subscription: {subscription_path} อยู่นะครับ กด Ctrl+C เพื่อหยุดได้เลยครับ\")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

with subscriber:
    try:
        streaming_pull_future.result() # บล็อกการทำงานเพื่อรอข้อความ
    except KeyboardInterrupt:
        streaming_pull_future.cancel()
        streaming_pull_future.result() # รอให้ future หยุดทำงานให้เรียบร้อย

จากตัวอย่าง Consumer นี้ ก็จะเห็นว่าเค้ารอรับข้อความจาก Pub/Sub พอได้ Message มาก็เอา file_path ไปประมวลผลต่อได้เลยครับ ที่สำคัญคือต้อง ack() หรือ nack() ข้อความด้วยนะครับ ไม่งั้น Pub/Sub จะเข้าใจว่ายังไม่ได้ประมวลผล แล้วก็จะส่ง Message ซ้ำไปเรื่อยๆ ครับ

สรุป

การใช้ Python CLI ร่วมกับ Google Pub/Sub แบบนี้ ช่วยให้งานประมวลผลไฟล์ใหญ่ๆ ของเรา Scalable มากขึ้นนะครับ ตัว CLI เองก็ไม่ต้องรอนาน ส่วนการประมวลผลก็สามารถกระจายไปให้ Worker หลายๆ ตัวช่วยกันทำได้ด้วย ทำให้ระบบของเรามีประสิทธิภาพดีขึ้นมากๆ เลยครับ

เพื่อนๆ ลองเอาเทคนิคนี้ไปปรับใช้กับโปรเจคของตัวเองดูได้นะครับ มีข้อสงสัยตรงไหน ถามมาได้เลยครับ ยินดีตอบนะครับ

Read more

คิวบา: มนต์เสน่ห์บนเส้นทางแห่งความท้าทาย – วิกฤตพลังงานและแรงกดดันจากสหรัฐฯ

คิวบา: มนต์เสน่ห์บนเส้นทางแห่งความท้าทาย – วิกฤตพลังงานและแรงกดดันจากสหรัฐฯ

เจาะลึกสถานการณ์ล่าสุดของคิวบา ทั้งวิกฤตไฟฟ้าดับครั้งใหญ่จากปัญหาพลังงาน และแรงกดดันจากสหรัฐฯ ภายใต้การนำของทรัมป์ อนาคตของเกาะปฏิวัติแห่งนี้จะเป็นอย่างไร?

By ทีมงาน devdog
ละครไทย: ถอดรหัสเสน่ห์ "พลอยน้ำเพชร" และปรากฏการณ์บันเทิงที่ไม่เคยจางหาย

ละครไทย: ถอดรหัสเสน่ห์ "พลอยน้ำเพชร" และปรากฏการณ์บันเทิงที่ไม่เคยจางหาย

สำรวจความเข้มข้นของละคร "พลอยน้ำเพชร" จากช่องวัน 31 พร้อมเจาะลึกตอนที่ 17-20 และเสน่ห์ของละครไทยที่ครองใจผู้ชมทั่วโลก

By ทีมงาน devdog
ชนนพัฒฐ์ นาคสั้ว: สส.สงขลา กับประเด็นร้อนคดีเว็บพนันออนไลน์ที่ DSI กำลังจับตา

ชนนพัฒฐ์ นาคสั้ว: สส.สงขลา กับประเด็นร้อนคดีเว็บพนันออนไลน์ที่ DSI กำลังจับตา

เจาะลึกประเด็นร้อน ชนนพัฒฐ์ นาคสั้ว สส.สงขลา พรรคกล้าธรรม กับกระแสข่าวพาดพิงถึงเครือข่ายเว็บพนันออนไลน์ที่ DSI กำลังสอบสวน เปิดความท้าทายต่อบทบาทผู้แทนราษฎร

By ทีมงาน devdog
เจาะลึก "ณัฐธิดา เล็กอุดากร" หลานเนวินชิดชอบ สส. อายุน้อยสุด ผู้พร้อมสร้างอนาคตใหม่ให้บุรีรัมย์

เจาะลึก "ณัฐธิดา เล็กอุดากร" หลานเนวินชิดชอบ สส. อายุน้อยสุด ผู้พร้อมสร้างอนาคตใหม่ให้บุรีรัมย์

ทำความรู้จักกับ พลอย ณัฐธิดา เล็กอุดากร สส. บุรีรัมย์ เขต 2 หลานสาว เนวิน ชิดชอบ ผู้สร้างประวัติศาสตร์ สส. อายุน้อยที่สุดในสภา พร้อมบทบาทการเมืองและวิสัยทัศน์คนรุ่นใหม่

By ทีมงาน devdog