การใช้งาน Multiprocessing Package ในภาษา Python
Multiprocessing คือตัวช่วยให้ process ที่ใช้เวลานานสามารถทำงานได้เร็วขึ้น หรือจะบอกว่าเป็นการ process หลายๆ งานพร้อมกันในแบบขนานกัน การ process ในแบบ single core จะมีขอบเขตจำกัด ซึ่งสามารถแก้ไขแทนได้ด้วยการทำงานแบบ multiple cores ถ้างานที่ต้องใช้เวลามากควรจะใช้ไปใช้งานในการ run แบบ parallel และอยู่ภายใต้ระบบที่ทำงานแบบ multiple processors/cores ซึ่ง Python ได้ provides interface ที่ใช้งานง่ายในการใช้งาน multiprocessing
ตัวอย่างนี้จะเป็นการ process ด้วย Multiprocessing Pool with starmap แบบเยอะๆ
Multiprocessing vs. Multithreading
การประมวลผลหลายตัวใช้ประโยชน์จากคอร์ของ CPU ทั้งหมด (multiple processes), ในขณะที่ Multithreading จะจับคู่กับ maps multiple threads กับทุกกระบวนการ ใน
กระบวนการ Multithreading แต่ละ processes จะสัมพันธ์กับหน่วยความจำของตัวเอง ซึ่งไม่นำไปสู่ความเสียหายของข้อมูลหรือการหยุดชะงักของงาน ส่วน Threads จะใช้ shared memory, มีการ Lock ทำงานที่ละ Thread
สำหรับงานที่เกี่ยวข้องกับ CPU การประมวลผลแบบ Multiprocessing จะดีกว่า ในขณะที่สำหรับงานที่เกี่ยวข้องกับ I/O (งานที่ผูกกับ IO กับงานที่ผูกกับ CPU) การทำงานแบบ Multithreading นั้นดีกว่า
ในภาษา python, Global Interpreter Lock (GIL) คือ lock ที่อนุญาตให้ควบคุม Python interpreter ด้วย thread เดียวเท่านั้น ในกรณี multithreading ซึ่งใช้เป็นหลักสำหรับงานที่เกี่ยวข้องกับ I/O, GIL จะไม่มีผลกระทบมากนักเนื่องจากมีการแชร์การล็อกระหว่าง thread ในขณะที่กำลังรอ I/O
ในทางกลับกัน การประมวลผลแบบ Multiprocessing จะจัดสรร Python Interpreter และ GIL ให้กับทุกกระบวนการ
การใช้ Process
Process
class ใน multiprocessing จะจัดสรรงานทั้งหมดในหน่วยความจำในครั้งเดียว ทุก task ที่สร้างขึ้นและใช้ Process
class ต้องมีการจัดสรรหน่วยความจำแยกต่างหาก
ลองนึกภาพสถานการณ์ที่จะสร้าง 10 parallel processes โดยที่ทุกกระบวนการจะต้องเป็นกระบวนการของระบบที่แยกจากกัน
Process
class จะเริ่ม process ตัวเลขที่มีจำนวน 0 ถึง 10 ตัวแปล target จะเป็นตัวกำหนด function ที่จะเรียกใช้ และ args จะเป็นตัวกำหนด argument(s) ที่จะส่งต่อไปยัง function ที่เรียกใช้. start()
method เป็น method ที่บอกให้เริ่มกระบวนการ ใน processes ทั้งหมด จะทำงานใน loop จนกว่า ทุก process จะทำงานเสร็จทั้งหมด ในการตรวจสอบจะใช้ join()
method join()
จะช่วยให้แน่ใจว่า ตัวคำสั่งต่อไปหลังจาก join()
จะทำงาน หลังจาก multiprocessing ทำงานเสร็จเท่านั้น
sleep()
method จะช่วยให้เข้าใจกระบวนการที่เกิดขึ้นพร้อมกันทำงานอย่างไร
วิธีการ Implement Pipe
ถ้า 2 process ต้องการที่จะสื่อสารกัน Pipe คือตัวเลือกที่ดีทีในส่วนนี้, Pipe จะมี 2 end-points โดยจะมี method send()
และ recv()
ข้อมูลใน pipe จะเกิดความเสียหายได้ ถ้า มีการใช้บันทึกข้อมูลลงใน end-point เดียวกัน
จาก code ด้านบนนี้ cube_sender
และ cube_receiver
เป็น 2 process ที่สามารถสื่อสารกันได้ด้วย pipe ซึ่งจะมี x_conn, y_conn ที่ถูกสร้างมาจาก Pipe Class
ใน p1 จะมีการรับค่า 19 และ x_conn เข้ามาและส่งต่อไป ที่ cube_sender ภายใน cube_sender x_conn จะเรียกใช้คำสั่ง send เพื่อส่งต่อผลที่ได้จากการคำนวนเข้าไปใน pipe โดย y_conn จะรับข้อมูลนี้ออกมาเป็น output ใน p2
วิธีการ Implement Queue
เพื่อจัดเก็บเอาท์พุตของ multiple processes ใน shared communication channel สามารถใช้ queue ได้ ตัวอย่างเช่น สมมติว่าโจทย์คือการหา cubes ของตัวเลขสิบตัวแรกแล้วตามด้วยการเพิ่ม 1 ในแต่ละตัวเลข
มีการกำหนดสองฟังก์ชัน sum()
และ cube()
แล้วกำหนด Queue (q) และเรียกใช้ฟังก์ชัน cube() ตามด้วยฟังก์ชัน add()
จาก code จะอธิบายการสื่อสารของ object ในกรณีของเรา q จะอยู่ทั้ง 2 process ตัว method empty() คือตัวตรวจสอบว่า queue ว่างหรือไม่ และ get() จะ return ค่าที่เก็บอยู่ใน queue
โดยลำดับของผลลัพธ์จะไม่ได้กำหนดไว้
วิธีการ Implement Shared Memory
ในการรับคิวจาก Queue, ตัว Shared Memory stores จัดเก็บข้อมูลที่ใช้ร่วมกันระหว่างกระบวนการต่างๆ สามารถเป็นได้สองประเภท: Value หรือ Array.
Value
single value สามารถใช้ร่วมกันระหว่างหลายกระบวนการได้ดังนี้:
หมายเลข 19 ถูกส่งผ่านเป็นอาร์กิวเมนต์ไปยังฟังก์ชัน cube() ที่ value
attribute จะดึงค่าที่แท้จริงของ Value
,num
. ตัวเลขจะมีการถูกแก้ไขเมื่อมีการเรียกใช้ cube()
function ในครั้งหลัง สุดท้ายแล้วผลลัพธ์จะแสดงออกมาที่ print statement
Array
list of values สามารถใช้ร่วมกันระหว่าง multiple processes ได้ดังนี้:
Array()
จะเริ่มต้นที่อาร์เรย์ว่างที่มีประเภทข้อมูล int
ที่มีความยาว 3 อาร์เรย์ถูกวนซ้ำโดยการเพิ่ม 1 ให้กับทุกองค์ประกอบในนั้น
คุณสามารถใช้ arr
ในกระบวนการอื่นได้ เช่นเดียวกับ Value โดยพื้นฐานแล้วนี่คือแนวคิดของหน่วยความจำที่ใช้ร่วมกัน
หมายเหตุ: 'd' หมายถึงการลอยตัวแบบ double-precision และ 'i' (ใน Array("i", 3)) หมายถึงจำนวน signed integer.
วิธีการ Implement Server Process
server process เป็นกระบวนการหลักที่เริ่มทำงานเมื่อเริ่มโปรแกรม Python, process อื่นสามารถใช้ objects นี้ได้ ตัว manager object ของ class Manager()
คือตัวท่ีทำหน้าที่ควบคุม server process ตัว Manager() รองรับ data types หลายแบบเช่น list, dict, Lock, RLock, Semaphore, BoundedSemaphore, Namespace, Condition, Event, Queue, Value, and Array.
เพื่อให้เข้าใจแนวคิดดีขึ้น ให้ดูตัวอย่างนี้:
dictionary และ list ได้รับการ initialized และจัดการโดยใช้ manager object
Shared Memory vs. Server Process:
- Manager() รองรับประเภทข้อมูลที่หลากหลายเมื่อเปรียบเทียบกับ shared memory
- Processes สามารถแบ่งปัน single manager บนคอมพิวเตอร์หลายเครื่องผ่านเครือข่าย
- server process ช้ากว่า shared memory
การใช้ Pool
Pool
class ใน multiprocessing สามารถจัดการ process จำนวนมากได้ ช่วยให้คุณสามารถเรียกใช้งานได้หลายงานต่อ process (เนื่องจากสามารถจัดคิวงานได้) หน่วยความจำถูกจัดสรรให้กับกระบวนการดำเนินการเท่านั้น ไม่เหมือนกับคลาส Process ซึ่งจัดสรรหน่วยความจำให้กับกระบวนการทั้งหมด Pool
class จะใช้ worker processes ตามที่เกิดขึ้นใน pool และ spawns เข้าไปใน process
การเริ่ม processes จำนวนมากโดยใช้คลาส Process นั้นแทบจะเป็นไปไม่ได้ เนื่องจากอาจทำให้ระบบปฏิบัติการเสียหายได้ แต่ Pool จะจัดการการกระจายงานออกไปและรวบรวมผลลัพธ์จากกระบวนการที่เกิดทั้งหมด โดยมี worker process จำนวนน้อยที่สุด (โดยเฉพาะอย่างยิ่ง จำนวนของ worker process จะเท่ากับ core ของ CPU)
Pool
ไม่ทำงานใน interactive interpreter และ Python classes มันต้องการการทำงานภายใต้ __main__ module
Pool
class จะมี 6 method ที่มีประโยชน์:
apply
apply()
method จะทำงานในแต่ละ block ใน primary process จนกว่าจะเสร็จ apply()
รองรับหลายอาร์กิวเมนต์ รักษาลำดับของผลลัพธ์ และไม่เกิดการทำงานขึ้นพร้อมกัน
ถ้าจะคำนวณ cube ของตัวเลขสิบตัวแรก จะต้องวนซ้ำตัวเลขและส่งทีละตัวไปยังเมธอด apply() จำนวนกระบวนการนี้จะถูกตั้งค่าเป็น 4; อย่างไรก็ตาม cube() execute ในหนึ่งใน pool worker เท่านั้น
close() method จะยุติ pool และ join() รอให้กระบวนการของ worker หยุดลง
map
map()
method รองรับการทำงานพร้อมกัน ไม่ยอมรับหลายอาร์กิวเมนต์และบล็อกโปรแกรมหลักจนกว่ากระบวนการทั้งหมดจะเสร็จสมบูรณ์ นอกจากนี้ยังรักษาลำดับของผลลัพธ์ (แม้ว่าลำดับการคำนวณอาจแตกต่างกันไป!)
ไม่เหมือนกับ apply(), map() ยอมรับ iterator ที่จะส่งผ่านเป็นอาร์กิวเมนต์ไปยังฟังก์ชัน cube()
apply_async
callback function ใน apply_async()
สามารถใช้เพื่อคืนค่าทันทีหลังจากดำเนินการเสร็จสิ้น วิธีนี้จะรักษาลำดับของผลลัพธ์และรองรับการทำงานพร้อมกัน
หมายเหตุ: คุณสามารถใช้ wait() เพื่อบล็อกการเรียกใช้แบบ asynchronous
map_async
ไม่เหมือนกับ map(), map_async() ไม่มีการปิดกั้น (และรักษาลำดับของผลลัพธ์)
“HERE” และ “HERE AGAIN” จะถูกเขียนลงในคอนโซลเมื่อ map_async() ทำงาน โดยแสดงให้เห็นลักษณะการไม่ปิดกั้น อย่างไรก็ตาม คุณสามารถใช้ wait() เพื่อบล็อกการเรียกใช้งานแบบ asynchronous ได้
starmap
ไม่เหมือนกับ map(), starmap() ยอมรับหลายอาร์กิวเมนต์ มันรักษาลำดับของผลลัพธ์ พร้อมกัน และบล็อกกระบวนการหลัก
starmap_async
แตกต่างจาก starmap(), starmap_async() ไม่มีการปิดกั้น (และรักษาลำดับของผลลัพธ์)
imap
ไม่เหมือนกับ map(), imap() ไม่รอผลทั้งหมดและส่งคืนตัววนซ้ำ (ไม่ใช่รายการ)
imap_unordered
ไม่เหมือนกับ imap() ลำดับของผลลัพธ์จะไม่คงอยู่เสมอไป
Reference: https://towardsdatascience.com/how-to-use-the-multiprocessing-package-in-python3-a1c808415ec2