介紹如何使用 Python 的 multiprocessing 模組,開發多核心平行運算程式。
Python 的 multiprocessing 是一個多行程模組,功能類似多執行緒的 threading 模組,可以讓開發者使用多核心的 CPU 進行平行化程式設計,加速程式處理速度。
Pool 資源池
Pool 是由多個工作者行程(worker process)所組成的資源池,可以將工作分配給其內部的各個工作者行程進行平行處理。
map 函數
Pool 最簡單的使用方式就是將資料以列表的形式打包好,透過其 map 函數將實際的工作處理函數與列表資料傳入其中。
假設我們想要以 Pool 的 map 函數平行計算多個數值的平方,就可以這樣寫:
from multiprocessing import Pool
# 計算數值平方的函數
def f(x):
return x*x
# 建立含有 4 個工作者行程的 Pool
with Pool(processes=4) as p:
# 以 map 平行計算數值的平方
print(p.map(f, [1, 2, 3]))
Pool 的 map 函數可以視為 Python 內建 map 函數的平行化版本,呼叫 Pool 的 map 函數可以將資料分割之後,再以 Pool 中的工作者行程平行處理,等到所有資料都處理完成之後,再一次傳回所有的結果。
Pool 的 map 函數有一個 chunksize 參數可以調整資料分割的大小,當資料列表的長度很長的時候,可以將 chunksize 設為較大的值,讓 map 每次切多一點資料給各個工作者行程,這樣可以減少資料傳遞的次數,提升程式執行效能。以下是比較不同 chunksize 對程式執行速度的範例程式:
from multiprocessing import Pool
import time
def f(x):
return x*x
with Pool(processes=4) as p:
start = time.time() # 開始測量執行時間
# 設定 chunksize 為 1
result1 = p.map(f, range(1000000), chunksize=1)
end = time.time() # 結束測量執行時間
print("chunksize 為 1,執行時間為 %f 秒" % (end - start))
start = time.time() # 開始測量執行時間
# 設定 chunksize 為 1000
result2 = p.map(f, range(1000000), chunksize=1000)
end = time.time() # 結束測量執行時間
print("chunksize 為 1000,執行時間為 %f 秒" % (end - start))
if result1 == result2:
print("結果相同")
chunksize 為 1,執行時間為 24.855215 秒 chunksize 為 1000,執行時間為 0.257618 秒 結果相同
調整 chunksize 並不會影響程式結構或計算結果,但是卻對執行時間影響很大。
imap 函數
imap 函數是 map 函數的可迭代(iterable)版本,其參數的用法皆與 map 函數相同:
from multiprocessing import Pool
def f(x):
return x*x
with Pool(processes=4) as p:
# 以 imap 平行計算數值的平方
for i in p.imap(f, [1, 2, 3]):
print(i)
1 4 9
imap 的 chunksize 設定方式與概念皆與 map 相同:
from multiprocessing import Pool
def f(x):
return x*x
with Pool(processes=4) as p:
# 設定 chunksize 為 1000
for i in p.imap(f, range(1000000), chunksize=1000):
print(i)
imap_unordered 函數
imap_unordered 函數跟 imap 函數非常類似,只不過 imap_unordered 所傳回的計算結果不會完全依照輸入資料的順序傳回,而是將先處理完成的資料先傳回,可增進整體程式的執行效率:
from multiprocessing import Pool
def f(x):
return x*x
with Pool(processes=4) as p:
# 以 imap_unordered 平行計算數值的平方
for i in p.imap_unordered(f, [1, 2, 3]):
print(i)
1 9 4
imap_unordered 的 chunksize 設定方式也都相同。
以下是一個用來比較 imap_unordered 與 imap 差異的範例,imap_unordered 會將先處理完成的資料先傳回,而 imap 則是為了要保持原有的資料順序,所以會有潛在的等待時間,這就會造成程式執行效率上的差異:
from multiprocessing import Pool
import time
def f(x):
time.sleep(x)
with Pool(processes=4) as p:
start = time.time() # 開始測量執行時間
# 以 imap 平行處理
for i in p.imap(f, [3, 2, 1]):
time.sleep(1)
end = time.time() # 結束測量執行時間
print("imap 執行時間為 %f 秒" % (end - start))
start = time.time() # 開始測量執行時間
# 以 imap_unordered 平行處理
for i in p.imap_unordered(f, [3, 2, 1]):
time.sleep(1)
end = time.time() # 結束測量執行時間
print("imap_unordered 執行時間為 %f 秒" % (end - start))
imap 執行時間為 6.007628 秒 imap_unordered 執行時間為 4.004911 秒
所以如果我們不在意計算結果的傳回順序,就可以採用效率較好的 imap_unordered 函數。
starmap 函數
starmap 函數可以用來處理多輸入參數的狀況,以下是每次傳遞兩個輸入的數值,平行計算總合的範例。
from multiprocessing import Pool
# 計算總和
def f(x, y):
return x+y
with Pool(processes=4) as p:
# 以 starmap 傳遞多個輸入參數資料
print(p.starmap(f, [(1, 2), (3, 4)]))
[3, 7]
apply_async 函數
apply_async 函數可以將指定的函數放進 Pool 資源池中執行,以下是平行執行多個函數的範例:
from multiprocessing import Pool
import time
def f1(x):
time.sleep(1)
return x*x
def f2(x):
time.sleep(1)
return x*x*x
with Pool(processes=4) as p:
start = time.time() # 開始測量執行時間
# 平行呼叫多個函數
r1 = p.apply_async(f1, (2,))
r2 = p.apply_async(f2, (2,))
# 取得計算結果
print(r1.get(timeout=3))
print(r2.get(timeout=3))
end = time.time() # 結束測量執行時間
print("執行時間:%f 秒" % (end - start))
4 8 執行時間:1.002772 秒
map_async 函數
map_async 函數是 map 函數的非同步版本,可以將工作放進 Pool 資源池中執行,等執行完成後再一次取回:
from multiprocessing import Pool
import time
def f1(x):
time.sleep(1)
return x*x
def f2(x):
time.sleep(1)
return x*x*x
with Pool(processes=4) as p:
start = time.time() # 開始測量執行時間
# 平行呼叫多個函數
r1 = p.map_async(f1, [1, 2, 3])
r2 = p.map_async(f2, [1, 2, 3])
# 取得計算結果
print(r1.get(timeout=3))
print(r2.get(timeout=3))
end = time.time() # 結束測量執行時間
print("執行時間:%f 秒" % (end - start))
[1, 4, 9] [1, 8, 27] 執行時間:2.003540 秒
starmap_async 函數
starmap_async 函數是 starmap 函數的非同步版本,可以將工作放進 Pool 資源池中執行,等執行完成後再一次取回:
from multiprocessing import Pool
import time
def f1(x, y):
time.sleep(1)
return x+y
def f2(x, y):
time.sleep(1)
return x*y
with Pool(processes=4) as p:
start = time.time() # 開始測量執行時間
# 平行呼叫多個函數
r1 = p.starmap_async(f1, [(1, 2), (3, 4)])
r2 = p.starmap_async(f2, [(1, 2), (3, 4)])
# 取得計算結果
print(r1.get(timeout=3))
print(r2.get(timeout=3))
end = time.time() # 結束測量執行時間
print("執行時間:%f 秒" % (end - start))
[3, 7] [2, 12] 執行時間:1.002267 秒
Process 行程
multiprocessing 模組中的 Process 可以用來建立獨立的行程,平行處理不同的工作。
from multiprocessing import Process
# 實際要執行的工作
def f(name):
print('Hello', name)
# 建立獨立行程
p = Process(target=f, args=('Bob',))
# 開始執行
p.start()
# 原行程可處理其他工作...
# 等待獨立行程結束
p.join()
Hello Bob
共享記憶體
如果需要在行程之間共享資料,可以使用 Value 或 Array 來儲存資料,讓多個行程可以同時存取。以下是透過 Value 與 Array 取回計算結果的範例。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 1.24
for i in range(len(a)):
a[i] = a[i] * 2
# 建立共享記憶體的 Value 與 Array
num = Value('d', 0.0)
arr = Array('i', range(10))
# 以獨立行程執行
p = Process(target=f, args=(num, arr))
p.start()
p.join()
# 取得計算結果
print(num.value)
print(arr[:])
1.24 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
如果有多個行程都會同時更改到共享記憶體中的資料,就必須搭配 Lock 鎖定記憶體中的資料之後再進行修改,才不會因為競爭條件(race condition)造成資料錯誤:
from multiprocessing import Process, Value
def f(c):
for i in range(1000):
# 取得 Lock 之後更改內容
with counter.get_lock():
c.value += 1
# 建立共享記憶體的 Value
counter = Value('i', )
# 以獨立行程執行
p1 = Process(target=f, args=(counter,))
p2 = Process(target=f, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
# 取得計算結果
print(counter.value)
2000
Queue
Queue 是另外一種在行程之間遞送資料的方法,以下是一個簡單的使用範例。
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
# 建立共享佇列
q = Queue()
p = Process(target=f, args=(q,))
p.start()
# 從佇列取得資料
print(q.get())
p.join()
[42, None, 'hello']
Pipe
Pipe 可以讓兩個行程之間進行雙向的資料傳輸,以下是一個簡單的範例。
from multiprocessing import Process, Pipe
def f(conn):
# 透過 Pipe 傳送資料
conn.send([42, None, 'hello'])
# 關閉 Pipe 連線
conn.close()
# 建立可雙向傳輸的 Pipe
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
# 從 Pipe 取得資料
print(parent_conn.recv())
p.join()
[42, None, 'hello']
在預設的情況下,建立 Pipe 的時候所產生兩個 Connection 物件都具有發送與接收資料的能力(send 與 recv 函數)。
