了解 Message Acknowledgment
在完善我們的程式之前,我們需要了解 RabbitMQ 所提供的訊息確認機制稱為 Message Acknowledgment,當接收端收到訊息並處理完後,可以發送通知讓 RabbitMQ 知道該任務結束。反之,接收端意外結束而未回送通知,RabbitMQ 將會在下一次重新發送該任務。
加入 Acknowledgment 機制
利用之前的 worker.py 進行修改,內容如下:
import pika
import time
# 建立連線並宣告使用的 Queue
...
def callback(ch, method, properties, body):
print ' [x] Received %r' % (body,)
time.sleep(body.count('.'))
print ' [x] Done'
# 設定 acknowledgment
ch.basic_ack(delivery_tag = method.delivery_tag)
# 將原本 basic_consume 的參數 no_ack = True 刪除
channel.basic_consume(callback, queue='myRabbitMQ')
channel.start_consuming()
根據下面 pika 的說明文件,可以更詳細了解 basic_ack 提供的確認機制,測試 worker.py
首先開啟兩個 Terminal,一個執行 worker.py,另一個透過執行 send_task.py 來連續發送多個任務。發送任務指令如下:
python send_task.py 1message.
python send_task.py 2message..
python send_task.py 3message...
python send_task.py 4message....
python send_task.py 5message.....
趁所有任務尚未發送出去前,強制將 worker.py 停止。等待 send_task.py 將任務都發送完後,你可以查看有哪些任務等待發送至接收端,指令:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# 顯示結果應如同下面:
Listing queues ...
myRabbitMQ 3 0
之後再將 worker.py 啟動,就可以看到它繼續處理之前未接收到的任務。使用 Work Queue 的第二部分介紹至此,請接著參考下一篇。
Environment :
・ Arch Linux
・ Python 2.7
Reference :
・ RabbitMQ official site
・ Pika docs