這裡我們使用 RabbitMQ 來實作 publish-subscribe pattern,將訊息一次傳送給多個 consumers。

前一個範例中,我們實作的工作佇列都是假設一個工作只會配送給一個 worker,現在我們要改變一下這個規則,讓一個訊息可以同時傳送給多個 consumers,而這樣的設計模式就稱為 publish/subscribe


在這裡我們將實作一個記錄檔系統(logging system),整個系統中包含兩個程式,一個是負責產生記錄訊息,另外一個則是負責接收與處理這些訊息。

在這個記錄檔系統中,我們可以使用一個訊息接收程式(receiver)將接收到的訊息寫入硬碟中,另外同時再執行一個訊息接收程式將所接收到的訊息顯示在螢幕上。

基本上在這個記錄檔系統中,所有產生出來的訊息都可以以廣播的方式送給每一個接收程式,讓每一個接收者都收到一份相同的訊息。

Exchanges

在之前的教學範例中,我們都是很簡單的將訊息送進 RabbitMQ 的佇列中,在從佇列中取得訊息,但是整個過程其實不只是這樣,這裡我們將解釋 RabbitMQ 處理訊息的模型與流程。

以下是前面教學範例的一些重點:

  • producer:一個使用者的程式,負責發送訊息。
  • queue:儲存訊息用的緩衝區。
  • consumer:一個使用者的程式,負責發送訊息。

在 RabbitMQ 中,所有的 producer 都不會將任何訊息直接送給 queue,實際上 producer 通常也不知道哪一個訊息該送給哪一個 queue。

在 RabbitMQ 中訊息的配送都是靠 exchange 來處理的,exchange 專門負責從 producers 接收訊息,然後將訊息配送給正確的 queues,exchange 必須清楚每一則訊息的配送規則,例如哪一些訊息要送給哪一個 queue?哪一些訊息要鬼給好幾個 queues?或是哪一些訊息應該被丟棄?而這些規則都是經由 exchange 的類型(type)來定義的。

rabbitmq-exchanges

exchange 的類型分為:directtopicheadersfanout 這幾種,在這裡我們需要使用的類型是 fanout,首先建立一個 exchange,並命名為 logs

channel.exchange_declare(exchange='logs',
                         type='fanout')

fanout 的 exchange 非常簡單,他就是很單純的把所有接收到的訊息配送給每一個 queues,而這個規則也就是我們在記錄檔系統中所需要的。

接著就可以將訊息發佈至這個 exchange 了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

這裡的 exchange 參數可以用來指定 exchange 的名稱。

列出系統的 Exchanges

如果要列出系統中所有的 exchanges,可以使用 rabbitmqctl 指令:

sudo rabbitmqctl list_exchanges

輸出為
Listing exchanges …
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
…done.

輸出的 exchanges 中,有一些名稱為 amq.* 或是空字串的 exchanges,這些是 RabbitMQ 內建的,這些預設的 exchanges 在這裡並不會用到。

沒有名稱的 Exchange

在之前的教學範例中,我們都是直接使用名稱為空字串的 exchange:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

名稱為空字串的 exchange 是一個內建的 exchange,它會直接把收到的訊息依據 routing_key 所指定的 queue 名稱來配送。

暫時性的 Queues

在之前的範例中,因為我們需要讓 producers 與 consumers 可以共用一個佇列,將大量的工作分散處理,所以我們藉著建立具名佇列的方式,來讓 producers 與 consumers 都可以明確指定要使用的佇列(如之前的 hellotask_queue)。

但是在這裡的情況跟之前的例子不同,現在我們要讓每一個訊息接收者都可以收到所有的訊息,而且我們只需要接收到最新的訊息即可,過時的訊息對我們來說其實沒有用,所以我們做了兩項改變。

第一個部分就是讓每一個訊息接收者連接至 RabbitMQ 伺服器時,建立一個該訊息接收者專屬的新佇列,因為這個佇列是專門給這個接收者使用,所以也不需要特別指定名稱。

result = channel.queue_declare()

如果在呼叫 queue_declare() 時,沒有以 queue 參數指定佇列名稱,則 RabbitMQ 會自行以隨機的方式為這個佇列指定一個名稱,儲存至 result.method.queue

第二個部分就是在訊息接收者中斷與 RabbitMQ 伺服器的連現時,讓剛剛建立的專屬佇列也可以自動刪除,這個動作可以使用 exclusive 參數來處理:

result = channel.queue_declare(exclusive=True)

這樣一來,每一個訊息接收者在連接上 RabbitMQ 伺服器之後,就都會有一個自己的佇列,如此一來不同的接收者就不會互相干擾。

Bindings

建立好新的佇列之後,還要讓之前建立的 fanout exchange 可以將訊息配送至新建立的佇列中,建立 exchange 與佇列之間的聯結動作就稱作 binding。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

這樣 logs 這個 exchange 就會將訊息配送至 result.method.queue 這個剛建立好的佇列中。

rabbitmq-bindings

如果要查看系統中所有 binding 的狀態,可以使用

rabbitmqctl list_bindings

完整的記錄檔系統

rabbitmq-python-three-overall

在這個例子中,producer 的程式碼跟之前的例子都差不多,只是改用 logs 這個 exchange 而已,我們將 producer 的指令稿命名為 emit_log.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()

這裡在使用 basic_publish() 指定使用 logs 這個 exchange 時,routing_key 雖然還是要指定,不過這個參數對於 fanout exchange 是沒有作用的。

另外 producer 在實作上也要記得宣告 exchange,以避免使用到還沒建立好的 exchange 造成錯誤。

在這個例子中,如果 producer 發送訊息給 exchange 的時候,還沒有任何佇列聯結上該 exchange,那麼該訊息就會被丟棄,這個狀況對於這個範例而言剛好沒有影響,因為我們只希望接收最新的訊息,舊的不用保留,所以沒有關係。

以下是訊息接收者的程式碼,我們將其命名為 receive_logs.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

這樣就完成了,如果想要將記錄檔寫入檔案,可以執行:

python receive_logs.py > logs_from_rabbit.log

如果想要從螢幕上看到及時的記錄檔訊息,可以再開一的終端機,執行:

python receive_logs.py

最後產生記錄訊息:

python emit_log.py

你可以使用 rabbitmqctl 指令來確認 binding 的狀態是否正確:

sudo rabbitmqctl list_bindings

如果有兩個 receive_logs.py 在執行的情況下,輸出應該會類似這樣:

Listing bindings …
exchange amq.gen-FdsvRZh16fRiV60q0MdzVg queue amq.gen-FdsvRZh16fRiV60q0MdzVg []
exchange amq.gen-fo3VHtgITPC4pe4l_PcSYQ queue amq.gen-fo3VHtgITPC4pe4l_PcSYQ []
logs exchange amq.gen-FdsvRZh16fRiV60q0MdzVg queue amq.gen-FdsvRZh16fRiV60q0MdzVg []
logs exchange amq.gen-fo3VHtgITPC4pe4l_PcSYQ queue amq.gen-fo3VHtgITPC4pe4l_PcSYQ []
…done.

關於 rabbitmqctl list_bindings 的輸出欄位說明,可以參考 rabbitmqctl 指令的 man 線上手冊(man rabbitmqctl)。