本篇介紹如何在 Python 中使用 threading 模組,撰寫多執行緒的平行計算程式,利用多顆 CPU 核心加速運算。
現在電腦的 CPU 都有許多的核心,若想要讓程式可以運用多顆 CPU 核心,充分發揮硬體的運算能力,就必須考慮使用多執行緒(multithreading)或多行程(multiprocessing)等平行化的技術,以下介紹 Python 的多執行緒的程式設計方法與技巧,並提供詳細的範例程式碼。
由於 CPython 的 GIL(Global Interpreter Lock)限制,可能會造成大部分的 Python 程式無法以多執行緒發揮多核心 CPU 的效能,若遇到這樣的狀況,可以考慮改用多行程的方式來設計程式。
threading 模組
在 Python 中若要撰寫多執行緒(multithreading)的平行化程式,最基本的方式是使用 threading 這個模組來建立子執行緒。
threading 是 Python 標準函式庫裡面的模組,所以不用特別安裝即可使用,雖然功能不是很多,但是基本多執行緒程式設計常用的功能它都有,對於比較單純的平行化工作來說,還算滿實用了。
建立子執行緒
以下是使用 threading 模組建立子執行緒的範例:
import threading
import time
# 子執行緒的工作函數
def job():
for i in range(5):
print("Child thread:", i)
time.sleep(1)
# 建立一個子執行緒
t = threading.Thread(target = job)
# 執行該子執行緒
t.start()
# 主執行緒繼續執行自己的工作
for i in range(3):
print("Main thread:", i)
time.sleep(1)
# 等待 t 這個子執行緒結束
t.join()
print("Done.")
在這個例子中,我們先定義一個要讓子執行緒執行的 job 函數,接著使用 threading.Thread 建立一個新的子執行緒,其 target 參數就指定為要讓子執行緒執行的函數(也就是 job)。
建立好新的執行緒之後,即可呼叫執行緒的 start 函數,讓它開始執行,在子執行緒執行的同時,我們還是可以在主程式中繼續處理其他的工作。
如果有些工作是要等待子執行緒執行完成後才能處理的話,可以使用執行緒的 join 函數,等待該執行緒執行結束,也就是說放在 join 之後的程式碼就會等到子執行緒執行完成後,才會接著執行。
執行後的輸出會類似這樣:
Child thread: 0 Main thread: 0 Main thread: 1 Child thread: 1 Main thread: 2 Child thread: 2 Child thread: 3 Child thread: 4 Done.
這裡的子執行緒會執行 5 秒,但是主程式中的迴圈只要 3 秒就結束了,所以主程式會在 join 的地方等待 2 秒鐘,等到子執行緒結束之後,才會輸出 Done. 這一行訊息。
多個子執行緒與參數
通常我們在撰寫平行化的程式時,都會使用多個子執行緒,並且傳入不同的參數,讓個子執行緒各自負責不同的工作,這時候就可以在建立子執行緒時,使用 args 參數指定要傳數的參數。以下是一個簡單的範例:
import threading
import time
# 子執行緒的工作函數
def job(num):
print("Thread", num)
time.sleep(1)
# 建立 5 個子執行緒
threads = []
for i in range(5):
threads.append(threading.Thread(target = job, args = (i,)))
threads[i].start()
# 主執行緒繼續執行自己的工作
# ...
# 等待所有子執行緒結束
for i in range(5):
threads[i].join()
print("Done.")
在這個例子中,我們讓子執行緒執行的 job 函數會接受一個 num 參數,依據這個參數來決定要處理什麼工作,然後在呼叫 threading.Thread 建立子執行緒時,將要傳入的參數放在 args 參數中,這樣就可以把資料傳進子執行緒的 job 函數中了。執行之後的結果如下:
Thread 0 Thread 1 Thread 2 Thread 3 Thread 4 Done.
物件導向
我們也可以使用 Python 物件導向的方式來改寫 threading 的多執行緒程式,以下是一個簡單的範例:
import threading
import time
# 子執行緒類別
class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self):
print("Thread", self.num)
time.sleep(1)
# 建立 5 個子執行緒
threads = []
for i in range(5):
threads.append(MyThread(i))
threads[i].start()
# 主執行緒繼續執行自己的工作
# ...
# 等待所有子執行緒結束
for i in range(5):
threads[i].join()
print("Done.")
這個範例大致上的觀念都跟前面差不多,比較需要注意的地方就是 threading.Thread 在開始執行時,會呼叫它自己的 run 方法函數,這個方法函數預設會呼叫前面我們以 target 參數所指定的函數,在這裡我們在繼承 threading.Thread 類別之後,就直接把 run 覆寫成要執行的函數即可。
這個範例的執行結果跟上一個例子相同:
Thread 0 Thread 1 Thread 2 Thread 3 Thread 4 Done.
佇列(Queue)
如果我們有許多的工作要分給多個 CPU 核心做運算,最簡單的方式就是使用佇列的方式,讓多個 CPU 可從佇列中取得尚未處理的工作來處理:
import time
import threading
import queue
# Worker 類別,負責處理資料
class Worker(threading.Thread):
def __init__(self, queue, num):
threading.Thread.__init__(self)
self.queue = queue
self.num = num
def run(self):
while self.queue.qsize() > 0:
# 取得新的資料
msg = self.queue.get()
# 處理資料
print("Worker %d: %s" % (self.num, msg))
time.sleep(1)
# 建立佇列
my_queue = queue.Queue()
# 將資料放入佇列
for i in range(10):
my_queue.put("Data %d" % i)
# 建立兩個 Worker
my_worker1 = Worker(my_queue, 1)
my_worker2 = Worker(my_queue, 2)
# 讓 Worker 開始處理資料
my_worker1.start()
my_worker2.start()
# 等待所有 Worker 結束
my_worker1.join()
my_worker2.join()
print("Done.")
這裡我們建立兩個 Worker,它們都會從佇列中取得尚未處理的資料,直到佇列清空為止。執行後的結果會像這樣:
Worker 1: Data 0 Worker 2: Data 1 Worker 1: Data 2 Worker 2: Data 3 Worker 2: Data 4 Worker 1: Data 5 Worker 1: Data 6 Worker 2: Data 7 Worker 2: Data 8 Worker 1: Data 9 Done.
鎖定(Lock)
在平行化的多執行緒程式中,每個執行緒都是同時在執行的,若遇到不可以讓多個執行緒同時進行的工作時(例如將資料寫入同一個檔案),就必須使用鎖定(lock)的方式,一次只讓一個執行緒處理這種工作。
在 Python 中,我們可以使用 threading 模組的 Lock 來處理多執行緒的鎖定問題,以下是一個簡單的使用範例:
import time
import threading
import queue
class Worker(threading.Thread):
def __init__(self, queue, num, lock):
threading.Thread.__init__(self)
self.queue = queue
self.num = num
self.lock = lock
def run(self):
while self.queue.qsize() > 0:
msg = self.queue.get()
# 取得 lock
self.lock.acquire()
print("Lock acquired by Worker %d" % self.num)
# 不能讓多個執行緒同時進的工作
print("Worker %d: %s" % (self.num, msg))
time.sleep(1)
# 釋放 lock
print("Lock released by Worker %d" % self.num)
self.lock.release()
my_queue = queue.Queue()
for i in range(5):
my_queue.put("Data %d" % i)
# 建立 lock
lock = threading.Lock()
my_worker1 = Worker(my_queue, 1, lock)
my_worker2 = Worker(my_queue, 2, lock)
my_worker1.start()
my_worker2.start()
my_worker1.join()
my_worker2.join()
print("Done.")
在這個範例中,我們讓兩個 Worker 都從佇列中取得待處理的工作,但是我們使用一個 Lock 限制一次只允許一個 Worker 來處理工作。
當一個執行緒呼叫了 Lock 的 acquire 時,代表取得了這個 Lock 的使用權,接著它就可以往下執行裡面的工作,若此時又有另外一個執行緒想要呼叫 acquire 取得使用權的話,就必須等待上一個執行緒執行完,並呼叫 release 釋放這個 Lock 之後,才能夠取得這個 Lock 的使用權,接著執行裡面的工作。
在這種狀況下雖然兩個 Worker 是同時執行的,但是由於 Lock 的互斥作用,因此可以確保被 Lock 的 acquire 與 release 包起來的這段程式碼不會被兩個執行緒同時執行。
執行的結果如下:
Lock acquired by Worker 1 Worker 1: Data 0 Lock released by Worker 1 Lock acquired by Worker 2 Worker 2: Data 1 Lock released by Worker 2 Lock acquired by Worker 1 Worker 1: Data 2 Lock released by Worker 1 Lock acquired by Worker 2 Worker 2: Data 3 Lock released by Worker 2 Lock acquired by Worker 1 Worker 1: Data 4 Lock released by Worker 1 Done.
旗標(Semaphore)
有時候因為系統資源有限的因素(例如考量 CPU 或記憶體的限制),在處理某些特別耗資源的工作時,僅允許有限個執行緒同時進行,這個狀況跟上面介紹的鎖定(lock)有點類似,但是鎖定的方式是僅允許一個執行緒進行某項工作,而這裡我們是允許多個執行緒同時執行的,但要限制同時執行的執行緒數量上限。
旗標(semaphore)的作用跟鎖定(lock)類似,但是它多了一個計數器的功能,當一個執行緒呼叫了 acquire 時,旗標內部的計數器就會遞減 1,而當執行緒呼叫了 release 時,計數器就會遞增 1,當計數器遞減到 0 的時候,後面來的執行緒就要等待其他執行緒release 後才能繼續。
以下是一個簡單的範例:
import time
import threading
import queue
class Worker(threading.Thread):
def __init__(self, queue, num, semaphore):
threading.Thread.__init__(self)
self.queue = queue
self.num = num
self.semaphore = semaphore
def run(self):
while self.queue.qsize() > 0:
msg = self.queue.get()
# 取得旗標
semaphore.acquire()
print("Semaphore acquired by Worker %d" % self.num)
# 僅允許有限個執行緒同時進的工作
print("Worker %d: %s" % (self.num, msg))
time.sleep(1)
# 釋放旗標
print("Semaphore released by Worker %d" % self.num)
self.semaphore.release()
my_queue = queue.Queue()
for i in range(5):
my_queue.put("Data %d" % i)
# 建立旗標
semaphore = threading.Semaphore(2)
my_worker1 = Worker(my_queue, 1, semaphore)
my_worker2 = Worker(my_queue, 2, semaphore)
my_worker3 = Worker(my_queue, 3, semaphore)
my_worker1.start()
my_worker2.start()
my_worker3.start()
my_worker1.join()
my_worker2.join()
my_worker3.join()
print("Done.")
Semaphore acquired by Worker 1 Worker 1: Data 0 Semaphore acquired by Worker 2 Worker 2: Data 1 Semaphore released by Worker 1 Semaphore acquired by Worker 1 Worker 1: Data 3 Semaphore released by Worker 2 Semaphore acquired by Worker 2 Worker 2: Data 4 Semaphore released by Worker 1 Semaphore released by Worker 2 Semaphore acquired by Worker 3 Worker 3: Data 2 Semaphore released by Worker 3 Done.
重複鎖定(RLock)
RLock 是一個可重複取得使用權的鎖定功能,它跟普通的 Lock 類似,但是它可以允許同一個執行緒重複取得鎖定的使用權。
若以普通的 Lock 來說,如果同一個執行緒呼叫了兩次 acquire,則在呼叫第二次的時候,就會被擋住:
# 建立 Lock
lock = threading.Lock()
# 取得 Lock
lock.acquire()
# 重複取得 Lock 的時候,就被擋住!
lock.acquire()
如果想要讓同一個執行緒可以重複取得鎖定,可以改用有重複鎖定的 RLock:
# 建立 RLock
rlock = threading.RLock()
# 取得 rlock
rlock.acquire()
# 不能讓多個執行緒同時進的工作...
# 重複取得 rlock
rlock.acquire()
# 不能讓多個執行緒同時進的工作...
# 釋放 rlock
self.rlock.release()
# 不能讓多個執行緒同時進的工作...
# 再次釋放 rlock
self.rlock.release()
RLock 內部有一個計數器,當執行緒在每次呼叫 RLock 的 acquire 的時候,計數器就會遞增 1,紀錄這個鎖定被取得了幾多少次,如果呼叫了 release 時,該計數器就會遞減 1,當計數器遞減至 0 得時候,才會真正釋放鎖定,讓其他的執行緒使用,而在 RLock 的計數器還處於大於 0 的狀態時,其它的執行緒都無法取得這個鎖定的使用權。
