了解 Work Queue
在開始動手之前,試著想像 Work Queue 的概念有點 "類似" 銀行、郵局服務客戶的方式,客戶先後抽取號碼牌等待就像依序進入 Queue,由多個服務人員分別地來替所有的客戶服務,因此服務人員越多處理需求的時間就可以大幅度縮短。但需注意的是,Work Queue 是運用 Round-robin 方式派送工作給每一個接收端。
發送與接收處理
我們將利用上一篇的發送端 send.py 改寫,但這邊將改名為 send_task.py:
import sys
import pika
# 建立連線與宣告使用的 Queue
...
# 讓 User 輸入要傳送的訊息內容
msg = ' '.join(sys.argv[1:]) or 'Hello RabbitMQ!'
channel.basic_publish(exchange='', routing_key='myRabbitMQ', body=msg)
print ' [x] Sent %r' % (msg,)
# 關閉連線
connection.close()
接著建立處理任務的接收端,一樣也是修改上一篇的 receive.py ,並改名為 worker.py:
import pika
import time
# 建立連線與宣告使用的 Queue
...
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print ' [x] Received %r' % (body,)
time.sleep(body.count('.'))
print ' [x] Done'
channel.basic_consume(callback, queue='myRabbitMQ', no_ack=True)
channel.start_consuming()
你可以看到在 worker.py 中加入 time.sleep( ),這邊是為了模擬真實環境處理任務的時間,根據訊息中有幾個 . 符號就會 sleep 多久,如此一來待會測試時才可以明顯感受到 Work Queue 的工作方式。
測試 send_task.py 與 worker.py
假設我們有 5 個任務由 2 個服務人員來處理,因此我們需要開啟三個視窗,其中兩個都各執行 worker.py,一個透過執行 send_task.py 來發送 5 個任務。發送任務指令如下:
python send_task.py 1message.
python send_task.py 2message..
python send_task.py 3message...
python send_task.py 4message....
python send_task.py 5message.....
接著你就可以看到兩個 worker 分別顯示如下:
worker1
[*] Waiting for messages. To exit press CTRL+C
[x] Received '1message.'
[x] Done
[x] Received '3message...'
[x] Done
[x] Received '5message.....'
[x] Done
worker2
[*] Waiting for messages. To exit press CTRL+C
[x] Received '2message..'
[x] Done
[x] Received '4message....'
[x] Done
使用 Work Queue 的第一個部分介紹至此,請繼續閱讀下一篇。
Environment :
・ Arch Linux
・ Python 2.7
Reference :
・ RabbitMQ official site
・ Pika docs