4.3萬字詳解PHP+RabbitMQ(AMQP協(xié)議、通訊架構(gòu)、6大模式、交換機(jī)隊(duì)列消息持久化、死信隊(duì)列、延時(shí)隊(duì)列、消息丟失、重復(fù)消費(fèi)、消息應(yīng)答、消息應(yīng)答、發(fā)布確認(rèn)、故障轉(zhuǎn)移、不公平分發(fā)、優(yōu)先級、等)

理論(后半部分有實(shí)操詳解)

哲學(xué)思考

  • 易經(jīng)思維:向各國人講述一種動物叫烏龜,要學(xué)很久的各國語言,但是隨手畫一個(gè)烏龜,全世界的人都能看得懂。
  • 道家思維:努力沒有用(指勞神費(fèi)心的機(jī)械性重復(fù)、肢體受累、刻意行為),要用心(深度思考、去感悟、透過現(xiàn)象看本質(zhì))才有用。
  • 舉例:類似中學(xué)做不出來的幾何題的底層原理:不是不知道xx定理或公式(招式),而是不知道畫輔助線的思路(內(nèi)功)。
  • 總結(jié):萬事萬物、用道家思維思考本質(zhì)與規(guī)律,用易經(jīng)思維從眾多信息中簡化模型練出來的內(nèi)功,叫覺悟(內(nèi)力),而知識僅僅是外功(招式)。

做研發(fā)的,可能距離成功一步之遙,別因?yàn)橐蝗~障目而放棄。

消息隊(duì)列與消息中間件

  • 消息隊(duì)列:一種存儲消息的數(shù)據(jù)結(jié)構(gòu),消息隊(duì)列通常是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),可以確保消息的順序傳遞。類比Redis的List,先進(jìn)先出類比lPush和rPop。
  • 消息中間件:是管理消息傳遞的一種組件。功能包含消息隊(duì)列。
  • 中間件:中間件是指位于客戶端(或調(diào)用端)和服務(wù)端之間的組件,用于協(xié)調(diào)、存儲、接口、管理組件之間的通信,類比賣家與買家之間的快遞驛站,不是一定要用,但是最好得有。

AMQP協(xié)議

AMQP(Advanced Message Queuing Protocol),是一種用于消息傳遞協(xié)議,類比成HTTP、TCP、UDP這種不同的協(xié)議就行。它定義了消息中間件客戶端與服務(wù)端(買家買家對驛站的溝通)的通信規(guī)則(怎么運(yùn)快遞),包括消息格式(什么類型的快遞)、消息發(fā)布(怎么發(fā)快遞)、消息訂閱(怎么收快遞)、隊(duì)列管理(怎么處理快遞)等。

AMQP高效就高效在把通信(物流階段)能遇到的問題都解決了。

  • 異步通信:AMQP支持異步通信,可以讓消息發(fā)送者和接收者在不同的時(shí)間和速度進(jìn)行消息傳遞。
  • 消息路由:AMQP協(xié)議定義了靈活的消息路由規(guī)則,可以根據(jù)消息內(nèi)容自動將消息路由到指定的接收者。
  • 消息確認(rèn):AMQP支持消息確認(rèn)機(jī)制,確保消息被可靠地接收。
  • 批量處理:AMQP協(xié)議支持批量消息處理,可以同時(shí)發(fā)送和接收多個(gè)消息,減少了網(wǎng)絡(luò)通信的開銷和系統(tǒng)的資源消耗。

RabbitMQ

  • 官方文檔:https://www.rabbitmq.com/docs
  • 極簡概括:使用ErLang語言寫基于AMQP協(xié)議的C/S架構(gòu)的消息中間件,用于不同組件之間高效的傳遞數(shù)據(jù)。
  • 解決問題:
    • 削峰:賣家一家伙運(yùn)了幾十噸的貨(海量請求)、買家沒地方存、扛不住了,那就放驛站緩沖一下。
    • 解耦:沒有驛站中轉(zhuǎn),快遞送過來硬塞給買家,買家不在,這個(gè)流程就走不下去(耦合)。
    • 異步:賣家只要發(fā)貨、流程基本走完,剩下的流程交給物流和驛站(中間件),不影響賣家做其它事(非阻塞),買家也一樣、快遞放驛站、我忙我的。需要時(shí)再?。ò葱栌嗛啠?。
    • 卷:面試加分項(xiàng),工作用到減輕負(fù)擔(dān)。只想卷死各位,或者被各位卷死。
  • 適用場景
    • 流量削峰:當(dāng)系統(tǒng)抗不出海量的請求的時(shí)候,把MQ放置在用戶端與業(yè)務(wù)端之間(強(qiáng)制排隊(duì))削去部分峰值流量(Nginx令牌桶和滴水算法、或基于Redis也能實(shí)現(xiàn)),這個(gè)過程和加鎖的缺點(diǎn)差不多,性能會受影響。
    • 應(yīng)用解耦:小型項(xiàng)目用不上,大型項(xiàng)目中,庫存、訂單、支付、物流等各種模塊,為了防止硬關(guān)聯(lián)耦合度大,一節(jié)點(diǎn)掛掉其余癱瘓,所以用MQ作為通信的橋梁。
    • 異步處理:用的最多,異步發(fā)短信,發(fā)郵件、導(dǎo)出導(dǎo)出文件、延時(shí)任務(wù)、自動取消訂單、推送通知、回調(diào)重試等。
    • 助力跳槽漲薪:只想卷死各位,或者被各位卷死。
    • 賦能成就感:RabbitMQ都用上了,感覺離架構(gòu)師不遠(yuǎn)了(呵呵)。
  • 優(yōu)點(diǎn):
    • 應(yīng)用場景就是優(yōu)點(diǎn),主流。
    • 豐富的客戶端支持,支持PHP、Java、Python、Ruby、C、Golang、NodeJS、.NET、Objective-C、Swift。
  • 缺點(diǎn):
    • 性能下降:流量削峰引起的性能下降。
    • 運(yùn)維成本:多引入一個(gè)組件,就要考慮它的運(yùn)維成本,以及各項(xiàng)配置問題。
    • 大數(shù)據(jù)處理:對于大數(shù)據(jù)、流數(shù)據(jù)、日志分析、更適合Kafka,RabbitMQ性能會下降。
    • 安裝困難:成熟的軟件安裝不方便,Erlang環(huán)境依賴強(qiáng),其次是官網(wǎng)還把CentOS7的安裝去掉了。
  • 同類產(chǎn)品:RocketMQ(僅支持Java和C++)、ActiveMQ(后期停止維護(hù))、Kafka(大數(shù)據(jù)場景)、基于Redis實(shí)現(xiàn)的任務(wù)隊(duì)列(輕量級使用)、編程語言框架提供的支持(Laravel Queue)、AWSMQ(亞馬遜)、ApsaraMQ(阿里巴巴)、PulsarMQ(Apache)。

工作架構(gòu)圖

5ae53f37e4924f50b3ed4cdf2ea3cbb7.png
  • Producer:生產(chǎn)者,類比賣家。
  • Connection:客戶端與服務(wù)端的通信層。
  • Channel:這里叫信道,類比要發(fā)那個(gè)快遞,或從那家快遞取貨。
  • Broker:用于接受和分發(fā)消息的處理,這是MQ Server要干的事,類比驛站。
  • Exchange:交換機(jī)(不是switch),指定規(guī)則路由到那個(gè)隊(duì)列,類比分配到那個(gè)貨架的方法。
  • Consumer:消費(fèi)者,類比買家。
  • 補(bǔ)充:每次訪問RabbitMQ都需要建立一個(gè)連接,海量的請求對于這塊的開銷是絕大的,所以需要在channel與connection內(nèi)部建立邏輯連接,從而減少性能損耗。
  • 多租戶設(shè)計(jì):每個(gè)Block里面可以多個(gè)Vhost,Vhost里面,可以有多個(gè)Exchange(交換機(jī)),每個(gè)Exchange可以有多個(gè)Queue。

四大概念的通俗理解

  • 生產(chǎn)者:顧名思義生產(chǎn)數(shù)據(jù)的角色,類比賣家發(fā)快遞。
  • 消費(fèi)者:生產(chǎn)者產(chǎn)出的數(shù)據(jù),給消費(fèi)者使用,類比買家收快遞。
  • 交換機(jī):,用于接受生產(chǎn)者的消息,通過指定模式(4大模式)和路由鍵(交換機(jī)與隊(duì)列綁定標(biāo)識符)分發(fā)給隊(duì)列,一個(gè)交換機(jī)可綁定多個(gè)隊(duì)列,將消息路由到多個(gè)隊(duì)列,類比快遞驛站的分發(fā)到那個(gè)貨架。
  • 隊(duì)列:一種數(shù)據(jù)結(jié)構(gòu),存放消息隊(duì)列,類比快遞驛站的貨架。
  • 流程:生產(chǎn)者(發(fā)快遞)->交換機(jī)(驛站怎么分類)->隊(duì)列(驛站怎么存)->消費(fèi)者(拿快遞)。

Exchange(交換機(jī))

  • 極簡概括:位于生產(chǎn)者和隊(duì)列之間,用于接受生產(chǎn)者的消息,并分發(fā)給隊(duì)列,一個(gè)交換機(jī)可綁定多個(gè)隊(duì)列,將消息路由到多個(gè)隊(duì)列。
  • 解決問題:生產(chǎn)者發(fā)一條消息,讓所有或指定消費(fèi)者能夠收到(類似廣播)。如果沒有交換機(jī)機(jī)制,只會有一個(gè)消費(fèi)者能收到此消息。
  • 交換機(jī)4大類型:直接(direct)類型、主題(topic)類型、頭(headers)類型、扇出(fanout)類型(下文有詳解)。
  • 補(bǔ)充:在常規(guī)模式,工作隊(duì)列模式(生產(chǎn)者端,basic_publish方法參數(shù)2為空字符串),也有一個(gè)默認(rèn)類型交換機(jī),或稱之為無名Exchange。

Routing Key(路由鍵)

  • 極簡概括:交換機(jī)綁定隊(duì)列的標(biāo)識符,一個(gè)交換機(jī),可以有多個(gè)Routing key。
  • 解決問題:起個(gè)名用于區(qū)分,方便對不同隊(duì)列進(jìn)行不同的操作,就像MySQL表id作用一樣。

死信

無法被消費(fèi)的消息。

死信隊(duì)列

  • 極簡概括:隊(duì)列中的消息無法被消費(fèi),若沒有后續(xù)的處理,就成了死信隊(duì)列。
  • 解決問題:將消費(fèi)異常的數(shù)據(jù)放入死信隊(duì)列,用于存儲消費(fèi)失敗或者異常時(shí)的情況,確保失敗的消息能夠得到適當(dāng)?shù)奶幚恚ㄖ卦嚮蛴砷_發(fā)者調(diào)試查看用)??梢院唵蔚睦斫鉃檎覀€(gè)地方存失敗的消費(fèi)任務(wù)。也可將計(jì)就計(jì),利用某些特性作為延時(shí)隊(duì)列使用。
  • 產(chǎn)生原因:
    • 消息TTL過期。
    • 隊(duì)列滿了。
    • 消息被拒絕(basic reject 或basic nack),并且requeue為false。

有Redis List去實(shí)現(xiàn)消息隊(duì)列,為什么要RabbitMQ?

  • 持久化問題:RabbitMQ支持持久化,Redis雖然也支持持久化,但只要不是每次操作都持久化,那么就有丟失數(shù)據(jù)的風(fēng)險(xiǎn)。
  • 消息應(yīng)答問題:消息處理成功與失敗,Redis用隊(duì)列無法記錄,任務(wù)消息只會取一個(gè)少一個(gè),而RabbitMQ可以。
  • 故障轉(zhuǎn)移問題:Redis哨兵機(jī)制、主從復(fù)制,是針對緩存高可用,做消息中間件有局限性。RabbitMQ支持消息重新入隊(duì)。如果某個(gè)消費(fèi)者由于某些原因失去連接,導(dǎo)致消息未發(fā)送ACK確認(rèn),那么RabbitMQ有讓消息重新排隊(duì)的機(jī)制,如果此時(shí)其它消費(fèi)者可以處理,那就讓其它消費(fèi)者處理。
  • 支持消息優(yōu)先級:RabbitMQ支持消息優(yōu)先級,而Redis不支持。
  • 廣播支持:RabbitMQ支持廣播,等指定隊(duì)列發(fā)送,而Redis不支持。
  • 路由轉(zhuǎn)發(fā):RabbitMQ通過交換機(jī)機(jī)制,支持設(shè)定不同的分發(fā)隊(duì)列規(guī)則,滿足各個(gè)場景,而Redis List需要手動實(shí)現(xiàn)這塊內(nèi)部機(jī)制。

有Redis Sorted Set、或者過期監(jiān)聽去實(shí)現(xiàn)延時(shí)隊(duì)列,為什么要RabbitMQ?

RabbitMQ是推模式還是拉模式?

都有,生產(chǎn)者發(fā)數(shù)據(jù)到MQ是推,消費(fèi)者消費(fèi)消息是拉。

通信方案選擇Push還是Pull?

推或拉是兩種通信的方向選擇,跟MQ無關(guān),但是類似MQ,順便提一下。
個(gè)人認(rèn)為:

  • 看業(yè)務(wù)場景,拋開業(yè)務(wù)場景談架構(gòu)都是耍流氓
    • 要求實(shí)時(shí)性的,就選擇推送,輪詢耗費(fèi)網(wǎng)絡(luò)資源,調(diào)用端或客戶端每次請求,服務(wù)端都得執(zhí)行一次,尤其是并發(fā)量大或者響應(yīng)流程任務(wù)重的場景。
    • 不需要實(shí)時(shí)性的,就拉取,減低耦合,服務(wù)端就純粹的產(chǎn)生服務(wù)端的數(shù)據(jù)就行,客戶端或調(diào)用端誰想拉取讓它自己來。
  • 看改動開銷
    • 保證工程質(zhì)量的前提下,那種方式開銷小,技術(shù)老大說用哪個(gè),或者同事們習(xí)慣那一種,就用哪一種。
  • 看調(diào)用端可信任性。
    • 各種鑒權(quán),驗(yàn)證是一方面,數(shù)據(jù)傳輸也是一方面,對于不信任的平臺,白推送的數(shù)據(jù),與對方直接請求獲取。還是有區(qū)別的。

RabbitMQ可以直連隊(duì)列嗎?

RabbitMQ內(nèi)部不可以直連隊(duì)列,但是操作上可以直連隊(duì)列。
就算是常規(guī)(Hello World)模式,沒有聲明交換機(jī),也會經(jīng)過一個(gè)默認(rèn)交換機(jī)。
不過這樣喪失了交換機(jī)靈活的路由分發(fā)功能,適用于簡單的場景。

實(shí)操

安裝

Docker安裝

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
瀏覽器訪問:http://192.168.xxx.xxx:15672

普通安裝

CentOS7的安裝RabbitMQ的教程,已經(jīng)被官網(wǎng)刪除了,支持CentOS8,CentOS需要借助外力。

安裝Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum -y install erlang

安裝RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install rabbitmq-server

開啟服務(wù),并設(shè)置開機(jī)自啟
systemctl start rabbitmq-server
systemctl enable rabbitmq-server

檢查狀態(tài)
systemctl status rabbitmq-server

啟動網(wǎng)頁端控制臺
rabbitmq-plugins enable rabbitmq_management

開啟防火墻
firewall-cmd --zone=public --add-port=80/tcp --permanent
systemctl restart firewalld

新建網(wǎng)頁端登錄用戶,并配置角色與權(quán)限
rabbitmqctl add_user admin 12345678
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission>
-p <virtual_host>:指定虛擬主機(jī)的名稱,例如/(默認(rèn)虛擬主機(jī))。
<username>:要為其設(shè)置權(quán)限的用戶名。
<configure_permission>:配置權(quán)限的正則表達(dá)式。允許用戶對隊(duì)列、交換機(jī)等進(jìn)行配置。
<write_permission>:寫權(quán)限的正則表達(dá)式。允許用戶發(fā)布消息。
<read_permission>:讀權(quán)限的正則表達(dá)式。允許用戶獲取消息。

瀏覽器訪問
http://192.168.xxx.xxx:15672
用戶名admin,密碼12345678

命令行常用命令

systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server開啟、關(guān)停、重啟、狀態(tài)查看、開機(jī)自啟

rabbitmq-plugins enable 插件名          # RabbitMQ Server安裝插件
rabbitmq-plugins list                   # 插件列表

rabbitmqctl version                     # 查看RabbitMQ Server版本
rabbitmqctl list_exchanges              # 查看交換機(jī)列表
rabbitmqctl list_queues                 # 查看隊(duì)列列表
rabbitmqctl list_bindings               # 查看綁定列表

PHP實(shí)現(xiàn)RabbitMQ Client

  • RabbitMQ6大模式官方教程:https://www.rabbitmq.com/tutorials
  • 官方擴(kuò)展(不用):https://pecl.php.net/package/amqp/1.11.0/windows
    這個(gè)擴(kuò)展官方下載地址有最后一版Windows系統(tǒng)的php_amqp.dll的下載地址,(用Windows是為了方便,在CentOS上還需要編譯,改完P(guān)HP代碼每次需要重新上傳,不想費(fèi)事),但是我使用報(bào)錯(cuò),所以廢棄了。
  • 官方推薦:composer require php-amqplib/php-amqplib
    PHP操作RabbitMQ思路并不復(fù)雜,有6種工作模式,翻來覆去就是研究消息怎么發(fā),發(fā)到哪里,怎么處理消息的問題。
  • Laravel框架:可以使用php-amqplib/php-amqplib,去操作,也可以使用現(xiàn)成的laravel-queue-rabbitmq去操作。composer require vladimir-yuldashev/laravel-queue-rabbitmq

消費(fèi)者常駐進(jìn)程偏好

while (count($channel->callbacks)) {
    $channel->wait();
}
//或者
$channel->consume();

常規(guī)模式(Hello World!)

  • 極簡概括:生產(chǎn)者生產(chǎn)消息給消費(fèi)者。
  • 解決問題:跨進(jìn)程,或跨組件、跨網(wǎng)絡(luò)通信,適用與兩個(gè)角色,但是一個(gè)PHP進(jìn)程無法完成的時(shí)候用。
  • 備注:用MySQL、Redis也能實(shí)現(xiàn)。
  • 生產(chǎn)者端代碼:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();

/*
參數(shù)1:隊(duì)列名
參數(shù)2:在聲明隊(duì)列時(shí)指定是否啟用passively模式,passively模式用于檢查隊(duì)列是否存在,而不是實(shí)際創(chuàng)建一個(gè)新隊(duì)列。如果隊(duì)列不存在,則會返回一個(gè)通知,而不會創(chuàng)建新隊(duì)列。
參數(shù)3:指定隊(duì)列的持久性。在這里,它是false,表示隊(duì)列不是持久的。如果設(shè)置為true,則隊(duì)列將在服務(wù)器重啟時(shí)或宕機(jī)后保留下來。 
參數(shù)4:指定隊(duì)列的排他性。如果設(shè)置為 true,則該隊(duì)列只能被聲明它的連接使用,一般用于臨時(shí)隊(duì)列。false表示隊(duì)列不是排它的。
參數(shù)5:指定隊(duì)列的自動刪除,如果設(shè)置為 true,則在隊(duì)列不再被使用時(shí)將自動刪除。在這里,它是 false,表示隊(duì)列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);

//編輯消息
$msg = new AMQPMessage('Hello World!');
//發(fā)送消息,交換機(jī)用不上,所以留空。這方法沒有返回值
$channel->basic_publish($msg, '', 'hello');


//用完了就關(guān)閉,釋放資源
$channel->close();
$connection->close();
  • 消費(fèi)者端代碼
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();

/*
參數(shù)1:隊(duì)列名
參數(shù)2:在聲明隊(duì)列時(shí)指定是否啟用passively模式,passively模式用于檢查隊(duì)列是否存在,而不是實(shí)際創(chuàng)建一個(gè)新隊(duì)列。如果隊(duì)列不存在,則會返回一個(gè)通知,而不會創(chuàng)建新隊(duì)列。
參數(shù)3:指定隊(duì)列的持久性。在這里,它是false,表示隊(duì)列不是持久的。如果設(shè)置為true,則隊(duì)列將在服務(wù)器重啟后保留下來。
參數(shù)4:指定隊(duì)列的排他性。如果設(shè)置為 true,則該隊(duì)列只能被聲明它的連接使用,一般用于臨時(shí)隊(duì)列。false表示隊(duì)列不是排它的。
參數(shù)5:指定隊(duì)列的自動刪除,如果設(shè)置為 true,則在隊(duì)列不再被使用時(shí)將自動刪除。在這里,它是 false,表示隊(duì)列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);


/*
參數(shù)1:隊(duì)列名稱
參數(shù)2:這是消費(fèi)者標(biāo)簽(consumer tag),用于唯一標(biāo)識消費(fèi)者。在這里,它是空字符串,表示不為消費(fèi)者指定任何特定的標(biāo)簽。
參數(shù)3:如果設(shè)置了無本地字段,則服務(wù)器將不會向發(fā)布消息的連接發(fā)送消息。
參數(shù)4:是指定是否自動確認(rèn)消息(auto-ack)。設(shè)置為true則表示消費(fèi)者在接收到消息后會立即確認(rèn)消息,RabbitMQ將會將消息標(biāo)記為已處理并從隊(duì)列中刪除。false表示消費(fèi)者會手動確認(rèn)消息,即在處理消息后,通過調(diào)用 $channel->basic_ack($deliveryTag) 手動確認(rèn)消息。
參數(shù)5:指定是否獨(dú)占消費(fèi)者。如果設(shè)置為true,則表示只有當(dāng)前連接能夠使用該消費(fèi)者。在這里,它是true,表示只有當(dāng)前連接可以使用這個(gè)消費(fèi)者。
參數(shù)6:如果設(shè)置了,服務(wù)器將不會對該方法作出響應(yīng)??蛻舳瞬粦?yīng)等待答復(fù)方法。如果服務(wù)器無法完成該方法,它將引發(fā)通道或連接異常。
參數(shù)7:回調(diào)參數(shù),拿到數(shù)據(jù)怎樣處理。
*/
$channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
    echo $msg->body;
});

//通過死循環(huán)持久化當(dāng)前進(jìn)程,實(shí)時(shí)消費(fèi)
$channel->consume();

工作隊(duì)列模式(Work Queues)

  • 極簡概括:類比Redis的lPush、Rpop。但是RabbitMQ可以針對一個(gè)隊(duì)列有多個(gè)消費(fèi)者,但一條消息,只能被一個(gè)消費(fèi)者消費(fèi)一次,不能多次消費(fèi)。為了避免消費(fèi)者處理數(shù)據(jù)傾斜問題(有的隊(duì)列處理任務(wù)多,有的處理的少),所以使用了輪詢的方式,挨個(gè)處理任務(wù)。
  • 解決問題:耗時(shí)且不需要串行執(zhí)行的任務(wù),可以丟給隊(duì)列,例如發(fā)短信、郵件、大量數(shù)據(jù)導(dǎo)入導(dǎo)出。
  • 測試普通用法

生產(chǎn)者代碼,在cli模式下,依次輸入1~10,執(zhí)行10次

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數(shù)
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');

$channel->close();
$connection->close();

消費(fèi)者1代碼,cli模式下運(yùn)行,依次返回1、3、5、7、9,可見RabbitMQ不管消費(fèi)節(jié)點(diǎn)處理的時(shí)間,只會根據(jù)消費(fèi)者數(shù)量輪詢處理,哪怕其中任意幾個(gè)隊(duì)列任務(wù)重,其它隊(duì)列任務(wù)輕松。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, true, false, false,
    function ($msg) {
        echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
        sleep(5);
        echo "成功處理消息\n";
    }
);

$channel->consume();

消費(fèi)者2代碼,(與消費(fèi)者1代碼唯一不同的,就是sleep函數(shù)的時(shí)間),cli模式下運(yùn)行,依次返回2、4、6、8、10,可見RabbitMQ不管消費(fèi)節(jié)點(diǎn)處理的時(shí)間,只會根據(jù)消費(fèi)者數(shù)量輪詢處理,哪怕其中任意幾個(gè)隊(duì)列任務(wù)重,其它隊(duì)列任務(wù)輕松。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, true, false, false,
    function ($msg) {
        echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
        sleep(10);
        echo "成功處理消息\n";
    }
);

$channel->consume();

發(fā)布訂閱模式(Pub/Sub)

  • 極簡概括:生產(chǎn)者生產(chǎn)數(shù)據(jù),所有隊(duì)列關(guān)聯(lián)的消費(fèi)者都接收(類似廣播),使用交換機(jī)的扇出(Fanout)模式實(shí)現(xiàn)。
  • 解決問題:聊天室的功能,可以由它實(shí)現(xiàn)。

生產(chǎn)者代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
/**
參數(shù)1:交換機(jī)名稱。
參數(shù)2:交換機(jī)類型,這里是扇出。
參數(shù)3:當(dāng)passive參數(shù)設(shè)置為true時(shí),表示不會實(shí)際創(chuàng)建新的交換機(jī)或隊(duì)列,而是用來檢查已經(jīng)存在的交換機(jī)或隊(duì)列是否已經(jīng)存在。如果存在,則返回成功,如果不存在,則返回失敗。passive參數(shù)主要用于檢查交換機(jī)或隊(duì)列是否存在,而不是實(shí)際創(chuàng)建新的實(shí)體
參數(shù)4:交換機(jī)是否持久化,即當(dāng)RabbitMQ服務(wù)器重啟時(shí),交換機(jī)會不會被重新創(chuàng)建。
參數(shù)5:當(dāng)所有綁定的隊(duì)列都與交換機(jī)解綁后,是否自動刪除交換機(jī)。
*/
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);


$msg = new AMQPMessage('扇出測試');
//發(fā)送給指定的交換機(jī)
$channel->basic_publish($msg, 'fanout_test');


$channel->close();
$connection->close();

消費(fèi)者1代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī)
$channel->queue_bind($queue_name, 'fanout_test');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

消費(fèi)者2代碼(同1):

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī)
$channel->queue_bind($queue_name, 'fanout_test');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

路由模式(Routing)

  • 極簡概括:通過路由鍵,將消息發(fā)送給指定的(任意數(shù)量)消費(fèi)者,一個(gè)消費(fèi)者可配置多個(gè)路由鍵。
  • 解決問題:只發(fā)送部分消費(fèi)者。

生產(chǎn)端代碼,只讓消費(fèi)者1消費(fèi)。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('direct_test', 'direct', false, false, false);


$msg = new AMQPMessage('扇出測試');
//發(fā)送給指定的交換機(jī),并指定路由鍵
$channel->basic_publish($msg, 'direct_test', 'consumer1');


$channel->close();
$connection->close();

消費(fèi)者1代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī),并聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer1');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

消費(fèi)者2代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī),并聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer2');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

主題模式(Topics)

  • 極簡概括:生產(chǎn)者獲取消息,編寫指定的匹配規(guī)則,使被匹配的隊(duì)列的消費(fèi)者能夠消費(fèi)消息。
  • 解決問題:實(shí)現(xiàn)更加靈活的,消息分發(fā)模式。
  • 備注:主題模式下的路由鍵,多個(gè)標(biāo)識用多個(gè)點(diǎn)將其分開(例如aaa.bbb.ccc),最長不超過255個(gè)字符。支持通配符,*匹配一個(gè)標(biāo)識(奇葩設(shè)計(jì),和#就不應(yīng)該反過來嗎),#匹配任意個(gè)標(biāo)識。
    通配符用法*.*.xxx,表示xxx結(jié)尾的路由鍵,*.xxx.*表示包含xxx的路由鍵。a.#可以匹配a,也可以匹配,a.b。
    對應(yīng)的,當(dāng)一個(gè)隊(duì)列綁定的鍵是#,那就類似于fanout模式,如果一個(gè)隊(duì)列中沒有任何通配符,那就類似于direct模式。

生產(chǎn)者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明隊(duì)列類型為主題
$channel->exchange_declare('topic_test', 'topic', false, false, false);

$msg = new AMQPMessage('topic測試數(shù)據(jù)');


/*
以下路由鍵可以接受到消息
a.b
a.*.*
a.*.*.*
#.z
z
a.x.y.z
abc.z
*/
$arr = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z', 'x', 'y', 'z', 'a', 'ab', 'ac', 'ad','a.x.y.z', 'abc.z'];

foreach($arr as $v) {
    $channel->basic_publish($msg, 'topic_logs', $v);
}

$channel->close();
$connection->close();

消費(fèi)者代碼

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明topic模式的交換機(jī)
$channel->exchange_declare('topic_test', 'topic', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列
$queue_name = $channel->queue_declare("", false, false, true, false)[0];

$binding_keys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z'];
//綁定多個(gè)路由鍵
foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

$callback = function ($msg) {
    echo 'RoutingKey:', $msg->getRoutingKey(), ' --- Msg:', $msg->getBody(), "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->getMessage();
}

$channel->close();
$connection->close();

遠(yuǎn)程過程調(diào)用模式(RPC)

  • 極簡概括:(對于PHP,RPC用的不多,中間件RPC用的更少)使得客戶端可以像調(diào)用本地方法一樣調(diào)用遠(yuǎn)程方法,同時(shí)隱藏了底層網(wǎng)絡(luò)通信細(xì)節(jié)。

調(diào)用端代碼

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient {
    private $connection;
    private $channel;
    private $queue_name;
    private $response;
    private $corr_id;

    public function __construct() {
        $this->connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
        $this->channel = $this->connection->channel();
        $this->queue_name = $this->channel->queue_declare("", false, false, true, false)[0];
        $this->channel->basic_consume($this->queue_name, '', false, true, false, false, array($this, 'onResponse'));
    }

    public function onResponse($rep) {
        if ($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    public function call($str) {
        $this->response = null;
        $this->corr_id = uniqid();


        $this->channel->basic_publish(new AMQPMessage($str, [
            'correlation_id' => $this->corr_id,
            'reply_to' => $this->queue_name
        ]), '', 'rpc_queue');

        while (! $this->response) {
            $this->channel->wait();
        }

        return $this->response;
    }
}

$fibonacci_rpc = new RpcClient();
$response = $fibonacci_rpc->call('客戶端向服務(wù)端發(fā)送數(shù)據(jù)');
echo $response, "\n";

服務(wù)端代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

$callback = function ($req) {
    echo $req->getBody(), "\n";

    $msg = new AMQPMessage('服務(wù)端成功接收:'. $req->getBody(), ['correlation_id' => $req->get('correlation_id')]);

    $req->getChannel()->basic_publish($msg, '', $req->get('reply_to'));
    $req->ack();
};

$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

死信隊(duì)列(TTL過期)

  • 極簡概括:消息超時(shí)未處理,會被放置死信隊(duì)列中。
  • 備注:注釋很清楚,這需要先執(zhí)行這段代碼(創(chuàng)建普通與死信,交換機(jī)、隊(duì)列、和路由鍵),然后再關(guān)閉(模擬消費(fèi)者掛掉,10秒后,此時(shí)超時(shí)的消息就會存入死信隊(duì)列,至于死信的隊(duì)列的數(shù)據(jù),是重試,還是給開發(fā)者提示,看產(chǎn)品需求)。
  • 現(xiàn)象:流程走完后,登錄控制臺,到Queues選項(xiàng)卡,查看隊(duì)列列表的Ready列,原先的normal_queue隊(duì)列由原先的10變成了0(執(zhí)行一次生產(chǎn)者代碼,里面有一個(gè)for循環(huán)),而dead_queue里面的Ready,由原先的0,變成了10(10個(gè)消息全部超時(shí),到了死信隊(duì)列)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機(jī)名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//設(shè)置消息生存時(shí)間為15秒
$payload->set('x-message-ttl', 10000);
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);


$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
    echo $msg->getBody(), "\n";
});

//常駐進(jìn)程
$channel->consume();

$channel->close();
$connection->close();

然后再執(zhí)行生產(chǎn)者代碼,模擬發(fā)消息。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);


for($i = 0; $i< 10; $i++) {
    $msg = new AMQPMessage($i, [
        //配置過期時(shí)間為10秒,讓生產(chǎn)者控制過期時(shí)間
        'expiration' => '10000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

極簡死信隊(duì)列消費(fèi)示例

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

死信隊(duì)列(隊(duì)列達(dá)到最大長度放不下)

  • 極簡概括:生產(chǎn)的消息數(shù)量超過了消費(fèi)者端設(shè)置了最大長度,剩余的消息會被放入死信隊(duì)列。
  • 注意:本次測試,需要把原先的隊(duì)列刪除,否則隊(duì)列長度設(shè)置不生效。
  • 現(xiàn)象:流程走完后,登錄控制臺,到Queues選項(xiàng)卡,查看隊(duì)列列表的Ready列,原先的normal_queue隊(duì)列由原先的0變成了8(執(zhí)行一次生產(chǎn)者代碼,里面有一個(gè)for循環(huán)),而dead_queue里面的Ready,由原先的0,變成了2(10 - 8)。

消費(fèi)者端代碼,這一步是為了初始化普通和死信交換機(jī)、隊(duì)列、路由鍵,并且需要執(zhí)行后Ctrl + C強(qiáng)制停止,保證生產(chǎn)者生產(chǎn)的消息,不被能正常的消費(fèi)(不然怎么演示不正?,F(xiàn)象時(shí)的死信隊(duì)列?)。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機(jī)名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//設(shè)置最多存儲8條消息
$payload->set('x-max-length', 8);
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);



$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
    echo $msg->getBody(), "\n";
});

//常駐進(jìn)程
$channel->consume();

$channel->close();
$connection->close();

生產(chǎn)者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);


for($i = 0; $i< 10; $i++) {
    $msg = new AMQPMessage($i, [
        //配置過期時(shí)間為10秒,讓生產(chǎn)者控制過期時(shí)間
        'expiration' => '10000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

極簡死信隊(duì)列消費(fèi)示例

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

死信隊(duì)列(消息被拒絕,且requeue為false)

  • 極簡概括:消費(fèi)者拒絕了消息,并且將requeue為false,的消息,會被放入死信隊(duì)列。
  • 注意:本次測試,需要把原先的隊(duì)列刪除,避免影響。
  • 現(xiàn)象:流程走完后,登錄控制臺,到Queues選項(xiàng)卡,查看隊(duì)列列表的Ready列,原先的normal_queue隊(duì)列由原先的0變成了0(執(zhí)行一次生產(chǎn)者代碼,里面有一個(gè)for循環(huán)),而dead_queue里面的Ready,由原先的0,變成了3(10 - 7,消費(fèi)者的代碼中,有if判斷)。

這次讓消費(fèi)者正常消費(fèi)代碼即可,不用Ctrl + C強(qiáng)制中斷,正常接受生產(chǎn)者者的數(shù)據(jù)。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機(jī)名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);



$channel->basic_consume($normal_queue, '', false, false, false, false, function($msg) {
    if($msg->getBody() > 6)  {
        //手動拒絕消息,不批量,且不讓重入隊(duì)列
        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, false);
    } else {
        //手動確認(rèn)消息
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
});

//常駐進(jìn)程
$channel->consume();

$channel->close();
$connection->close();

生產(chǎn)者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);


for($i = 0; $i< 10; $i++) {
    $msg = new AMQPMessage($i, [
        //配置過期時(shí)間為10秒,讓生產(chǎn)者控制過期時(shí)間
        'expiration' => '10000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

極簡死信隊(duì)列消費(fèi)示例

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

延時(shí)隊(duì)列(基于TTL過期式的死信隊(duì)列,不推薦使用)

  • 極簡概括:生產(chǎn)出來的隊(duì)列任務(wù),不會立馬消費(fèi),而是等到指定時(shí)間。
  • 解決問題:自動確認(rèn)訂單,自動取消未支付訂單,等。
  • 原理分析:常規(guī)的生產(chǎn)和消息鏈路配置了死信隊(duì)列的兜底機(jī)制,若普通隊(duì)列因TTL過期,會自動把消息放入死信隊(duì)列,利用這個(gè)特性,可以做出來延遲隊(duì)列,延時(shí)隊(duì)列的延遲機(jī)制,就是普通隊(duì)列的TTL過期時(shí)間,延時(shí)任務(wù)的處理機(jī)制,就是消費(fèi)死信隊(duì)列的代碼段。因此可以專門選一個(gè)死信隊(duì)列,作為延時(shí)隊(duì)列來用。
  • 注意:
    • 例如延遲10秒,并不能保證一定會在第10秒被處理,會有小誤差(絕大部分業(yè)務(wù)場景能接受)。
    • 延遲消息是排隊(duì)處理的,第一個(gè)延遲10秒,第二個(gè)消息延遲5秒,默認(rèn)情況下,第二個(gè)會延遲15秒,因?yàn)橄⑹桥抨?duì)(隊(duì)列先進(jìn)先出)處理的,所以這個(gè)情況需要優(yōu)化,這是RabbitMQ死信隊(duì)列實(shí)現(xiàn)延時(shí)隊(duì)列的巨大缺陷。
      擴(kuò)展:利用Redis的sorted set也可以作為延時(shí)隊(duì)列,key為唯一標(biāo)識符,score為執(zhí)行的時(shí)間的時(shí)間戳,val為要執(zhí)行的序列化代碼,用while(true)起一個(gè)常駐進(jìn)程,用于消費(fèi)當(dāng)前時(shí)間戳下的任務(wù),如果要添加任務(wù),就網(wǎng)Zset中添加數(shù)據(jù)。常駐進(jìn)程到時(shí)間了,就會消費(fèi)。

初始化普通、死信交換機(jī)、隊(duì)列、路由鍵。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機(jī)名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);

$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {

});

生產(chǎn)者代碼,產(chǎn)生延時(shí)任務(wù)

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);

for($i = 0;$i < 10; $i++) {
    $msg = new AMQPMessage(microtime(true), [
        'expiration' => '3000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

延時(shí)任務(wù)處理代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    $consumer_time = microtime(true);
    $product_time  = $msg->getBody();
    echo '時(shí)間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

延時(shí)隊(duì)列(基于RabbitMQ延遲交換機(jī)插件,推薦使用)

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
發(fā)現(xiàn)如下字樣,就說明安裝成功。
Enabling plugins on node rabbit@lnmp:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@lnmp...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.


systemctl restart rabbitmq-server

生產(chǎn)者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$delay_exchange    = 'delay_exchange';
$delay_queue       = 'delay_queue';
$delay_routing_key = 'delay_routing_key';


//聲明交換機(jī)類型為direct
$payload = new AMQPTable();
$payload->set('x-delayed-type', 'direct');
//聲明延時(shí)隊(duì)列交換機(jī),參數(shù)2的類型,是安裝插件后才有的,固定值
$channel->exchange_declare($delay_exchange, 'x-delayed-message', false, false, true, false, false, $payload);

//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($delay_queue, false, false, false, false);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($delay_queue, $delay_exchange, $delay_routing_key);

//發(fā)送延遲消息
$msg = new AMQPMessage(microtime(true), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    //這里配置超時(shí)時(shí)間,固定格式。
    'application_headers' => new AMQPTable(['x-delay' => 5000])
]);

$channel->basic_publish($msg, $delay_exchange, $delay_routing_key);


$channel->close();
$connection->close();

消費(fèi)者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    $consumer_time = microtime(true);
    $product_time  = $msg->getBody();
    echo '時(shí)間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('delay_queue', '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

延時(shí)隊(duì)列提前執(zhí)行或延后執(zhí)行的方案

某些業(yè)務(wù)場景,可能需要提前觸發(fā),或者延期處理,這就需要一些外的操作,才能完成。

隊(duì)列優(yōu)先級

  • 不同的隊(duì)列和可以設(shè)置不同的優(yōu)先級。
  • 在堆積的情況下才生效,否則生產(chǎn)一個(gè)消息處理一個(gè),無阻塞,就不存在多個(gè)消息的優(yōu)先處理的問題。
  • 要設(shè)置優(yōu)先級,那么隊(duì)列和消息都需要設(shè)置優(yōu)先級。
  • 優(yōu)先級越大,越先被處理。
  • 支持1到255之間的優(yōu)先級,強(qiáng)烈建議使用1到5之間的值,更高的優(yōu)先級值需要更多的CPU和內(nèi)存資源,因?yàn)镽abbitMQ需要在內(nèi)部為每個(gè)優(yōu)先級維護(hù)一個(gè)子隊(duì)列,從1到為給定隊(duì)列配置的最大值。

首先啟動生產(chǎn)者代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$test_exchange    = 'test_exchange';
$test_queue       = 'test_queue';
$test_routing_key = 'test_routing_key';

$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);

$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);

for($i =0; $i < 10; $i++) {
    $msg = new AMQPMessage($i, [
        //為每個(gè)消息設(shè)置不同的優(yōu)先級
        'priority' => $i,
    ]);

    $channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}

$channel->close();
$connection->close();

消費(fèi)者代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg . "\n";
};
$channel->basic_consume('test_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

惰性隊(duì)列

惰性隊(duì)列用的極少,因?yàn)槭谴鎯υ诖疟P中的。在消費(fèi)者掛掉的情況下,避免RabbitMQ積累了太多的消息,消耗內(nèi)存去采取的策略。

高可用

性能與高可用權(quán)衡的架構(gòu)思維

加了RabbitMQ組件可以看做是分布式系統(tǒng),分布式系統(tǒng)有有一個(gè)CAP定理,CAP只能同時(shí)滿足兩個(gè)。
對于RabbitMQ消息丟失,或者重復(fù)消費(fèi)的問題,若當(dāng)前需求不能容忍,就需要額外的環(huán)節(jié)來彌補(bǔ)。如果當(dāng)前需求(小概率)能容忍,去掉高可用環(huán)節(jié),那么在性能上就會提升。
例如支付寶,支付時(shí)總是會延遲幾秒鐘,一是走支付寶安全風(fēng)控系統(tǒng),二是要極致要求穩(wěn)定高可用的環(huán)節(jié)必然性能下降,三是等支付回調(diào),然而其它接口內(nèi)容卻是瞬間加載。
所以沒有最好的架構(gòu),只有最合適的架構(gòu),所以要結(jié)合業(yè)務(wù)場景,判斷業(yè)務(wù)容忍度下限,與開發(fā)復(fù)雜度,在高可用與性能之間權(quán)衡,這是架構(gòu)師必備思維。

如何避免RabbitMQ消息丟失?

  • 百分99.9%投遞策略:事先要處理的消息綁一個(gè)唯一標(biāo)識(讓RabbitMQ和MySQL自始至終都能知道是哪一個(gè),避免因業(yè)務(wù)需求生產(chǎn)兩份同樣的消息區(qū)分不出來),先寫入MySQL,并預(yù)定一個(gè)未消費(fèi)的缺省狀態(tài),直到收到RabbitMQ的確認(rèn)通知,改變狀態(tài)。再加一個(gè)循環(huán)定時(shí)任務(wù)(定時(shí)的周期看業(yè)務(wù)需求容忍度與組件性能),超時(shí)未處理的消息,讓定時(shí)任務(wù)手動觸發(fā)。
    但這種也不是沒有弊端,如果RabbitMQ的確認(rèn)通知由于網(wǎng)絡(luò)抖動沒有發(fā)出去,那么定時(shí)任務(wù)就會讓其重復(fù)消費(fèi)。
  • 生產(chǎn)消息傳輸感知:RabbitMQ掛了,生產(chǎn)消息發(fā)送給MQ時(shí),就會超時(shí),報(bào)錯(cuò),此時(shí)把消息放入MySQL做兜底(可以臨時(shí)不處理,但是別丟),避免消息丟失。
  • RabbitMQ自帶的交換機(jī)、隊(duì)列、消息持久化機(jī)制。
  • RabbitMQ自帶的發(fā)送確認(rèn)、消費(fèi)確認(rèn)、事務(wù)隊(duì)列機(jī)制。

如何防止RabbitMQ重復(fù)消費(fèi)?

  • 兜底策略:在數(shù)據(jù)集層上最好有個(gè)兜底策略,如唯一索引,更新數(shù)據(jù)前的狀態(tài)機(jī)判斷等,避免由于程序設(shè)計(jì)疏忽或者故障等原因?qū)е碌闹貜?fù)消費(fèi)。
  • 上游生產(chǎn)防重:消息重復(fù)消費(fèi),一方面是消息重復(fù)發(fā)送引起的,一般在接口處理的上游,會有一些防重策略(數(shù)據(jù)冪等),上游杜絕,也是一種策略。
  • 消費(fèi)端上游判斷:首先把要處理的消息綁一個(gè)唯一標(biāo)識(讓RabbitMQ和MySQL自始至終都能知道是哪一個(gè),避免因業(yè)務(wù)需求生產(chǎn)兩份同樣的消息區(qū)分不出來),消費(fèi)者獲取到消息時(shí),再把唯一標(biāo)識寫入具有唯一索引約束的MySQL表,重復(fù)消費(fèi),MySQL唯一索引就要報(bào)錯(cuò),利用插入失敗的特性,阻止重復(fù)消費(fèi)。

RabbitMQ不公平分發(fā)

  • 不公平分發(fā):推薦操作,避免一個(gè)消費(fèi)者很忙,其它消費(fèi)者不做任何工作的情況發(fā)生,防止壓力傾斜。
    可在每個(gè)消費(fèi)者端調(diào)用basic_consume()方法之前配置,注意這種行為在消息堆積的情況下生效。
    被設(shè)置不公平分發(fā)的消費(fèi)者中,可從網(wǎng)頁端控制臺->Channels選項(xiàng)卡->表格->所在行數(shù)據(jù)的Prefetch中看到。
/*
參數(shù)1:預(yù)取大小,通常情況下為null,表示不限制消息大小。
參數(shù)2:預(yù)取數(shù)量,表示消費(fèi)者每次最多接收的消息數(shù)量,值為權(quán)重比例,可以為任意正整數(shù)。
參數(shù)3:是否將預(yù)取限制應(yīng)用到channel級別(true)或者消費(fèi)者級別(false)。
*/
$channel->basic_qos(null, 1, false);

RabbitMQ可靠消息機(jī)制(發(fā)布確認(rèn))

  • 極簡概括:是用來保證消息不丟失的的重要功能,生產(chǎn)者將消息發(fā)送給MQ,MQ把數(shù)據(jù)保存在磁盤上之后,會返回生產(chǎn)者成功保存的反饋。需要生產(chǎn)者端的隊(duì)列以及消息持久化的前提,就是為了防止隊(duì)列或者小寫將要持久化的時(shí)候RabbitMQ出故障的間隙情況發(fā)生。
    隊(duì)列持久化、消息持久化、發(fā)布確認(rèn)3個(gè)因素加起來,才能保證消息不丟失。
    發(fā)布確認(rèn)模式有3種:

    • 單個(gè)確認(rèn):性能最低,生產(chǎn)一條消息確認(rèn)一個(gè),后面的消息排隊(duì)。
    • 批量確認(rèn):性能比單個(gè)確認(rèn)好,但是出問題時(shí),無法定位是那條消息。
    • 異步確認(rèn):是性能與高可用性權(quán)衡的方案(很多服務(wù)端組件都有這樣的),利用回調(diào)函數(shù)來達(dá)到消息可靠性傳遞的。
  • 單個(gè)確認(rèn),發(fā)送100條數(shù)據(jù),耗時(shí)0.337秒,大部分場景夠用,高并發(fā)場景除外。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('hello', false, true, false, false);


//開啟發(fā)布確認(rèn)模式
$channel->confirm_select();

$start = microtime(true);
for($i = 0; $i < 100; $i++) {
    $msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $channel->basic_publish($msg, '', 'hello');
    //生產(chǎn)一條數(shù)據(jù),發(fā)送一條確認(rèn)消息,參數(shù)值為超時(shí)時(shí)間,單位:秒
    $channel->wait_for_pending_acks(10.000);
}
echo microtime(true) - $start; // 0.337秒

$channel->close();
$connection->close();
  • 批量確認(rèn),發(fā)送100條數(shù)據(jù),耗時(shí)0.048秒,時(shí)間節(jié)省了很多
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('hello', false, true, false, false);


//開啟發(fā)布確認(rèn)模式
$channel->confirm_select();

$start = microtime(true);
for($i = 0; $i < 100; $i++) {
    $msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $channel->basic_publish($msg, '', 'hello');
}
//生產(chǎn)一條數(shù)據(jù),發(fā)送一條確認(rèn)消息,參數(shù)值為超時(shí)時(shí)間,單位:秒
$channel->wait_for_pending_acks(10.000);

echo microtime(true) - $start; // 0.048秒

$channel->close();
$connection->close();
  • 異步確認(rèn):(推薦使用)生產(chǎn)者會先把消息發(fā)送到隊(duì)列中,MQ會為其自動編號(有了這個(gè)編號,方便異步回調(diào)時(shí)定位失敗的消息),并調(diào)用broker模塊判斷是否收到確認(rèn)回調(diào),由于確認(rèn)的過程是異步的,所以對性能影響較小。至于回調(diào)的內(nèi)容,怎么操作,重試還是做其它事,根據(jù)業(yè)務(wù)來判定。
    異步的操作函數(shù),由set_ack_handle()(消息確認(rèn)時(shí)調(diào)用)和set_nack_handler()(消息拒絕確認(rèn)時(shí)調(diào)用)函數(shù)完成。
    本次測試耗時(shí)0.012秒。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->set_ack_handler(
    function (\PhpAmqpLib\Message\AMQPMessage $message){
//        echo $message->getBody();
    }
);
$channel->set_nack_handler(
    function (\PhpAmqpLib\Message\AMQPMessage $message){
//        echo $message->getBody();
    }
);

$channel->queue_declare('hello', false, true, false, false);
//開啟發(fā)布確認(rèn)模式
$channel->confirm_select();

$start = microtime(true);
for($i = 0; $i < 100; $i++) {
    $msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $channel->basic_publish($msg, '', 'hello');
}
echo microtime(true) - $start; // 0.012秒

$channel->close();
$connection->close();

RabbitMQ可靠消息機(jī)制(事務(wù)消息)

  • 極簡概括:用的相對較少,用于確保消息被可靠的發(fā)送,但是會犧牲掉一些性能,有事務(wù),往往意味著是多條消息的一次性處理(不然生產(chǎn)一條消費(fèi)一條,回滾怎么撤回呢),例如發(fā)送10個(gè)消息,其中n(0<n<=10)個(gè)消息發(fā)送故障,或者ack異常,則10條全部回滾。
  • 用法:tx_select()類比MySQL的begin,tx_commit類比MySQL的commit(),tx_rollback類比MySQL的commit(),可能出現(xiàn)異常情況,所以用try catch包裹。
  • 注意:如果消費(fèi)端設(shè)置了自動確認(rèn)機(jī)制,事務(wù)會失效,必須為手動確認(rèn)。

消費(fèi)者代碼,首先啟動

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg->getBody() . "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

//手動確認(rèn)
$channel->basic_consume('test_queue', '', false, false, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

先測試生產(chǎn)者,不用事務(wù)的情況,看看生產(chǎn)者異常情況下,消息是否會回滾。
命令行提示出異常了,消費(fèi)者返回012三條消息,說明沒有回滾。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$test_exchange    = 'test_exchange';
$test_queue       = 'test_queue';
$test_routing_key = 'test_routing_key';

$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);


$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);

try {
    for($i =0; $i < 10; $i++) {
        if($i == 3) {
            throw new Exception('測試異常');
        }

        $msg = new AMQPMessage($i);
        $channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
    }

    echo '成功執(zhí)行';
} catch(\Exception $e) {
    echo '出異常了';
}

$channel->close();
$connection->close();

再測試生產(chǎn)者,用事務(wù)的情況,看看生產(chǎn)者異常情況下,消息是否會回滾。
命令行提示出異常了,消費(fèi)者返回0條消息,說明生產(chǎn)端異常,也會回滾。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$test_exchange    = 'test_exchange';
$test_queue       = 'test_queue';
$test_routing_key = 'test_routing_key';

$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);


$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);



try {
    $channel->tx_select();
    for($i =0; $i < 10; $i++) {
        if($i == 3) {
            throw new Exception('測試異常');
        }

        $msg = new AMQPMessage($i);
        $channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
    }

    echo '成功執(zhí)行';
    //提交事務(wù)
    $channel->tx_commit();
} catch(\Exception $e) {
    echo '出異常了';
    //回滾事務(wù)
    $channel->tx_rollback();
}

$channel->close();
$connection->close();

RabbitMQ可靠消息機(jī)制(消息應(yīng)答)

  • 極簡概括: 為了彌補(bǔ)事務(wù)消息的性能問題。如果某個(gè)消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)掛掉了,(此時(shí)肯定是已經(jīng)獲取到了這條數(shù)據(jù)才去消費(fèi)的,那么工作隊(duì)列中的數(shù)據(jù)就會被刪除),就可能導(dǎo)致數(shù)據(jù)的丟失。為了解決因此問題,RabbitMQ添加了消息應(yīng)答機(jī)制,消費(fèi)端在消息處理完成后,告訴RabbitMQ這條數(shù)據(jù)已經(jīng)被成功的處理,因此RabbitMQ才可以刪除這條數(shù)據(jù),應(yīng)答機(jī)制又分為兩種情況。
    • 自動應(yīng)答:不推薦使用,獲取到消息確認(rèn)應(yīng)答,這不代表后續(xù)流程一定會處理成功。
    • 手動應(yīng)答:推薦使用,告訴MQ消息被成功處理或者失敗。

測試手動確認(rèn)或拒絕應(yīng)答(無論是確認(rèn),或者拒絕,RabbitMQ都任務(wù)此消息已經(jīng)消費(fèi)過了)

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數(shù)
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');

$channel->close();
$connection->close();

這里只有消費(fèi)者1,手動拒絕消息

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, false, false, false,
    function ($msg) {
        echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
        sleep(5);
        echo "拒絕處理\n";
        /*
        basic_nack和basic_ack參數(shù)一致
        參數(shù)1:string 消息唯一標(biāo)識符
        參數(shù)2:bool 是否確認(rèn)接受或拒絕多個(gè)消息
        參數(shù)3:bool 表示是否重新放入消息隊(duì)列
        */
        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
    }
);

$channel->consume();

這里只有消費(fèi)者1,手動確認(rèn)消息,basic_nack(拒絕)方法改為basic_ack(同意)方法即可。

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, false, false, false,
    function ($msg) {
        echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
        sleep(5);
        echo "成功處理消息\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
);

$channel->consume();

RabbitMQ的故障轉(zhuǎn)移機(jī)制

  • 消息重新入隊(duì)的故障轉(zhuǎn)移機(jī)制:如果某個(gè)消費(fèi)者由于某些原因失去連接,導(dǎo)致消息未發(fā)送ACK確認(rèn)(默認(rèn)超時(shí)時(shí)間為30分鐘),那么RabbitMQ有讓消息重新排隊(duì)的機(jī)制,如果此時(shí)其它消費(fèi)者可以處理,那就讓其它消費(fèi)者處理。注意:重新排隊(duì)的機(jī)制只能保證消息不丟失,不能保證失去連接的那個(gè)消費(fèi)者沒處理這個(gè)消息,所以重新排隊(duì)的故障轉(zhuǎn)移機(jī)制,讓其他消費(fèi)者處理的過程,有重復(fù)消費(fèi)的可能,所以要在代碼邏輯中,添加一些兜底的機(jī)制(例如數(shù)據(jù)庫唯一索引、分布式鎖等)。

測試故障轉(zhuǎn)移,生產(chǎn)者生產(chǎn)出一條消息,消費(fèi)者1會立即捕獲這條消息,但是處理時(shí)間為5秒鐘,如果在這5秒鐘內(nèi)Ctrl + C強(qiáng)行終止,則RabbitMQ立即會讓消費(fèi)者2去消費(fèi)這條數(shù)據(jù),進(jìn)而保證消息不丟失。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數(shù)
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');

$channel->close();
$connection->close();

消費(fèi)者1代碼如下:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, false, false, false,
    function ($msg) {
        echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
        sleep(5);
        echo "成功處理消息\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
);

$channel->consume();

消費(fèi)者2代碼如下

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, false, false, false,
    function ($msg) {
        echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
        sleep(10);
        echo "成功處理消息\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
);

$channel->consume();

RabbitMQ交換機(jī)、隊(duì)列、消息持久化

以php-amqplib/php-amqplib為例,

  • 交換機(jī)持久化:exchange_declare()方法參數(shù)4設(shè)置為true即可。

  • 隊(duì)列持久化:exchange_declare()方法參數(shù)3設(shè)置為true即可。

  • 消息持久化:AMQPMessage()對象參數(shù)2添加['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]即可。

  • 對于設(shè)置為持久化存儲的情況:RabbitMQ會開啟一個(gè)進(jìn)程,采用追加寫的方式,對數(shù)據(jù)進(jìn)行實(shí)時(shí)持久化存儲。

  • 對于設(shè)置為非持久化存儲的:RabbitMQ會開啟一個(gè)進(jìn)程,當(dāng)內(nèi)存不夠時(shí),采用追加寫的方式,對數(shù)據(jù)進(jìn)行實(shí)時(shí)持久化存儲。

  • 存儲方式:持久化存儲時(shí),RabbitMQ會內(nèi)部維護(hù)一張ETS的表,用于記錄消息的(id、偏移量、有效數(shù)據(jù)、左邊文件、右邊文件的元數(shù)據(jù))。

  • 注意:如果一個(gè)隊(duì)列已經(jīng)存在且沒有配置持久化,若再次配置持久化,會報(bào)錯(cuò)。如果一個(gè)隊(duì)列持久化了,在網(wǎng)頁端控制臺->queues->隊(duì)列堆在表格->所在隊(duì)列的Features項(xiàng),會顯示D。
    將消息標(biāo)記為持久化并不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當(dāng)RabbitMQ已經(jīng)設(shè)置為持久化,不能完全保證不會丟失消息,接受了一個(gè)消息但還沒有保存它時(shí),仍然有一個(gè)很短的時(shí)間空隙,這個(gè)間隙出問題了,還是有丟失的可能。

集成到Laravel

  • 方案1:可以集成composer require php-amqplib/php-amqplib到框架,將主機(jī)端口用戶名密碼放入配置,php artisan make:command xxx,然后去運(yùn)行它,優(yōu)點(diǎn)是用法更貼近原生,缺點(diǎn)是少了Laravel Queue自帶的優(yōu)雅特性。
  • 方案2:換Laravel RabbitMQ的包,這樣更方便的使用Laravel優(yōu)雅的Queue用法,不用關(guān)注中間件的底層實(shí)現(xiàn)。
    親測非延時(shí)隊(duì)列和延時(shí)隊(duì)列都可正常使用,延時(shí)隊(duì)列不會造成像死信隊(duì)列實(shí)現(xiàn)延時(shí)隊(duì)列的(第一個(gè)延時(shí)5秒,第二個(gè)延遲10秒,結(jié)果第一個(gè)5秒執(zhí)行,第二個(gè)15秒執(zhí)行)那樣消息堆積的情況。
composer require vladimir-yuldashev/laravel-queue-rabbitmq

vim laravel/config/queue.php,添加以下配置,并修改'driver' => 'rabbitmq'。

        'rabbitmq' => [
            'driver' => 'rabbitmq',
            'hosts' => [
                [
                    'host' => '192.168.0.180',
                    'port' => '5672',
                    'user' => 'admin',
                    'password' => '12345678',
                    'vhost' => '/',
                ]
            ],
            'lazy' => false,
            'options' => [
                'queue' => [
                    'prioritize_delayed' =>  false,
                    'queue_max_priority' => 10,
                ],
            ],
        ],

集群與監(jiān)控

大部分公司仍舊用的是框架自帶的基于Redis實(shí)現(xiàn)的隊(duì)列或延時(shí)隊(duì)列,RabbitMQ都很少用,更別說用到集群。
集群方面的,小公司用不上,大公司有專門的運(yùn)維。
后期更新……

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容