เมื่อต้องรันอะไรเยอะๆ พร้อมกัน Python ก็ช่วยได้ ไม่ต้องกลัวเครื่องค้างอีกต่อไป
สวัสดีค้าบเพื่อนๆ พี่ๆ น้องๆ ทุกคน! วันนี้ผมจะมาเม้าท์มอยเรื่องนึงที่หลายคนน่าจะเคยเจอกันมาบ้าง เวลาที่เราต้องรันโค้ดที่มันใช้เวลานานๆ หรืออยากจะให้หลายๆ งานมันทำงานพร้อมกันเนี่ย ปัญหาคืออะไรเหรอ? ก็คือเครื่องมันค้างบ้างละ กินแรมเยอะบ้างละ จนบางทีเราก็แอบท้อใช่ป่ะครับ
วันนี้ผมมีตัวช่วยดีๆ มาแนะนำ มันคือ ThreadPoolExecutor ครับ! ฟังดูเป็นศัพท์เทคนิคจ๋าเลยใช่ไหมครับ แต่จริงๆ มันใช้ง่ายกว่าที่คิดเยอะเลยนะ ช่วยให้เรารันงานแบบขนานได้สบายๆ แถมยังช่วยให้เราจัดการ resource ของเครื่องได้ดีขึ้นด้วย ไม่ต้องเขียน thread เองให้ปวดหัววว
ThreadPoolExecutor คืออะไร ทำไมต้องใช้?
ThreadPoolExecutor เนี่ย มันเป็นส่วนหนึ่งของโมดูล concurrent.futures ใน Python นะฮะ หลักๆ ก็คือมันสร้างกลุ่มของ "คนงาน" หรือ "เธรด" ขึ้นมาหลายๆ อัน คอยรอรับงานที่เราโยนเข้าไป แล้วมันก็จะจัดการเอาไปรันให้เราเองเลย ข้อดีคือเราไม่ต้องมานั่งสร้างหรือทำลายเธรดเอง มันจัดการให้หมดครับ ยิ่งถ้าใครต้องจัดการเรื่อง resource management หรือการจัดสรรทรัพยากรเครื่องเนี่ย ตัวนี้จะช่วยได้เยอะเลย
ที่เด็ดเลยคือ มันช่วยเรื่อง resource management ได้ดีมากๆ เลยนะ เพราะเราสามารถกำหนดจำนวน worker หรือคนงานที่เราต้องการให้มันทำงานพร้อมกันได้ สมมติเรามี CPU 4 คอร์ เราก็อาจจะกำหนดให้มันรันพร้อมกันซัก 4-8 อันก็ได้นะ เพื่อไม่ให้เครื่องเรามันทำงานหนักเกินไปไงครับ
มาดูตัวอย่างโค้ดกันเลยดีกว่า
1. เริ่มต้นง่ายๆ กับการส่งงานเข้าคิว
มาลองดูตัวอย่างแรกกันเลยดีกว่า ง่ายมากๆ ครับ จะเห็นว่าเราโยนงานเข้าไปสี่งาน แต่กำหนดให้มันรันพร้อมกันแค่สามงาน พออันไหนเสร็จ มันก็จะดึงอันถัดไปมาทำเองเลย เจ๋งป้ะล่ะ!
import time
from concurrent.futures import ThreadPoolExecutor
def do_work(task_id):
print(f"[{task_id}] กำลังเริ่มทำงาน...", flush=True)
time.sleep(2) # สมมติว่าทำงานนาน 2 วินาที
print(f"[{task_id}] ทำงานเสร็จแล้ว! เย้!", flush=True)
return f"ผลลัพธ์จากงาน {task_id}"
if __name__ == "__main__":
print("จะเริ่มโยนงานให้ ThraedPoolExecutor แล้วนะ")
with ThreadPoolExecutor(max_workers=3) as executor: # กำหนดให้ทำงานพร้อมกันสูงสุด 3 งาน
future1 = executor.submit(do_work, "งานที่ 1")
future2 = executor.submit(do_work, "งานที่ 2")
future3 = executor.submit(do_work, "งานที่ 3")
future4 = executor.submit(do_work, "งานที่ 4")
# เราสามารถดึงผลลัพธ์ได้ทีหลัง
print(f">>> ผลลัพธ์ 1: {future1.result()}")
print(f">>> ผลลัพธ์ 2: {future2.result()}")
2. จัดการผลลัพธ์และ Error ด้วย as_completed
บางทีเราอยากได้ผลลัพธ์มาทีละอันที่มันทำงานเสร็จใช่ไหมครับ ไม่ต้องรออันแรกเสร็จหมดก่อนถึงจะได้ อันนี้ก็มีตัวช่วยอีกนะ มันคือ as_completed ครับ ตัวอย่างนี้จะเห็นเลยว่า เราสามารถจัดการกับ error ที่เกิดขึ้นในแต่ละงานได้ด้วยนะ แล้วก็จะได้ผลลัพธ์มาตามลำดับที่มันเสร็จ ไม่ต้องรอครบทุกอัน สุดยอด!
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
def do_work_with_error(task_id, should_fail=False):
print(f"[{task_id}] เริ่มต้นทำงาน...", flush=True)
time.sleep(1 + task_id % 2) # บางงานช้า บางงานเร็ว
if should_fail and task_id == 3:
raise ValueError(f"งาน {task_id} มีปัญหาแล้วนะะะ!")
result = f"ทำ {task_id} สำเร็จแล้ว!"
print(f"[{task_id}] จบงาน!", flush=True)
return result
if __name__ == "__main__":
tasks = [1, 2, 3, 4, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(do_work_with_error, t, t == 3) for t in tasks]
print("\
กำลังรอผลลัพธ์จากงานที่ทำเสร็จก่อนนะ...")
for future in as_completed(futures):
try:
result = future.result()
print(f"--- ได้ผลลัพธ์มาแล้ว: {result}")
except Exception as e:
print(f"--- อ๊าววว งานมีปัญหา: {e}")
3. เคสใช้งานจริง (แนวคิดการดึงข้อมูลจากหลายๆ ที่)
ยกตัวอย่างง่ายๆ ที่ใช้บ่อยๆ เลยนะ เวลาเราไปดึงข้อมูลจากเว็บ (Web Scraping) หรือยิง API หลายๆ ที่เนี่ย ถ้าเรายิงทีละอัน มันจะช้ามากๆ เลยใช่ป่ะครับ แต่ถ้าเราใช้ ThreadPoolExecutor เนี่ย มันจะช่วยให้เราทำหลายๆ อย่างพร้อมกันได้เลย ประหยัดเวลาไปได้เยอะม๊ากกกก!
# อันนี้แค่แนวคิดนะ ไม่ได้รันจริงให้ดู เพราะไม่อยากยิงเว็บอื่นมั่วซั่ว
# import requests # ต้องติดตั้ง requests ก่อนนะ pip install requests
# def fetch_url(url):
# response = requests.get(url)
# return f"ข้อมูลจาก {url} ได้มาแล้ว ขนาด: {len(response.text)} ตัวอักษร"
# if __name__ == "__main__":
# urls_to_fetch = [
# "https://cii3.net/article/1", # สมมตินะ อันนี้ไม่ได้มีอยู่จริง
# "https://cii3.net/article/2",
# "https://cii3.net/article/3",
# "https://cii3.net/article/4",
# ]
# with ThreadPoolExecutor(max_workers=5) as executor:
# results = executor.map(fetch_url, urls_to_fetch)
# for res in results:
# print(res)
ฟังก์ชัน map ของ ThreadPoolExecutor ก็สะดวกมากๆ เลยนะ ถ้างานของเรามันรับ input แค่อันเดียว แล้วอยากให้มันคืนค่ากลับมาเป็นลิสต์เรียงตามที่เราส่งไป ง่ายสุดๆ ไปเลย
ข้อควรระวังเรื่องความปลอดภัยและ PDPA (นิดนึง)
อ้อ! อีกเรื่องที่ต้องระวังนะฮะ เวลาเราทำงานแบบขนานพวกนี้เนี่ย ถ้าเราต้องส่งข้อมูลสำคัญๆ เช่น ข้อมูลส่วนตัวลูกค้า หรืออะไรที่เกี่ยวกับ PDPA เข้าไปในงานเนี่ย ต้องมั่นใจนะว่าโค้ดของเรามันปลอดภัย ไม่ไปหลุด ไม่ไปปนกับข้อมูลอื่น หรือมีใครแอบดักเอาข้อมูลไปได้ง่ายๆ นะฮะะะ
การใช้ ThreadPoolExecutor เองมันไม่ได้ทำให้ข้อมูลปลอดภัยขึ้นโดยตรงนะ แต่เราต้องเขียนโค้ดข้างในให้ดีๆ เช่น ไม่เก็บข้อมูลสำคัญไว้ในตัวแปร global ที่ทุกคนเข้าถึงได้ หรือใช้ล็อกเพื่อป้องกันข้อมูลเสียหายจากการเข้าถึงพร้อมๆ กัน อะไรประมาณนี้ฮับบ
สรุปส่งท้าย
เป็นไงบ้างครับ ThreadPoolExecutor มันไม่ได้ยากอย่างที่คิดเลยใช่ไหมละะะ? ตัวนี้แหละจะช่วยให้เราเขียนโค้ด Python จัดการงานหนักๆ ได้อย่างมีประสิทธิภาพมากขึ้นเยอะเลย ประหยัดเวลา ประหยัดแรงแถมเครื่องก็ไม่ค้างบ่อยๆ ด้วยนะ
ลองเอาไปใช้กันดูนะครับ ถ้าติดตรงไหน หรือมีคำถามก็มาคุยกันได้เลยนะฮะ! ส่วนบทความนี้ก็หวังว่าจะเป็นประโยชน์กับเพื่อนๆ ทุกคนนะคับบ
แหล่งอ้างอิง: - เอกสาร concurrent.futures ใน Python: https://docs.python.org/3/library/concurrent.futures.html