暸解 RPC 的工作流程
首先簡單地敘述整個工作流程,由 Client 端發送 Request 至 Queue,Queue 將訊息傳送給 Consumer,Consumer 將任務處理完成後將 Response 傳回至 Queue 再回送給 Client。前半段發送訊息至 Consumer 的部分在之前的文章中都實作過了,至於後半段 Consumer 要怎麼回送訊息至當初發送 Request 的 Client 呢?
RabbitMQ 團隊提出的方式很簡單,我們只需要在 Client 發送訊息時加入兩個參數,分別是 reply_to 與 correlation_id。reply_to 指定了 Consumer 回覆時的 Queue,而 correlation_id 則像是這個訊息的 ID,因為若我們只有一個回覆的 Queue,如何得知哪個 Response 要對應哪個 Client,這時後我們就可以依靠 correlation_id 來辨別。來看一下 RabbitMQ 說明文件中的流程圖:
實作 RPC
此次實作範例很簡單,Client 端單純的傳送文字訊息至指定的 Queue: rpc_queue,然後 Reponse 則是將文字訊息再回傳至 Client。我們先建立 Client 端程式,名稱為 rpc_client.py:
import pika
import uuid
# 建立 RPC Client Class
class MyRpcClient(object):
def __init__(self):
# 建立連線
self.connection = pika.BlockingConnection(\
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
# 宣告 Queue
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
# 設定 Consume 相關參數,加入 Callback
self.channel.basic_consume(self.on_response, no_ack=True,\
queue=self.callback_queue)
# 此函式主要為判斷 correlation_id 是否一致
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
# 此函式主要為發送訊息並等待回傳結果
def call(self, msg):
self.response = None
# 取得 correlation_id
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='', routing_key='rpc_queue',\
properties=pika.BasicProperties(reply_to=self.callback_queue,\
correlation_id=self.corr_id), body=str(msg))
while self.response is None:
self.connection.process_data_events()
return str(self.response)
myRpcClient = MyRpcClient()
print ' [x] Send Request '
# 執行發送訊息
response = myRpcClient.call('Hello RPC!')
print ' [.] Got %s' % (response)
接著來建立 Server 端,檔案名稱為 rpc_server.py,內容如下:
import pika
# 建立連線
connection = pika.BlockingConnection(\
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 宣告 Queue
channel.queue_declare(queue='rpc_queue')
# 此函示將根據 reply_to 回傳訊息
def on_request(ch, method, props, body):
print ' [.] Message: %s' % (body)
response = 'Got a message from the client: ' + body
ch.basic_publish(exchange='', routing_key=props.reply_to,\
properties=pika.BasicProperties(correlation_id=\
props.correlation_id), body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print ' [x] Awaiting RPC requests'
channel.start_consuming()
測試
首先開啟兩個 Consumer,先執行 rpc_server.py ,接著執行 rpc_client.py,看看 Client 端是否接收到剛剛傳出去的訊息。
Environment :
・ Arch Linux
・ Python 2.7
Reference :
・ RabbitMQ official site
・ Pika docs