分類: 網頁開發

rabbit.js:在 Node.JS 中使用 RabbitMQ 實作訊息佇列(Message Queue)

rabbit.js 是一個專門為 RabbitMQ 所設計的 JavaScript API 函式庫,可以讓你在 Node.js 中很輕鬆的實作各種類型的訊息佇列(Message Queue)。

rabbit.js 以 amqplib 為基礎,將原本複雜的設定又再簡化,讓一般性的使用者更方便,如果是使用一般常見的模式(pattern),只要幾行程式碼就可以運作了。

安裝

如果要在 Node.js 中使用 RabbitMQ,首先就是要將基本的 Node.js 與 RabbitMQ 服務先安裝好,安裝說明請參考:

接著再使用 Node.js 的 npm 套件管理程式安裝 rabbot.js:

npm install rabbit.js

使用 rabbit.js

rabbit.js 在使用上比 RabbitMQ 官方所提供的教學範例還要簡略,只要建立 RabbitMQ 的連線與對應類型的 socket 即可,以下是使用的方式教學。

建立連線

首先以 createContext() 指定 RabbitMQ 的 URL 位址,建立一個新的連線:

var context = require('rabbit.js').createContext('amqp://localhost');

在連線建立之後,context 會送出 'ready' 這個事件(event)。如果連線發生問題時,則會送出 'error' 事件,並且附帶一個 Error 物件,設計者可以過這些事件判斷連線是否正常。

建立 Socket

建立連線之後,還要再建立指定類型的 socket,而要使用哪一種類型就要看自己的需求而定。這裡以 Publish/Subscribe 這個模型為範例,在一個 JavaScript 程式中同時實作 publish 與 subscribe:

var pub = context.socket('PUBLISH');
var sub = context.socket('SUBSCRIBE');

這裡建立兩個 socket,一個作為 publish(pub),另一個作為 subscribe(sub)。
接下來,讓兩個 socket 連接到同一個 exchange:

pub.connect('alerts');
sub.connect('alerts');

socket 實際上就是 Stream,所以當它的緩衝區有資料可以讀取時,你可以使用 read() 函數或是透過 'data' 事件來讀取,而若要寫入資料,則可使用 write() 函數。

如果傳送的資料都是字串,則可以使用 setEncoding() 來設定字串的編碼:

sub.setEncoding('utf8');
sub.on('data', function(note) { console.log("Alarum! %s", note); });

或是在寫入資料時加上一個指定編碼的參數:

pub.write("Emergency. There's an emergency going on", 'utf8');

Stream 所提供的 pipe() 函數可以很方便的將輸出導向至其他的串流:

sub.pipe(process.stdout);

一個 socket 可以同時連接到多個 exchange,例如:

var sub2 = context.socket('SUBSCRIBE');
sub2.connect('system');
sub2.connect('notifications');

這裡的 sub2 會同時接收來自於 systemnotifications 這兩個 exchange 的所有訊息。

如果一個 socket 同時連接多個 exchange 時,無法辨識收到的訊息是來自於哪一個 exchange,如果需要區分訊息來源,就要改用多個 socket。

如果要關閉 socket,可以呼叫 close() 函數,它會清理配置給該 socket 的所有資源,並在完成時送出 'close' 事件。若 socket 的類型是屬於可以寫入資料的,則也可以使用 end([chunk [, encoding]]) 函數來寫入最後一筆資料,當資料寫入後,就會自動關閉 socket,如果呼叫 end() 不加上任何參數,它的效果就跟 close() 函數相同。

Socket 類型

在使用 Context.socket() 建立 socket 時,第一個參數的作用是指定 socket 的類型,以下是所有支援的類型:

  • PUBLISH / SUBSCRIBEPUB / SUB
  • PUSH / PULL(使用範例請參考 net
  • REQUEST / REPLYREQ / REP)(使用範例請參考 ordering
  • PUSH / WORKER

Topic

PUBSUB 這兩個類型的 socket 可以依據指定的 topic 來傳送與接收訊息。

PUB socket 可以使用 setsockopt('topic', string) 來設定 socket 的 topic,或是在發送訊息時,使用 publish(topic, message, [encoding]) 直接指定該訊息的 topic。

SUB socket 則可以在呼叫 connect() 函數時,在第二個參數上指定 topic。

指定完 topic 之後,還要設定 'routing' 的類型,才能讓他運作,可用的 'routing' 類型如下:

  • 'fanout':這是預設的選項,不管 topic 是什麼,將所有的訊息配送至所有的 SUB socket。
  • 'direct':只配送完全符合 topic 名稱的訊息。
  • 'topic':依據 AMQP 的萬用字元規則來比對 topic 名稱,比對方法請參考 RabbitMQ 的 Topics

Socket 設定

若要設定或更改 socket 的設定,可以透過 Socket.setsockopt() 函數,或是在呼叫 Context.socket() 時,將選項的設定值放在第二個參數上。

以下是一些可用的 socket 選項:

routing
適用於 PUBSUB 這兩個類型的 socket,跟 topic 配合之後,可以決定如何配送訊息。routing 在 socket 建立的時候就要指定好,連接至相同位置的 sockets 其 routing 也必須吻合。
topic
用於設定 PUB socket 所發送訊息的 topic。
expiration
專門用於可寫入的 socket(如 PUBPUSHREQREP),設定訊息的有效期間,單位為千分之一秒,例如:
pub.setsockopt('expiration', 60 * 1000)

這樣由 pub 所發送的訊息如果在 60 秒內沒有被接收,那麼伺服器就會將此訊息丟棄。訊息有效期間的功能在 RabbitMQ 3.0.0 以後才有被支援。

prefetch
適用於 WORKERREP socket,設定 RabbitMQ 在訊息處理完成之前,只能配送多少筆訊息給 socket。例如:
var worker = ctx.socket('WORKER', {prefetch: 1});

這樣 RabbitMQ 就會一次只配送一個工作給 worker,等待工作處理完成並呼叫 ack() 之後,才會再繼續配送下一個工作。 如果 prefetch 設定為 0(預設的選項),RabbitMQ 就會不設定任何限制,將所有的訊息都配送給 socket。

persistent
設定訊息在 RabbitMQ 伺服器重新啟動之後,是否還要保存,可用的值為 truefalse。在 REQ / REP 的狀況下,系統只會保存 requests,而在 PUB / SUB 的狀況則沒有作用。

參考資料:GitHub

G. T. Wang

個人使用 Linux 經驗長達十餘年,樂於分享各種自由軟體技術與實作文章。

Share
Published by
G. T. Wang

Recent Posts

光陽 KYMCO GP 125 機車接電發動、更換電瓶記錄

本篇記錄我的光陽 KYMCO ...

2 年 ago

[開箱] YubiKey 5C NFC 實體金鑰

本篇是 YubiKey 5C ...

3 年 ago

[DIY] 自製竹火把

本篇記錄我拿竹子加上過期的苦茶...

3 年 ago