了解 Routing
透過 Exchange 傳送訊息時,我們可以透過增加 Routing 目的地( routing_key ) 來指定某些訊息導向特定的 Queue。當你透過 Exchange 傳送訊息至 Queue 時,我們需要一個將 Exchange 與 Queue 綁定的動作,這個動作稱為 Binding。因此我們會在 Binding 動作中加入 routing_key 的設定。需要注意的是,當 Exchange 為 fanout 模式時,routing_key 則會被忽略。設定方式如下:
channel.queue_bind(exchange='', queue='', routing_key='')
了解 Direct Exchange
上面有提到,Exchange 若使用 fanout 模式則 routing_key 會被忽略。因此,我們將使用另一種方式:direct。這個方式的概念很簡單,訊息會根據你在 channel.queue_bind( ) 時設定的 routing_key 傳送給符合的 Queue。另外,在 direct 方式下,我們也可以 Binding 多個 Queue。
實作 Direct Exchange 並加入 Routing
實作部分延續我們之前的 log 記錄系統,假設今天我們需要 2 個 Consumer,一個記錄所有 log 訊息,另一個只針對 Error 的訊息才記錄。首先,我們先利用之前的 send_task.py:
import sys
import pika
# 建立連線
...
# 宣告 Exchange 與 direct 模式
channel.exchange_declare(exchange='myExch', type='direct')
# 執行時需帶入 log 的類別,讓 Exchange 知道 routing_key
# 如: python send_task.py warning "warning message"
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
# 訊息內容
msg = ' '.join(sys.argv[2:]) or 'Hello RabbitMQ!'
# 發送訊息,並指定 routing_key
channel.basic_publish(exchange='myExch', routing_key=severity, body=msg)
print ' [x] Sent %r:%r' % (severity, msg)
# 關閉連線
connection.close()
接著來修改 worker.py,內容如下:
import sys
import pika
# 建立連線
...
# 宣告 Exchange 與 direct 模式
channel.exchange_declare(exchange='myExch', type='direct')
# 透過系統自動建立獨立的 Queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 執行 worker.py 時也要給予 log 類別參數,讓它知道它需要接收哪種訊息
# 如:pyhton worker.py info error
severities = sys.argv[1:]if not severities:
print >> sys.stderr, "Usages: %s [info] [warning] [error]" % \
(sys.argv[0])
sys.exit(1)
# 根據輸入的參數綁定不同的 Exchange 與 Queue
for severity in severities:
channel.queue_bind(exchange='myExch', queue=queue_name, \
routing_key=severity)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print ' [x] %r:%r' % (method.routing_key, body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
測試
首先開啟兩個 Consumer,假設一個只接收 error 的訊息,另一個則是所有的訊息。指令如下:
# Terminal 1
python worker.py error
# Terminal 2
python worker.py info warning error
之後,另外開啟一個 Producer 發送任務,指令如下:
python send_task.py info "Info message"
python send_task.py error "Error message"
python send_task.py warning "Warning message"
看看兩個 Consumer 是否收到該接收到的訊息。
Environment :
・ Arch Linux
・ Python 2.7
Reference :
・ RabbitMQ official site
・ Pika docs