2015年8月14日

RabbitMQ 教學 (9) - 實作 Topic Exchange ( Implement topic exchange using Python )

從上一篇使用 Direct Exchange 的範例中,我們可以很簡單地指定訊息給特定的 Queue。然而,這個方式還不夠靈活,因此 RabbitMQ 團隊也提出另一個更細緻的方式:Topic Exchange。本篇將介紹及討論如何實作 Topic Exchange。( 其他 RabbitMQ 相關教學可以參考本篇整理 )



了解 Topic Exchange
Topic Exchange 與 Direct Exchange 最大的不同就是在於設定 routing_key 上的不同,前者允許你使用萬用字元於 routing_key 中。舉例來說,我們想得到 rabbit.a.info 與 rabbit.b.info 的訊息,這時候我們就可以設定 routing_key='rabbit.*.info'。除了 * 以外,我們還可以使用 #。請參考文件說明:




實作 Topic Exchange
實作部分仍然延續我們之前的 log 記錄系統,假設今天我們需要 2 個 Consumer,一個記錄所有 log 訊息 ( rabbit.# ),另一個只針對 rabbit.*.info 的訊息才記錄。首先,我們先利用之前的 send_task.py
import sys
import pika

# 建立連線
...

# 宣告 Exchange 並設定模式為 topic
channel.exchange_declare(exchange='myExch', type='topic')

# 執行時也要給予 routing_key 參數
# 如:python send_task.py "rabbit.#"
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'rabbit.all.info'

# 訊息內容
msg = ' '.join(sys.argv[2:]) or 'Hello RabbitMQ!'

# 加入 routing_key 設定
channel.basic_publish(exchange='myExch', \
        routing_key=routing_key, body=msg)

print ' [x] Sent %r:%r' % (routing_key, msg)

# 關閉連線
connection.close()
接著來修改 worker.py,內容如下:
import sys
import pika

# 建立連線
...

# 宣告 Exchange 並設定模式為 topic
channel.exchange_declare(exchange='myExch', type='topic')

# 讓 Rabbit 替我們自動建立 Queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 執行時也要給予 routing_key 參數,讓它知道它需要接收哪種訊息
# 如:python worker.py "rabbit.#"
binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usages: %s [binding_key] " % \
            (sys.argv[0])
    sys.exit(1)

# 根據不同的參數設定不同的 routing_key
for binding_key in binding_keys:
    channel.queue_bind(exchange='myExch', queue=queue_name, \
            routing_key=binding_key)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print ' [x] %r:%r' % (method.routing_key, body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()



測試
首先開啟兩個 Consumer,指令如下:
# Terminal 1
python worker.py "rabbit.#"
# Terminal 2
python worker.py "rabbit.*.info"
之後,另外開啟一個 Producer 發送任務,指令如下:
python send_task.py rabbit.a.info "a info msg"
python send_task.py rabbit.b.info "b info msg"
python send_task.py rabbit.c.info "c info msg"
python send_task.py rabbit.d.error "d error msg"
看看兩個 Consumer 是否接收到想要的訊息。


Environment :
  ・ Arch Linux
  ・ Python 2.7

Reference :
  ・ RabbitMQ official site
  ・ Pika docs