本文所有內(nèi)容均個(gè)人從RabbitMQ官網(wǎng)教程中翻譯,若圖片文字的引用有任何侵權(quán)的地方,聯(lián)系我,我會(huì)立馬刪除。
This article was translated from RabbitMQ Official Tutorials by myself,and if this article and the images in this article have any infringement,please contact to me, and i will delete them.
遠(yuǎn)程程序調(diào)用(Remote Procedure Call(RPC))
(使用php-amqplib)
在第二個(gè)教程中我們學(xué)習(xí)了使用工作隊(duì)列(Work Queues)在多個(gè)工作程序中分配耗時(shí)任務(wù)。
但是如果我們需要在遠(yuǎn)程計(jì)算機(jī)上運(yùn)行一個(gè)函數(shù)并等待它的結(jié)果呢?那是另外一回事了。這種模式通常被成為遠(yuǎn)程程序調(diào)用(Remote Procedure Call)或者簡(jiǎn)稱為PRC。
在這一教程我們準(zhǔn)備使用RabbitMQ來建立一個(gè)RPC系統(tǒng):一個(gè)客戶端和一個(gè)可擴(kuò)展的PRC服務(wù)器。由于我們沒有一些值得分發(fā)的耗時(shí)任務(wù),我們準(zhǔn)備建立一個(gè)會(huì)返回斐波那契數(shù)字(Fibonacci)的愚蠢的(dummy?)RPC服務(wù)。
客戶端接口
為了說明一個(gè)RPC服務(wù)能被如何使用,我們準(zhǔn)備創(chuàng)建一個(gè)簡(jiǎn)單的客戶端類。它會(huì)暴露一個(gè)叫做call的方法用于發(fā)送一個(gè)RPC請(qǐng)求,然后一直阻塞直到接收到應(yīng)答:
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
關(guān)于RPC的一點(diǎn)
盡管RPC是計(jì)算中一個(gè)十分普通的常見的模式,但它常常被批評(píng)。當(dāng)程序員沒有意識(shí)到一個(gè)函數(shù)是本地調(diào)用還是一個(gè)緩慢的RPC(請(qǐng)求)時(shí)候問題就會(huì)出現(xiàn)。這樣的困惑導(dǎo)致了一個(gè)不可預(yù)知的系統(tǒng)以及為調(diào)試增加比必要的復(fù)雜性。和簡(jiǎn)化軟件不同,濫用RPC將會(huì)導(dǎo)致代碼像一團(tuán)意大利面一樣難以維護(hù)。
將下面的建議銘記于心:
- 確保能顯而易見地區(qū)分哪個(gè)函數(shù)是本地調(diào)用哪個(gè)是遠(yuǎn)程。
- 為你的系統(tǒng)書寫文檔。確保組件間的依賴關(guān)系清晰。
- 處理錯(cuò)誤情況。當(dāng)RPC服務(wù)器掛掉了很長(zhǎng)的一段時(shí)間,客戶端應(yīng)該如何做出反應(yīng)?
有疑問時(shí)應(yīng)避免RPC。如果可以,你應(yīng)該使用一個(gè)異步的管道——而不是像會(huì)造成阻塞的這樣的RPC,結(jié)果會(huì)被異步地推送到下一個(gè)計(jì)算階段。
回調(diào)隊(duì)列(Callback Queue)
一般情況下,實(shí)現(xiàn)通過RabbitMQ實(shí)現(xiàn)RPC是很容易的。一個(gè)客戶端發(fā)送一個(gè)請(qǐng)求消息然后一個(gè)服務(wù)端回復(fù)一個(gè)響應(yīng)消息。為了去接收一個(gè)響應(yīng),我們需要隨請(qǐng)求發(fā)送一個(gè)“回調(diào)”隊(duì)列地址。我們使用默認(rèn)Queue(隊(duì)列)。讓我們一起來試試吧:
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$msg = new AMQPMessage(
$payload,
array('reply_to' => $queue_name));
$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
消息屬性
AMQP 0-9-1協(xié)議的消息預(yù)定義了一套十四個(gè)屬性。這些屬性的大部分都很少使用,除了以下的這些:
delivery_mode:標(biāo)記一條消息為持久化的(設(shè)為2時(shí))或者瞬時(shí)性的(設(shè)為1時(shí))。你可能還記得我們?cè)诘诙€(gè)教程中由使用過。content_type:用來描述編碼的mime-type。例如對(duì)于經(jīng)常使用的JSON編碼,它是一個(gè)用來將這個(gè)屬性設(shè)為application/json的很好的一個(gè)實(shí)例。reply_to:通常被用來命名一個(gè)回調(diào)隊(duì)列。correlation_id:對(duì)于RPC的響應(yīng)與請(qǐng)求間的關(guān)聯(lián)很有用。
關(guān)聯(lián)ID(Correlation ID)
上面提及的方法中我們建議為每一個(gè)RPC請(qǐng)求都創(chuàng)建一個(gè)回調(diào)隊(duì)列。這是十分低效率的,但幸運(yùn)的是,這里有一個(gè)更好的方法,讓我們?yōu)槊總€(gè)客戶端創(chuàng)建一個(gè)單一的回調(diào)隊(duì)列。
這引發(fā)了一個(gè)新問題,在一個(gè)隊(duì)列中接收到一個(gè)響應(yīng)后,并不清楚這個(gè)響應(yīng)應(yīng)該屬于哪一個(gè)請(qǐng)求。這正式使用correlation_id屬性的時(shí)候。我們準(zhǔn)備把這個(gè)值對(duì)于每一個(gè)請(qǐng)求來說都是唯一的。稍后,當(dāng)我們從回調(diào)隊(duì)列接收到一個(gè)消息我們會(huì)查看這一屬性,然后根據(jù)它我們將能夠匹配請(qǐng)求到對(duì)應(yīng)的響應(yīng)。如果我們看見了一個(gè)未知的correlation_id值,我們可以安全地丟棄這一消息,因?yàn)樗粚儆谖覀兊娜魏我粋€(gè)請(qǐng)求。
你可能會(huì)問,在回調(diào)隊(duì)列中,為什么我們要忽視未知的消息,而不是拋出異常?因?yàn)檫@可能是服務(wù)器的競(jìng)爭(zhēng)情況。盡管不太可能,這有可能是RPC服務(wù)器在發(fā)送回應(yīng)答給我們后就立馬掛掉了,但這卻是在為請(qǐng)求發(fā)送一個(gè)確認(rèn)消息前。如果這一情況發(fā)生,重啟后的RPC服務(wù)器將會(huì)再次處理請(qǐng)求。這就是為什么在客戶端中我們必須優(yōu)雅地處理重復(fù)響應(yīng),而RPC在理想情況中應(yīng)該是冪等的。
總結(jié)

我們的RPC將會(huì)像這樣子工作:
當(dāng)客戶端Client開始運(yùn)行,讓創(chuàng)建了一個(gè)匿名的獨(dú)占回調(diào)隊(duì)列。
對(duì)于一個(gè)RPC請(qǐng)求,客戶端Client發(fā)送一條帶有兩個(gè)屬性的消息:設(shè)置回調(diào)隊(duì)列的reply_to與對(duì)于每個(gè)請(qǐng)求都是唯一的值的correlation_id。
請(qǐng)求被發(fā)送到一個(gè)rpc_queueQueue(隊(duì)列)。
RPC處理程序(又名服務(wù))將會(huì)等待來自該隊(duì)列的請(qǐng)求。當(dāng)一個(gè)請(qǐng)求出現(xiàn),它會(huì)處理工作并通過reply_to參數(shù)指明的Queue(隊(duì)列)發(fā)送一條帶有結(jié)果的消息回服務(wù)端Client。
客戶端將會(huì)在回調(diào)隊(duì)列等待數(shù)據(jù)。當(dāng)一條消息出現(xiàn),它會(huì)檢查correlation_id屬性。如果它匹配了請(qǐng)求的值,它將會(huì)返回響應(yīng)到應(yīng)用。
將他們一起運(yùn)行
斐波那契(Fibonacci)任務(wù)代碼:
function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
}
我們聲明我們的斐波那契函數(shù)。它只接收有效的正整數(shù)輸入。(不要期望這一個(gè)函數(shù)處理很大的數(shù),這可能是最慢的遞歸實(shí)現(xiàn)了。)
我們的RPC服務(wù)rpc_server.php的代碼如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requests\n";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
$req->delivery_info['channel']->basic_publish(
$msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
服務(wù)端代碼同樣十分容易理解:
像往常一樣我們建立了連接、頻道與定義了隊(duì)列。
我們可能希望運(yùn)行多于一個(gè)的服務(wù)進(jìn)程。為了將負(fù)載均衡地分布在多個(gè)服務(wù),我們需要在$channel.basic_qos方法中設(shè)置prefetch_count設(shè)置參數(shù)。
我們使用bask_consume函數(shù)去訪問隊(duì)列。然后我們進(jìn)入了呆呆請(qǐng)求消息的while循環(huán),處理重做然后發(fā)會(huì)響應(yīng)。
我們的RPC客戶端代碼rpc_client.php如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false);
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
array($this, 'on_response'));
}
public function on_response($rep) {
if($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($n) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
};
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
?>
現(xiàn)在是時(shí)候看一看完整的示例代碼rpc_client.php與rpc_server.php。
我們的RPC服務(wù)現(xiàn)在已經(jīng)準(zhǔn)備好了。我們可以開啟服務(wù):
php rpc_server.php
# => [x] A waiting PRC requests
為了請(qǐng)求以個(gè)斐波那契數(shù)字,我們運(yùn)行客戶端:
php rpc_client.php
# => [x] Requesting fib(30)
這里介紹的設(shè)計(jì)并不是RPC服務(wù)的唯一實(shí)現(xiàn),但它有一些重要的優(yōu)點(diǎn):
如果RPC服務(wù)非常緩慢,你可以通過運(yùn)行另外一個(gè)服務(wù)來擴(kuò)展。嘗試一下運(yùn)行第二個(gè)rpc_server.php在一個(gè)新的控制臺(tái)。
在客戶端,RPC只需要發(fā)送和接收單一消息。不需要類似queue_declare的異步調(diào)用。因此RPC的每一個(gè)RPC請(qǐng)求都只需要一次網(wǎng)絡(luò)往返。
我們的代碼仍然十分簡(jiǎn)單,且沒有嘗試去解決更復(fù)雜(卻很重要)的問題,例如:
- 如果沒有服務(wù)正在運(yùn)行,客戶端應(yīng)該如何反應(yīng)。
- RPC客戶端應(yīng)該由超時(shí)機(jī)制嗎?
- 如果服務(wù)器發(fā)生故障或者異常,是否將其轉(zhuǎn)發(fā)給客戶端?
- 在處理前防止無效的傳入消息(如檢查邊界、類型)。
如果你想進(jìn)行實(shí)驗(yàn),可能會(huì)發(fā)現(xiàn)一個(gè)UI管理對(duì)于查看隊(duì)列非常有用