了解 Message Durability
在進一步調整程式之前,我們需要了解 RabbitMQ 所提供的訊息保存機制稱為 Message Durability,當 Server 意外停止時,透過這個機制保存 Queue 與待發送的訊息,如此一來當 Server 重新運作就可以根據這些紀錄,確保訊息會被派送出去。
加入 Durability 機制
利用之前的 worker.py 進行修改,加入保存 Queue 的機制,內容如下:
import pika
import time
# 建立連線
...
# 宣告使用的 Queue 並加入 durable = True 確保此 Queue 會被保存下來
channel.queue_declare(queue='myRabbitMQ', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print ' [x] Received %r' % (body,)
time.sleep(body.count('.'))
print ' [x] Done'
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue='myRabbitMQ')
channel.start_consuming()
一樣利用之前的 send_task.py 進行修改,加入保存機制,內容如下:
import sys
import pika
# 建立連線
...
# 宣告使用的 Queue 並加入 durable = True 確保此 Queue 會被保存下來
channel.queue_declare(queue='myRabbitMQ', durable=True)
# 訊息內容
msg = ' '.join(sys.argv[1:]) or 'Hello RabbitMQ!'
# 加入 properties 參數,設定 delivery_mode = 2,確保訊息會被保存
channel.basic_publish(exchange='', routing_key='myRabbitMQ', body=msg, properties=pika.BasicProperties(delivery_mode = 2))
print ' [x] Sent %r' % (msg,)
# 關閉連線
connection.close()
測試
開啟兩個 Terminal,一個執行 worker.py,另一個透過執行 send_task.py 來連續發送多個任務。發送任務指令如下:
python send_task.py 1message.
python send_task.py 2message..
python send_task.py 3message...
python send_task.py 4message....
python send_task.py 5message.....
趁所有訊息發送執行中,強制將 Server 停止。重新啟動 Server 後,你可以查看有哪些 Queue 與訊息等待發送,指令:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
之後再將 worker.py 啟動,就可以看到它繼續處理之前未接收到的任務。
值得注意的事,RabbitMQ 文件中有提到,當 RabbitMQ 接收到訊息到將訊息保存中間會有間隔一小段時間,那 Server 在訊息被儲存之前中斷的話不就會遺失該訊息?的確,但 RabbitMQ 團隊認為上面提到的保存機制足以應付基本的情況。若你需要更完善的機制,可以參考 Publisher Acknowledgements。使用 Work Queue 的第三部分介紹至此,請接著參考下一篇。
Environment :
・ Arch Linux
・ Python 2.7
Reference :
・ RabbitMQ official site
・ Pika docs