理論(后半部分有實(shí)操詳解)
哲學(xué)思考
- 易經(jīng)思維:向各國人講述一種動物叫烏龜,要學(xué)很久的各國語言,但是隨手畫一個(gè)烏龜,全世界的人都能看得懂。
- 道家思維:努力沒有用(指勞神費(fèi)心的機(jī)械性重復(fù)、肢體受累、刻意行為),要用心(深度思考、去感悟、透過現(xiàn)象看本質(zhì))才有用。
- 舉例:類似中學(xué)做不出來的幾何題的底層原理:不是不知道xx定理或公式(招式),而是不知道畫輔助線的思路(內(nèi)功)。
- 總結(jié):萬事萬物、用道家思維思考本質(zhì)與規(guī)律,用易經(jīng)思維從眾多信息中簡化模型練出來的內(nèi)功,叫覺悟(內(nèi)力),而知識僅僅是外功(招式)。
做研發(fā)的,可能距離成功一步之遙,別因?yàn)橐蝗~障目而放棄。
消息隊(duì)列與消息中間件
- 消息隊(duì)列:一種存儲消息的數(shù)據(jù)結(jié)構(gòu),消息隊(duì)列通常是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),可以確保消息的順序傳遞。類比Redis的List,先進(jìn)先出類比lPush和rPop。
- 消息中間件:是管理消息傳遞的一種組件。功能包含消息隊(duì)列。
- 中間件:中間件是指位于客戶端(或調(diào)用端)和服務(wù)端之間的組件,用于協(xié)調(diào)、存儲、接口、管理組件之間的通信,類比賣家與買家之間的快遞驛站,不是一定要用,但是最好得有。
AMQP協(xié)議
AMQP(Advanced Message Queuing Protocol),是一種用于消息傳遞協(xié)議,類比成HTTP、TCP、UDP這種不同的協(xié)議就行。它定義了消息中間件客戶端與服務(wù)端(買家買家對驛站的溝通)的通信規(guī)則(怎么運(yùn)快遞),包括消息格式(什么類型的快遞)、消息發(fā)布(怎么發(fā)快遞)、消息訂閱(怎么收快遞)、隊(duì)列管理(怎么處理快遞)等。
AMQP高效就高效在把通信(物流階段)能遇到的問題都解決了。
- 異步通信:AMQP支持異步通信,可以讓消息發(fā)送者和接收者在不同的時(shí)間和速度進(jìn)行消息傳遞。
- 消息路由:AMQP協(xié)議定義了靈活的消息路由規(guī)則,可以根據(jù)消息內(nèi)容自動將消息路由到指定的接收者。
- 消息確認(rèn):AMQP支持消息確認(rèn)機(jī)制,確保消息被可靠地接收。
- 批量處理:AMQP協(xié)議支持批量消息處理,可以同時(shí)發(fā)送和接收多個(gè)消息,減少了網(wǎng)絡(luò)通信的開銷和系統(tǒng)的資源消耗。
RabbitMQ
- 官方文檔:https://www.rabbitmq.com/docs
- 極簡概括:使用ErLang語言寫基于AMQP協(xié)議的C/S架構(gòu)的消息中間件,用于不同組件之間高效的傳遞數(shù)據(jù)。
- 解決問題:
- 削峰:賣家一家伙運(yùn)了幾十噸的貨(海量請求)、買家沒地方存、扛不住了,那就放驛站緩沖一下。
- 解耦:沒有驛站中轉(zhuǎn),快遞送過來硬塞給買家,買家不在,這個(gè)流程就走不下去(耦合)。
- 異步:賣家只要發(fā)貨、流程基本走完,剩下的流程交給物流和驛站(中間件),不影響賣家做其它事(非阻塞),買家也一樣、快遞放驛站、我忙我的。需要時(shí)再?。ò葱栌嗛啠?。
- 卷:面試加分項(xiàng),工作用到減輕負(fù)擔(dān)。只想卷死各位,或者被各位卷死。
- 適用場景
- 流量削峰:當(dāng)系統(tǒng)抗不出海量的請求的時(shí)候,把MQ放置在用戶端與業(yè)務(wù)端之間(強(qiáng)制排隊(duì))削去部分峰值流量(Nginx令牌桶和滴水算法、或基于Redis也能實(shí)現(xiàn)),這個(gè)過程和加鎖的缺點(diǎn)差不多,性能會受影響。
- 應(yīng)用解耦:小型項(xiàng)目用不上,大型項(xiàng)目中,庫存、訂單、支付、物流等各種模塊,為了防止硬關(guān)聯(lián)耦合度大,一節(jié)點(diǎn)掛掉其余癱瘓,所以用MQ作為通信的橋梁。
- 異步處理:用的最多,異步發(fā)短信,發(fā)郵件、導(dǎo)出導(dǎo)出文件、延時(shí)任務(wù)、自動取消訂單、推送通知、回調(diào)重試等。
- 助力跳槽漲薪:只想卷死各位,或者被各位卷死。
- 賦能成就感:RabbitMQ都用上了,感覺離架構(gòu)師不遠(yuǎn)了(呵呵)。
- 優(yōu)點(diǎn):
- 應(yīng)用場景就是優(yōu)點(diǎn),主流。
- 豐富的客戶端支持,支持PHP、Java、Python、Ruby、C、Golang、NodeJS、.NET、Objective-C、Swift。
- 缺點(diǎn):
- 性能下降:流量削峰引起的性能下降。
- 運(yùn)維成本:多引入一個(gè)組件,就要考慮它的運(yùn)維成本,以及各項(xiàng)配置問題。
- 大數(shù)據(jù)處理:對于大數(shù)據(jù)、流數(shù)據(jù)、日志分析、更適合Kafka,RabbitMQ性能會下降。
- 安裝困難:成熟的軟件安裝不方便,Erlang環(huán)境依賴強(qiáng),其次是官網(wǎng)還把CentOS7的安裝去掉了。
- 同類產(chǎn)品:RocketMQ(僅支持Java和C++)、ActiveMQ(后期停止維護(hù))、Kafka(大數(shù)據(jù)場景)、基于Redis實(shí)現(xiàn)的任務(wù)隊(duì)列(輕量級使用)、編程語言框架提供的支持(Laravel Queue)、AWSMQ(亞馬遜)、ApsaraMQ(阿里巴巴)、PulsarMQ(Apache)。
工作架構(gòu)圖

- Producer:生產(chǎn)者,類比賣家。
- Connection:客戶端與服務(wù)端的通信層。
- Channel:這里叫信道,類比要發(fā)那個(gè)快遞,或從那家快遞取貨。
- Broker:用于接受和分發(fā)消息的處理,這是MQ Server要干的事,類比驛站。
- Exchange:交換機(jī)(不是switch),指定規(guī)則路由到那個(gè)隊(duì)列,類比分配到那個(gè)貨架的方法。
- Consumer:消費(fèi)者,類比買家。
- 補(bǔ)充:每次訪問RabbitMQ都需要建立一個(gè)連接,海量的請求對于這塊的開銷是絕大的,所以需要在channel與connection內(nèi)部建立邏輯連接,從而減少性能損耗。
- 多租戶設(shè)計(jì):每個(gè)Block里面可以多個(gè)Vhost,Vhost里面,可以有多個(gè)Exchange(交換機(jī)),每個(gè)Exchange可以有多個(gè)Queue。
四大概念的通俗理解
- 生產(chǎn)者:顧名思義生產(chǎn)數(shù)據(jù)的角色,類比賣家發(fā)快遞。
- 消費(fèi)者:生產(chǎn)者產(chǎn)出的數(shù)據(jù),給消費(fèi)者使用,類比買家收快遞。
- 交換機(jī):,用于接受生產(chǎn)者的消息,通過指定模式(4大模式)和路由鍵(交換機(jī)與隊(duì)列綁定標(biāo)識符)分發(fā)給隊(duì)列,一個(gè)交換機(jī)可綁定多個(gè)隊(duì)列,將消息路由到多個(gè)隊(duì)列,類比快遞驛站的分發(fā)到那個(gè)貨架。
- 隊(duì)列:一種數(shù)據(jù)結(jié)構(gòu),存放消息隊(duì)列,類比快遞驛站的貨架。
- 流程:生產(chǎn)者(發(fā)快遞)->交換機(jī)(驛站怎么分類)->隊(duì)列(驛站怎么存)->消費(fèi)者(拿快遞)。
Exchange(交換機(jī))
- 極簡概括:位于生產(chǎn)者和隊(duì)列之間,用于接受生產(chǎn)者的消息,并分發(fā)給隊(duì)列,一個(gè)交換機(jī)可綁定多個(gè)隊(duì)列,將消息路由到多個(gè)隊(duì)列。
- 解決問題:生產(chǎn)者發(fā)一條消息,讓所有或指定消費(fèi)者能夠收到(類似廣播)。如果沒有交換機(jī)機(jī)制,只會有一個(gè)消費(fèi)者能收到此消息。
- 交換機(jī)4大類型:直接(direct)類型、主題(topic)類型、頭(headers)類型、扇出(fanout)類型(下文有詳解)。
- 補(bǔ)充:在常規(guī)模式,工作隊(duì)列模式(生產(chǎn)者端,basic_publish方法參數(shù)2為空字符串),也有一個(gè)默認(rèn)類型交換機(jī),或稱之為無名Exchange。
Routing Key(路由鍵)
- 極簡概括:交換機(jī)綁定隊(duì)列的標(biāo)識符,一個(gè)交換機(jī),可以有多個(gè)Routing key。
- 解決問題:起個(gè)名用于區(qū)分,方便對不同隊(duì)列進(jìn)行不同的操作,就像MySQL表id作用一樣。
死信
無法被消費(fèi)的消息。
死信隊(duì)列
- 極簡概括:隊(duì)列中的消息無法被消費(fèi),若沒有后續(xù)的處理,就成了死信隊(duì)列。
- 解決問題:將消費(fèi)異常的數(shù)據(jù)放入死信隊(duì)列,用于存儲消費(fèi)失敗或者異常時(shí)的情況,確保失敗的消息能夠得到適當(dāng)?shù)奶幚恚ㄖ卦嚮蛴砷_發(fā)者調(diào)試查看用)??梢院唵蔚睦斫鉃檎覀€(gè)地方存失敗的消費(fèi)任務(wù)。也可將計(jì)就計(jì),利用某些特性作為延時(shí)隊(duì)列使用。
- 產(chǎn)生原因:
- 消息TTL過期。
- 隊(duì)列滿了。
- 消息被拒絕(basic reject 或basic nack),并且requeue為false。
有Redis List去實(shí)現(xiàn)消息隊(duì)列,為什么要RabbitMQ?
- 持久化問題:RabbitMQ支持持久化,Redis雖然也支持持久化,但只要不是每次操作都持久化,那么就有丟失數(shù)據(jù)的風(fēng)險(xiǎn)。
- 消息應(yīng)答問題:消息處理成功與失敗,Redis用隊(duì)列無法記錄,任務(wù)消息只會取一個(gè)少一個(gè),而RabbitMQ可以。
- 故障轉(zhuǎn)移問題:Redis哨兵機(jī)制、主從復(fù)制,是針對緩存高可用,做消息中間件有局限性。RabbitMQ支持消息重新入隊(duì)。如果某個(gè)消費(fèi)者由于某些原因失去連接,導(dǎo)致消息未發(fā)送ACK確認(rèn),那么RabbitMQ有讓消息重新排隊(duì)的機(jī)制,如果此時(shí)其它消費(fèi)者可以處理,那就讓其它消費(fèi)者處理。
- 支持消息優(yōu)先級:RabbitMQ支持消息優(yōu)先級,而Redis不支持。
- 廣播支持:RabbitMQ支持廣播,等指定隊(duì)列發(fā)送,而Redis不支持。
- 路由轉(zhuǎn)發(fā):RabbitMQ通過交換機(jī)機(jī)制,支持設(shè)定不同的分發(fā)隊(duì)列規(guī)則,滿足各個(gè)場景,而Redis List需要手動實(shí)現(xiàn)這塊內(nèi)部機(jī)制。
有Redis Sorted Set、或者過期監(jiān)聽去實(shí)現(xiàn)延時(shí)隊(duì)列,為什么要RabbitMQ?
RabbitMQ是推模式還是拉模式?
都有,生產(chǎn)者發(fā)數(shù)據(jù)到MQ是推,消費(fèi)者消費(fèi)消息是拉。
通信方案選擇Push還是Pull?
推或拉是兩種通信的方向選擇,跟MQ無關(guān),但是類似MQ,順便提一下。
個(gè)人認(rèn)為:
- 看業(yè)務(wù)場景,拋開業(yè)務(wù)場景談架構(gòu)都是耍流氓
- 要求實(shí)時(shí)性的,就選擇推送,輪詢耗費(fèi)網(wǎng)絡(luò)資源,調(diào)用端或客戶端每次請求,服務(wù)端都得執(zhí)行一次,尤其是并發(fā)量大或者響應(yīng)流程任務(wù)重的場景。
- 不需要實(shí)時(shí)性的,就拉取,減低耦合,服務(wù)端就純粹的產(chǎn)生服務(wù)端的數(shù)據(jù)就行,客戶端或調(diào)用端誰想拉取讓它自己來。
- 看改動開銷
- 保證工程質(zhì)量的前提下,那種方式開銷小,技術(shù)老大說用哪個(gè),或者同事們習(xí)慣那一種,就用哪一種。
- 看調(diào)用端可信任性。
- 各種鑒權(quán),驗(yàn)證是一方面,數(shù)據(jù)傳輸也是一方面,對于不信任的平臺,白推送的數(shù)據(jù),與對方直接請求獲取。還是有區(qū)別的。
RabbitMQ可以直連隊(duì)列嗎?
RabbitMQ內(nèi)部不可以直連隊(duì)列,但是操作上可以直連隊(duì)列。
就算是常規(guī)(Hello World)模式,沒有聲明交換機(jī),也會經(jīng)過一個(gè)默認(rèn)交換機(jī)。
不過這樣喪失了交換機(jī)靈活的路由分發(fā)功能,適用于簡單的場景。
實(shí)操
安裝
Docker安裝
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
瀏覽器訪問:http://192.168.xxx.xxx:15672
普通安裝
CentOS7的安裝RabbitMQ的教程,已經(jīng)被官網(wǎng)刪除了,支持CentOS8,CentOS需要借助外力。
安裝Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum -y install erlang
安裝RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install rabbitmq-server
開啟服務(wù),并設(shè)置開機(jī)自啟
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
檢查狀態(tài)
systemctl status rabbitmq-server
啟動網(wǎng)頁端控制臺
rabbitmq-plugins enable rabbitmq_management
開啟防火墻
firewall-cmd --zone=public --add-port=80/tcp --permanent
systemctl restart firewalld
新建網(wǎng)頁端登錄用戶,并配置角色與權(quán)限
rabbitmqctl add_user admin 12345678
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission>
-p <virtual_host>:指定虛擬主機(jī)的名稱,例如/(默認(rèn)虛擬主機(jī))。
<username>:要為其設(shè)置權(quán)限的用戶名。
<configure_permission>:配置權(quán)限的正則表達(dá)式。允許用戶對隊(duì)列、交換機(jī)等進(jìn)行配置。
<write_permission>:寫權(quán)限的正則表達(dá)式。允許用戶發(fā)布消息。
<read_permission>:讀權(quán)限的正則表達(dá)式。允許用戶獲取消息。
瀏覽器訪問
http://192.168.xxx.xxx:15672
用戶名admin,密碼12345678
命令行常用命令
systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server開啟、關(guān)停、重啟、狀態(tài)查看、開機(jī)自啟
rabbitmq-plugins enable 插件名 # RabbitMQ Server安裝插件
rabbitmq-plugins list # 插件列表
rabbitmqctl version # 查看RabbitMQ Server版本
rabbitmqctl list_exchanges # 查看交換機(jī)列表
rabbitmqctl list_queues # 查看隊(duì)列列表
rabbitmqctl list_bindings # 查看綁定列表
PHP實(shí)現(xiàn)RabbitMQ Client
- RabbitMQ6大模式官方教程:https://www.rabbitmq.com/tutorials
- 官方擴(kuò)展(不用):https://pecl.php.net/package/amqp/1.11.0/windows
這個(gè)擴(kuò)展官方下載地址有最后一版Windows系統(tǒng)的php_amqp.dll的下載地址,(用Windows是為了方便,在CentOS上還需要編譯,改完P(guān)HP代碼每次需要重新上傳,不想費(fèi)事),但是我使用報(bào)錯(cuò),所以廢棄了。 - 官方推薦:composer require php-amqplib/php-amqplib
PHP操作RabbitMQ思路并不復(fù)雜,有6種工作模式,翻來覆去就是研究消息怎么發(fā),發(fā)到哪里,怎么處理消息的問題。 - Laravel框架:可以使用php-amqplib/php-amqplib,去操作,也可以使用現(xiàn)成的laravel-queue-rabbitmq去操作。composer require vladimir-yuldashev/laravel-queue-rabbitmq
消費(fèi)者常駐進(jìn)程偏好
while (count($channel->callbacks)) {
$channel->wait();
}
//或者
$channel->consume();
常規(guī)模式(Hello World!)
- 極簡概括:生產(chǎn)者生產(chǎn)消息給消費(fèi)者。
- 解決問題:跨進(jìn)程,或跨組件、跨網(wǎng)絡(luò)通信,適用與兩個(gè)角色,但是一個(gè)PHP進(jìn)程無法完成的時(shí)候用。
- 備注:用MySQL、Redis也能實(shí)現(xiàn)。
- 生產(chǎn)者端代碼:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();
/*
參數(shù)1:隊(duì)列名
參數(shù)2:在聲明隊(duì)列時(shí)指定是否啟用passively模式,passively模式用于檢查隊(duì)列是否存在,而不是實(shí)際創(chuàng)建一個(gè)新隊(duì)列。如果隊(duì)列不存在,則會返回一個(gè)通知,而不會創(chuàng)建新隊(duì)列。
參數(shù)3:指定隊(duì)列的持久性。在這里,它是false,表示隊(duì)列不是持久的。如果設(shè)置為true,則隊(duì)列將在服務(wù)器重啟時(shí)或宕機(jī)后保留下來。
參數(shù)4:指定隊(duì)列的排他性。如果設(shè)置為 true,則該隊(duì)列只能被聲明它的連接使用,一般用于臨時(shí)隊(duì)列。false表示隊(duì)列不是排它的。
參數(shù)5:指定隊(duì)列的自動刪除,如果設(shè)置為 true,則在隊(duì)列不再被使用時(shí)將自動刪除。在這里,它是 false,表示隊(duì)列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);
//編輯消息
$msg = new AMQPMessage('Hello World!');
//發(fā)送消息,交換機(jī)用不上,所以留空。這方法沒有返回值
$channel->basic_publish($msg, '', 'hello');
//用完了就關(guān)閉,釋放資源
$channel->close();
$connection->close();
- 消費(fèi)者端代碼
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();
/*
參數(shù)1:隊(duì)列名
參數(shù)2:在聲明隊(duì)列時(shí)指定是否啟用passively模式,passively模式用于檢查隊(duì)列是否存在,而不是實(shí)際創(chuàng)建一個(gè)新隊(duì)列。如果隊(duì)列不存在,則會返回一個(gè)通知,而不會創(chuàng)建新隊(duì)列。
參數(shù)3:指定隊(duì)列的持久性。在這里,它是false,表示隊(duì)列不是持久的。如果設(shè)置為true,則隊(duì)列將在服務(wù)器重啟后保留下來。
參數(shù)4:指定隊(duì)列的排他性。如果設(shè)置為 true,則該隊(duì)列只能被聲明它的連接使用,一般用于臨時(shí)隊(duì)列。false表示隊(duì)列不是排它的。
參數(shù)5:指定隊(duì)列的自動刪除,如果設(shè)置為 true,則在隊(duì)列不再被使用時(shí)將自動刪除。在這里,它是 false,表示隊(duì)列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);
/*
參數(shù)1:隊(duì)列名稱
參數(shù)2:這是消費(fèi)者標(biāo)簽(consumer tag),用于唯一標(biāo)識消費(fèi)者。在這里,它是空字符串,表示不為消費(fèi)者指定任何特定的標(biāo)簽。
參數(shù)3:如果設(shè)置了無本地字段,則服務(wù)器將不會向發(fā)布消息的連接發(fā)送消息。
參數(shù)4:是指定是否自動確認(rèn)消息(auto-ack)。設(shè)置為true則表示消費(fèi)者在接收到消息后會立即確認(rèn)消息,RabbitMQ將會將消息標(biāo)記為已處理并從隊(duì)列中刪除。false表示消費(fèi)者會手動確認(rèn)消息,即在處理消息后,通過調(diào)用 $channel->basic_ack($deliveryTag) 手動確認(rèn)消息。
參數(shù)5:指定是否獨(dú)占消費(fèi)者。如果設(shè)置為true,則表示只有當(dāng)前連接能夠使用該消費(fèi)者。在這里,它是true,表示只有當(dāng)前連接可以使用這個(gè)消費(fèi)者。
參數(shù)6:如果設(shè)置了,服務(wù)器將不會對該方法作出響應(yīng)??蛻舳瞬粦?yīng)等待答復(fù)方法。如果服務(wù)器無法完成該方法,它將引發(fā)通道或連接異常。
參數(shù)7:回調(diào)參數(shù),拿到數(shù)據(jù)怎樣處理。
*/
$channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
echo $msg->body;
});
//通過死循環(huán)持久化當(dāng)前進(jìn)程,實(shí)時(shí)消費(fèi)
$channel->consume();
工作隊(duì)列模式(Work Queues)
- 極簡概括:類比Redis的lPush、Rpop。但是RabbitMQ可以針對一個(gè)隊(duì)列有多個(gè)消費(fèi)者,但一條消息,只能被一個(gè)消費(fèi)者消費(fèi)一次,不能多次消費(fèi)。為了避免消費(fèi)者處理數(shù)據(jù)傾斜問題(有的隊(duì)列處理任務(wù)多,有的處理的少),所以使用了輪詢的方式,挨個(gè)處理任務(wù)。
- 解決問題:耗時(shí)且不需要串行執(zhí)行的任務(wù),可以丟給隊(duì)列,例如發(fā)短信、郵件、大量數(shù)據(jù)導(dǎo)入導(dǎo)出。
- 測試普通用法
生產(chǎn)者代碼,在cli模式下,依次輸入1~10,執(zhí)行10次
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數(shù)
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消費(fèi)者1代碼,cli模式下運(yùn)行,依次返回1、3、5、7、9,可見RabbitMQ不管消費(fèi)節(jié)點(diǎn)處理的時(shí)間,只會根據(jù)消費(fèi)者數(shù)量輪詢處理,哪怕其中任意幾個(gè)隊(duì)列任務(wù)重,其它隊(duì)列任務(wù)輕松。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, true, false, false,
function ($msg) {
echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
sleep(5);
echo "成功處理消息\n";
}
);
$channel->consume();
消費(fèi)者2代碼,(與消費(fèi)者1代碼唯一不同的,就是sleep函數(shù)的時(shí)間),cli模式下運(yùn)行,依次返回2、4、6、8、10,可見RabbitMQ不管消費(fèi)節(jié)點(diǎn)處理的時(shí)間,只會根據(jù)消費(fèi)者數(shù)量輪詢處理,哪怕其中任意幾個(gè)隊(duì)列任務(wù)重,其它隊(duì)列任務(wù)輕松。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, true, false, false,
function ($msg) {
echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
sleep(10);
echo "成功處理消息\n";
}
);
$channel->consume();
發(fā)布訂閱模式(Pub/Sub)
- 極簡概括:生產(chǎn)者生產(chǎn)數(shù)據(jù),所有隊(duì)列關(guān)聯(lián)的消費(fèi)者都接收(類似廣播),使用交換機(jī)的扇出(Fanout)模式實(shí)現(xiàn)。
- 解決問題:聊天室的功能,可以由它實(shí)現(xiàn)。
生產(chǎn)者代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
/**
參數(shù)1:交換機(jī)名稱。
參數(shù)2:交換機(jī)類型,這里是扇出。
參數(shù)3:當(dāng)passive參數(shù)設(shè)置為true時(shí),表示不會實(shí)際創(chuàng)建新的交換機(jī)或隊(duì)列,而是用來檢查已經(jīng)存在的交換機(jī)或隊(duì)列是否已經(jīng)存在。如果存在,則返回成功,如果不存在,則返回失敗。passive參數(shù)主要用于檢查交換機(jī)或隊(duì)列是否存在,而不是實(shí)際創(chuàng)建新的實(shí)體
參數(shù)4:交換機(jī)是否持久化,即當(dāng)RabbitMQ服務(wù)器重啟時(shí),交換機(jī)會不會被重新創(chuàng)建。
參數(shù)5:當(dāng)所有綁定的隊(duì)列都與交換機(jī)解綁后,是否自動刪除交換機(jī)。
*/
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
$msg = new AMQPMessage('扇出測試');
//發(fā)送給指定的交換機(jī)
$channel->basic_publish($msg, 'fanout_test');
$channel->close();
$connection->close();
消費(fèi)者1代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī)
$channel->queue_bind($queue_name, 'fanout_test');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
消費(fèi)者2代碼(同1):
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī)
$channel->queue_bind($queue_name, 'fanout_test');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
路由模式(Routing)
- 極簡概括:通過路由鍵,將消息發(fā)送給指定的(任意數(shù)量)消費(fèi)者,一個(gè)消費(fèi)者可配置多個(gè)路由鍵。
- 解決問題:只發(fā)送部分消費(fèi)者。
生產(chǎn)端代碼,只讓消費(fèi)者1消費(fèi)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('direct_test', 'direct', false, false, false);
$msg = new AMQPMessage('扇出測試');
//發(fā)送給指定的交換機(jī),并指定路由鍵
$channel->basic_publish($msg, 'direct_test', 'consumer1');
$channel->close();
$connection->close();
消費(fèi)者1代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī),并聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer1');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
消費(fèi)者2代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機(jī)初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列,用于接受隊(duì)列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊(duì)列綁定指定的交換機(jī),并聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer2');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
主題模式(Topics)
- 極簡概括:生產(chǎn)者獲取消息,編寫指定的匹配規(guī)則,使被匹配的隊(duì)列的消費(fèi)者能夠消費(fèi)消息。
- 解決問題:實(shí)現(xiàn)更加靈活的,消息分發(fā)模式。
- 備注:主題模式下的路由鍵,多個(gè)標(biāo)識用多個(gè)點(diǎn)將其分開(例如aaa.bbb.ccc),最長不超過255個(gè)字符。支持通配符,*匹配一個(gè)標(biāo)識(奇葩設(shè)計(jì),和#就不應(yīng)該反過來嗎),#匹配任意個(gè)標(biāo)識。
通配符用法*.*.xxx,表示xxx結(jié)尾的路由鍵,*.xxx.*表示包含xxx的路由鍵。a.#可以匹配a,也可以匹配,a.b。
對應(yīng)的,當(dāng)一個(gè)隊(duì)列綁定的鍵是#,那就類似于fanout模式,如果一個(gè)隊(duì)列中沒有任何通配符,那就類似于direct模式。
生產(chǎn)者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明隊(duì)列類型為主題
$channel->exchange_declare('topic_test', 'topic', false, false, false);
$msg = new AMQPMessage('topic測試數(shù)據(jù)');
/*
以下路由鍵可以接受到消息
a.b
a.*.*
a.*.*.*
#.z
z
a.x.y.z
abc.z
*/
$arr = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z', 'x', 'y', 'z', 'a', 'ab', 'ac', 'ad','a.x.y.z', 'abc.z'];
foreach($arr as $v) {
$channel->basic_publish($msg, 'topic_logs', $v);
}
$channel->close();
$connection->close();
消費(fèi)者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明topic模式的交換機(jī)
$channel->exchange_declare('topic_test', 'topic', false, false, false);
//創(chuàng)建臨時(shí)隊(duì)列
$queue_name = $channel->queue_declare("", false, false, true, false)[0];
$binding_keys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z'];
//綁定多個(gè)路由鍵
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
$callback = function ($msg) {
echo 'RoutingKey:', $msg->getRoutingKey(), ' --- Msg:', $msg->getBody(), "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$channel->close();
$connection->close();
遠(yuǎn)程過程調(diào)用模式(RPC)
- 極簡概括:(對于PHP,RPC用的不多,中間件RPC用的更少)使得客戶端可以像調(diào)用本地方法一樣調(diào)用遠(yuǎn)程方法,同時(shí)隱藏了底層網(wǎng)絡(luò)通信細(xì)節(jié)。
調(diào)用端代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcClient {
private $connection;
private $channel;
private $queue_name;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$this->channel = $this->connection->channel();
$this->queue_name = $this->channel->queue_declare("", false, false, true, false)[0];
$this->channel->basic_consume($this->queue_name, '', false, true, false, false, array($this, 'onResponse'));
}
public function onResponse($rep) {
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($str) {
$this->response = null;
$this->corr_id = uniqid();
$this->channel->basic_publish(new AMQPMessage($str, [
'correlation_id' => $this->corr_id,
'reply_to' => $this->queue_name
]), '', 'rpc_queue');
while (! $this->response) {
$this->channel->wait();
}
return $this->response;
}
}
$fibonacci_rpc = new RpcClient();
$response = $fibonacci_rpc->call('客戶端向服務(wù)端發(fā)送數(shù)據(jù)');
echo $response, "\n";
服務(wù)端代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
$callback = function ($req) {
echo $req->getBody(), "\n";
$msg = new AMQPMessage('服務(wù)端成功接收:'. $req->getBody(), ['correlation_id' => $req->get('correlation_id')]);
$req->getChannel()->basic_publish($msg, '', $req->get('reply_to'));
$req->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信隊(duì)列(TTL過期)
- 極簡概括:消息超時(shí)未處理,會被放置死信隊(duì)列中。
- 備注:注釋很清楚,這需要先執(zhí)行這段代碼(創(chuàng)建普通與死信,交換機(jī)、隊(duì)列、和路由鍵),然后再關(guān)閉(模擬消費(fèi)者掛掉,10秒后,此時(shí)超時(shí)的消息就會存入死信隊(duì)列,至于死信的隊(duì)列的數(shù)據(jù),是重試,還是給開發(fā)者提示,看產(chǎn)品需求)。
- 現(xiàn)象:流程走完后,登錄控制臺,到Queues選項(xiàng)卡,查看隊(duì)列列表的Ready列,原先的normal_queue隊(duì)列由原先的10變成了0(執(zhí)行一次生產(chǎn)者代碼,里面有一個(gè)for循環(huán)),而dead_queue里面的Ready,由原先的0,變成了10(10個(gè)消息全部超時(shí),到了死信隊(duì)列)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機(jī)名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//設(shè)置消息生存時(shí)間為15秒
$payload->set('x-message-ttl', 10000);
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
echo $msg->getBody(), "\n";
});
//常駐進(jìn)程
$channel->consume();
$channel->close();
$connection->close();
然后再執(zhí)行生產(chǎn)者代碼,模擬發(fā)消息。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置過期時(shí)間為10秒,讓生產(chǎn)者控制過期時(shí)間
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
極簡死信隊(duì)列消費(fèi)示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信隊(duì)列(隊(duì)列達(dá)到最大長度放不下)
- 極簡概括:生產(chǎn)的消息數(shù)量超過了消費(fèi)者端設(shè)置了最大長度,剩余的消息會被放入死信隊(duì)列。
- 注意:本次測試,需要把原先的隊(duì)列刪除,否則隊(duì)列長度設(shè)置不生效。
- 現(xiàn)象:流程走完后,登錄控制臺,到Queues選項(xiàng)卡,查看隊(duì)列列表的Ready列,原先的normal_queue隊(duì)列由原先的0變成了8(執(zhí)行一次生產(chǎn)者代碼,里面有一個(gè)for循環(huán)),而dead_queue里面的Ready,由原先的0,變成了2(10 - 8)。
消費(fèi)者端代碼,這一步是為了初始化普通和死信交換機(jī)、隊(duì)列、路由鍵,并且需要執(zhí)行后Ctrl + C強(qiáng)制停止,保證生產(chǎn)者生產(chǎn)的消息,不被能正常的消費(fèi)(不然怎么演示不正?,F(xiàn)象時(shí)的死信隊(duì)列?)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機(jī)名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//設(shè)置最多存儲8條消息
$payload->set('x-max-length', 8);
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
echo $msg->getBody(), "\n";
});
//常駐進(jìn)程
$channel->consume();
$channel->close();
$connection->close();
生產(chǎn)者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置過期時(shí)間為10秒,讓生產(chǎn)者控制過期時(shí)間
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
極簡死信隊(duì)列消費(fèi)示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信隊(duì)列(消息被拒絕,且requeue為false)
- 極簡概括:消費(fèi)者拒絕了消息,并且將requeue為false,的消息,會被放入死信隊(duì)列。
- 注意:本次測試,需要把原先的隊(duì)列刪除,避免影響。
- 現(xiàn)象:流程走完后,登錄控制臺,到Queues選項(xiàng)卡,查看隊(duì)列列表的Ready列,原先的normal_queue隊(duì)列由原先的0變成了0(執(zhí)行一次生產(chǎn)者代碼,里面有一個(gè)for循環(huán)),而dead_queue里面的Ready,由原先的0,變成了3(10 - 7,消費(fèi)者的代碼中,有if判斷)。
這次讓消費(fèi)者正常消費(fèi)代碼即可,不用Ctrl + C強(qiáng)制中斷,正常接受生產(chǎn)者者的數(shù)據(jù)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機(jī)名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, false, false, false, function($msg) {
if($msg->getBody() > 6) {
//手動拒絕消息,不批量,且不讓重入隊(duì)列
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, false);
} else {
//手動確認(rèn)消息
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
});
//常駐進(jìn)程
$channel->consume();
$channel->close();
$connection->close();
生產(chǎn)者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置過期時(shí)間為10秒,讓生產(chǎn)者控制過期時(shí)間
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
極簡死信隊(duì)列消費(fèi)示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延時(shí)隊(duì)列(基于TTL過期式的死信隊(duì)列,不推薦使用)
- 極簡概括:生產(chǎn)出來的隊(duì)列任務(wù),不會立馬消費(fèi),而是等到指定時(shí)間。
- 解決問題:自動確認(rèn)訂單,自動取消未支付訂單,等。
- 原理分析:常規(guī)的生產(chǎn)和消息鏈路配置了死信隊(duì)列的兜底機(jī)制,若普通隊(duì)列因TTL過期,會自動把消息放入死信隊(duì)列,利用這個(gè)特性,可以做出來延遲隊(duì)列,延時(shí)隊(duì)列的延遲機(jī)制,就是普通隊(duì)列的TTL過期時(shí)間,延時(shí)任務(wù)的處理機(jī)制,就是消費(fèi)死信隊(duì)列的代碼段。因此可以專門選一個(gè)死信隊(duì)列,作為延時(shí)隊(duì)列來用。
- 注意:
- 例如延遲10秒,并不能保證一定會在第10秒被處理,會有小誤差(絕大部分業(yè)務(wù)場景能接受)。
-
延遲消息是排隊(duì)處理的,第一個(gè)延遲10秒,第二個(gè)消息延遲5秒,默認(rèn)情況下,第二個(gè)會延遲15秒,因?yàn)橄⑹桥抨?duì)(隊(duì)列先進(jìn)先出)處理的,所以這個(gè)情況需要優(yōu)化,這是RabbitMQ死信隊(duì)列實(shí)現(xiàn)延時(shí)隊(duì)列的巨大缺陷。
擴(kuò)展:利用Redis的sorted set也可以作為延時(shí)隊(duì)列,key為唯一標(biāo)識符,score為執(zhí)行的時(shí)間的時(shí)間戳,val為要執(zhí)行的序列化代碼,用while(true)起一個(gè)常駐進(jìn)程,用于消費(fèi)當(dāng)前時(shí)間戳下的任務(wù),如果要添加任務(wù),就網(wǎng)Zset中添加數(shù)據(jù)。常駐進(jìn)程到時(shí)間了,就會消費(fèi)。
初始化普通、死信交換機(jī)、隊(duì)列、路由鍵。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機(jī)名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機(jī)名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊(duì)列名稱
$normal_queue = 'normal_queue';
//聲明死信隊(duì)列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機(jī)
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊(duì)列異常時(shí),轉(zhuǎn)發(fā)給死信隊(duì)列的荷載參數(shù),就指望著這個(gè)普通隊(duì)列有問題了,才會把消息數(shù)據(jù)轉(zhuǎn)發(fā)到死信隊(duì)列,轉(zhuǎn)發(fā)到那里,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊(duì)列出異常了,要轉(zhuǎn)發(fā)的交換機(jī)
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉(zhuǎn)發(fā)的交換機(jī)還不夠,還得知道那個(gè)隊(duì)列,不然交換機(jī)不知道路由那個(gè)消息到達(dá)那個(gè)隊(duì)列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊(duì),就是等普通隊(duì)列出問題了,才把數(shù)據(jù)丟給死信隊(duì)列,所以普通(注意是普通隊(duì)列)隊(duì)列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊(duì)列,其實(shí)死信隊(duì)列本身是一個(gè)普通隊(duì)列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機(jī)與普通隊(duì)列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機(jī)與死信隊(duì)列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
});
生產(chǎn)者代碼,產(chǎn)生延時(shí)任務(wù)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0;$i < 10; $i++) {
$msg = new AMQPMessage(microtime(true), [
'expiration' => '3000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
延時(shí)任務(wù)處理代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
$consumer_time = microtime(true);
$product_time = $msg->getBody();
echo '時(shí)間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延時(shí)隊(duì)列(基于RabbitMQ延遲交換機(jī)插件,推薦使用)
- 官方插件集合地址:https://www.rabbitmq.com/community-plugins
- rabbitmq-delayed-message-exchange版本地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 版本匹配:
rabbitmqctl version發(fā)現(xiàn)是3.10.0,rabbitmq-delayed-message-exchange release頁也有說明:This release has no functional changes but lists RabbitMQ 3.10.x as supported in plugin metadata. - 實(shí)測延時(shí)誤差在5毫秒以內(nèi),滿足99.9%的應(yīng)用場景。
- 安裝插件
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
發(fā)現(xiàn)如下字樣,就說明安裝成功。
Enabling plugins on node rabbit@lnmp:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@lnmp...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
systemctl restart rabbitmq-server
生產(chǎn)者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$delay_exchange = 'delay_exchange';
$delay_queue = 'delay_queue';
$delay_routing_key = 'delay_routing_key';
//聲明交換機(jī)類型為direct
$payload = new AMQPTable();
$payload->set('x-delayed-type', 'direct');
//聲明延時(shí)隊(duì)列交換機(jī),參數(shù)2的類型,是安裝插件后才有的,固定值
$channel->exchange_declare($delay_exchange, 'x-delayed-message', false, false, true, false, false, $payload);
//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($delay_queue, false, false, false, false);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($delay_queue, $delay_exchange, $delay_routing_key);
//發(fā)送延遲消息
$msg = new AMQPMessage(microtime(true), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
//這里配置超時(shí)時(shí)間,固定格式。
'application_headers' => new AMQPTable(['x-delay' => 5000])
]);
$channel->basic_publish($msg, $delay_exchange, $delay_routing_key);
$channel->close();
$connection->close();
消費(fèi)者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
$consumer_time = microtime(true);
$product_time = $msg->getBody();
echo '時(shí)間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('delay_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延時(shí)隊(duì)列提前執(zhí)行或延后執(zhí)行的方案
某些業(yè)務(wù)場景,可能需要提前觸發(fā),或者延期處理,這就需要一些外的操作,才能完成。
隊(duì)列優(yōu)先級
- 不同的隊(duì)列和可以設(shè)置不同的優(yōu)先級。
- 在堆積的情況下才生效,否則生產(chǎn)一個(gè)消息處理一個(gè),無阻塞,就不存在多個(gè)消息的優(yōu)先處理的問題。
- 要設(shè)置優(yōu)先級,那么隊(duì)列和消息都需要設(shè)置優(yōu)先級。
- 優(yōu)先級越大,越先被處理。
- 支持1到255之間的優(yōu)先級,強(qiáng)烈建議使用1到5之間的值,更高的優(yōu)先級值需要更多的CPU和內(nèi)存資源,因?yàn)镽abbitMQ需要在內(nèi)部為每個(gè)優(yōu)先級維護(hù)一個(gè)子隊(duì)列,從1到為給定隊(duì)列配置的最大值。
首先啟動生產(chǎn)者代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$test_exchange = 'test_exchange';
$test_queue = 'test_queue';
$test_routing_key = 'test_routing_key';
$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);
$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);
for($i =0; $i < 10; $i++) {
$msg = new AMQPMessage($i, [
//為每個(gè)消息設(shè)置不同的優(yōu)先級
'priority' => $i,
]);
$channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}
$channel->close();
$connection->close();
消費(fèi)者代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg . "\n";
};
$channel->basic_consume('test_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
惰性隊(duì)列
惰性隊(duì)列用的極少,因?yàn)槭谴鎯υ诖疟P中的。在消費(fèi)者掛掉的情況下,避免RabbitMQ積累了太多的消息,消耗內(nèi)存去采取的策略。
高可用
性能與高可用權(quán)衡的架構(gòu)思維
加了RabbitMQ組件可以看做是分布式系統(tǒng),分布式系統(tǒng)有有一個(gè)CAP定理,CAP只能同時(shí)滿足兩個(gè)。
對于RabbitMQ消息丟失,或者重復(fù)消費(fèi)的問題,若當(dāng)前需求不能容忍,就需要額外的環(huán)節(jié)來彌補(bǔ)。如果當(dāng)前需求(小概率)能容忍,去掉高可用環(huán)節(jié),那么在性能上就會提升。
例如支付寶,支付時(shí)總是會延遲幾秒鐘,一是走支付寶安全風(fēng)控系統(tǒng),二是要極致要求穩(wěn)定高可用的環(huán)節(jié)必然性能下降,三是等支付回調(diào),然而其它接口內(nèi)容卻是瞬間加載。
所以沒有最好的架構(gòu),只有最合適的架構(gòu),所以要結(jié)合業(yè)務(wù)場景,判斷業(yè)務(wù)容忍度下限,與開發(fā)復(fù)雜度,在高可用與性能之間權(quán)衡,這是架構(gòu)師必備思維。
如何避免RabbitMQ消息丟失?
- 百分99.9%投遞策略:事先要處理的消息綁一個(gè)唯一標(biāo)識(讓RabbitMQ和MySQL自始至終都能知道是哪一個(gè),避免因業(yè)務(wù)需求生產(chǎn)兩份同樣的消息區(qū)分不出來),先寫入MySQL,并預(yù)定一個(gè)未消費(fèi)的缺省狀態(tài),直到收到RabbitMQ的確認(rèn)通知,改變狀態(tài)。再加一個(gè)循環(huán)定時(shí)任務(wù)(定時(shí)的周期看業(yè)務(wù)需求容忍度與組件性能),超時(shí)未處理的消息,讓定時(shí)任務(wù)手動觸發(fā)。
但這種也不是沒有弊端,如果RabbitMQ的確認(rèn)通知由于網(wǎng)絡(luò)抖動沒有發(fā)出去,那么定時(shí)任務(wù)就會讓其重復(fù)消費(fèi)。 - 生產(chǎn)消息傳輸感知:RabbitMQ掛了,生產(chǎn)消息發(fā)送給MQ時(shí),就會超時(shí),報(bào)錯(cuò),此時(shí)把消息放入MySQL做兜底(可以臨時(shí)不處理,但是別丟),避免消息丟失。
- RabbitMQ自帶的交換機(jī)、隊(duì)列、消息持久化機(jī)制。
- RabbitMQ自帶的發(fā)送確認(rèn)、消費(fèi)確認(rèn)、事務(wù)隊(duì)列機(jī)制。
如何防止RabbitMQ重復(fù)消費(fèi)?
- 兜底策略:在數(shù)據(jù)集層上最好有個(gè)兜底策略,如唯一索引,更新數(shù)據(jù)前的狀態(tài)機(jī)判斷等,避免由于程序設(shè)計(jì)疏忽或者故障等原因?qū)е碌闹貜?fù)消費(fèi)。
- 上游生產(chǎn)防重:消息重復(fù)消費(fèi),一方面是消息重復(fù)發(fā)送引起的,一般在接口處理的上游,會有一些防重策略(數(shù)據(jù)冪等),上游杜絕,也是一種策略。
- 消費(fèi)端上游判斷:首先把要處理的消息綁一個(gè)唯一標(biāo)識(讓RabbitMQ和MySQL自始至終都能知道是哪一個(gè),避免因業(yè)務(wù)需求生產(chǎn)兩份同樣的消息區(qū)分不出來),消費(fèi)者獲取到消息時(shí),再把唯一標(biāo)識寫入具有唯一索引約束的MySQL表,重復(fù)消費(fèi),MySQL唯一索引就要報(bào)錯(cuò),利用插入失敗的特性,阻止重復(fù)消費(fèi)。
RabbitMQ不公平分發(fā)
- 不公平分發(fā):推薦操作,避免一個(gè)消費(fèi)者很忙,其它消費(fèi)者不做任何工作的情況發(fā)生,防止壓力傾斜。
可在每個(gè)消費(fèi)者端調(diào)用basic_consume()方法之前配置,注意這種行為在消息堆積的情況下生效。
被設(shè)置不公平分發(fā)的消費(fèi)者中,可從網(wǎng)頁端控制臺->Channels選項(xiàng)卡->表格->所在行數(shù)據(jù)的Prefetch中看到。
/*
參數(shù)1:預(yù)取大小,通常情況下為null,表示不限制消息大小。
參數(shù)2:預(yù)取數(shù)量,表示消費(fèi)者每次最多接收的消息數(shù)量,值為權(quán)重比例,可以為任意正整數(shù)。
參數(shù)3:是否將預(yù)取限制應(yīng)用到channel級別(true)或者消費(fèi)者級別(false)。
*/
$channel->basic_qos(null, 1, false);
RabbitMQ可靠消息機(jī)制(發(fā)布確認(rèn))
-
極簡概括:是用來保證消息不丟失的的重要功能,生產(chǎn)者將消息發(fā)送給MQ,MQ把數(shù)據(jù)保存在磁盤上之后,會返回生產(chǎn)者成功保存的反饋。需要生產(chǎn)者端的隊(duì)列以及消息持久化的前提,就是為了防止隊(duì)列或者小寫將要持久化的時(shí)候RabbitMQ出故障的間隙情況發(fā)生。
隊(duì)列持久化、消息持久化、發(fā)布確認(rèn)3個(gè)因素加起來,才能保證消息不丟失。
發(fā)布確認(rèn)模式有3種:- 單個(gè)確認(rèn):性能最低,生產(chǎn)一條消息確認(rèn)一個(gè),后面的消息排隊(duì)。
- 批量確認(rèn):性能比單個(gè)確認(rèn)好,但是出問題時(shí),無法定位是那條消息。
- 異步確認(rèn):是性能與高可用性權(quán)衡的方案(很多服務(wù)端組件都有這樣的),利用回調(diào)函數(shù)來達(dá)到消息可靠性傳遞的。
單個(gè)確認(rèn),發(fā)送100條數(shù)據(jù),耗時(shí)0.337秒,大部分場景夠用,高并發(fā)場景除外。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, true, false, false);
//開啟發(fā)布確認(rèn)模式
$channel->confirm_select();
$start = microtime(true);
for($i = 0; $i < 100; $i++) {
$msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'hello');
//生產(chǎn)一條數(shù)據(jù),發(fā)送一條確認(rèn)消息,參數(shù)值為超時(shí)時(shí)間,單位:秒
$channel->wait_for_pending_acks(10.000);
}
echo microtime(true) - $start; // 0.337秒
$channel->close();
$connection->close();
- 批量確認(rèn),發(fā)送100條數(shù)據(jù),耗時(shí)0.048秒,時(shí)間節(jié)省了很多
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, true, false, false);
//開啟發(fā)布確認(rèn)模式
$channel->confirm_select();
$start = microtime(true);
for($i = 0; $i < 100; $i++) {
$msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'hello');
}
//生產(chǎn)一條數(shù)據(jù),發(fā)送一條確認(rèn)消息,參數(shù)值為超時(shí)時(shí)間,單位:秒
$channel->wait_for_pending_acks(10.000);
echo microtime(true) - $start; // 0.048秒
$channel->close();
$connection->close();
- 異步確認(rèn):(推薦使用)生產(chǎn)者會先把消息發(fā)送到隊(duì)列中,MQ會為其自動編號(有了這個(gè)編號,方便異步回調(diào)時(shí)定位失敗的消息),并調(diào)用broker模塊判斷是否收到確認(rèn)回調(diào),由于確認(rèn)的過程是異步的,所以對性能影響較小。至于回調(diào)的內(nèi)容,怎么操作,重試還是做其它事,根據(jù)業(yè)務(wù)來判定。
異步的操作函數(shù),由set_ack_handle()(消息確認(rèn)時(shí)調(diào)用)和set_nack_handler()(消息拒絕確認(rèn)時(shí)調(diào)用)函數(shù)完成。
本次測試耗時(shí)0.012秒。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->set_ack_handler(
function (\PhpAmqpLib\Message\AMQPMessage $message){
// echo $message->getBody();
}
);
$channel->set_nack_handler(
function (\PhpAmqpLib\Message\AMQPMessage $message){
// echo $message->getBody();
}
);
$channel->queue_declare('hello', false, true, false, false);
//開啟發(fā)布確認(rèn)模式
$channel->confirm_select();
$start = microtime(true);
for($i = 0; $i < 100; $i++) {
$msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'hello');
}
echo microtime(true) - $start; // 0.012秒
$channel->close();
$connection->close();
RabbitMQ可靠消息機(jī)制(事務(wù)消息)
- 極簡概括:用的相對較少,用于確保消息被可靠的發(fā)送,但是會犧牲掉一些性能,有事務(wù),往往意味著是多條消息的一次性處理(不然生產(chǎn)一條消費(fèi)一條,回滾怎么撤回呢),例如發(fā)送10個(gè)消息,其中n(0<n<=10)個(gè)消息發(fā)送故障,或者ack異常,則10條全部回滾。
- 用法:tx_select()類比MySQL的begin,tx_commit類比MySQL的commit(),tx_rollback類比MySQL的commit(),可能出現(xiàn)異常情況,所以用try catch包裹。
- 注意:如果消費(fèi)端設(shè)置了自動確認(rèn)機(jī)制,事務(wù)會失效,必須為手動確認(rèn)。
消費(fèi)者代碼,首先啟動
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody() . "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//手動確認(rèn)
$channel->basic_consume('test_queue', '', false, false, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
先測試生產(chǎn)者,不用事務(wù)的情況,看看生產(chǎn)者異常情況下,消息是否會回滾。
命令行提示出異常了,消費(fèi)者返回012三條消息,說明沒有回滾。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$test_exchange = 'test_exchange';
$test_queue = 'test_queue';
$test_routing_key = 'test_routing_key';
$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);
$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);
try {
for($i =0; $i < 10; $i++) {
if($i == 3) {
throw new Exception('測試異常');
}
$msg = new AMQPMessage($i);
$channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}
echo '成功執(zhí)行';
} catch(\Exception $e) {
echo '出異常了';
}
$channel->close();
$connection->close();
再測試生產(chǎn)者,用事務(wù)的情況,看看生產(chǎn)者異常情況下,消息是否會回滾。
命令行提示出異常了,消費(fèi)者返回0條消息,說明生產(chǎn)端異常,也會回滾。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$test_exchange = 'test_exchange';
$test_queue = 'test_queue';
$test_routing_key = 'test_routing_key';
$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);
$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個(gè)自定義延遲隊(duì)列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊(duì)列綁定交換機(jī)
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);
try {
$channel->tx_select();
for($i =0; $i < 10; $i++) {
if($i == 3) {
throw new Exception('測試異常');
}
$msg = new AMQPMessage($i);
$channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}
echo '成功執(zhí)行';
//提交事務(wù)
$channel->tx_commit();
} catch(\Exception $e) {
echo '出異常了';
//回滾事務(wù)
$channel->tx_rollback();
}
$channel->close();
$connection->close();
RabbitMQ可靠消息機(jī)制(消息應(yīng)答)
- 極簡概括: 為了彌補(bǔ)事務(wù)消息的性能問題。如果某個(gè)消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)掛掉了,(此時(shí)肯定是已經(jīng)獲取到了這條數(shù)據(jù)才去消費(fèi)的,那么工作隊(duì)列中的數(shù)據(jù)就會被刪除),就可能導(dǎo)致數(shù)據(jù)的丟失。為了解決因此問題,RabbitMQ添加了消息應(yīng)答機(jī)制,消費(fèi)端在消息處理完成后,告訴RabbitMQ這條數(shù)據(jù)已經(jīng)被成功的處理,因此RabbitMQ才可以刪除這條數(shù)據(jù),應(yīng)答機(jī)制又分為兩種情況。
- 自動應(yīng)答:不推薦使用,獲取到消息確認(rèn)應(yīng)答,這不代表后續(xù)流程一定會處理成功。
- 手動應(yīng)答:推薦使用,告訴MQ消息被成功處理或者失敗。
測試手動確認(rèn)或拒絕應(yīng)答(無論是確認(rèn),或者拒絕,RabbitMQ都任務(wù)此消息已經(jīng)消費(fèi)過了)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數(shù)
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
這里只有消費(fèi)者1,手動拒絕消息
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
sleep(5);
echo "拒絕處理\n";
/*
basic_nack和basic_ack參數(shù)一致
參數(shù)1:string 消息唯一標(biāo)識符
參數(shù)2:bool 是否確認(rèn)接受或拒絕多個(gè)消息
參數(shù)3:bool 表示是否重新放入消息隊(duì)列
*/
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
這里只有消費(fèi)者1,手動確認(rèn)消息,basic_nack(拒絕)方法改為basic_ack(同意)方法即可。
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
sleep(5);
echo "成功處理消息\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
RabbitMQ的故障轉(zhuǎn)移機(jī)制
- 消息重新入隊(duì)的故障轉(zhuǎn)移機(jī)制:如果某個(gè)消費(fèi)者由于某些原因失去連接,導(dǎo)致消息未發(fā)送ACK確認(rèn)(默認(rèn)超時(shí)時(shí)間為30分鐘),那么RabbitMQ有讓消息重新排隊(duì)的機(jī)制,如果此時(shí)其它消費(fèi)者可以處理,那就讓其它消費(fèi)者處理。注意:重新排隊(duì)的機(jī)制只能保證消息不丟失,不能保證失去連接的那個(gè)消費(fèi)者沒處理這個(gè)消息,所以重新排隊(duì)的故障轉(zhuǎn)移機(jī)制,讓其他消費(fèi)者處理的過程,有重復(fù)消費(fèi)的可能,所以要在代碼邏輯中,添加一些兜底的機(jī)制(例如數(shù)據(jù)庫唯一索引、分布式鎖等)。
測試故障轉(zhuǎn)移,生產(chǎn)者生產(chǎn)出一條消息,消費(fèi)者1會立即捕獲這條消息,但是處理時(shí)間為5秒鐘,如果在這5秒鐘內(nèi)Ctrl + C強(qiáng)行終止,則RabbitMQ立即會讓消費(fèi)者2去消費(fèi)這條數(shù)據(jù),進(jìn)而保證消息不丟失。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數(shù)
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消費(fèi)者1代碼如下:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
sleep(5);
echo "成功處理消息\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
消費(fèi)者2代碼如下
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, false, false, false,
function ($msg) {
echo "收到消息,內(nèi)容為{$msg->getBody()}\n";
sleep(10);
echo "成功處理消息\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
$channel->consume();
RabbitMQ交換機(jī)、隊(duì)列、消息持久化
以php-amqplib/php-amqplib為例,
交換機(jī)持久化:exchange_declare()方法參數(shù)4設(shè)置為true即可。
隊(duì)列持久化:exchange_declare()方法參數(shù)3設(shè)置為true即可。
消息持久化:AMQPMessage()對象參數(shù)2添加['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]即可。
對于設(shè)置為持久化存儲的情況:RabbitMQ會開啟一個(gè)進(jìn)程,采用追加寫的方式,對數(shù)據(jù)進(jìn)行實(shí)時(shí)持久化存儲。
對于設(shè)置為非持久化存儲的:RabbitMQ會開啟一個(gè)進(jìn)程,當(dāng)內(nèi)存不夠時(shí),采用追加寫的方式,對數(shù)據(jù)進(jìn)行實(shí)時(shí)持久化存儲。
存儲方式:持久化存儲時(shí),RabbitMQ會內(nèi)部維護(hù)一張ETS的表,用于記錄消息的(id、偏移量、有效數(shù)據(jù)、左邊文件、右邊文件的元數(shù)據(jù))。
注意:如果一個(gè)隊(duì)列已經(jīng)存在且沒有配置持久化,若再次配置持久化,會報(bào)錯(cuò)。如果一個(gè)隊(duì)列持久化了,在網(wǎng)頁端控制臺->queues->隊(duì)列堆在表格->所在隊(duì)列的Features項(xiàng),會顯示D。
將消息標(biāo)記為持久化并不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當(dāng)RabbitMQ已經(jīng)設(shè)置為持久化,不能完全保證不會丟失消息,接受了一個(gè)消息但還沒有保存它時(shí),仍然有一個(gè)很短的時(shí)間空隙,這個(gè)間隙出問題了,還是有丟失的可能。
集成到Laravel
- 方案1:可以集成composer require php-amqplib/php-amqplib到框架,將主機(jī)端口用戶名密碼放入配置,php artisan make:command xxx,然后去運(yùn)行它,優(yōu)點(diǎn)是用法更貼近原生,缺點(diǎn)是少了Laravel Queue自帶的優(yōu)雅特性。
- 方案2:換Laravel RabbitMQ的包,這樣更方便的使用Laravel優(yōu)雅的Queue用法,不用關(guān)注中間件的底層實(shí)現(xiàn)。
親測非延時(shí)隊(duì)列和延時(shí)隊(duì)列都可正常使用,延時(shí)隊(duì)列不會造成像死信隊(duì)列實(shí)現(xiàn)延時(shí)隊(duì)列的(第一個(gè)延時(shí)5秒,第二個(gè)延遲10秒,結(jié)果第一個(gè)5秒執(zhí)行,第二個(gè)15秒執(zhí)行)那樣消息堆積的情況。
composer require vladimir-yuldashev/laravel-queue-rabbitmq
vim laravel/config/queue.php,添加以下配置,并修改'driver' => 'rabbitmq'。
'rabbitmq' => [
'driver' => 'rabbitmq',
'hosts' => [
[
'host' => '192.168.0.180',
'port' => '5672',
'user' => 'admin',
'password' => '12345678',
'vhost' => '/',
]
],
'lazy' => false,
'options' => [
'queue' => [
'prioritize_delayed' => false,
'queue_max_priority' => 10,
],
],
],
集群與監(jiān)控
大部分公司仍舊用的是框架自帶的基于Redis實(shí)現(xiàn)的隊(duì)列或延時(shí)隊(duì)列,RabbitMQ都很少用,更別說用到集群。
集群方面的,小公司用不上,大公司有專門的運(yùn)維。
后期更新……