了解 Exchange
之前教學中的 Producer 與 Consumer 都是直接跟 Queue 做溝通,但其實這並不是一個正式的形式。理想的方式應該為 Producer 不直接發送訊息至 Queue,而是發送至 Exchange。Exchange 是一個專門接收 Producer 訊息的中繼站,它負責將訊息傳送至特定或多個 Queue。Exchange 提供了 4 種方式:direct, topic, headers, fanout,本篇將先介紹 fanout 方式。
什麼是 Fanout ?
從字面上看,fanout 像是散佈、散播的概念。當 Exchange 採用 fanout 模式會把訊息傳送至所有的 Queue,並且忽略 routing key 的設定。參考 RabbitMQ 文件的概念圖:
實作 Exchange 機制
實作部分,假設我們需要一個紀錄與顯示 log 的機制,其中一個 Consumer 需要將訊息內容儲存成文字檔,而另一個 Consumer 則需要將內容呈現於螢幕,因此兩邊的訊息必須要完全一模一樣。首先,利用之前的 send_task.py 進行修改,加入 fanout,內容如下:
import sys
import pika
# 建立連線
...
# 宣告要使用的 Exchange,並設定為 fanout 模式
channel.exchange_declare(exchange='myExch',type='fanout')
# 訊息內容
msg = ' '.join(sys.argv[1:]) or 'Push/Subscribe via RabbitMQ!'
# 設定 Exchnage
channel.basic_publish(exchange='myExch', routing_key='', body=msg)
print ' [x] Sent %r' % (msg)
# 關閉連線
connection.close()
針對 Producer 的設定目前已完成。
替 Consumer 加入對應的機制
既然每個 Consumer 都要收到相同的訊息,因此我們不再透過單一個 Queue 來讓多個 Consumer 取得訊息,而是透過 RabbitMQ 自動建立各自獨立的 Queue 來取得。當系統替你自動產生一個 Queue 後,你需要將 Queue 與 Exchange 進行綁定的動作,稱之為 binding。一樣利用之前的 worker.py 修改,程式碼內容參考如下:
import pika
import time
# 建立連線
...
# 宣告要使用的 Exchange,並設定為 fanout 模式
channel.exchange_declare(exchange='myExch',type='fanout')
# 宣告使用的 Queue
# 當使用 channel.queue_declare()未明確指定 Queue 時系統會自動產生一個,
# 加入 exclusive=True 讓 Consumer 連線中斷時,此 Queue 會自動消失。
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 將 Exchange 與 Queue 綁定
channel.queue_bind(exchange='myExch', queue=queue_name)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print ' [x] Received %r' % (body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
測試
首先開啟兩個 Consumer,另外開啟一個 Producer 發送任務,看看 Consumer 是否都收到相同的訊息,若要將訊息儲存成文字檔,指令如下:
python worker.py > worker.log
我們還可以使用指令來了解 exchage 與 binding 的狀況,指令如下:
# 列出 Exchange
rabbitmqctl list_exchanges
# 列出 Binding
rabbitmqctl list_bindings
Environment :
・ Arch Linux
・ Python 2.7
Reference :
・ RabbitMQ official site
・ Pika docs