[Python] 好用的 concurrent.futures is a good way to speed up your function


Posted by jerryeml on 2021-11-10

前言

有鑑於之前介紹了許多關於多線程跟多進程的東西
Golang-Advance- GO routine
Multi-processing 和 Multi-threading 的 Differenece:


如果喜歡,請幫我按讚訂閱分享並且開啟小鈴鐺(目前還沒有XD)

快速 Recap 一下 Concurrency and Parallelism

順序執行:老師甲先幫學生A輔導,輔導完之後再取給B輔導,最後再去給C輔導。
Concurrency:老師甲先給學生A去講思路,A聽懂了自己書寫過程,這期間甲老師去給B講思路,講完思路,B自己書寫過程,這期間再去給C講思路。這樣老師就沒有空著,一直在做事情。與順序執行不同的是,順序執行,老師講完思路之後學生再寫步驟,這期間老師是空閒的。
Parallelism:直接讓三個老師甲、乙、丙,「同時」給三個學生輔導作業。

有興趣的人可以先回去複習一下XD

Python 中實現 Concurrency 與 Parallelism

根據上圖 我們發現 要實現 Concurrency 一個好的做法就是利用 Threading 或是 Async IO 的方式

剛好concurrent.futures 這個好用的 Python package 可以滿足我們的需求

在我們正式開始之前,先了解關於 Future 模式的相關知識

首先 Future 是什麼?

Future 其實是生產-消費者模型的一種擴充套件,在生產-消費者模型中,生產者不關心消費者什麼時候處理完資料,也不關心消費者處理的結果。比如我們經常寫出如下的程式碼

import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            self.queue.put(`one product`)
            print(multiprocessing.current_process().name + str(os.getpid()) + ` produced one product, the no of queue now is: %d` %self.queue.qsize())
            sleep(randint(1, 3))


class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            d = self.queue.get(1)
            if d != None:
                print(multiprocessing.current_process().name + str(os.getpid()) + ` consumed  %s, the no of queue now is: %d` %(d,self.queue.qsize()))
                sleep(randint(1, 4))
                continue
            else:
                break

#create queue
queue = multiprocessing.Queue(40)

if __name__ == "__main__":
    print(`Excited!")
    #create processes    
    processed = []
    for i in range(3):
        processed.append(Producer(queue))
        processed.append(Consumer(queue))

    #start processes        
    for i in range(len(processed)):
        processed[i].start()

    #join processes    
    for i in range(len(processed)):
        processed[i].join()複製程式碼

這就是生產-消費者模型的一個簡單的實現,我們利用一個 multiprocessing 中的 Queue 來作為通訊渠道,我們的生產者負責往佇列中傳入資料,消費者負責從佇列中獲取資料並處理。不過就如同上面所說的一樣,在這種模式中,生產者並不關心消費者何時處理完資料,也不關心處理的結果。

concurrent.futures

Python 3.2 以後, concurrent.futures 是內建的模組,我們可以直接使用

Note: 如果你需要在 Python 2.7 中使用 concurrent.futures , 那麼請用 pip 進行安裝,pip install futures

Python 關於平行處理的模組除了 multiprocessingthreading 之外,其實還提供 1 個更為簡單易用的 concurrent.futures 可以使用。

concurrent.futures 提供了一組高階 API 給使用者操作非同步執行的任務。透過 ThreadPoolExectuor 執行 thread 層級的非同步任務,或是使用 ProcessPoolExecutor 執行 process 層級的非同步任務。兩者的 API 介面都相同,同樣繼承於 Executor
(今天會針對 Thread 做介紹)

ThreadPoolExecutor

ThreadPoolExecutor 如其名,透過 Thread 的方式建立多個 Executors ,用以執行消化多個任務(tasks)

例如以下範例,建立 1 個 ThreadPoolExecutor 以最多不超過 5 個 Threads 的方式平行執行 vision_one_has ,每個 vision_one_has 所需要的參數都是透過呼叫 submit 的方式交給 Executer 處理:

from concurrent.futures import ThreadPoolExecutor

def vision_one_has(team):
    print(team)

teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']

with ThreadPoolExecutor(max_workers=5) as executor:
    for n in teams:
        executor.submit(vision_one_has, n)

上述範例執行結果如下:

Elpis
Genger
Matrix
Sorlax
Lapras

Future objects

接著談談 concurrent.futures 模組中相當重要的角色 —— Future

事實上,當呼叫 submit 後,會回傳的並不是在 Thread 執行的程式結果,而是 Future 的實例,而這個實例是一個執行結果的代理 (Proxy) ,所以我們可以透過 done , running , cancelled 等方法詢問 Future實例在 Thread 中執行的程式狀態如何,如果程式已經進入 done 的狀態,則可以呼叫 result 取得結果。Link

不過 Python 也提供更簡單的方法 —— as_completed ,幫忙檢查狀態,所以可以少寫一些程式碼。

因此前述範例可以進一步改成以下形式:

from concurrent.futures import ThreadPoolExecutor, as_completed

def vision_one_has(team):
    return f'Hi, {team}'

teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    for n in names:
        future = executor.submit(vision_one_has, n)
        print(type(future))
        futures.append(future)

    for future in as_completed(futures):
        print(future.result())

上述範例在第 11 行取得 future 實例之後,在第 13 行將其放進 futures list 中,接著在第 15 行透過 as_completed(futures) 一個一個取得已經完成執行的 future 實例,並透過 result() 取得其結果後並列印出來。

其執行結果如下:

<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
Hi, Genger
Hi, Matrix
Hi, Elpis
Hi, Lapras
Hi, Sorlax

也由於我們將列印的功能從 Thread 內搬出,所以也解決列印文字可能黏在一起的情況。

除了以 submit() 先取得 Future 實例再逐一檢查狀態並取得結果之外,也可以直接利用 map() 方法直接取得 Thread 的執行結果,例如以下範例:

from concurrent.futures import ThreadPoolExecutor, as_completed


def vision_one_has(team):
    for i in range(100000):
        pass
    return f'Hi, {team}'


teams = ['Elpis', 'Genger', 'Matrix', 'Sorlax', 'Lapras']

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(vision_one_has, teams)

for r in results:
    print(r)

ProcessPoolExecutor

ProcessPoolExecutor 的使用方法與 ThreadPoolExecutor 一模一樣,基本上視需求選擇使用 ThreadPoolExecutorProcessPoolExecutor 即可。

不過值得注意的是 Python 3.5 之後 map() 方法多了 1 個 chunksize 參數可以使用,而該參數只對 ProcessPoolExecutor 有效,該參數可以提升 ProcessPoolExecutor 在處理大量 iterables 的執行效能。

ThreadPoolExecutor vs ProcessPoolExecutor 該怎麼選!?

Thread 會在同一個 Process 裡面做事,遇到被 I/O waiting hang-up 的時候 (例如 waiting for socket response),會由另外一個 Thread 繼續做事。當任務有很多 I/O 的動作時,就適合使用 ThreadPoolExecutor

Process 則是會開新的 Process 來處理,因此對於高 CPU 計算的工作帶來效益,不同於 socket 的狀況,這些計算並不會把自身 hang-up,而會持續不斷的計算。例如說費氏數列的計算就是這樣。對於這種狀況,使用 Process 就如同開影分身,能夠讓整體的計算更快完成。這時候就適合使用 ProcessPoolExecutor

Reference

https://docs.python.org/3/library/concurrent.futures.html
https://myapollo.com.tw/zh-tw/python-concurrent-futures/
https://iter01.com/52341.html
https://blog.louie.lu/2017/08/01/%E4%BD%A0%E6%89%80%E4%B8%8D%E7%9F%A5%E9%81%93%E7%9A%84-python-%E6%A8%99%E6%BA%96%E5%87%BD%E5%BC%8F%E5%BA%AB%E7%94%A8%E6%B3%95-06-concurrent-futures/
https://www.oulub.com/zh-TW/Python/library.concurrent.futures
https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python


#Python







Related Posts

<form> Attributes

<form> Attributes

什麼是 Prototype 、繼承與原型鏈 ?

什麼是 Prototype 、繼承與原型鏈 ?

[FE301] React 基礎(Class component 版)先別急著學 React

[FE301] React 基礎(Class component 版)先別急著學 React


Comments