了解 Topic Exchange
Topic Exchange 與 Direct Exchange 最大的不同就是在於設定 routing_key 上的不同,前者允許你使用萬用字元於 routing_key 中。舉例來說,我們想得到 rabbit.a.info 與 rabbit.b.info 的訊息,這時候我們就可以設定 routing_key='rabbit.*.info'。除了 * 以外,我們還可以使用 #。請參考文件說明:
實作 Topic Exchange
實作部分仍然延續我們之前的 log 記錄系統,假設今天我們需要 2 個 Consumer,一個記錄所有 log 訊息 ( rabbit.# ),另一個只針對 rabbit.*.info 的訊息才記錄。首先,我們先利用之前的 send_task.py:
import sys
import pika
# 建立連線
...
# 宣告 Exchange 並設定模式為 topic
channel.exchange_declare(exchange='myExch', type='topic')
# 執行時也要給予 routing_key 參數
# 如:python send_task.py "rabbit.#"
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'rabbit.all.info'
# 訊息內容
msg = ' '.join(sys.argv[2:]) or 'Hello RabbitMQ!'
# 加入 routing_key 設定
channel.basic_publish(exchange='myExch', \
routing_key=routing_key, body=msg)
print ' [x] Sent %r:%r' % (routing_key, msg)
# 關閉連線
connection.close()
接著來修改 worker.py,內容如下:
import sys
import pika
# 建立連線
...
# 宣告 Exchange 並設定模式為 topic
channel.exchange_declare(exchange='myExch', type='topic')
# 讓 Rabbit 替我們自動建立 Queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 執行時也要給予 routing_key 參數,讓它知道它需要接收哪種訊息
# 如:python worker.py "rabbit.#"
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usages: %s [binding_key] " % \
(sys.argv[0])
sys.exit(1)
# 根據不同的參數設定不同的 routing_key
for binding_key in binding_keys:
channel.queue_bind(exchange='myExch', queue=queue_name, \
routing_key=binding_key)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print ' [x] %r:%r' % (method.routing_key, body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
測試
首先開啟兩個 Consumer,指令如下:
# Terminal 1
python worker.py "rabbit.#"
# Terminal 2
python worker.py "rabbit.*.info"
之後,另外開啟一個 Producer 發送任務,指令如下:
python send_task.py rabbit.a.info "a info msg"
python send_task.py rabbit.b.info "b info msg"
python send_task.py rabbit.c.info "c info msg"
python send_task.py rabbit.d.error "d error msg"
看看兩個 Consumer 是否接收到想要的訊息。
Environment :
・ Arch Linux
・ Python 2.7
Reference :
・ RabbitMQ official site
・ Pika docs