這裡介紹如何在 Ubuntu Linux 中安裝 RabbitMQ 這個訊息佇列,並且以範例程式說明如何使用 RabbitMQ。
RabbitMQ 是一個訊息仲介(message broker),它所做的事情就是接收訊息,然後再把訊息發送出去,就好像郵局一樣,發信者將信件交給郵差,透過郵局的郵務系統將信件送給收信人,而 RabbitMQ 跟郵局的不同點只在於它不處理實體的信件,而是處理數位化的資料。
在 Ubuntu Linux 中可使用 apt 安裝 RabbitMQ:
sudo apt-get install rabbitmq-server
如果安裝沒問題,在安裝結束後會自動啟動 rabbitmq-server:
[略]
Setting up rabbitmq-server (2.7.1-0ubuntu4) …
Adding group `rabbitmq’ (GID 128) …
Done.
Adding system user `rabbitmq’ (UID 120) …
Adding new user `rabbitmq’ (UID 120) with group `rabbitmq’ …
Not creating home directory `/var/lib/rabbitmq’.
Starting rabbitmq-server: SUCCESS
rabbitmq-server.
Processing triggers for libc-bin …
ldconfig deferred processing now taking place
這樣就安裝好了。
不過通常 Ubuntu 官方套件庫中的 RabbitMQ 版本比較舊,如果要安裝最新版的,可以使用 RabbitMQ 官方提供的套件庫,將下面這行加入 /etc/apt/sources.list
:
deb http://www.rabbitmq.com/debian/ testing main
這個套件庫適用於所有 Debian 系列的 Linux,所以 Ubuntu 也適用。
如果要避免套件沒有簽署的警告,可以將 RabbitMQ 的金鑰先匯入:
wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc sudo apt-key add rabbitmq-signing-key-public.asc
然後再用 apt 安裝:
sudo apt-get update sudo apt-get install rabbitmq-server
接著編輯 /etc/default/rabbitmq-server
這個設定檔,調整 ulimit
的設定::
ulimit -n 1024
這個是用來限制 RabbitMQ 開啟的檔案數量,避免開啟的檔案太多,影響整個系統,請依照自己的狀況調整。
若要控制 RabbitMQ 系統服務,可以使用 rabbitmqctl
這個指令,例如:
rabbitmqctl stop
就會停止 RabbitMQ 服務。其餘可用的參數可參考 rabbitmqctl
的 man page。
man rabbitmqctl
RabbitMQ 的記錄檔預設會放在 /var/log/rabbitmq/
這個目錄下,若要設定 logrotate
可以修改 /etc/logrotate.d/rabbitmq-server
這個設定檔。
在開始撰寫程式之前,要先了解關於訊息佇列的一些專有名詞,在一個訊息佇列系統中,主要有三種原件,分別為 producer、queue 與 consumer。
指訊息的發送者,以 P 表示。
暫時儲存訊息的地方,它位於 RabbitMQ 內部,在儲存空間足夠的狀況下,它可以儲存任意數量的訊息,多個 producer 可以將訊息發送至同一個 queue 中,而不同的 consumer 也可以從同一個 queue 接收訊息。
指訊息的接收者,以 C 表示。
一個最簡單的 hellow world 範例就是下面這種由單一的 producer、queue 與 consumer 所組成的訊息佇列系統:
要實作這樣的架構,我們會需要撰寫兩個程式,分別為 producer 與 consumer,而 queue 的部分則是由 RabbitMQ 來負責,以下分別示範各種語言的實作方式。
由於 RabbitMQ 是使用 AMQP 這個資料傳輸協定,所以一般的程式若要跟 RabbitMQ 溝通,就要安裝支援 AMQP 的函式庫,目前幾乎任何語言都可以找到支援 AMQP 的函式庫,以 Python 來說可以使用下面這幾種:
這裡我們使用 Pika 來示範。
首先先安裝 pip
與 git-core
這兩個套件,然後再使用 pip
安裝 pika
。在 Ubuntu 中可使用:
sudo apt-get install python-pip git-core sudo pip install pika==0.9.8
Debian 則使用:
sudo apt-get install python-setuptools git-core sudo easy_install pip sudo pip install pika==0.9.8
Windows 則使用:
easy_install pip pip install pika==0.9.8
安裝好 pika
之後,就可以開始撰寫程式了。首先撰寫訊息發送者 producer 的部分,他負責產生訊息並送給 queue。
以下是 send.py
這個 producer 的程式碼:
# 連接到 broker(RabbitMQ 伺服器) connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
程式一開始要先指定 broker 的位置(也就是執行 RabbitMQ 伺服器的位置),這個例子中是連到 localhost
,如果你的 RabbitMQ 是在不同一台機器上,這裡就換成該伺服器的 IP 位址。
在發送訊息之前,要先確認自己要使用的 queue 是存在的,如果將訊息發送至一個不存在的 queue,RabbitMQ 會直接將該訊息丟棄。這裡我們建立一個 queue,命名為 hello
:
# 建立一個 queue channel.queue_declare(queue='hello')
接著發送一筆訊息到 hello
這個 queue 中,內容為 Hello World!
。
在 RabbitMQ 中所有的訊息在接收到之後,並不會直接放進 queue 中,中間還會經過 exchanges 的步驟來判斷該把訊息放進哪個 queue,而在這裡我們先使用預設的 exchange(空字串),讓訊息直接放進 routing_key
所指定的 queue 中:
# 發送訊息 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'"
訊息發送完之後,最後要記得關閉連線,以確保緩衝區的資料都有被送出(flush)。
# 關閉連線
connection.close()
如果你是第一次使用 RabbitMQ,發現執行 send.py
後沒有出現訊息已送出的輸出,那就要檢查自己的 RabbitMQ 伺服器是否有正常啟動,要找問題出在哪裡,可以檢查 /var/log/rabbitmq/
下面的記錄檔,另外如果硬碟空間不夠(預設需要 1G 的硬碟空間)也會要造成無法啟動的狀況。
接著還要再寫一個 consumer 來接收 queue 裡面的訊息。
receive.py
這個 consumer 的程式碼一開始在連接 RabbitMQ 的部分,跟 send.py
是一樣的,然後一樣在接收訊息前要先確認 queue 是否存在:
# 建立一個 queue channel.queue_declare(queue='hello')
這個建立 queue 的動作可以執行很多次,但同一個名字的 queue 始終只會存在一個,後來重複建立的指令都會被忽略。要這麼做主要是因為你不曉得 send.py
與 receive.py
哪一個程式會先執行,所以在每次使用 queue 之前都執行這行可以確保不會使用到一個不存在的 queue。
如果你想要看看目前 RabbitMQ 伺服器中有哪些已經被建立的 queue,可以使用 rabbitmqctl
指令:
sudo rabbitmqctl list_queues
輸出為
Listing queues …
hello 0
…done.
在接收訊息的部分會比較複雜一些,這裡會需要定義一個回呼(callback)函數,每當有訊息被接收時,Pika 就會呼叫這個函數,這裡我們讓他直接將接收到的訊息輸出至螢幕上:
# 定義回呼函數 def callback(ch, method, properties, body): print " [x] Received %r" % (body,)
然後告訴 RabbitMQ 使用這個回呼函數來接收 hello
這個 queue 的訊息。
# 接收訊息 channel.basic_consume(callback, queue='hello', no_ack=True)
no_ack
這個參數目前沒有用到,在後面的教學會再解釋。
最後進入一個無窮迴圈,等待訊息。
print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
以下是 send.py
的完整程式碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
而 receive.py
的完整程式碼則為:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming()
接下來開始測試,首先執行 send.py
:
python send.py
輸出為
[x] Sent ‘Hello World!’
send.py
再送出一條訊息後就會自動結束,接著執行 receive.py
:
python receive.py
輸出為
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World!’
這樣就完成了一個最基本的佇列系統!
在 receive.py
執行時,只要接收到新訊息就會輸出在螢幕上,這時候你可以再執行一次 send.py
測試看看是否會接收到後續的訊息。
receive.py
在執行之後就會一直等待新的訊息,按下 Ctrl + c
就可以離開。