2015年8月13日

RabbitMQ 教學 (8) - 應用 Direct Exchange ( Apply direct exchange using Python )

本篇將基於上一篇的 Publish / Subscribe 模式延伸討論,從上一篇文章可以暸解透過廣播的方式傳送訊息,讓所有 Consumer 取得相同的訊息。但是,若我們需要針對不同的訊息類別導向特定的 Queue 時,我們則需要運用 Routing 的概念。( 其他 RabbitMQ 相關教學可以參考本篇整理 )



了解 Routing
透過 Exchange 傳送訊息時,我們可以透過增加 Routing 目的地( routing_key ) 來指定某些訊息導向特定的 Queue。當你透過 Exchange 傳送訊息至 Queue 時,我們需要一個將 Exchange 與 Queue 綁定的動作,這個動作稱為 Binding。因此我們會在 Binding 動作中加入 routing_key 的設定。需要注意的是,當 Exchange 為 fanout 模式時,routing_key 則會被忽略。設定方式如下:
channel.queue_bind(exchange='', queue='', routing_key='')


了解 Direct Exchange
上面有提到,Exchange 若使用 fanout 模式則 routing_key 會被忽略。因此,我們將使用另一種方式:direct。這個方式的概念很簡單,訊息會根據你在 channel.queue_bind( ) 時設定的 routing_key 傳送給符合的 Queue。另外,在 direct 方式下,我們也可以 Binding 多個 Queue。




實作 Direct Exchange 並加入 Routing
實作部分延續我們之前的 log 記錄系統,假設今天我們需要 2 個 Consumer,一個記錄所有 log 訊息,另一個只針對 Error 的訊息才記錄。首先,我們先利用之前的 send_task.py
import sys
import pika

# 建立連線
...

# 宣告 Exchange 與 direct 模式
channel.exchange_declare(exchange='myExch', type='direct')

# 執行時需帶入 log 的類別,讓 Exchange 知道 routing_key
# 如: python send_task.py warning "warning message"
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

# 訊息內容
msg = ' '.join(sys.argv[2:]) or 'Hello RabbitMQ!'

# 發送訊息,並指定 routing_key
channel.basic_publish(exchange='myExch', routing_key=severity, body=msg)
print ' [x] Sent %r:%r' % (severity, msg)

# 關閉連線
connection.close()
接著來修改 worker.py,內容如下:
import sys
import pika

# 建立連線
...

# 宣告 Exchange 與 direct 模式
channel.exchange_declare(exchange='myExch', type='direct')

# 透過系統自動建立獨立的 Queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 執行 worker.py 時也要給予 log 類別參數,讓它知道它需要接收哪種訊息
# 如:pyhton worker.py info error
severities = sys.argv[1:]if not severities:
    print >> sys.stderr, "Usages: %s [info] [warning] [error]" % \
            (sys.argv[0])
    sys.exit(1)

# 根據輸入的參數綁定不同的 Exchange 與 Queue
for severity in severities:
    channel.queue_bind(exchange='myExch', queue=queue_name, \
            routing_key=severity)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print ' [x] %r:%r' % (method.routing_key, body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()



測試
首先開啟兩個 Consumer,假設一個只接收 error 的訊息,另一個則是所有的訊息。指令如下:
# Terminal 1
python worker.py error
# Terminal 2
python worker.py info warning error
之後,另外開啟一個 Producer 發送任務,指令如下:
python send_task.py info "Info message"
python send_task.py error "Error message"
python send_task.py warning "Warning message"
看看兩個 Consumer 是否收到該接收到的訊息。


Environment :
  ・ Arch Linux
  ・ Python 2.7

Reference :
  ・ RabbitMQ official site
  ・ Pika docs