2015年8月10日

RabbitMQ 教學 (5) - 建立 Work Queue #3 ( Create a work queue using Python #3 )

前一篇介紹了 Message Acknowledgment 機制,但該機制只解決接收端 ( Consumer ) 意外地停止時的狀況。若是 RabbitMQ Server 意外停止運作,Queue 本身與存在於 Queue 裡的訊息該怎麼處理呢?本篇將討論處理 RabbitMQ Server 意外停止時的解決方案。( 其他 RabbitMQ 相關教學可以參考本篇整理 )



了解 Message Durability
在進一步調整程式之前,我們需要了解 RabbitMQ 所提供的訊息保存機制稱為 Message Durability,當 Server 意外停止時,透過這個機制保存 Queue 與待發送的訊息,如此一來當 Server 重新運作就可以根據這些紀錄,確保訊息會被派送出去。


加入 Durability 機制
利用之前的 worker.py 進行修改,加入保存 Queue 的機制,內容如下:
import pika
import time

# 建立連線
...

# 宣告使用的 Queue 並加入 durable = True 確保此 Queue 會被保存下來
channel.queue_declare(queue='myRabbitMQ', durable=True)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print ' [x] Received %r' % (body,)
    time.sleep(body.count('.'))
    print ' [x] Done'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue='myRabbitMQ')
channel.start_consuming()
一樣利用之前的 send_task.py 進行修改,加入保存機制,內容如下:
import sys
import pika

# 建立連線
...

# 宣告使用的 Queue 並加入 durable = True 確保此 Queue 會被保存下來
channel.queue_declare(queue='myRabbitMQ', durable=True)

# 訊息內容
msg = ' '.join(sys.argv[1:]) or 'Hello RabbitMQ!'

# 加入 properties 參數,設定 delivery_mode = 2,確保訊息會被保存
channel.basic_publish(exchange='', routing_key='myRabbitMQ', body=msg, properties=pika.BasicProperties(delivery_mode = 2))
print ' [x] Sent %r' % (msg,)

# 關閉連線
connection.close()


測試
開啟兩個 Terminal,一個執行 worker.py,另一個透過執行 send_task.py 來連續發送多個任務。發送任務指令如下:
python send_task.py 1message.
python send_task.py 2message..
python send_task.py 3message...
python send_task.py 4message....
python send_task.py 5message.....
趁所有訊息發送執行中,強制將 Server 停止。重新啟動 Server 後,你可以查看有哪些 Queue 與訊息等待發送,指令:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
之後再將 worker.py 啟動,就可以看到它繼續處理之前未接收到的任務。

值得注意的事,RabbitMQ 文件中有提到,當 RabbitMQ 接收到訊息到將訊息保存中間會有間隔一小段時間,那 Server 在訊息被儲存之前中斷的話不就會遺失該訊息?的確,但 RabbitMQ 團隊認為上面提到的保存機制足以應付基本的情況。若你需要更完善的機制,可以參考 Publisher Acknowledgements。使用 Work Queue 的第三部分介紹至此,請接著參考下一篇。


Environment :
  ・ Arch Linux
  ・ Python 2.7

Reference :
  ・ RabbitMQ official site
  ・ Pika docs