2015年8月10日

RabbitMQ 教學 (4) - 建立 Work Queue #2 ( Create a work queue using Python #2 )

前一篇介紹如何應用 Work Queue,明明程式運作起來很正常還有什麼需要討論的呢?如果當接收端 ( Consumer ) 意外地停止運作時,未處理完的訊息又該如何處理呢?RabbitMQ 團隊已經想到這個問題了,本篇將討論如何處理 RabbitMQ 接收端意外停止時的解決方案。( 其他 RabbitMQ 相關教學可以參考本篇整理 )



了解 Message Acknowledgment
在完善我們的程式之前,我們需要了解 RabbitMQ 所提供的訊息確認機制稱為 Message Acknowledgment,當接收端收到訊息並處理完後,可以發送通知讓 RabbitMQ 知道該任務結束。反之,接收端意外結束而未回送通知,RabbitMQ 將會在下一次重新發送該任務。


加入 Acknowledgment 機制
利用之前的 worker.py 進行修改,內容如下:
import pika
import time

# 建立連線並宣告使用的 Queue
...

def callback(ch, method, properties, body):
    print ' [x] Received %r' % (body,)
    time.sleep(body.count('.'))
    print ' [x] Done'
    # 設定 acknowledgment
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 將原本 basic_consume 的參數 no_ack = True 刪除
channel.basic_consume(callback, queue='myRabbitMQ')
channel.start_consuming()
根據下面 pika 的說明文件,可以更詳細了解 basic_ack 提供的確認機制,




測試 worker.py
首先開啟兩個 Terminal,一個執行 worker.py,另一個透過執行 send_task.py 來連續發送多個任務。發送任務指令如下:
python send_task.py 1message.
python send_task.py 2message..
python send_task.py 3message...
python send_task.py 4message....
python send_task.py 5message.....
趁所有任務尚未發送出去前,強制將 worker.py 停止。等待 send_task.py 將任務都發送完後,你可以查看有哪些任務等待發送至接收端,指令:
rabbitmqctl list_queues name messages_ready messages_unacknowledged

# 顯示結果應如同下面:
Listing queues ...
myRabbitMQ 3 0
之後再將 worker.py 啟動,就可以看到它繼續處理之前未接收到的任務。使用 Work Queue 的第二部分介紹至此,請接著參考下一篇。


Environment :
  ・ Arch Linux
  ・ Python 2.7

Reference :
  ・ RabbitMQ official site
  ・ Pika docs


熱門文章