這裡介紹如何在 Ubuntu Linux 中安裝 RabbitMQ 這個訊息佇列,並且以範例程式說明如何使用 RabbitMQ。

RabbitMQ 是一個訊息仲介(message broker),它所做的事情就是接收訊息,然後再把訊息發送出去,就好像郵局一樣,發信者將信件交給郵差,透過郵局的郵務系統將信件送給收信人,而 RabbitMQ 跟郵局的不同點只在於它不處理實體的信件,而是處理數位化的資料。

透過 RabbitMQ 這類的訊息佇列系統,可以很容易將分散的系統整合在一起,讓各種不同的系統協同運作,以下會介紹如何安裝與使用 RabbitMQ。

安裝 RabbitMQ

RabbitMQ 的安裝方式有很多種,可以使用 Docker 直接執行:

# 以 Docker 執行 RabbitMQ 4.x
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management

若是 Ubuntu Linux 則可按照 RabbitMQ 官方的文件 的指令稿,快速用 apt 安裝:

#!/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Modern Erlang/OTP releases
##
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb1.rabbitmq.com/rabbitmq-erlang/ubuntu/noble noble main
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb2.rabbitmq.com/rabbitmq-erlang/ubuntu/noble noble main

## Latest RabbitMQ releases
##
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb1.rabbitmq.com/rabbitmq-server/ubuntu/noble noble main
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb2.rabbitmq.com/rabbitmq-server/ubuntu/noble noble main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

詳細的安裝說明可以參考RabbitMQ 官方文件

系統資源限制

由於 RabbitMQ 在實際執行時,會需要開啟大量的檔案,這個開啟檔案數量的上限在 Linux 系統上是由 ulimit -n 在設定的,大部分的系統預設值是 1024,而這個值對於 RabbitMQ 來說通常不夠高,依據 RabbitMQ 官方的說明文件建議在開發系統上設定為 4096,而在正式環境則建議設定為 65536

若要查詢目前 RabbitMQ 系統上的開啟檔案數量上限設定,可以使用 rabbitmq-diagnostics 指令:

# 檢查 RabbitMQ 狀態
rabbitmq-diagnostics status
[略]
File Descriptors

Total: 0, limit: 32671
[略]

目前大部分的 Linux 發行版系統都是採用 systemd 來控制系統服務,在 systemd 之下我們可以建立 /etc/systemd/system/rabbitmq-server.service.d/limits.conf 這個設定檔,設定檔案開啟數量上限值:

# 建立 /etc/systemd/system/rabbitmq-server.service.d 目錄
sudo mkdir -p /etc/systemd/system/rabbitmq-server.service.d

# 建立 /etc/systemd/system/rabbitmq-server.service.d/limits.conf 設定檔
sudo cat <<EOF >> /etc/systemd/system/rabbitmq-server.service.d/limits.conf
[Service]
LimitNOFILE=64000
EOF

建立好設定檔之後,重新載入設定檔,並重啟 RabbitMQ 即可讓新設定生效:

# 載入 systemd 設定檔
systemctl daemon-reload

# 重新啟動 RabbitMQ
systemctl restart rabbitmq-server

修改完成後,可以使用 rabbitmq-diagnostics 指令再檢查一次系統設定。

開始使用 RabbitMQ

在開始撰寫程式之前,要先了解關於訊息佇列的一些專有名詞,在一個訊息佇列系統中,主要有三種原件,分別為 producer、queue 與 consumer。

Producer

指訊息的發送者,以 P 表示。

producer

Queue

暫時儲存訊息的地方,它位於 RabbitMQ 內部,在儲存空間足夠的狀況下,它可以儲存任意數量的訊息,多個 producer 可以將訊息發送至同一個 queue 中,而不同的 consumer 也可以從同一個 queue 接收訊息。

queue

Consumer

指訊息的接收者,以 C 表示。

consumer

一個最簡單的 hellow world 範例就是下面這種由單一的 producer、queue 與 consumer 所組成的訊息佇列系統:

rabbitmq-hello

要實作這樣的架構,我們會需要撰寫兩個程式,分別為 producer 與 consumer,而 queue 的部分則是由 RabbitMQ 來負責,以下分別示範各種語言的實作方式。

Python 實作

由於 RabbitMQ 是使用 AMQP 這個資料傳輸協定,所以一般的程式若要跟 RabbitMQ 溝通,就要安裝支援 AMQP 的函式庫,目前幾乎任何語言都可以找到支援 AMQP 的函式庫,以 Python 來說可以使用下面這幾種:

這裡我們使用 Pika 來示範。

Python 的 pika 套件可以透過 pip 來安裝:

pip install pika

安裝好 pika 之後,就可以開始撰寫程式了。首先撰寫訊息發送者 producer 的部分,他負責產生訊息並送給 queue。

sending

以下是 send.py 這個 producer 的程式碼:

# 連接到 broker(RabbitMQ 伺服器)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

程式一開始要先指定 broker 的位置(也就是執行 RabbitMQ 伺服器的位置),這個例子中是連到 localhost,如果你的 RabbitMQ 是在不同一台機器上,這裡就換成該伺服器的 IP 位址。

在發送訊息之前,要先確認自己要使用的 queue 是存在的,如果將訊息發送至一個不存在的 queue,RabbitMQ 會直接將該訊息丟棄。這裡我們建立一個 queue,命名為 hello

# 建立一個 queue
channel.queue_declare(queue='hello')

接著發送一筆訊息到 hello 這個 queue 中,內容為 Hello World!

在 RabbitMQ 中所有的訊息在接收到之後,並不會直接放進 queue 中,中間還會經過 exchanges 的步驟來判斷該把訊息放進哪個 queue,而在這裡我們先使用預設的 exchange(空字串),讓訊息直接放進 routing_key 所指定的 queue 中:

# 發送訊息
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

訊息發送完之後,最後要記得關閉連線,以確保緩衝區的資料都有被送出(flush)。

# 關閉連線
connection.close()

如果你是第一次使用 RabbitMQ,發現執行 send.py 後沒有出現訊息已送出的輸出,那就要檢查自己的 RabbitMQ 伺服器是否有正常啟動,要找問題出在哪裡,可以檢查 /var/log/rabbitmq/ 下面的記錄檔,另外如果硬碟空間不夠(預設需要 1G 的硬碟空間)也會要造成無法啟動的狀況。

接著還要再寫一個 consumer 來接收 queue 裡面的訊息。

receiving

receive.py 這個 consumer 的程式碼一開始在連接 RabbitMQ 的部分,跟 send.py 是一樣的,然後一樣在接收訊息前要先確認 queue 是否存在:

# 建立一個 queue
channel.queue_declare(queue='hello')

這個建立 queue 的動作可以執行很多次,但同一個名字的 queue 始終只會存在一個,後來重複建立的指令都會被忽略。要這麼做主要是因為你不曉得 send.pyreceive.py 哪一個程式會先執行,所以在每次使用 queue 之前都執行這行可以確保不會使用到一個不存在的 queue。

如果你想要看看目前 RabbitMQ 伺服器中有哪些已經被建立的 queue,可以使用 rabbitmqctl 指令:

sudo rabbitmqctl list_queues

輸出為

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	1

在接收訊息的部分會比較複雜一些,這裡會需要定義一個回呼(callback)函數,每當有訊息被接收時,Pika 就會呼叫這個函數,這裡我們讓他直接將接收到的訊息輸出至螢幕上:

# 定義回呼函數
def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body,))

然後告訴 RabbitMQ 使用這個回呼函數來接收 hello 這個 queue 的訊息。

# 接收訊息
channel.basic_consume(queue='hello', on_message_callback=callback)

no_ack 這個參數目前沒有用到,在後面的教學會再解釋。

最後進入一個無窮迴圈,等待訊息。

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

以下是 send.py 的完整程式碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py 的完整程式碼則為:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body,))

channel.basic_consume(queue='hello', on_message_callback=callback)

channel.start_consuming()

接下來開始測試,首先執行 send.py

python send.py

輸出為

[x] Sent ‘Hello World!’

send.py 再送出一條訊息後就會自動結束,接著執行 receive.py

python receive.py

輸出為

[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World!’

這樣就完成了一個最基本的佇列系統!

receive.py 執行時,只要接收到新訊息就會輸出在螢幕上,這時候你可以再執行一次 send.py 測試看看是否會接收到後續的訊息。

receive.py 在執行之後就會一直等待新的訊息,按下 Ctrl + c 就可以離開。

除了 Python 之外,也可以使用 JavaPHPRuby,基本上概念都是一樣的。

繼續閱讀:以 RabbitMQ 實作工作佇列(Work Queues)(Python 版本)