2015年8月15日

RabbitMQ 教學 (10) - 實作 RPC ( Implement RPC over RabbitMQ using Python )

先前文章都是討論比較單純架構下的情況,若我們需要遠端執行程式於其他的機器上時,我們也可以透過 RabbitMQ 來完成。本篇將討論如何透過 RabbitMQ 執行 Remote Procedure Call。( 其他 RabbitMQ 相關教學可以參考本篇整理 )



暸解 RPC 的工作流程
首先簡單地敘述整個工作流程,由 Client 端發送 Request 至 Queue,Queue 將訊息傳送給 Consumer,Consumer 將任務處理完成後將 Response 傳回至 Queue 再回送給 Client。前半段發送訊息至 Consumer 的部分在之前的文章中都實作過了,至於後半段 Consumer 要怎麼回送訊息至當初發送 Request 的 Client 呢?

RabbitMQ 團隊提出的方式很簡單,我們只需要在 Client 發送訊息時加入兩個參數,分別是 reply_tocorrelation_id。reply_to 指定了 Consumer 回覆時的 Queue,而 correlation_id 則像是這個訊息的 ID,因為若我們只有一個回覆的 Queue,如何得知哪個 Response 要對應哪個 Client,這時後我們就可以依靠 correlation_id 來辨別。來看一下 RabbitMQ 說明文件中的流程圖:




實作 RPC
此次實作範例很簡單,Client 端單純的傳送文字訊息至指定的 Queue: rpc_queue,然後 Reponse 則是將文字訊息再回傳至 Client。我們先建立 Client 端程式,名稱為 rpc_client.py
import pika
import uuid

# 建立 RPC Client Class
class MyRpcClient(object):
    def __init__(self):
        # 建立連線
        self.connection = pika.BlockingConnection(\
                pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

        # 宣告 Queue
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        # 設定 Consume 相關參數,加入 Callback
        self.channel.basic_consume(self.on_response, no_ack=True,\
                queue=self.callback_queue)

    # 此函式主要為判斷 correlation_id 是否一致
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    # 此函式主要為發送訊息並等待回傳結果
    def call(self, msg):
        self.response = None
        # 取得 correlation_id
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='', routing_key='rpc_queue',\
            properties=pika.BasicProperties(reply_to=self.callback_queue,\
                correlation_id=self.corr_id), body=str(msg))

        while self.response is None:
            self.connection.process_data_events()
        return str(self.response)

myRpcClient = MyRpcClient()
print ' [x] Send Request '
# 執行發送訊息
response = myRpcClient.call('Hello RPC!')
print ' [.] Got %s' % (response)
接著來建立 Server 端,檔案名稱為 rpc_server.py,內容如下:
import pika

# 建立連線
connection = pika.BlockingConnection(\
        pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 宣告 Queue
channel.queue_declare(queue='rpc_queue')

# 此函示將根據 reply_to 回傳訊息
def on_request(ch, method, props, body):
    print ' [.] Message: %s' % (body)
    response = 'Got a message from the client: ' + body

    ch.basic_publish(exchange='', routing_key=props.reply_to,\
            properties=pika.BasicProperties(correlation_id=\
                props.correlation_id), body=str(response))

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print ' [x] Awaiting RPC requests'
channel.start_consuming()



測試
首先開啟兩個 Consumer,先執行 rpc_server.py ,接著執行 rpc_client.py,看看 Client 端是否接收到剛剛傳出去的訊息。


Environment :
  ・ Arch Linux
  ・ Python 2.7

Reference :
  ・ RabbitMQ official site
  ・ Pika docs


熱門文章