Python CLI จัดการไฟล์ใหญ่ Async ด้วย Pub/Sub

สวัสดีครับ

โปรเจคหลายๆ ตัวของเราเนี่ย บางทีต้องทำงานกับไฟล์ใหญ่ๆ เยอะๆ ใช่ไหมครับ แล้วการที่เราให้ CLI Tool ของเราจัดการประมวลผลไฟล์พวกนั้นตรงๆ เลยเนี่ย ก็อาจจะใช้เวลานานมากๆ ทำเอาเสียเวลาไปเลยนะครับ

วันนี้ผมเลยจะมาแนะนำวิธีจัดการไฟล์ขนาดใหญ่พวกนี้แบบ Async ด้วย Python CLI นะครับ โดยเราจะใช้พลังของ Google Pub/Sub มาช่วยให้งานของเราเดินหน้าไปได้แบบไม่ติดขัด ไม่ต้องรอนานเลยครับ

เตรียมตัวก่อนเริ่ม

ก่อนอื่นนะครับ เราต้องติดตั้ง Library ของ Google Cloud Pub/Sub ก่อน แบบนี้ครับ

pip install google-cloud-pubsub

แล้วก็อย่าลืม Login บัญชี Google Cloud ของเราให้เรียบร้อย ตั้งค่า Project ID ให้พร้อมใช้งานนะครับ

ส่งข้อมูลไฟล์เข้าคิวด้วย Pub/Sub

ขั้นแรกเนี่ย เราก็ต้องมีตัวส่งข้อมูลไฟล์ไปบอก Pub/Sub ก่อนนะครับ ว่ามีไฟล์นี้เข้ามานะ ต้องประมวลผลอะไรแบบนี้ครับ ตัวอย่างโค้ดง่ายๆ นะครับ

from google.cloud import pubsub_v1
import json
import datetime

project_id = \"your-gcp-project-id\" # ตรงนี้อย่าลืมเปลี่ยนนะครับ
topic_id = \"your-pubsub-topic\"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_file_info(file_path):
    message_data = {
        \"file_path\": file_path,
        \"timestamp\": datetime.datetime.now(datetime.timezone.utc).isoformat()
    }
    data = json.dumps(message_data).encode(\"utf-8\")
    future = publisher.publish(topic_path, data)
    print(f\"Published message ID: {future.result()}\")

if __name__ == \"__main__\":
    # สมมุติว่านี่คือไฟล์ขนาดใหญ่ที่เราอยากให้ระบบประมวลผลทีหลังนะครับ
    large_file = \"data/big_sales_report_2023.csv\"
    publish_file_info(large_file)
    print(f\"ส่งข้อมูลไฟล์ '{large_file}' ไปที่ Pub/Sub เพื่อรอการประมวลผลแล้วครับ\")

โค้ดข้างบนนี้ก็แค่รับ file_path มานะครับ แล้วก็สร้าง Message เป็น JSON ส่งไปให้ Pub/Sub ใน Topic ที่เรากำหนดไว้ ข้อดีคือ CLI ของเราแค่ส่งเฉยๆ แล้วก็จบเลย ไม่ต้องรอ

สร้าง CLI Tool สำหรับส่งงาน

จากนั้นนะครับ เราก็เอาฟังก์ชัน publish_file_info ไปใส่ใน CLI Tool ของเราได้เลย ใช้ argparse ง่ายๆ นะครับ

import argparse
# จากบทความนี้นะครับ ให้เอา publish_file_info จากไฟล์ก่อนหน้ามาใส่ตรงนี้
# from your_publisher_module import publish_file_info 

def main():
    parser = argparse.ArgumentParser(description=\"CLI Tool สำหรับส่งไฟล์ขนาดใหญ่เข้าคิวประมวลผล Async.\")
    parser.add_argument(\"file\", help=\"ระบุ Path ของไฟล์ใหญ่ที่ต้องการประมวลผล.\")
    args = parser.parse_args()

    print(f\"กำลังส่งข้อมูลไฟล์ '{args.file}' เข้าคิวประมวลผลด้วย Pub/Sub... ไม่ต้องรอนานนะครับ\")
    # เรียกใช้ฟังก์ชันส่งข้อมูลไฟล์เข้า Pub/Sub ตรงนี้นะครับ
    # publish_file_info(args.file) 
    print(f\"ส่งข้อมูลไฟล์ '{args.file}' เข้าคิวประมวลผลเรียบร้อยแล้วครับ! ตอนนี้คุณไปทำอย่างอื่นได้เลยครับ\")

if __name__ == \"__main__\":
    main()

ทีนี้เวลาเราเรียกใช้ CLI นะครับ ก็แค่ python cli_tool.py data/your_big_file.log แบบนี้ครับ ระบบก็จะส่งข้อมูลไปที่ Pub/Sub แล้ว CLI ก็จะทำงานเสร็จทันทีเลย เร็วปรี๊ดเลยครับ

ฝั่ง Consumer ที่รอรับงานไปประมวลผล

แน่นอนครับ มีคนส่งแล้วก็ต้องมีคนรับใช่ไหมครับ ฝั่งที่จะประมวลผลไฟล์จริงๆ เนี่ย มักจะเป็น Service แยกของเรานะครับ อาจจะเป็น Cloud Function, Compute Engine, หรือ Kubernetes ที่รัน Consumer รอรับข้อความจาก Pub/Sub แบบนี้ครับ

from google.cloud import pubsub_v1
import json
import time

project_id = \"your-gcp-project-id\"
subscription_id = \"your-pubsub-subscription-id\" # ตรงนี้ก็อย่าลืมเปลี่ยนนะครับ

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def process_large_file(file_path):
    print(f\"### เริ่มประมวลผลไฟล์: {file_path} แล้วครับ... (ใช้เวลานานหน่อยนะ)\")
    # ตรงนี้คือส่วนที่เราจะเขียนโค้ดสำหรับประมวลผลไฟล์จริงๆ นะครับ
    # เช่น อ่านไฟล์ CSV, ประมวลผลข้อมูล, บันทึกลง Database
    time.sleep(10) # จำลองการประมวลผลที่ใช้เวลานาน 10 วินาที
    print(f\"### ประมวลผลไฟล์ {file_path} เสร็จเรียบร้อยแล้วครับ!\")

def callback(message):
    print(f\"--- ได้รับข้อความแล้วครับ: {message.data.decode('utf-8')}\")
    try:
        message_data = json.loads(message.data.decode(\"utf-8\"))
        file_to_process = message_data.get(\"file_path\")
        if file_to_process:
            process_large_file(file_to_process)
        message.ack() # สำคัญนะครับ ต้อง ack ด้วย ไม่งั้น Pub/Sub จะส่งซ้ำ
    except Exception as e:
        print(f\"มีข้อผิดพลาดตอนประมวลผล: {e} ขอ Nack ข้อความนี้ครับ\")
        message.nack() # ถ้ามีปัญหา ก็บอก Pub/Sub ว่าประมวลผลไม่สำเร็จ ให้ส่งใหม่

print(f\"กำลังรอข้อความใหม่ที่ Subscription: {subscription_path} อยู่นะครับ กด Ctrl+C เพื่อหยุดได้เลยครับ\")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

with subscriber:
    try:
        streaming_pull_future.result() # บล็อกการทำงานเพื่อรอข้อความ
    except KeyboardInterrupt:
        streaming_pull_future.cancel()
        streaming_pull_future.result() # รอให้ future หยุดทำงานให้เรียบร้อย

จากตัวอย่าง Consumer นี้ ก็จะเห็นว่าเค้ารอรับข้อความจาก Pub/Sub พอได้ Message มาก็เอา file_path ไปประมวลผลต่อได้เลยครับ ที่สำคัญคือต้อง ack() หรือ nack() ข้อความด้วยนะครับ ไม่งั้น Pub/Sub จะเข้าใจว่ายังไม่ได้ประมวลผล แล้วก็จะส่ง Message ซ้ำไปเรื่อยๆ ครับ

สรุป

การใช้ Python CLI ร่วมกับ Google Pub/Sub แบบนี้ ช่วยให้งานประมวลผลไฟล์ใหญ่ๆ ของเรา Scalable มากขึ้นนะครับ ตัว CLI เองก็ไม่ต้องรอนาน ส่วนการประมวลผลก็สามารถกระจายไปให้ Worker หลายๆ ตัวช่วยกันทำได้ด้วย ทำให้ระบบของเรามีประสิทธิภาพดีขึ้นมากๆ เลยครับ

เพื่อนๆ ลองเอาเทคนิคนี้ไปปรับใช้กับโปรเจคของตัวเองดูได้นะครับ มีข้อสงสัยตรงไหน ถามมาได้เลยครับ ยินดีตอบนะครับ

Read more

ไอลีน กู: ตำนานนักสกีฟรีสไตล์ผู้พลิกโฉมวงการและความหมายของชัยชนะ

ไอลีน กู: ตำนานนักสกีฟรีสไตล์ผู้พลิกโฉมวงการและความหมายของชัยชนะ

เจาะลึกเรื่องราวของ Eileen Gu นักสกีฟรีสไตล์ผู้สร้างประวัติศาสตร์ในโอลิมปิก 2026 สถิติที่ไม่เคยมีมาก่อน ประเด็นถกเถียง และความแข็งแกร่งส่วนตัวที่ทำให้เธอก้าวสู่ระดับโลก

By ทีมงาน devdog
วันพระ: คู่มือฉบับสมบูรณ์สำหรับพุทธศาสนิกชนและผู้สนใจยุคใหม่

วันพระ: คู่มือฉบับสมบูรณ์สำหรับพุทธศาสนิกชนและผู้สนใจยุคใหม่

เจาะลึกวันพระและความสำคัญของวันมาฆบูชา 2569 ทั้งวันหยุดราชการ ธนาคาร กิจกรรมเวียนเทียนต้นไม้ และผลกระทบต่อบริการขนส่ง เตรียมตัววางแผนทำบุญและพักผ่อน

By ทีมงาน devdog
ถอดรหัสรักแท้: "บังมัดคลองตันต้นข้าว" เรื่องราวที่สะท้อนการให้อภัยและการเริ่มต้นใหม่

ถอดรหัสรักแท้: "บังมัดคลองตันต้นข้าว" เรื่องราวที่สะท้อนการให้อภัยและการเริ่มต้นใหม่

เจาะลึกงานวิวาห์ "บังมัดคลองตัน" กับ "ต้นข้าว มิสแกรนด์" พร้อมเหตุผลจากใจเจ้าสาวที่เลือกความรักเหนือกาลเวลาและคำวิจารณ์ สู่การเริ่มต้นชีวิตคู่ที่สะท้อนการให้อภัย

By ทีมงาน devdog
ไฮไลท์บอลไทยลีก 2: มหาสารคาม เอสบีที เอฟซี กับฟอร์มร้อนแรงสู่เส้นทางเพลย์ออฟ

ไฮไลท์บอลไทยลีก 2: มหาสารคาม เอสบีที เอฟซี กับฟอร์มร้อนแรงสู่เส้นทางเพลย์ออฟ

เจาะลึกไฮไลท์บอลไทยลีก 2 ของมหาสารคาม เอสบีที เอฟซี กับฟอร์มร้อนแรง ชัยชนะสำคัญจาก ชิตชนก และบทบาทโค้ชดุสิต สู่เส้นทางเพลย์ออฟที่น่าจับตา!

By ทีมงาน devdog