rabbit.js 是一個專門為 RabbitMQ 所設計的 JavaScript API 函式庫,可以讓你在 Node.js 中很輕鬆的實作各種類型的訊息佇列(Message Queue)。
rabbit.js 以 amqplib 為基礎,將原本複雜的設定又再簡化,讓一般性的使用者更方便,如果是使用一般常見的模式(pattern),只要幾行程式碼就可以運作了。
如果要在 Node.js 中使用 RabbitMQ,首先就是要將基本的 Node.js 與 RabbitMQ 服務先安裝好,安裝說明請參考:
接著再使用 Node.js 的 npm
套件管理程式安裝 rabbot.js:
npm install rabbit.js
rabbit.js 在使用上比 RabbitMQ 官方所提供的教學範例還要簡略,只要建立 RabbitMQ 的連線與對應類型的 socket 即可,以下是使用的方式教學。
首先以 createContext()
指定 RabbitMQ 的 URL 位址,建立一個新的連線:
var context = require('rabbit.js').createContext('amqp://localhost');
在連線建立之後,context
會送出 'ready'
這個事件(event)。如果連線發生問題時,則會送出 'error'
事件,並且附帶一個 Error
物件,設計者可以過這些事件判斷連線是否正常。
建立連線之後,還要再建立指定類型的 socket,而要使用哪一種類型就要看自己的需求而定。這裡以 Publish/Subscribe 這個模型為範例,在一個 JavaScript 程式中同時實作 publish 與 subscribe:
var pub = context.socket('PUBLISH'); var sub = context.socket('SUBSCRIBE');
這裡建立兩個 socket,一個作為 publish(pub
),另一個作為 subscribe(sub
)。
接下來,讓兩個 socket 連接到同一個 exchange:
pub.connect('alerts'); sub.connect('alerts');
socket 實際上就是 Stream,所以當它的緩衝區有資料可以讀取時,你可以使用 read()
函數或是透過 'data'
事件來讀取,而若要寫入資料,則可使用 write()
函數。
如果傳送的資料都是字串,則可以使用 setEncoding()
來設定字串的編碼:
sub.setEncoding('utf8'); sub.on('data', function(note) { console.log("Alarum! %s", note); });
或是在寫入資料時加上一個指定編碼的參數:
pub.write("Emergency. There's an emergency going on", 'utf8');
Stream 所提供的 pipe()
函數可以很方便的將輸出導向至其他的串流:
sub.pipe(process.stdout);
一個 socket 可以同時連接到多個 exchange,例如:
var sub2 = context.socket('SUBSCRIBE'); sub2.connect('system'); sub2.connect('notifications');
這裡的 sub2
會同時接收來自於 system
與 notifications
這兩個 exchange 的所有訊息。
如果要關閉 socket,可以呼叫 close()
函數,它會清理配置給該 socket 的所有資源,並在完成時送出 'close'
事件。若 socket 的類型是屬於可以寫入資料的,則也可以使用 end([chunk [, encoding]])
函數來寫入最後一筆資料,當資料寫入後,就會自動關閉 socket,如果呼叫 end()
不加上任何參數,它的效果就跟 close()
函數相同。
在使用 Context.socket()
建立 socket 時,第一個參數的作用是指定 socket 的類型,以下是所有支援的類型:
PUBLISH
/ SUBSCRIBE
(PUB
/ SUB
)PUSH
/ PULL
(使用範例請參考 net)REQUEST
/ REPLY
(REQ
/ REP
)(使用範例請參考 ordering)PUSH
/ WORKER
PUB
與 SUB
這兩個類型的 socket 可以依據指定的 topic 來傳送與接收訊息。
PUB
socket 可以使用 setsockopt('topic', string)
來設定 socket 的 topic,或是在發送訊息時,使用 publish(topic, message, [encoding])
直接指定該訊息的 topic。
SUB
socket 則可以在呼叫 connect()
函數時,在第二個參數上指定 topic。
指定完 topic 之後,還要設定 'routing'
的類型,才能讓他運作,可用的 'routing'
類型如下:
'fanout'
:這是預設的選項,不管 topic 是什麼,將所有的訊息配送至所有的 SUB
socket。'direct'
:只配送完全符合 topic 名稱的訊息。'topic'
:依據 AMQP 的萬用字元規則來比對 topic 名稱,比對方法請參考 RabbitMQ 的 Topics。若要設定或更改 socket 的設定,可以透過 Socket.setsockopt()
函數,或是在呼叫 Context.socket()
時,將選項的設定值放在第二個參數上。
以下是一些可用的 socket 選項:
routing
PUB
與 SUB
這兩個類型的 socket,跟 topic
配合之後,可以決定如何配送訊息。routing
在 socket 建立的時候就要指定好,連接至相同位置的 sockets 其 routing
也必須吻合。topic
PUB
socket 所發送訊息的 topic。expiration
PUB
、PUSH
、REQ
、REP
),設定訊息的有效期間,單位為千分之一秒,例如:
pub.setsockopt('expiration', 60 * 1000)
這樣由 pub
所發送的訊息如果在 60 秒內沒有被接收,那麼伺服器就會將此訊息丟棄。訊息有效期間的功能在 RabbitMQ 3.0.0 以後才有被支援。
prefetch
WORKER
與 REP
socket,設定 RabbitMQ 在訊息處理完成之前,只能配送多少筆訊息給 socket。例如:
var worker = ctx.socket('WORKER', {prefetch: 1});
這樣 RabbitMQ 就會一次只配送一個工作給 worker
,等待工作處理完成並呼叫 ack()
之後,才會再繼續配送下一個工作。 如果 prefetch
設定為 0
(預設的選項),RabbitMQ 就會不設定任何限制,將所有的訊息都配送給 socket。
persistent
true
與 false
。在 REQ
/ REP
的狀況下,系統只會保存 requests,而在 PUB
/ SUB
的狀況則沒有作用。參考資料:GitHub