การใช้งาน Multiprocessing Package ในภาษา Python

Python 10 พ.ย. 2021

Multiprocessing คือตัวช่วยให้ process ที่ใช้เวลานานสามารถทำงานได้เร็วขึ้น หรือจะบอกว่าเป็นการ process หลายๆ งานพร้อมกันในแบบขนานกัน การ process ในแบบ single core จะมีขอบเขตจำกัด ซึ่งสามารถแก้ไขแทนได้ด้วยการทำงานแบบ multiple cores ถ้างานที่ต้องใช้เวลามากควรจะใช้ไปใช้งานในการ run แบบ parallel และอยู่ภายใต้ระบบที่ทำงานแบบ multiple processors/cores ซึ่ง Python ได้ provides interface ที่ใช้งานง่ายในการใช้งาน multiprocessing

ตัวอย่างนี้จะเป็นการ process ด้วย Multiprocessing Pool with starmap แบบเยอะๆ

Multiprocessing vs. Multithreading

การประมวลผลหลายตัวใช้ประโยชน์จากคอร์ของ CPU ทั้งหมด (multiple processes), ในขณะที่ Multithreading จะจับคู่กับ maps multiple threads กับทุกกระบวนการ ใน

กระบวนการ Multithreading แต่ละ processes จะสัมพันธ์กับหน่วยความจำของตัวเอง ซึ่งไม่นำไปสู่ความเสียหายของข้อมูลหรือการหยุดชะงักของงาน ส่วน Threads จะใช้  shared memory, มีการ Lock ทำงานที่ละ Thread

สำหรับงานที่เกี่ยวข้องกับ CPU การประมวลผลแบบ Multiprocessing จะดีกว่า ในขณะที่สำหรับงานที่เกี่ยวข้องกับ I/O (งานที่ผูกกับ IO กับงานที่ผูกกับ CPU) การทำงานแบบ Multithreading นั้นดีกว่า

ในภาษา python, Global Interpreter Lock (GIL) คือ lock ที่อนุญาตให้ควบคุม Python interpreter ด้วย thread เดียวเท่านั้น ในกรณี multithreading ซึ่งใช้เป็นหลักสำหรับงานที่เกี่ยวข้องกับ I/O, GIL จะไม่มีผลกระทบมากนักเนื่องจากมีการแชร์การล็อกระหว่าง thread ในขณะที่กำลังรอ I/O

ในทางกลับกัน การประมวลผลแบบ Multiprocessing จะจัดสรร Python Interpreter และ GIL ให้กับทุกกระบวนการ

การใช้ Process

Process class ใน multiprocessing จะจัดสรรงานทั้งหมดในหน่วยความจำในครั้งเดียว ทุก task ที่สร้างขึ้นและใช้ Processclass ต้องมีการจัดสรรหน่วยความจำแยกต่างหาก

ลองนึกภาพสถานการณ์ที่จะสร้าง 10 parallel processes โดยที่ทุกกระบวนการจะต้องเป็นกระบวนการของระบบที่แยกจากกัน

Process class จะเริ่ม process ตัวเลขที่มีจำนวน 0 ถึง 10 ตัวแปล target จะเป็นตัวกำหนด function ที่จะเรียกใช้ และ args จะเป็นตัวกำหนด argument(s) ที่จะส่งต่อไปยัง function ที่เรียกใช้.  start() method เป็น method ที่บอกให้เริ่มกระบวนการ ใน processes ทั้งหมด จะทำงานใน loop จนกว่า ทุก process จะทำงานเสร็จทั้งหมด ในการตรวจสอบจะใช้ join() method join() จะช่วยให้แน่ใจว่า ตัวคำสั่งต่อไปหลังจาก join() จะทำงาน หลังจาก multiprocessing ทำงานเสร็จเท่านั้น

sleep() method จะช่วยให้เข้าใจกระบวนการที่เกิดขึ้นพร้อมกันทำงานอย่างไร


วิธีการ Implement Pipe

ถ้า 2 process ต้องการที่จะสื่อสารกัน Pipe คือตัวเลือกที่ดีทีในส่วนนี้, Pipe จะมี 2 end-points โดยจะมี method send() และ recv()

ข้อมูลใน pipe จะเกิดความเสียหายได้ ถ้า มีการใช้บันทึกข้อมูลลงใน end-point เดียวกัน

จาก code ด้านบนนี้ cube_sender และ cube_receiver เป็น 2 process ที่สามารถสื่อสารกันได้ด้วย pipe ซึ่งจะมี x_conn, y_conn ที่ถูกสร้างมาจาก Pipe Class

ใน p1 จะมีการรับค่า 19 และ x_conn เข้ามาและส่งต่อไป ที่ cube_sender ภายใน cube_sender x_conn จะเรียกใช้คำสั่ง send เพื่อส่งต่อผลที่ได้จากการคำนวนเข้าไปใน pipe โดย y_conn จะรับข้อมูลนี้ออกมาเป็น output ใน p2


วิธีการ Implement Queue

เพื่อจัดเก็บเอาท์พุตของ multiple processes ใน shared communication channel สามารถใช้ queue ได้ ตัวอย่างเช่น สมมติว่าโจทย์คือการหา cubes ของตัวเลขสิบตัวแรกแล้วตามด้วยการเพิ่ม 1 ในแต่ละตัวเลข

มีการกำหนดสองฟังก์ชัน sum() และ cube() แล้วกำหนด Queue (q) และเรียกใช้ฟังก์ชัน cube() ตามด้วยฟังก์ชัน add()

จาก code จะอธิบายการสื่อสารของ object ในกรณีของเรา q จะอยู่ทั้ง 2 process ตัว method empty() คือตัวตรวจสอบว่า queue ว่างหรือไม่ และ get() จะ return ค่าที่เก็บอยู่ใน queue

โดยลำดับของผลลัพธ์จะไม่ได้กำหนดไว้


วิธีการ Implement Shared Memory

ในการรับคิวจาก Queue, ตัว Shared Memory stores จัดเก็บข้อมูลที่ใช้ร่วมกันระหว่างกระบวนการต่างๆ สามารถเป็นได้สองประเภท: Value หรือ Array.

Value

single value สามารถใช้ร่วมกันระหว่างหลายกระบวนการได้ดังนี้:

หมายเลข 19 ถูกส่งผ่านเป็นอาร์กิวเมนต์ไปยังฟังก์ชัน cube() ที่ value attribute จะดึงค่าที่แท้จริงของ Value,num. ตัวเลขจะมีการถูกแก้ไขเมื่อมีการเรียกใช้ cube() function ในครั้งหลัง สุดท้ายแล้วผลลัพธ์จะแสดงออกมาที่ print statement

Array

list of values สามารถใช้ร่วมกันระหว่าง multiple processes ได้ดังนี้:

Array() จะเริ่มต้นที่อาร์เรย์ว่างที่มีประเภทข้อมูล int ที่มีความยาว 3 อาร์เรย์ถูกวนซ้ำโดยการเพิ่ม 1 ให้กับทุกองค์ประกอบในนั้น

คุณสามารถใช้ arrในกระบวนการอื่นได้ เช่นเดียวกับ Value โดยพื้นฐานแล้วนี่คือแนวคิดของหน่วยความจำที่ใช้ร่วมกัน

หมายเหตุ: 'd' หมายถึงการลอยตัวแบบ double-precision และ 'i' (ใน Array("i", 3)) หมายถึงจำนวน signed integer.


วิธีการ Implement Server Process

server process เป็นกระบวนการหลักที่เริ่มทำงานเมื่อเริ่มโปรแกรม Python, process อื่นสามารถใช้ objects นี้ได้ ตัว manager object ของ class Manager() คือตัวท่ีทำหน้าที่ควบคุม server process ตัว Manager() รองรับ data types หลายแบบเช่น list, dict, Lock, RLock, Semaphore, BoundedSemaphore, Namespace, Condition, Event, Queue, Value, and Array.

เพื่อให้เข้าใจแนวคิดดีขึ้น ให้ดูตัวอย่างนี้:

dictionary และ list ได้รับการ initialized และจัดการโดยใช้ manager object


Shared Memory vs. Server Process:

  1. Manager() รองรับประเภทข้อมูลที่หลากหลายเมื่อเปรียบเทียบกับ shared memory
  2. Processes สามารถแบ่งปัน single manager บนคอมพิวเตอร์หลายเครื่องผ่านเครือข่าย
  3. server process ช้ากว่า shared memory

การใช้ Pool

Pool class ใน multiprocessing สามารถจัดการ process จำนวนมากได้ ช่วยให้คุณสามารถเรียกใช้งานได้หลายงานต่อ process (เนื่องจากสามารถจัดคิวงานได้) หน่วยความจำถูกจัดสรรให้กับกระบวนการดำเนินการเท่านั้น ไม่เหมือนกับคลาส Process ซึ่งจัดสรรหน่วยความจำให้กับกระบวนการทั้งหมด Pool class จะใช้ worker processes ตามที่เกิดขึ้นใน pool และ spawns เข้าไปใน process

การเริ่ม processes จำนวนมากโดยใช้คลาส Process นั้นแทบจะเป็นไปไม่ได้ เนื่องจากอาจทำให้ระบบปฏิบัติการเสียหายได้ แต่ Pool จะจัดการการกระจายงานออกไปและรวบรวมผลลัพธ์จากกระบวนการที่เกิดทั้งหมด โดยมี worker process จำนวนน้อยที่สุด (โดยเฉพาะอย่างยิ่ง จำนวนของ worker process จะเท่ากับ core ของ CPU)

Poolไม่ทำงานใน interactive interpreter และ Python classes มันต้องการการทำงานภายใต้ __main__ module

Pool class จะมี 6 method ที่มีประโยชน์:

apply

apply() method จะทำงานในแต่ละ block ใน primary process จนกว่าจะเสร็จ apply() รองรับหลายอาร์กิวเมนต์ รักษาลำดับของผลลัพธ์ และไม่เกิดการทำงานขึ้นพร้อมกัน

ถ้าจะคำนวณ cube ของตัวเลขสิบตัวแรก จะต้องวนซ้ำตัวเลขและส่งทีละตัวไปยังเมธอด apply() จำนวนกระบวนการนี้จะถูกตั้งค่าเป็น 4; อย่างไรก็ตาม cube() execute ในหนึ่งใน pool worker เท่านั้น

close() method จะยุติ pool และ join() รอให้กระบวนการของ worker หยุดลง


map

map() method รองรับการทำงานพร้อมกัน ไม่ยอมรับหลายอาร์กิวเมนต์และบล็อกโปรแกรมหลักจนกว่ากระบวนการทั้งหมดจะเสร็จสมบูรณ์ นอกจากนี้ยังรักษาลำดับของผลลัพธ์ (แม้ว่าลำดับการคำนวณอาจแตกต่างกันไป!)

ไม่เหมือนกับ apply(), map() ยอมรับ iterator ที่จะส่งผ่านเป็นอาร์กิวเมนต์ไปยังฟังก์ชัน cube()


apply_async

callback function ใน apply_async() สามารถใช้เพื่อคืนค่าทันทีหลังจากดำเนินการเสร็จสิ้น วิธีนี้จะรักษาลำดับของผลลัพธ์และรองรับการทำงานพร้อมกัน

หมายเหตุ: คุณสามารถใช้ wait() เพื่อบล็อกการเรียกใช้แบบ asynchronous


map_async

ไม่เหมือนกับ map(), map_async() ไม่มีการปิดกั้น (และรักษาลำดับของผลลัพธ์)

“HERE” และ “HERE AGAIN” จะถูกเขียนลงในคอนโซลเมื่อ map_async() ทำงาน โดยแสดงให้เห็นลักษณะการไม่ปิดกั้น อย่างไรก็ตาม คุณสามารถใช้ wait() เพื่อบล็อกการเรียกใช้งานแบบ asynchronous ได้


starmap

ไม่เหมือนกับ map(), starmap() ยอมรับหลายอาร์กิวเมนต์ มันรักษาลำดับของผลลัพธ์ พร้อมกัน และบล็อกกระบวนการหลัก


starmap_async

แตกต่างจาก starmap(), starmap_async() ไม่มีการปิดกั้น (และรักษาลำดับของผลลัพธ์)


imap

ไม่เหมือนกับ map(), imap() ไม่รอผลทั้งหมดและส่งคืนตัววนซ้ำ (ไม่ใช่รายการ)


imap_unordered

ไม่เหมือนกับ imap() ลำดับของผลลัพธ์จะไม่คงอยู่เสมอไป


Reference: https://towardsdatascience.com/how-to-use-the-multiprocessing-package-in-python3-a1c808415ec2

แท็ก

Onyx

Just a middle-aged programmer, Can do many things but not the most.