本篇介紹如何在 Python 中使用 threading 模組,撰寫多執行緒的平行計算程式,利用多顆 CPU 核心加速運算。

現在電腦的 CPU 都有許多的核心,若想要讓程式可以運用多顆 CPU 核心,充分發揮硬體的運算能力,就必須考慮使用多執行緒(multithreading)或多行程(multiprocessing)等平行化的技術,以下介紹 Python 的多執行緒的程式設計方法與技巧,並提供詳細的範例程式碼。

由於 CPython 的 GIL(Global Interpreter Lock)限制,可能會造成大部分的 Python 程式無法以多執行緒發揮多核心 CPU 的效能,若遇到這樣的狀況,可以考慮改用多行程的方式來設計程式。

threading 模組

在 Python 中若要撰寫多執行緒(multithreading)的平行化程式,最基本的方式是使用 threading 這個模組來建立子執行緒。

threading 是 Python 標準函式庫裡面的模組,所以不用特別安裝即可使用,雖然功能不是很多,但是基本多執行緒程式設計常用的功能它都有,對於比較單純的平行化工作來說,還算滿實用了。

建立子執行緒

以下是使用 threading 模組建立子執行緒的範例:

import threading
import time

# 子執行緒的工作函數
def job():
  for i in range(5):
    print("Child thread:", i)
    time.sleep(1)

# 建立一個子執行緒
t = threading.Thread(target = job)

# 執行該子執行緒
t.start()

# 主執行緒繼續執行自己的工作
for i in range(3):
  print("Main thread:", i)
  time.sleep(1)

# 等待 t 這個子執行緒結束
t.join()

print("Done.")

在這個例子中,我們先定義一個要讓子執行緒執行的 job 函數,接著使用 threading.Thread 建立一個新的子執行緒,其 target 參數就指定為要讓子執行緒執行的函數(也就是 job)。

建立好新的執行緒之後,即可呼叫執行緒的 start 函數,讓它開始執行,在子執行緒執行的同時,我們還是可以在主程式中繼續處理其他的工作。

如果有些工作是要等待子執行緒執行完成後才能處理的話,可以使用執行緒的 join 函數,等待該執行緒執行結束,也就是說放在 join 之後的程式碼就會等到子執行緒執行完成後,才會接著執行。

執行後的輸出會類似這樣:

Child thread: 0
Main thread: 0
Main thread: 1
Child thread: 1
Main thread: 2
Child thread: 2
Child thread: 3
Child thread: 4
Done.

這裡的子執行緒會執行 5 秒,但是主程式中的迴圈只要 3 秒就結束了,所以主程式會在 join 的地方等待 2 秒鐘,等到子執行緒結束之後,才會輸出 Done. 這一行訊息。

多個子執行緒與參數

通常我們在撰寫平行化的程式時,都會使用多個子執行緒,並且傳入不同的參數,讓個子執行緒各自負責不同的工作,這時候就可以在建立子執行緒時,使用 args 參數指定要傳數的參數。以下是一個簡單的範例:

import threading
import time

# 子執行緒的工作函數
def job(num):
  print("Thread", num)
  time.sleep(1)

# 建立 5 個子執行緒
threads = []
for i in range(5):
  threads.append(threading.Thread(target = job, args = (i,)))
  threads[i].start()

# 主執行緒繼續執行自己的工作
# ...

# 等待所有子執行緒結束
for i in range(5):
  threads[i].join()

print("Done.")

在這個例子中,我們讓子執行緒執行的 job 函數會接受一個 num 參數,依據這個參數來決定要處理什麼工作,然後在呼叫 threading.Thread 建立子執行緒時,將要傳入的參數放在 args 參數中,這樣就可以把資料傳進子執行緒的 job 函數中了。執行之後的結果如下:

Thread 0
Thread 1
Thread 2
Thread 3
Thread 4
Done.

物件導向

我們也可以使用 Python 物件導向的方式來改寫 threading 的多執行緒程式,以下是一個簡單的範例:

import threading
import time

# 子執行緒類別
class MyThread(threading.Thread):
  def __init__(self, num):
    threading.Thread.__init__(self)
    self.num = num

  def run(self):
    print("Thread", self.num)
    time.sleep(1)

# 建立 5 個子執行緒
threads = []
for i in range(5):
  threads.append(MyThread(i))
  threads[i].start()

# 主執行緒繼續執行自己的工作
# ...

# 等待所有子執行緒結束
for i in range(5):
  threads[i].join()

print("Done.")

這個範例大致上的觀念都跟前面差不多,比較需要注意的地方就是 threading.Thread 在開始執行時,會呼叫它自己的 run 方法函數,這個方法函數預設會呼叫前面我們以 target 參數所指定的函數,在這裡我們在繼承 threading.Thread 類別之後,就直接把 run 覆寫成要執行的函數即可。

這個範例的執行結果跟上一個例子相同:

Thread 0
Thread 1
Thread 2
Thread 3
Thread 4
Done.

佇列(Queue)

如果我們有許多的工作要分給多個 CPU 核心做運算,最簡單的方式就是使用佇列的方式,讓多個 CPU 可從佇列中取得尚未處理的工作來處理:

import time
import threading
import queue

# Worker 類別,負責處理資料
class Worker(threading.Thread):
  def __init__(self, queue, num):
    threading.Thread.__init__(self)
    self.queue = queue
    self.num = num

  def run(self):
    while self.queue.qsize() > 0:
      # 取得新的資料
      msg = self.queue.get()

      # 處理資料
      print("Worker %d: %s" % (self.num, msg))
      time.sleep(1)

# 建立佇列
my_queue = queue.Queue()

# 將資料放入佇列
for i in range(10):
  my_queue.put("Data %d" % i)

# 建立兩個 Worker
my_worker1 = Worker(my_queue, 1)
my_worker2 = Worker(my_queue, 2)

# 讓 Worker 開始處理資料
my_worker1.start()
my_worker2.start()

# 等待所有 Worker 結束
my_worker1.join()
my_worker2.join()

print("Done.")

這裡我們建立兩個 Worker,它們都會從佇列中取得尚未處理的資料,直到佇列清空為止。執行後的結果會像這樣:

Worker 1: Data 0
Worker 2: Data 1
Worker 1: Data 2
Worker 2: Data 3
Worker 2: Data 4
Worker 1: Data 5
Worker 1: Data 6
Worker 2: Data 7
Worker 2: Data 8
Worker 1: Data 9
Done.

鎖定(Lock)

在平行化的多執行緒程式中,每個執行緒都是同時在執行的,若遇到不可以讓多個執行緒同時進行的工作時(例如將資料寫入同一個檔案),就必須使用鎖定(lock)的方式,一次只讓一個執行緒處理這種工作。

在 Python 中,我們可以使用 threading 模組的 Lock 來處理多執行緒的鎖定問題,以下是一個簡單的使用範例:

import time
import threading
import queue

class Worker(threading.Thread):
  def __init__(self, queue, num, lock):
    threading.Thread.__init__(self)
    self.queue = queue
    self.num = num
    self.lock = lock

  def run(self):
    while self.queue.qsize() > 0:
      msg = self.queue.get()

      # 取得 lock
      lock.acquire()
      print("Lock acquired by Worker %d" % self.num)

      # 不能讓多個執行緒同時進的工作
      print("Worker %d: %s" % (self.num, msg))
      time.sleep(1)

      # 釋放 lock
      print("Lock released by Worker %d" % self.num)
      self.lock.release()

my_queue = queue.Queue()
for i in range(5):
  my_queue.put("Data %d" % i)

# 建立 lock
lock = threading.Lock()

my_worker1 = Worker(my_queue, 1, lock)
my_worker2 = Worker(my_queue, 2, lock)

my_worker1.start()
my_worker2.start()

my_worker1.join()
my_worker2.join()

print("Done.")

在這個範例中,我們讓兩個 Worker 都從佇列中取得待處理的工作,但是我們使用一個 Lock 限制一次只允許一個 Worker 來處理工作。

當一個執行緒呼叫了 Lockacquire 時,代表取得了這個 Lock 的使用權,接著它就可以往下執行裡面的工作,若此時又有另外一個執行緒想要呼叫 acquire 取得使用權的話,就必須等待上一個執行緒執行完,並呼叫 release 釋放這個 Lock 之後,才能夠取得這個 Lock 的使用權,接著執行裡面的工作。

在這種狀況下雖然兩個 Worker 是同時執行的,但是由於 Lock 的互斥作用,因此可以確保被 Lockacquirerelease 包起來的這段程式碼不會被兩個執行緒同時執行。

執行的結果如下:

Lock acquired by Worker 1
Worker 1: Data 0
Lock released by Worker 1
Lock acquired by Worker 2
Worker 2: Data 1
Lock released by Worker 2
Lock acquired by Worker 1
Worker 1: Data 2
Lock released by Worker 1
Lock acquired by Worker 2
Worker 2: Data 3
Lock released by Worker 2
Lock acquired by Worker 1
Worker 1: Data 4
Lock released by Worker 1
Done.

旗標(Semaphore)

有時候因為系統資源有限的因素(例如考量 CPU 或記憶體的限制),在處理某些特別耗資源的工作時,僅允許有限個執行緒同時進行,這個狀況跟上面介紹的鎖定(lock)有點類似,但是鎖定的方式是僅允許一個執行緒進行某項工作,而這裡我們是允許多個執行緒同時執行的,但要限制同時執行的執行緒數量上限。

旗標(semaphore)的作用跟鎖定(lock)類似,但是它多了一個計數器的功能,當一個執行緒呼叫了 acquire 時,旗標內部的計數器就會遞減 1,而當執行緒呼叫了 release 時,計數器就會遞增 1,當計數器遞減到 0 的時候,後面來的執行緒就要等待其他執行緒release 後才能繼續。

以下是一個簡單的範例:

import time
import threading
import queue

class Worker(threading.Thread):
  def __init__(self, queue, num, semaphore):
    threading.Thread.__init__(self)
    self.queue = queue
    self.num = num
    self.semaphore = semaphore

  def run(self):
    while self.queue.qsize() > 0:
      msg = self.queue.get()

      # 取得旗標
      semaphore.acquire()
      print("Semaphore acquired by Worker %d" % self.num)

      # 僅允許有限個執行緒同時進的工作
      print("Worker %d: %s" % (self.num, msg))
      time.sleep(1)

      # 釋放旗標
      print("Semaphore released by Worker %d" % self.num)
      self.semaphore.release()

my_queue = queue.Queue()
for i in range(5):
  my_queue.put("Data %d" % i)

# 建立旗標
semaphore = threading.Semaphore(2)

my_worker1 = Worker(my_queue, 1, semaphore)
my_worker2 = Worker(my_queue, 2, semaphore)
my_worker3 = Worker(my_queue, 3, semaphore)

my_worker1.start()
my_worker2.start()
my_worker3.start()

my_worker1.join()
my_worker2.join()
my_worker3.join()

print("Done.")
Semaphore acquired by Worker 1
Worker 1: Data 0
Semaphore acquired by Worker 2
Worker 2: Data 1
Semaphore released by Worker 1
Semaphore acquired by Worker 1
Worker 1: Data 3
Semaphore released by Worker 2
Semaphore acquired by Worker 2
Worker 2: Data 4
Semaphore released by Worker 1
Semaphore released by Worker 2
Semaphore acquired by Worker 3
Worker 3: Data 2
Semaphore released by Worker 3
Done.

重複鎖定(RLock)

RLock 是一個可重複取得使用權的鎖定功能,它跟普通的 Lock 類似,但是它可以允許同一個執行緒重複取得鎖定的使用權。

若以普通的 Lock 來說,如果同一個執行緒呼叫了兩次 acquire,則在呼叫第二次的時候,就會被擋住:

# 建立 Lock
lock = threading.Lock()

# 取得 Lock
lock.acquire()

# 重複取得 Lock 的時候,就被擋住!
lock.acquire()

如果想要讓同一個執行緒可以重複取得鎖定,可以改用有重複鎖定的 RLock

# 建立 RLock
rlock = threading.RLock()

# 取得 rlock
rlock.acquire()

# 不能讓多個執行緒同時進的工作...

# 重複取得 rlock
rlock.acquire()

# 不能讓多個執行緒同時進的工作...

# 釋放 rlock
self.rlock.release()

# 不能讓多個執行緒同時進的工作...

# 再次釋放 rlock
self.rlock.release()

RLock 內部有一個計數器,當執行緒在每次呼叫 RLockacquire 的時候,計數器就會遞增 1,紀錄這個鎖定被取得了幾多少次,如果呼叫了 release 時,該計數器就會遞減 1,當計數器遞減至 0 得時候,才會真正釋放鎖定,讓其他的執行緒使用,而在 RLock 的計數器還處於大於 0 的狀態時,其它的執行緒都無法取得這個鎖定的使用權。

參考資料:Python 官方文件tutorialspointchriskiehl.comPython 官方文件莫煩PythonZhou’s Blog