在 Ubuntu Linux 中安裝與使用 RabbitMQ 訊息佇列

這裡介紹如何在 Ubuntu Linux 中安裝 RabbitMQ 這個訊息佇列,並且以範例程式說明如何使用 RabbitMQ。

RabbitMQ 是一個訊息仲介(message broker),它所做的事情就是接收訊息,然後再把訊息發送出去,就好像郵局一樣,發信者將信件交給郵差,透過郵局的郵務系統將信件送給收信人,而 RabbitMQ 跟郵局的不同點只在於它不處理實體的信件,而是處理數位化的資料。


透過 RabbitMQ 這類的訊息佇列系統,可以很容易將分散的系統整合在一起,讓各種不同的系統協同運作,以下會介紹如何安裝與使用 RabbitMQ。

安裝 RabbitMQ

在 Ubuntu Linux 中可使用 apt 安裝 RabbitMQ:

sudo apt-get install rabbitmq-server

如果安裝沒問題,在安裝結束後會自動啟動 rabbitmq-server:

[略]
Setting up rabbitmq-server (2.7.1-0ubuntu4) …
Adding group `rabbitmq’ (GID 128) …
Done.
Adding system user `rabbitmq’ (UID 120) …
Adding new user `rabbitmq’ (UID 120) with group `rabbitmq’ …
Not creating home directory `/var/lib/rabbitmq’.
Starting rabbitmq-server: SUCCESS
rabbitmq-server.
Processing triggers for libc-bin …
ldconfig deferred processing now taking place

這樣就安裝好了。

不過通常 Ubuntu 官方套件庫中的 RabbitMQ 版本比較舊,如果要安裝最新版的,可以使用 RabbitMQ 官方提供的套件庫,將下面這行加入 /etc/apt/sources.list

deb http://www.rabbitmq.com/debian/ testing main

這個套件庫適用於所有 Debian 系列的 Linux,所以 Ubuntu 也適用。

如果要避免套件沒有簽署的警告,可以將 RabbitMQ 的金鑰先匯入:

wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo apt-key add rabbitmq-signing-key-public.asc

然後再用 apt 安裝:

sudo apt-get update
sudo apt-get install rabbitmq-server

接著編輯 /etc/default/rabbitmq-server 這個設定檔,調整 ulimit 的設定::

ulimit -n 1024

這個是用來限制 RabbitMQ 開啟的檔案數量,避免開啟的檔案太多,影響整個系統,請依照自己的狀況調整。

若要控制 RabbitMQ 系統服務,可以使用 rabbitmqctl 這個指令,例如:

rabbitmqctl stop

就會停止 RabbitMQ 服務。其餘可用的參數可參考 rabbitmqctl 的 man page。

man rabbitmqctl

RabbitMQ 的記錄檔預設會放在 /var/log/rabbitmq/ 這個目錄下,若要設定 logrotate 可以修改 /etc/logrotate.d/rabbitmq-server 這個設定檔。

開始使用 RabbitMQ

在開始撰寫程式之前,要先了解關於訊息佇列的一些專有名詞,在一個訊息佇列系統中,主要有三種原件,分別為 producer、queue 與 consumer。

Producer

指訊息的發送者,以 P 表示。

producer

Queue

暫時儲存訊息的地方,它位於 RabbitMQ 內部,在儲存空間足夠的狀況下,它可以儲存任意數量的訊息,多個 producer 可以將訊息發送至同一個 queue 中,而不同的 consumer 也可以從同一個 queue 接收訊息。

queue

consumer

指訊息的接收者,以 C 表示。

consumer

一個最簡單的 hellow world 範例就是下面這種由單一的 producer、queue 與 consumer 所組成的訊息佇列系統:

rabbitmq-hello

要實作這樣的架構,我們會需要撰寫兩個程式,分別為 producer 與 consumer,而 queue 的部分則是由 RabbitMQ 來負責,以下分別示範各種語言的實作方式。

Python

由於 RabbitMQ 是使用 AMQP 這個資料傳輸協定,所以一般的程式若要跟 RabbitMQ 溝通,就要安裝支援 AMQP 的函式庫,目前幾乎任何語言都可以找到支援 AMQP 的函式庫,以 Python 來說可以使用下面這幾種:

這裡我們使用 Pika 來示範。

首先先安裝 pipgit-core 這兩個套件,然後再使用 pip 安裝 pika。在 Ubuntu 中可使用:

sudo apt-get install python-pip git-core
sudo pip install pika==0.9.8

Debian 則使用:

sudo apt-get install python-setuptools git-core
sudo easy_install pip
sudo pip install pika==0.9.8

Windows 則使用:

easy_install pip
pip install pika==0.9.8

安裝好 pika 之後,就可以開始撰寫程式了。首先撰寫訊息發送者 producer 的部分,他負責產生訊息並送給 queue。

sending

以下是 send.py 這個 producer 的程式碼:

# 連接到 broker(RabbitMQ 伺服器)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

程式一開始要先指定 broker 的位置(也就是執行 RabbitMQ 伺服器的位置),這個例子中是連到 localhost,如果你的 RabbitMQ 是在不同一台機器上,這裡就換成該伺服器的 IP 位址。

在發送訊息之前,要先確認自己要使用的 queue 是存在的,如果將訊息發送至一個不存在的 queue,RabbitMQ 會直接將該訊息丟棄。這裡我們建立一個 queue,命名為 hello

# 建立一個 queue
channel.queue_declare(queue='hello')

接著發送一筆訊息到 hello 這個 queue 中,內容為 Hello World!

在 RabbitMQ 中所有的訊息在接收到之後,並不會直接放進 queue 中,中間還會經過 exchanges 的步驟來判斷該把訊息放進哪個 queue,而在這裡我們先使用預設的 exchange(空字串),讓訊息直接放進 routing_key 所指定的 queue 中:

# 發送訊息
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"

訊息發送完之後,最後要記得關閉連線,以確保緩衝區的資料都有被送出(flush)。

# 關閉連線
connection.close()

如果你是第一次使用 RabbitMQ,發現執行 send.py 後沒有出現訊息已送出的輸出,那就要檢查自己的 RabbitMQ 伺服器是否有正常啟動,要找問題出在哪裡,可以檢查 /var/log/rabbitmq/ 下面的記錄檔,另外如果硬碟空間不夠(預設需要 1G 的硬碟空間)也會要造成無法啟動的狀況。

接著還要再寫一個 consumer 來接收 queue 裡面的訊息。

receiving

receive.py 這個 consumer 的程式碼一開始在連接 RabbitMQ 的部分,跟 send.py 是一樣的,然後一樣在接收訊息前要先確認 queue 是否存在:

# 建立一個 queue
channel.queue_declare(queue='hello')

這個建立 queue 的動作可以執行很多次,但同一個名字的 queue 始終只會存在一個,後來重複建立的指令都會被忽略。要這麼做主要是因為你不曉得 send.pyreceive.py 哪一個程式會先執行,所以在每次使用 queue 之前都執行這行可以確保不會使用到一個不存在的 queue。

如果你想要看看目前 RabbitMQ 伺服器中有哪些已經被建立的 queue,可以使用 rabbitmqctl 指令:

sudo rabbitmqctl list_queues

輸出為

Listing queues …
hello    0
…done.

在接收訊息的部分會比較複雜一些,這裡會需要定義一個回呼(callback)函數,每當有訊息被接收時,Pika 就會呼叫這個函數,這裡我們讓他直接將接收到的訊息輸出至螢幕上:

# 定義回呼函數
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

然後告訴 RabbitMQ 使用這個回呼函數來接收 hello 這個 queue 的訊息。

# 接收訊息
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

no_ack 這個參數目前沒有用到,在後面的教學會再解釋。

最後進入一個無窮迴圈,等待訊息。

print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

以下是 send.py 的完整程式碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

receive.py 的完整程式碼則為:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()

接下來開始測試,首先執行 send.py

python send.py

輸出為

[x] Sent ‘Hello World!’

send.py 再送出一條訊息後就會自動結束,接著執行 receive.py

python receive.py

輸出為

[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World!’

這樣就完成了一個最基本的佇列系統!

receive.py 執行時,只要接收到新訊息就會輸出在螢幕上,這時候你可以再執行一次 send.py 測試看看是否會接收到後續的訊息。

receive.py 在執行之後就會一直等待新的訊息,按下 Ctrl + c 就可以離開。

除了 Python 之外,也可以使用 JavaPHPRuby,基本上概念都是一樣的。

繼續閱讀:以 RabbitMQ 實作工作佇列(Work Queues)(Python 版本)

Linux

1 留言

  1. 凌璟騰

    channel.basic_consume(callback,
    queue=’hello’,
    no_ack=True)
    這個 method 的參數位置有變,要改成

    channel.basic_consume(‘hello’, callback)

Comments are Closed