介紹如何在 Python 中使用 OpenCV 搭配多行程(multiprocessing)的方式開啟、顯示、處理與儲存 RTSP 串流影片。
測試用 RTSP 串流
網路上有一些測試用的 RTSP 串流伺服器,可用於程式的開發與測試:
安裝 OpenCV
安裝 Python 的 OpenCV 模組:
# 安裝 Python 的 OpenCV 模組
pip3 install opencv-python
開啟與顯示 RTSP 串流影像
OpenCV 中的 VideoCapture() 函數可以用來開啟各種不同的影片來源,包含影片檔案、網路攝影機、RTSP 串流等,以下是使用 OpenCV 開啟 RTSP 串流影片的範例。
import cv2
if __name__ == '__main__':
# 開啟 RTSP 串流
vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
# 建立視窗
cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)
while True:
# 從 RTSP 串流讀取一張影像
ret, image = vidCap.read()
if ret:
# 顯示影像
cv2.imshow('image_display', image)
cv2.waitKey(10)
else:
# 若沒有影像跳出迴圈
break
# 釋放資源
vidCap.release()
# 關閉所有 OpenCV 視窗
cv2.destroyAllWindows()
上面這段 Python 指令稿執行之後,就會開啟一個 OpenCV 的視窗,顯示 RTSP 串流的影像:

多行程版本
若在 CPU 處理速度比較慢的機器上,或是遇到影格率(frame rate)較高的影片,可能會出現單一執行緒無法及時處理的問題,這時候就可以考慮使用多行程(multiprocessing)的方式,以一個行程專門收取 RTSP 串流影像,然後將收進來的影像透過佇列(queue)交給另外一個行程來進行顯示,這樣就可以改善整體程式的處理效能,比較不會發生程式處理速度跟不上影片速度的問題。
from multiprocessing import Process, Queue
import cv2
def image_display(taskqueue):
# 建立視窗
cv2.namedWindow('image_display', cv2.WINDOW_AUTOSIZE)
while True:
# 從工作佇列取得影像
image = taskqueue.get()
# 若沒有影像則終止迴圈
if image is None: break
# 顯示影像
cv2.imshow('image_display', image)
cv2.waitKey(10)
if __name__ == '__main__':
# 開啟 RTSP 串流
vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
# 建立工作佇列
taskqueue = Queue()
# 建立並執行工作行程
proc = Process(target=image_display, args=(taskqueue,))
proc.start()
while True:
# 從 RTSP 串流讀取一張影像
ret, image = vidCap.read()
if ret:
# 將影像放入工作佇列
taskqueue.put(image)
else:
# 若沒有影像跳出迴圈
break
# 傳入 None 終止工作行程
taskqueue.put(None)
# 等待工作行程結束
proc.join()
# 釋放資源
vidCap.release()
# 關閉所有 OpenCV 視窗
cv2.destroyAllWindows()
儲存 RTSP 串流影像
以下是使用 OpenCV 讀取 RTSP 串流影像之後,將其儲存成影片檔案的範例,在儲存影片時維持原影片的解析度與影格率,而編碼則採用 mp4v,靠著計算影格數的方式錄製 10 秒鐘的 RTSP 串流影像,儲存為 output.mp4:
from multiprocessing import Process, Queue
import cv2
def image_save(taskqueue, width, height, fps):
# 指定影片編碼
#fourcc = cv2.VideoWriter_fourcc(*'XVID')
#fourcc = cv2.VideoWriter_fourcc(*'H264')
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# 建立 VideoWriter 物件
writer = cv2.VideoWriter('output.mp4', fourcc, fps, (width, height))
while True:
# 從工作佇列取得影像
image = taskqueue.get()
# 若沒有影像則終止迴圈
if image is None: break
# 儲存影像
writer.write(image)
# 釋放資源
writer.release()
if __name__ == '__main__':
# 開啟 RTSP 串流
vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
# 取得影像的尺寸大小
width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 取得影格率
fps = vidCap.get(cv2.CAP_PROP_FPS)
# 建立工作佇列
taskqueue = Queue()
# 建立並執行工作行程
proc = Process(target=image_save, args=(taskqueue, width, height, fps))
proc.start()
# 計數器
frame_counter = 0
# 總錄製幀數(10 秒鐘)
total_frames = fps * 10
while frame_counter < total_frames:
# 從 RTSP 串流讀取一張影像
ret, image = vidCap.read()
if ret:
# 將影像放入工作佇列
taskqueue.put(image)
frame_counter += 1
else:
# 若沒有影像跳出迴圈
break
# 傳入 None 終止工作行程
taskqueue.put(None)
# 等待工作行程結束
proc.join()
# 釋放資源
vidCap.release()
儲存長時間串流影片
若需要儲存長時間的 RTSP 串流影片,可以利用以下範例,將串流切成固定長度的影片,依照編號或時間戳記來命名儲存的影片檔案,方便後續處理。
from multiprocessing import Process, Queue
import cv2
from datetime import datetime
def image_save(taskqueue, width, height, fps, frames_per_file):
# 指定影片編碼
#fourcc = cv2.VideoWriter_fourcc(*'XVID')
#fourcc = cv2.VideoWriter_fourcc(*'H264')
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = None
while True:
# 從工作佇列取得影像
image, frame_counter = taskqueue.get()
# 若沒有影像則終止迴圈
if image is None: break
if frame_counter % frames_per_file == 0:
if writer: writer.release()
# 建立 VideoWriter 物件(以數字編號)
# index = int(frame_counter // frames_per_file)
# writer = cv2.VideoWriter(f'output-{index}.mp4', fourcc, fps, (width, height))
# 建立 VideoWriter 物件(以時間命名)
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
writer = cv2.VideoWriter(f'output-{timestamp}.mp4', fourcc, fps, (width, height))
# 儲存影像
writer.write(image)
# 釋放資源
writer.release()
if __name__ == '__main__':
# 開啟 RTSP 串流
vidCap = cv2.VideoCapture('rtsp://ipcam.stream:8554/bars')
# 取得影像的尺寸大小
width = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 取得影格率
fps = vidCap.get(cv2.CAP_PROP_FPS)
# 建立工作佇列
taskqueue = Queue()
# 計數器
frame_counter = 0
# 總錄製幀數(30 秒鐘)
total_frames = fps * 30
# 每個檔案的幀數(10 秒鐘)
frames_per_file = fps * 10
# 建立並執行工作行程
proc = Process(target=image_save, args=(taskqueue, width, height, fps, frames_per_file))
proc.start()
while frame_counter < total_frames:
# 從 RTSP 串流讀取一張影像
ret, image = vidCap.read()
if ret:
# 將影像放入工作佇列
taskqueue.put((image, frame_counter))
frame_counter += 1
else:
# 若沒有影像跳出迴圈
break
# 傳入 None 終止工作行程
taskqueue.put((None, None))
# 等待工作行程結束
proc.join()
# 釋放資源
vidCap.release()
這裡為了方便起見,檔案名稱用的時間戳記是放在工作行程中產生的,所以時間可能會有一些誤差,如果需要非常精確的時間,就要將時間戳記改為接收串流時同時取得時間。
錄製機器作動影片
這裡我的應用場景是將一台網路攝影機架設在工廠內,鏡頭對準特定的機台,然後我希望透過 RTSP 串流影片監看機台的狀況,當機台有動作時自動將作動過程的影片錄製下來,以利後續的 AI 分析,以下是我在開發過程中所用的基本架構範例。
from multiprocessing import Process, Queue
import cv2
from datetime import datetime
import numpy as np
# 開發模式
DEV_MODE = False
def printLogMsg(msg):
now = datetime.now()
timestamp = now.strftime("%Y/%m/%d %H:%M:%S")
print("[{}] {}".format(timestamp, msg))
def image_save(taskqueue, width, height, fps):
# 指定影片編碼
fourcc = cv2.VideoWriter_fourcc(*'XVID')
#fourcc = cv2.VideoWriter_fourcc(*'H264')
#fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = None
while True:
# 從工作佇列取得影像
image, frameCounter = taskqueue.get()
# 若沒有影像則終止迴圈
if image is None: break
if frameCounter == 0:
if writer: writer.release()
# 建立 VideoWriter 物件(以時間命名)
now = datetime.now()
timestamp = now.strftime("%Y%m%d-%H%M%S")
writer = cv2.VideoWriter(f'test-output-{timestamp}.avi', fourcc, fps, (width, height))
# 儲存影像
writer.write(image)
# 釋放資源
if writer: writer.release()
def captureRTSP(src):
# 開啟 RTSP 串流
vidCap = cv2.VideoCapture(src)
# 取得影像的尺寸大小
imgWidth = int(vidCap.get(cv2.CAP_PROP_FRAME_WIDTH))
imgHeight = int(vidCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
printLogMsg(f"Size: {imgWidth} x {imgHeight}")
# 取得影格率
fps = vidCap.get(cv2.CAP_PROP_FPS)
printLogMsg(f"FPS: {fps}")
# 建立工作佇列
taskqueue = Queue()
# 目標影像 FPS
desired_fps = 15
# 建立並執行工作行程
proc = Process(target=image_save, args=(taskqueue, imgWidth, imgHeight, desired_fps))
proc.start()
avgImage = None
avgImageFloat = None
# 計數器
frameCounter = 0
while vidCap.isOpened():
# 從 RTSP 串流讀取一張影像
ret, image = vidCap.read()
if ret:
# 忽略黑白影像(紅外線)
if np.array_equal(image[:,:,], image[:,:,1]) and np.array_equal(image[:,:,], image[:,:,2]):
avgImage = None
avgImageFloat = None
if frameCounter:
printLogMsg(f"Stop recording (frame counter = {frameCounter})")
frameCounter = 0
continue
# 取出中央部分影像,作為變動依據
cropImage = image[(imgHeight//4):(imgHeight//4*3), (imgWidth//8*3):(imgWidth//8*5)]
centerImage = cv2.resize(cropImage, (imgHeight//4, imgWidth//8))
centerArea = imgHeight//4 * imgWidth//8
# 初始化平均影像
if avgImage is None:
avgImage = cv2.blur(centerImage, (4, 4))
avgImageFloat = np.float32(avgImage)
printLogMsg("avgImage initialization")
continue
# 模糊處理
blurImage = cv2.blur(centerImage, (4, 4))
# 計算目前影格與平均影像的差異值
diffImage = cv2.absdiff(avgImage, blurImage)
# 將圖片轉為灰階
grayImage = cv2.cvtColor(diffImage, cv2.COLOR_BGR2GRAY)
# 篩選出變動程度大於門檻值的區域
ret, binImage = cv2.threshold(grayImage, 5, 255, cv2.THRESH_BINARY)
# 使用型態轉換函數去除雜訊
kernel = np.ones((5, 5), np.uint8)
binImage = cv2.morphologyEx(binImage, cv2.MORPH_OPEN, kernel, iterations=2)
binImage = cv2.morphologyEx(binImage, cv2.MORPH_CLOSE, kernel, iterations=2)
# 產生等高線
countours, _ = cv2.findContours(binImage, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
detected = False
for c in countours:
if cv2.contourArea(c) > centerArea / 5:
detected = True
if DEV_MODE:
# 計算等高線的外框範圍
(x, y, w, h) = cv2.boundingRect(c)
# 畫出外框
cv2.rectangle(centerImage, (x, y), (x + w, y + h), (0, 255, 0), 2)
else:
break
if DEV_MODE:
# 顯示偵測結果影像
cv2.imshow('image', centerImage)
cv2.imshow('avgImage', avgImage)
cv2.imshow('blurImage', blurImage)
if cv2.waitKey(1) & 0xFF == ord('q'):
# 傳入 None 終止工作行程
taskqueue.put((None, None))
break
# 更新平均影像
cv2.accumulateWeighted(blurImage, avgImageFloat, 0.1)
avgImage = cv2.convertScaleAbs(avgImageFloat)
# 將影像放入工作佇列
if detected:
if DEV_MODE == False:
taskqueue.put((image, frameCounter))
if frameCounter == 0:
printLogMsg("Start recording")
frameCounter += 1
else:
if frameCounter:
printLogMsg(f"Stop recording (frame counter = {frameCounter})")
frameCounter = 0
continue
else:
# 若沒有影像跳出迴圈
printLogMsg("no frame")
break
# 傳入 None 終止工作行程
taskqueue.put((None, None))
# 等待工作行程結束
proc.join()
# 釋放資源
vidCap.release()
# 關閉所有 OpenCV 視窗
cv2.destroyAllWindows()
if __name__ == '__main__':
captureRTSP('rtsp://ipcam.stream:8554/bars')
在現場的網路攝影機是 24 小時全天運作的,但是機器作動的頻率很低,大部分的時間都是處於閒置狀態,而在機器閒置的時間,現場的燈也都是關閉的。正常開燈的時候網路攝影機使用一般的攝像頭,對應的影格率(FPS)為 15,但當網路攝影機遇到關燈的狀態時,會自動遷換為紅外線攝像頭,影像雖然還是 RGB 格式,但是色彩會轉為灰階,而此時對應的影格率(FPS)就會降為 10。
由於機器只會在開燈的時候作動,所以我在程式中加上判斷影像色彩的條件,若整張影像的 RGB 三個 channels 都完全有相同的值,就判定為紅外線影像,將其直接忽略,減少不必要的計算。
當取得串流影像之後,我們將畫面中央的部分取出來,並降低解析度(節省計算量),參考 Python 與 OpenCV 實作移動偵測程式教學,偵測影像中央是否有變動,當出現影像大幅度變動時,就將串流影片儲存下來。
另外為了方便開發與測試,我們靠著 DEV_MODE 變數設定開發模式,當程式處於開發模式的時候,會開啟監看視窗,顯示幾種關鍵的影像內容,同時開發模式也不會儲存任何影像。
在儲存影片檔案時,會自動將獨立的影片片段分開儲存,檔案名稱則自動以開始錄製的時間來命名,影片的解析度、影格率都維持跟來源影片相同。
