Python CLI จัดการไฟล์ใหญ่ Async ด้วย Pub/Sub
สวัสดีครับ
โปรเจคหลายๆ ตัวของเราเนี่ย บางทีต้องทำงานกับไฟล์ใหญ่ๆ เยอะๆ ใช่ไหมครับ แล้วการที่เราให้ CLI Tool ของเราจัดการประมวลผลไฟล์พวกนั้นตรงๆ เลยเนี่ย ก็อาจจะใช้เวลานานมากๆ ทำเอาเสียเวลาไปเลยนะครับ
วันนี้ผมเลยจะมาแนะนำวิธีจัดการไฟล์ขนาดใหญ่พวกนี้แบบ Async ด้วย Python CLI นะครับ โดยเราจะใช้พลังของ Google Pub/Sub มาช่วยให้งานของเราเดินหน้าไปได้แบบไม่ติดขัด ไม่ต้องรอนานเลยครับ
เตรียมตัวก่อนเริ่ม
ก่อนอื่นนะครับ เราต้องติดตั้ง Library ของ Google Cloud Pub/Sub ก่อน แบบนี้ครับ
pip install google-cloud-pubsub
แล้วก็อย่าลืม Login บัญชี Google Cloud ของเราให้เรียบร้อย ตั้งค่า Project ID ให้พร้อมใช้งานนะครับ
ส่งข้อมูลไฟล์เข้าคิวด้วย Pub/Sub
ขั้นแรกเนี่ย เราก็ต้องมีตัวส่งข้อมูลไฟล์ไปบอก Pub/Sub ก่อนนะครับ ว่ามีไฟล์นี้เข้ามานะ ต้องประมวลผลอะไรแบบนี้ครับ ตัวอย่างโค้ดง่ายๆ นะครับ
from google.cloud import pubsub_v1
import json
import datetime
project_id = \"your-gcp-project-id\" # ตรงนี้อย่าลืมเปลี่ยนนะครับ
topic_id = \"your-pubsub-topic\"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_file_info(file_path):
message_data = {
\"file_path\": file_path,
\"timestamp\": datetime.datetime.now(datetime.timezone.utc).isoformat()
}
data = json.dumps(message_data).encode(\"utf-8\")
future = publisher.publish(topic_path, data)
print(f\"Published message ID: {future.result()}\")
if __name__ == \"__main__\":
# สมมุติว่านี่คือไฟล์ขนาดใหญ่ที่เราอยากให้ระบบประมวลผลทีหลังนะครับ
large_file = \"data/big_sales_report_2023.csv\"
publish_file_info(large_file)
print(f\"ส่งข้อมูลไฟล์ '{large_file}' ไปที่ Pub/Sub เพื่อรอการประมวลผลแล้วครับ\")
โค้ดข้างบนนี้ก็แค่รับ file_path มานะครับ แล้วก็สร้าง Message เป็น JSON ส่งไปให้ Pub/Sub ใน Topic ที่เรากำหนดไว้ ข้อดีคือ CLI ของเราแค่ส่งเฉยๆ แล้วก็จบเลย ไม่ต้องรอ
สร้าง CLI Tool สำหรับส่งงาน
จากนั้นนะครับ เราก็เอาฟังก์ชัน publish_file_info ไปใส่ใน CLI Tool ของเราได้เลย ใช้ argparse ง่ายๆ นะครับ
import argparse
# จากบทความนี้นะครับ ให้เอา publish_file_info จากไฟล์ก่อนหน้ามาใส่ตรงนี้
# from your_publisher_module import publish_file_info
def main():
parser = argparse.ArgumentParser(description=\"CLI Tool สำหรับส่งไฟล์ขนาดใหญ่เข้าคิวประมวลผล Async.\")
parser.add_argument(\"file\", help=\"ระบุ Path ของไฟล์ใหญ่ที่ต้องการประมวลผล.\")
args = parser.parse_args()
print(f\"กำลังส่งข้อมูลไฟล์ '{args.file}' เข้าคิวประมวลผลด้วย Pub/Sub... ไม่ต้องรอนานนะครับ\")
# เรียกใช้ฟังก์ชันส่งข้อมูลไฟล์เข้า Pub/Sub ตรงนี้นะครับ
# publish_file_info(args.file)
print(f\"ส่งข้อมูลไฟล์ '{args.file}' เข้าคิวประมวลผลเรียบร้อยแล้วครับ! ตอนนี้คุณไปทำอย่างอื่นได้เลยครับ\")
if __name__ == \"__main__\":
main()
ทีนี้เวลาเราเรียกใช้ CLI นะครับ ก็แค่ python cli_tool.py data/your_big_file.log แบบนี้ครับ ระบบก็จะส่งข้อมูลไปที่ Pub/Sub แล้ว CLI ก็จะทำงานเสร็จทันทีเลย เร็วปรี๊ดเลยครับ
ฝั่ง Consumer ที่รอรับงานไปประมวลผล
แน่นอนครับ มีคนส่งแล้วก็ต้องมีคนรับใช่ไหมครับ ฝั่งที่จะประมวลผลไฟล์จริงๆ เนี่ย มักจะเป็น Service แยกของเรานะครับ อาจจะเป็น Cloud Function, Compute Engine, หรือ Kubernetes ที่รัน Consumer รอรับข้อความจาก Pub/Sub แบบนี้ครับ
from google.cloud import pubsub_v1
import json
import time
project_id = \"your-gcp-project-id\"
subscription_id = \"your-pubsub-subscription-id\" # ตรงนี้ก็อย่าลืมเปลี่ยนนะครับ
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def process_large_file(file_path):
print(f\"### เริ่มประมวลผลไฟล์: {file_path} แล้วครับ... (ใช้เวลานานหน่อยนะ)\")
# ตรงนี้คือส่วนที่เราจะเขียนโค้ดสำหรับประมวลผลไฟล์จริงๆ นะครับ
# เช่น อ่านไฟล์ CSV, ประมวลผลข้อมูล, บันทึกลง Database
time.sleep(10) # จำลองการประมวลผลที่ใช้เวลานาน 10 วินาที
print(f\"### ประมวลผลไฟล์ {file_path} เสร็จเรียบร้อยแล้วครับ!\")
def callback(message):
print(f\"--- ได้รับข้อความแล้วครับ: {message.data.decode('utf-8')}\")
try:
message_data = json.loads(message.data.decode(\"utf-8\"))
file_to_process = message_data.get(\"file_path\")
if file_to_process:
process_large_file(file_to_process)
message.ack() # สำคัญนะครับ ต้อง ack ด้วย ไม่งั้น Pub/Sub จะส่งซ้ำ
except Exception as e:
print(f\"มีข้อผิดพลาดตอนประมวลผล: {e} ขอ Nack ข้อความนี้ครับ\")
message.nack() # ถ้ามีปัญหา ก็บอก Pub/Sub ว่าประมวลผลไม่สำเร็จ ให้ส่งใหม่
print(f\"กำลังรอข้อความใหม่ที่ Subscription: {subscription_path} อยู่นะครับ กด Ctrl+C เพื่อหยุดได้เลยครับ\")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
with subscriber:
try:
streaming_pull_future.result() # บล็อกการทำงานเพื่อรอข้อความ
except KeyboardInterrupt:
streaming_pull_future.cancel()
streaming_pull_future.result() # รอให้ future หยุดทำงานให้เรียบร้อย
จากตัวอย่าง Consumer นี้ ก็จะเห็นว่าเค้ารอรับข้อความจาก Pub/Sub พอได้ Message มาก็เอา file_path ไปประมวลผลต่อได้เลยครับ ที่สำคัญคือต้อง ack() หรือ nack() ข้อความด้วยนะครับ ไม่งั้น Pub/Sub จะเข้าใจว่ายังไม่ได้ประมวลผล แล้วก็จะส่ง Message ซ้ำไปเรื่อยๆ ครับ
สรุป
การใช้ Python CLI ร่วมกับ Google Pub/Sub แบบนี้ ช่วยให้งานประมวลผลไฟล์ใหญ่ๆ ของเรา Scalable มากขึ้นนะครับ ตัว CLI เองก็ไม่ต้องรอนาน ส่วนการประมวลผลก็สามารถกระจายไปให้ Worker หลายๆ ตัวช่วยกันทำได้ด้วย ทำให้ระบบของเรามีประสิทธิภาพดีขึ้นมากๆ เลยครับ
เพื่อนๆ ลองเอาเทคนิคนี้ไปปรับใช้กับโปรเจคของตัวเองดูได้นะครับ มีข้อสงสัยตรงไหน ถามมาได้เลยครับ ยินดีตอบนะครับ