本篇介紹如何在 Python 中使用 threading
模組,撰寫多執行緒的平行計算程式,利用多顆 CPU 核心加速運算。
現在電腦的 CPU 都有許多的核心,若想要讓程式可以運用多顆 CPU 核心,充分發揮硬體的運算能力,就必須考慮使用多執行緒(multithreading)或多行程(multiprocessing)等平行化的技術,以下介紹 Python 的多執行緒的程式設計方法與技巧,並提供詳細的範例程式碼。
threading
模組
在 Python 中若要撰寫多執行緒(multithreading)的平行化程式,最基本的方式是使用 threading
這個模組來建立子執行緒。
threading
是 Python 標準函式庫裡面的模組,所以不用特別安裝即可使用,雖然功能不是很多,但是基本多執行緒程式設計常用的功能它都有,對於比較單純的平行化工作來說,還算滿實用了。
建立子執行緒
以下是使用 threading
模組建立子執行緒的範例:
import threading import time # 子執行緒的工作函數 def job(): for i in range(5): print("Child thread:", i) time.sleep(1) # 建立一個子執行緒 t = threading.Thread(target = job) # 執行該子執行緒 t.start() # 主執行緒繼續執行自己的工作 for i in range(3): print("Main thread:", i) time.sleep(1) # 等待 t 這個子執行緒結束 t.join() print("Done.")
在這個例子中,我們先定義一個要讓子執行緒執行的 job
函數,接著使用 threading.Thread
建立一個新的子執行緒,其 target
參數就指定為要讓子執行緒執行的函數(也就是 job
)。
建立好新的執行緒之後,即可呼叫執行緒的 start
函數,讓它開始執行,在子執行緒執行的同時,我們還是可以在主程式中繼續處理其他的工作。
如果有些工作是要等待子執行緒執行完成後才能處理的話,可以使用執行緒的 join
函數,等待該執行緒執行結束,也就是說放在 join
之後的程式碼就會等到子執行緒執行完成後,才會接著執行。
執行後的輸出會類似這樣:
Child thread: 0 Main thread: 0 Main thread: 1 Child thread: 1 Main thread: 2 Child thread: 2 Child thread: 3 Child thread: 4 Done.
這裡的子執行緒會執行 5 秒,但是主程式中的迴圈只要 3 秒就結束了,所以主程式會在 join
的地方等待 2 秒鐘,等到子執行緒結束之後,才會輸出 Done.
這一行訊息。
多個子執行緒與參數
通常我們在撰寫平行化的程式時,都會使用多個子執行緒,並且傳入不同的參數,讓個子執行緒各自負責不同的工作,這時候就可以在建立子執行緒時,使用 args
參數指定要傳數的參數。以下是一個簡單的範例:
import threading import time # 子執行緒的工作函數 def job(num): print("Thread", num) time.sleep(1) # 建立 5 個子執行緒 threads = [] for i in range(5): threads.append(threading.Thread(target = job, args = (i,))) threads[i].start() # 主執行緒繼續執行自己的工作 # ... # 等待所有子執行緒結束 for i in range(5): threads[i].join() print("Done.")
在這個例子中,我們讓子執行緒執行的 job
函數會接受一個 num
參數,依據這個參數來決定要處理什麼工作,然後在呼叫 threading.Thread
建立子執行緒時,將要傳入的參數放在 args
參數中,這樣就可以把資料傳進子執行緒的 job
函數中了。執行之後的結果如下:
Thread 0 Thread 1 Thread 2 Thread 3 Thread 4 Done.
物件導向
我們也可以使用 Python 物件導向的方式來改寫 threading
的多執行緒程式,以下是一個簡單的範例:
import threading import time # 子執行緒類別 class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): print("Thread", self.num) time.sleep(1) # 建立 5 個子執行緒 threads = [] for i in range(5): threads.append(MyThread(i)) threads[i].start() # 主執行緒繼續執行自己的工作 # ... # 等待所有子執行緒結束 for i in range(5): threads[i].join() print("Done.")
這個範例大致上的觀念都跟前面差不多,比較需要注意的地方就是 threading.Thread
在開始執行時,會呼叫它自己的 run
方法函數,這個方法函數預設會呼叫前面我們以 target
參數所指定的函數,在這裡我們在繼承 threading.Thread
類別之後,就直接把 run
覆寫成要執行的函數即可。
這個範例的執行結果跟上一個例子相同:
Thread 0 Thread 1 Thread 2 Thread 3 Thread 4 Done.
佇列(Queue)
如果我們有許多的工作要分給多個 CPU 核心做運算,最簡單的方式就是使用佇列的方式,讓多個 CPU 可從佇列中取得尚未處理的工作來處理:
import time import threading import queue # Worker 類別,負責處理資料 class Worker(threading.Thread): def __init__(self, queue, num): threading.Thread.__init__(self) self.queue = queue self.num = num def run(self): while self.queue.qsize() > 0: # 取得新的資料 msg = self.queue.get() # 處理資料 print("Worker %d: %s" % (self.num, msg)) time.sleep(1) # 建立佇列 my_queue = queue.Queue() # 將資料放入佇列 for i in range(10): my_queue.put("Data %d" % i) # 建立兩個 Worker my_worker1 = Worker(my_queue, 1) my_worker2 = Worker(my_queue, 2) # 讓 Worker 開始處理資料 my_worker1.start() my_worker2.start() # 等待所有 Worker 結束 my_worker1.join() my_worker2.join() print("Done.")
這裡我們建立兩個 Worker
,它們都會從佇列中取得尚未處理的資料,直到佇列清空為止。執行後的結果會像這樣:
Worker 1: Data 0 Worker 2: Data 1 Worker 1: Data 2 Worker 2: Data 3 Worker 2: Data 4 Worker 1: Data 5 Worker 1: Data 6 Worker 2: Data 7 Worker 2: Data 8 Worker 1: Data 9 Done.
鎖定(Lock)
在平行化的多執行緒程式中,每個執行緒都是同時在執行的,若遇到不可以讓多個執行緒同時進行的工作時(例如將資料寫入同一個檔案),就必須使用鎖定(lock)的方式,一次只讓一個執行緒處理這種工作。
在 Python 中,我們可以使用 threading
模組的 Lock
來處理多執行緒的鎖定問題,以下是一個簡單的使用範例:
import time import threading import queue class Worker(threading.Thread): def __init__(self, queue, num, lock): threading.Thread.__init__(self) self.queue = queue self.num = num self.lock = lock def run(self): while self.queue.qsize() > 0: msg = self.queue.get() # 取得 lock self.lock.acquire() print("Lock acquired by Worker %d" % self.num) # 不能讓多個執行緒同時進的工作 print("Worker %d: %s" % (self.num, msg)) time.sleep(1) # 釋放 lock print("Lock released by Worker %d" % self.num) self.lock.release() my_queue = queue.Queue() for i in range(5): my_queue.put("Data %d" % i) # 建立 lock lock = threading.Lock() my_worker1 = Worker(my_queue, 1, lock) my_worker2 = Worker(my_queue, 2, lock) my_worker1.start() my_worker2.start() my_worker1.join() my_worker2.join() print("Done.")
在這個範例中,我們讓兩個 Worker
都從佇列中取得待處理的工作,但是我們使用一個 Lock
限制一次只允許一個 Worker
來處理工作。
當一個執行緒呼叫了 Lock
的 acquire
時,代表取得了這個 Lock
的使用權,接著它就可以往下執行裡面的工作,若此時又有另外一個執行緒想要呼叫 acquire
取得使用權的話,就必須等待上一個執行緒執行完,並呼叫 release
釋放這個 Lock
之後,才能夠取得這個 Lock
的使用權,接著執行裡面的工作。
在這種狀況下雖然兩個 Worker
是同時執行的,但是由於 Lock
的互斥作用,因此可以確保被 Lock
的 acquire
與 release
包起來的這段程式碼不會被兩個執行緒同時執行。
執行的結果如下:
Lock acquired by Worker 1 Worker 1: Data 0 Lock released by Worker 1 Lock acquired by Worker 2 Worker 2: Data 1 Lock released by Worker 2 Lock acquired by Worker 1 Worker 1: Data 2 Lock released by Worker 1 Lock acquired by Worker 2 Worker 2: Data 3 Lock released by Worker 2 Lock acquired by Worker 1 Worker 1: Data 4 Lock released by Worker 1 Done.
旗標(Semaphore)
有時候因為系統資源有限的因素(例如考量 CPU 或記憶體的限制),在處理某些特別耗資源的工作時,僅允許有限個執行緒同時進行,這個狀況跟上面介紹的鎖定(lock)有點類似,但是鎖定的方式是僅允許一個執行緒進行某項工作,而這裡我們是允許多個執行緒同時執行的,但要限制同時執行的執行緒數量上限。
旗標(semaphore)的作用跟鎖定(lock)類似,但是它多了一個計數器的功能,當一個執行緒呼叫了 acquire
時,旗標內部的計數器就會遞減 1
,而當執行緒呼叫了 release
時,計數器就會遞增 1
,當計數器遞減到 0
的時候,後面來的執行緒就要等待其他執行緒release
後才能繼續。
以下是一個簡單的範例:
import time import threading import queue class Worker(threading.Thread): def __init__(self, queue, num, semaphore): threading.Thread.__init__(self) self.queue = queue self.num = num self.semaphore = semaphore def run(self): while self.queue.qsize() > 0: msg = self.queue.get() # 取得旗標 semaphore.acquire() print("Semaphore acquired by Worker %d" % self.num) # 僅允許有限個執行緒同時進的工作 print("Worker %d: %s" % (self.num, msg)) time.sleep(1) # 釋放旗標 print("Semaphore released by Worker %d" % self.num) self.semaphore.release() my_queue = queue.Queue() for i in range(5): my_queue.put("Data %d" % i) # 建立旗標 semaphore = threading.Semaphore(2) my_worker1 = Worker(my_queue, 1, semaphore) my_worker2 = Worker(my_queue, 2, semaphore) my_worker3 = Worker(my_queue, 3, semaphore) my_worker1.start() my_worker2.start() my_worker3.start() my_worker1.join() my_worker2.join() my_worker3.join() print("Done.")
Semaphore acquired by Worker 1 Worker 1: Data 0 Semaphore acquired by Worker 2 Worker 2: Data 1 Semaphore released by Worker 1 Semaphore acquired by Worker 1 Worker 1: Data 3 Semaphore released by Worker 2 Semaphore acquired by Worker 2 Worker 2: Data 4 Semaphore released by Worker 1 Semaphore released by Worker 2 Semaphore acquired by Worker 3 Worker 3: Data 2 Semaphore released by Worker 3 Done.
重複鎖定(RLock)
RLock
是一個可重複取得使用權的鎖定功能,它跟普通的 Lock
類似,但是它可以允許同一個執行緒重複取得鎖定的使用權。
若以普通的 Lock
來說,如果同一個執行緒呼叫了兩次 acquire
,則在呼叫第二次的時候,就會被擋住:
# 建立 Lock lock = threading.Lock() # 取得 Lock lock.acquire() # 重複取得 Lock 的時候,就被擋住! lock.acquire()
如果想要讓同一個執行緒可以重複取得鎖定,可以改用有重複鎖定的 RLock
:
# 建立 RLock rlock = threading.RLock() # 取得 rlock rlock.acquire() # 不能讓多個執行緒同時進的工作... # 重複取得 rlock rlock.acquire() # 不能讓多個執行緒同時進的工作... # 釋放 rlock self.rlock.release() # 不能讓多個執行緒同時進的工作... # 再次釋放 rlock self.rlock.release()
RLock
內部有一個計數器,當執行緒在每次呼叫 RLock
的 acquire
的時候,計數器就會遞增 1
,紀錄這個鎖定被取得了幾多少次,如果呼叫了 release
時,該計數器就會遞減 1
,當計數器遞減至 0
得時候,才會真正釋放鎖定,讓其他的執行緒使用,而在 RLock
的計數器還處於大於 0
的狀態時,其它的執行緒都無法取得這個鎖定的使用權。
參考資料:Python 官方文件、tutorialspoint、chriskiehl.com、Python 官方文件、莫煩Python、Zhou’s Blog
路人鄉民
感謝分享,很實用的基礎教學!!
燒餅
真的很謝謝GT Wang大大每次都發這麼親切好讀的技術分享文,剛上完學校的作業系統課程,覺得大大的文章寫得非常清楚,很有幫助!
COSCOS
感謝提供簡單易懂的教學
另外介紹Lock的部分有問題想請教
# 取得 lock
lock.acquire()
這邊是不是應該用self.lock.acuire()才對?
不然看起來像是去呼叫全域變數了
G. T. Wang
沒錯,已修正,感謝提醒。
YuLin
不是很懂為什麼
threads.append(threading.Thread(target = job, args = (i,)))
args那邊為什麼i的後面需要多一個逗號啊~~~
heelo
這邊要傳入是 tuple ,如果沒有逗號的話, python 無法判斷(i)是一個 tuple ,而 i 是這個 tuple 第 0 個 index 的東西
F.S.
謝謝分享教學,很有幫助~~