這裡介紹如何在 Ubuntu Linux 中安裝 RabbitMQ 這個訊息佇列,並且以範例程式說明如何使用 RabbitMQ。
RabbitMQ 是一個訊息仲介(message broker),它所做的事情就是接收訊息,然後再把訊息發送出去,就好像郵局一樣,發信者將信件交給郵差,透過郵局的郵務系統將信件送給收信人,而 RabbitMQ 跟郵局的不同點只在於它不處理實體的信件,而是處理數位化的資料。
透過 RabbitMQ 這類的訊息佇列系統,可以很容易將分散的系統整合在一起,讓各種不同的系統協同運作,以下會介紹如何安裝與使用 RabbitMQ。
安裝 RabbitMQ
RabbitMQ 的安裝方式有很多種,可以使用 Docker 直接執行:
# 以 Docker 執行 RabbitMQ 4.x
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management
若是 Ubuntu Linux 則可按照 RabbitMQ 官方的文件 的指令稿,快速用 apt 安裝:
#!/bin/sh
sudo apt-get install curl gnupg apt-transport-https -y
## Team RabbitMQ's signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Modern Erlang/OTP releases
##
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb1.rabbitmq.com/rabbitmq-erlang/ubuntu/noble noble main
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb2.rabbitmq.com/rabbitmq-erlang/ubuntu/noble noble main
## Latest RabbitMQ releases
##
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb1.rabbitmq.com/rabbitmq-server/ubuntu/noble noble main
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://deb2.rabbitmq.com/rabbitmq-server/ubuntu/noble noble main
EOF
## Update package indices
sudo apt-get update -y
## Install Erlang packages
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing
詳細的安裝說明可以參考RabbitMQ 官方文件。
系統資源限制
由於 RabbitMQ 在實際執行時,會需要開啟大量的檔案,這個開啟檔案數量的上限在 Linux 系統上是由 ulimit -n 在設定的,大部分的系統預設值是 1024,而這個值對於 RabbitMQ 來說通常不夠高,依據 RabbitMQ 官方的說明文件建議在開發系統上設定為 4096,而在正式環境則建議設定為 65536。
若要查詢目前 RabbitMQ 系統上的開啟檔案數量上限設定,可以使用 rabbitmq-diagnostics 指令:
# 檢查 RabbitMQ 狀態
rabbitmq-diagnostics status
[略] File Descriptors Total: 0, limit: 32671 [略]
目前大部分的 Linux 發行版系統都是採用 systemd 來控制系統服務,在 systemd 之下我們可以建立 /etc/systemd/system/rabbitmq-server.service.d/limits.conf 這個設定檔,設定檔案開啟數量上限值:
# 建立 /etc/systemd/system/rabbitmq-server.service.d 目錄
sudo mkdir -p /etc/systemd/system/rabbitmq-server.service.d
# 建立 /etc/systemd/system/rabbitmq-server.service.d/limits.conf 設定檔
sudo cat <<EOF >> /etc/systemd/system/rabbitmq-server.service.d/limits.conf
[Service]
LimitNOFILE=64000
EOF
建立好設定檔之後,重新載入設定檔,並重啟 RabbitMQ 即可讓新設定生效:
# 載入 systemd 設定檔
systemctl daemon-reload
# 重新啟動 RabbitMQ
systemctl restart rabbitmq-server
修改完成後,可以使用 rabbitmq-diagnostics 指令再檢查一次系統設定。
開始使用 RabbitMQ
在開始撰寫程式之前,要先了解關於訊息佇列的一些專有名詞,在一個訊息佇列系統中,主要有三種原件,分別為 producer、queue 與 consumer。
Producer
指訊息的發送者,以 P 表示。

Queue
暫時儲存訊息的地方,它位於 RabbitMQ 內部,在儲存空間足夠的狀況下,它可以儲存任意數量的訊息,多個 producer 可以將訊息發送至同一個 queue 中,而不同的 consumer 也可以從同一個 queue 接收訊息。

Consumer
指訊息的接收者,以 C 表示。

一個最簡單的 hellow world 範例就是下面這種由單一的 producer、queue 與 consumer 所組成的訊息佇列系統:

要實作這樣的架構,我們會需要撰寫兩個程式,分別為 producer 與 consumer,而 queue 的部分則是由 RabbitMQ 來負責,以下分別示範各種語言的實作方式。
Python 實作
由於 RabbitMQ 是使用 AMQP 這個資料傳輸協定,所以一般的程式若要跟 RabbitMQ 溝通,就要安裝支援 AMQP 的函式庫,目前幾乎任何語言都可以找到支援 AMQP 的函式庫,以 Python 來說可以使用下面這幾種:
這裡我們使用 Pika 來示範。
Python 的 pika 套件可以透過 pip 來安裝:
pip install pika
安裝好 pika 之後,就可以開始撰寫程式了。首先撰寫訊息發送者 producer 的部分,他負責產生訊息並送給 queue。

以下是 send.py 這個 producer 的程式碼:
# 連接到 broker(RabbitMQ 伺服器)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
程式一開始要先指定 broker 的位置(也就是執行 RabbitMQ 伺服器的位置),這個例子中是連到 localhost,如果你的 RabbitMQ 是在不同一台機器上,這裡就換成該伺服器的 IP 位址。
在發送訊息之前,要先確認自己要使用的 queue 是存在的,如果將訊息發送至一個不存在的 queue,RabbitMQ 會直接將該訊息丟棄。這裡我們建立一個 queue,命名為 hello:
# 建立一個 queue
channel.queue_declare(queue='hello')
接著發送一筆訊息到 hello 這個 queue 中,內容為 Hello World!。
在 RabbitMQ 中所有的訊息在接收到之後,並不會直接放進 queue 中,中間還會經過 exchanges 的步驟來判斷該把訊息放進哪個 queue,而在這裡我們先使用預設的 exchange(空字串),讓訊息直接放進 routing_key 所指定的 queue 中:
# 發送訊息
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
訊息發送完之後,最後要記得關閉連線,以確保緩衝區的資料都有被送出(flush)。
# 關閉連線
connection.close()
如果你是第一次使用 RabbitMQ,發現執行 send.py 後沒有出現訊息已送出的輸出,那就要檢查自己的 RabbitMQ 伺服器是否有正常啟動,要找問題出在哪裡,可以檢查 /var/log/rabbitmq/ 下面的記錄檔,另外如果硬碟空間不夠(預設需要 1G 的硬碟空間)也會要造成無法啟動的狀況。
接著還要再寫一個 consumer 來接收 queue 裡面的訊息。

receive.py 這個 consumer 的程式碼一開始在連接 RabbitMQ 的部分,跟 send.py 是一樣的,然後一樣在接收訊息前要先確認 queue 是否存在:
# 建立一個 queue
channel.queue_declare(queue='hello')
這個建立 queue 的動作可以執行很多次,但同一個名字的 queue 始終只會存在一個,後來重複建立的指令都會被忽略。要這麼做主要是因為你不曉得 send.py 與 receive.py 哪一個程式會先執行,所以在每次使用 queue 之前都執行這行可以確保不會使用到一個不存在的 queue。
如果你想要看看目前 RabbitMQ 伺服器中有哪些已經被建立的 queue,可以使用 rabbitmqctl 指令:
sudo rabbitmqctl list_queues
輸出為
Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages hello 1
在接收訊息的部分會比較複雜一些,這裡會需要定義一個回呼(callback)函數,每當有訊息被接收時,Pika 就會呼叫這個函數,這裡我們讓他直接將接收到的訊息輸出至螢幕上:
# 定義回呼函數
def callback(ch, method, properties, body):
print(" [x] Received %r" % (body,))
然後告訴 RabbitMQ 使用這個回呼函數來接收 hello 這個 queue 的訊息。
# 接收訊息
channel.basic_consume(queue='hello', on_message_callback=callback)
no_ack 這個參數目前沒有用到,在後面的教學會再解釋。
最後進入一個無窮迴圈,等待訊息。
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
以下是 send.py 的完整程式碼:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
而 receive.py 的完整程式碼則為:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % (body,))
channel.basic_consume(queue='hello', on_message_callback=callback)
channel.start_consuming()
接下來開始測試,首先執行 send.py:
python send.py
輸出為
[x] Sent ‘Hello World!’
send.py 再送出一條訊息後就會自動結束,接著執行 receive.py:
python receive.py
輸出為
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World!’
這樣就完成了一個最基本的佇列系統!
在 receive.py 執行時,只要接收到新訊息就會輸出在螢幕上,這時候你可以再執行一次 send.py 測試看看是否會接收到後續的訊息。
receive.py 在執行之後就會一直等待新的訊息,按下 Ctrl + c 就可以離開。
