前言
有鑑於之前介紹了許多關於多線程跟多進程的東西
Golang-Advance- GO routine
Multi-processing 和 Multi-threading 的 Differenece:
如果喜歡,請幫我按讚訂閱分享並且開啟小鈴鐺(目前還沒有XD)
快速 Recap
一下 Concurrency
and Parallelism
順序執行:老師甲先幫學生A輔導,輔導完之後再取給B輔導,最後再去給C輔導。
Concurrency:老師甲先給學生A去講思路,A聽懂了自己書寫過程,這期間甲老師去給B講思路,講完思路,B自己書寫過程,這期間再去給C講思路。這樣老師就沒有空著,一直在做事情。與順序執行不同的是,順序執行,老師講完思路之後學生再寫步驟,這期間老師是空閒的。
Parallelism:直接讓三個老師甲、乙、丙,「同時」給三個學生輔導作業。
有興趣的人可以先回去複習一下XD
Python 中實現 Concurrency 與 Parallelism
根據上圖 我們發現 要實現 Concurrency
一個好的做法就是利用 Threading
或是 Async IO
的方式
剛好concurrent.futures
這個好用的 Python package
可以滿足我們的需求
在我們正式開始之前,先了解關於 Future
模式的相關知識
首先 Future
是什麼?
Future
其實是生產-消費者模型的一種擴充套件,在生產-消費者模型中,生產者不關心消費者什麼時候處理完資料,也不關心消費者處理的結果。比如我們經常寫出如下的程式碼
import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
self.queue.put(`one product`)
print(multiprocessing.current_process().name + str(os.getpid()) + ` produced one product, the no of queue now is: %d` %self.queue.qsize())
sleep(randint(1, 3))
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
d = self.queue.get(1)
if d != None:
print(multiprocessing.current_process().name + str(os.getpid()) + ` consumed %s, the no of queue now is: %d` %(d,self.queue.qsize()))
sleep(randint(1, 4))
continue
else:
break
#create queue
queue = multiprocessing.Queue(40)
if __name__ == "__main__":
print(`Excited!")
#create processes
processed = []
for i in range(3):
processed.append(Producer(queue))
processed.append(Consumer(queue))
#start processes
for i in range(len(processed)):
processed[i].start()
#join processes
for i in range(len(processed)):
processed[i].join()複製程式碼
這就是生產-消費者模型的一個簡單的實現,我們利用一個 multiprocessing
中的 Queue
來作為通訊渠道,我們的生產者負責往佇列中傳入資料,消費者負責從佇列中獲取資料並處理。不過就如同上面所說的一樣,在這種模式中,生產者並不關心消費者何時處理完資料,也不關心處理的結果。
concurrent.futures
在 Python 3.2
以後, concurrent.futures
是內建的模組,我們可以直接使用
Note: 如果你需要在 Python 2.7 中使用 concurrent.futures , 那麼請用 pip 進行安裝,pip install futures
Python
關於平行處理的模組除了 multiprocessing
與 threading
之外,其實還提供 1 個更為簡單易用的 concurrent.futures
可以使用。
concurrent.futures
提供了一組高階 API
給使用者操作非同步執行的任務。透過 ThreadPoolExectuor
執行 thread
層級的非同步任務,或是使用 ProcessPoolExecutor
執行 process
層級的非同步任務。兩者的 API
介面都相同,同樣繼承於 Executor
。
(今天會針對 Thread
做介紹)
ThreadPoolExecutor
ThreadPoolExecutor
如其名,透過 Thread
的方式建立多個 Executors
,用以執行消化多個任務(tasks)
。
例如以下範例,建立 1 個 ThreadPoolExecutor
以最多不超過 5 個 Threads
的方式平行執行 vision_one_has
,每個 vision_one_has
所需要的參數都是透過呼叫 submit
的方式交給 Executer
處理:
from concurrent.futures import ThreadPoolExecutor
def vision_one_has(team):
print(team)
teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']
with ThreadPoolExecutor(max_workers=5) as executor:
for n in teams:
executor.submit(vision_one_has, n)
上述範例執行結果如下:
Elpis
Genger
Matrix
Sorlax
Lapras
Future objects
接著談談 concurrent.futures
模組中相當重要的角色 —— Future
。
事實上,當呼叫 submit
後,會回傳的並不是在 Thread 執行的程式結果,而是 Future
的實例,而這個實例是一個執行結果的代理 (Proxy)
,所以我們可以透過 done
, running
, cancelled
等方法詢問 Future
實例在 Thread
中執行的程式狀態如何,如果程式已經進入 done 的狀態,則可以呼叫 result
取得結果。Link
不過 Python 也提供更簡單的方法 —— as_completed ,幫忙檢查狀態,所以可以少寫一些程式碼。
因此前述範例可以進一步改成以下形式:
from concurrent.futures import ThreadPoolExecutor, as_completed
def vision_one_has(team):
return f'Hi, {team}'
teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for n in names:
future = executor.submit(vision_one_has, n)
print(type(future))
futures.append(future)
for future in as_completed(futures):
print(future.result())
上述範例在第 11 行取得 future 實例之後,在第 13 行將其放進 futures list
中,接著在第 15 行透過 as_completed(futures)
一個一個取得已經完成執行的 future 實例,並透過 result()
取得其結果後並列印出來。
其執行結果如下:
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
Hi, Genger
Hi, Matrix
Hi, Elpis
Hi, Lapras
Hi, Sorlax
也由於我們將列印的功能從 Thread
內搬出,所以也解決列印文字可能黏在一起的情況。
除了以 submit()
先取得 Future
實例再逐一檢查狀態並取得結果之外,也可以直接利用 map()
方法直接取得 Thread
的執行結果,例如以下範例:
from concurrent.futures import ThreadPoolExecutor, as_completed
def vision_one_has(team):
for i in range(100000):
pass
return f'Hi, {team}'
teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(vision_one_has, teams)
for r in results:
print(r)
ProcessPoolExecutor
ProcessPoolExecutor
的使用方法與 ThreadPoolExecutor
一模一樣,基本上視需求選擇使用 ThreadPoolExecutor
或 ProcessPoolExecutor
即可。
不過值得注意的是 Python 3.5
之後 map()
方法多了 1 個 chunksize
參數可以使用,而該參數只對 ProcessPoolExecutor
有效,該參數可以提升 ProcessPoolExecutor
在處理大量 iterables
的執行效能。
ThreadPoolExecutor vs ProcessPoolExecutor 該怎麼選!?
Thread 會在同一個 Process
裡面做事,遇到被 I/O waiting hang-up
的時候 (例如 waiting for socket response
),會由另外一個 Thread
繼續做事。當任務有很多 I/O
的動作時,就適合使用 ThreadPoolExecutor
。
Process
則是會開新的 Process
來處理,因此對於高 CPU
計算的工作帶來效益,不同於 socket
的狀況,這些計算並不會把自身 hang-up
,而會持續不斷的計算。例如說費氏數列的計算就是這樣。對於這種狀況,使用 Process
就如同開影分身,能夠讓整體的計算更快完成。這時候就適合使用 ProcessPoolExecutor
。
Reference
https://docs.python.org/3/library/concurrent.futures.html
https://myapollo.com.tw/zh-tw/python-concurrent-futures/
https://iter01.com/52341.html
https://blog.louie.lu/2017/08/01/%E4%BD%A0%E6%89%80%E4%B8%8D%E7%9F%A5%E9%81%93%E7%9A%84-python-%E6%A8%99%E6%BA%96%E5%87%BD%E5%BC%8F%E5%BA%AB%E7%94%A8%E6%B3%95-06-concurrent-futures/
https://www.oulub.com/zh-TW/Python/library.concurrent.futures
https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python