這裡我們使用 RabbitMQ 來實作 publish-subscribe pattern,將訊息一次傳送給多個 consumers。
前一個範例中,我們實作的工作佇列都是假設一個工作只會配送給一個 worker,現在我們要改變一下這個規則,讓一個訊息可以同時傳送給多個 consumers,而這樣的設計模式就稱為 publish/subscribe。
在這裡我們將實作一個記錄檔系統(logging system),整個系統中包含兩個程式,一個是負責產生記錄訊息,另外一個則是負責接收與處理這些訊息。
在這個記錄檔系統中,我們可以使用一個訊息接收程式(receiver)將接收到的訊息寫入硬碟中,另外同時再執行一個訊息接收程式將所接收到的訊息顯示在螢幕上。
基本上在這個記錄檔系統中,所有產生出來的訊息都可以以廣播的方式送給每一個接收程式,讓每一個接收者都收到一份相同的訊息。
Exchanges
在之前的教學範例中,我們都是很簡單的將訊息送進 RabbitMQ 的佇列中,在從佇列中取得訊息,但是整個過程其實不只是這樣,這裡我們將解釋 RabbitMQ 處理訊息的模型與流程。
以下是前面教學範例的一些重點:
- producer:一個使用者的程式,負責發送訊息。
- queue:儲存訊息用的緩衝區。
- consumer:一個使用者的程式,負責發送訊息。
在 RabbitMQ 中,所有的 producer 都不會將任何訊息直接送給 queue,實際上 producer 通常也不知道哪一個訊息該送給哪一個 queue。
在 RabbitMQ 中訊息的配送都是靠 exchange 來處理的,exchange 專門負責從 producers 接收訊息,然後將訊息配送給正確的 queues,exchange 必須清楚每一則訊息的配送規則,例如哪一些訊息要送給哪一個 queue?哪一些訊息要鬼給好幾個 queues?或是哪一些訊息應該被丟棄?而這些規則都是經由 exchange 的類型(type)來定義的。
exchange 的類型分為:direct
、topic
、headers
與 fanout
這幾種,在這裡我們需要使用的類型是 fanout
,首先建立一個 exchange,並命名為 logs
:
channel.exchange_declare(exchange='logs', type='fanout')
fanout
的 exchange 非常簡單,他就是很單純的把所有接收到的訊息配送給每一個 queues,而這個規則也就是我們在記錄檔系統中所需要的。
接著就可以將訊息發佈至這個 exchange 了:
channel.basic_publish(exchange='logs', routing_key='', body=message)
這裡的 exchange
參數可以用來指定 exchange 的名稱。
列出系統的 Exchanges
如果要列出系統中所有的 exchanges,可以使用 rabbitmqctl
指令:
sudo rabbitmqctl list_exchanges
輸出為
Listing exchanges …
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
…done.
輸出的 exchanges 中,有一些名稱為 amq.*
或是空字串的 exchanges,這些是 RabbitMQ 內建的,這些預設的 exchanges 在這裡並不會用到。
沒有名稱的 Exchange
在之前的教學範例中,我們都是直接使用名稱為空字串的 exchange:
channel.basic_publish(exchange='', routing_key='hello', body=message)
名稱為空字串的 exchange 是一個內建的 exchange,它會直接把收到的訊息依據 routing_key
所指定的 queue 名稱來配送。
暫時性的 Queues
在之前的範例中,因為我們需要讓 producers 與 consumers 可以共用一個佇列,將大量的工作分散處理,所以我們藉著建立具名佇列的方式,來讓 producers 與 consumers 都可以明確指定要使用的佇列(如之前的 hello
與 task_queue
)。
但是在這裡的情況跟之前的例子不同,現在我們要讓每一個訊息接收者都可以收到所有的訊息,而且我們只需要接收到最新的訊息即可,過時的訊息對我們來說其實沒有用,所以我們做了兩項改變。
第一個部分就是讓每一個訊息接收者連接至 RabbitMQ 伺服器時,建立一個該訊息接收者專屬的新佇列,因為這個佇列是專門給這個接收者使用,所以也不需要特別指定名稱。
result = channel.queue_declare()
如果在呼叫 queue_declare()
時,沒有以 queue
參數指定佇列名稱,則 RabbitMQ 會自行以隨機的方式為這個佇列指定一個名稱,儲存至 result.method.queue
。
第二個部分就是在訊息接收者中斷與 RabbitMQ 伺服器的連現時,讓剛剛建立的專屬佇列也可以自動刪除,這個動作可以使用 exclusive
參數來處理:
result = channel.queue_declare(exclusive=True)
這樣一來,每一個訊息接收者在連接上 RabbitMQ 伺服器之後,就都會有一個自己的佇列,如此一來不同的接收者就不會互相干擾。
Bindings
建立好新的佇列之後,還要讓之前建立的 fanout
exchange 可以將訊息配送至新建立的佇列中,建立 exchange 與佇列之間的聯結動作就稱作 binding。
channel.queue_bind(exchange='logs',
queue=result.method.queue)
這樣 logs
這個 exchange 就會將訊息配送至 result.method.queue
這個剛建立好的佇列中。
如果要查看系統中所有 binding 的狀態,可以使用
rabbitmqctl list_bindings
完整的記錄檔系統
在這個例子中,producer 的程式碼跟之前的例子都差不多,只是改用 logs
這個 exchange 而已,我們將 producer 的指令稿命名為 emit_log.py
:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
這裡在使用 basic_publish()
指定使用 logs
這個 exchange 時,routing_key
雖然還是要指定,不過這個參數對於 fanout
exchange 是沒有作用的。
另外 producer 在實作上也要記得宣告 exchange,以避免使用到還沒建立好的 exchange 造成錯誤。
在這個例子中,如果 producer 發送訊息給 exchange 的時候,還沒有任何佇列聯結上該 exchange,那麼該訊息就會被丟棄,這個狀況對於這個範例而言剛好沒有影響,因為我們只希望接收最新的訊息,舊的不用保留,所以沒有關係。
以下是訊息接收者的程式碼,我們將其命名為 receive_logs.py
:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
這樣就完成了,如果想要將記錄檔寫入檔案,可以執行:
python receive_logs.py > logs_from_rabbit.log
如果想要從螢幕上看到及時的記錄檔訊息,可以再開一的終端機,執行:
python receive_logs.py
最後產生記錄訊息:
python emit_log.py
你可以使用 rabbitmqctl
指令來確認 binding 的狀態是否正確:
sudo rabbitmqctl list_bindings
如果有兩個 receive_logs.py
在執行的情況下,輸出應該會類似這樣:
Listing bindings …
exchange amq.gen-FdsvRZh16fRiV60q0MdzVg queue amq.gen-FdsvRZh16fRiV60q0MdzVg []
exchange amq.gen-fo3VHtgITPC4pe4l_PcSYQ queue amq.gen-fo3VHtgITPC4pe4l_PcSYQ []
logs exchange amq.gen-FdsvRZh16fRiV60q0MdzVg queue amq.gen-FdsvRZh16fRiV60q0MdzVg []
logs exchange amq.gen-fo3VHtgITPC4pe4l_PcSYQ queue amq.gen-fo3VHtgITPC4pe4l_PcSYQ []
…done.
關於 rabbitmqctl list_bindings
的輸出欄位說明,可以參考 rabbitmqctl
指令的 man 線上手冊(man rabbitmqctl
)。