RabbitMQ生產(chǎn)端消息的確認

如果不進行特殊配置,默認情況下發(fā)送消息的操作是不會返回任何信息給生產(chǎn)者的,也就是默認情況下生產(chǎn)者是不知道消息有沒有正確地到達服務(wù)器。這會出現(xiàn)消息丟失的情況。

RabbitMQ針對這個問題,提供了兩種解決方式:

(1)通過事務(wù)機制實現(xiàn)。

(2)通過發(fā)送方確認(publisher confirm)機制實現(xiàn)。

1、事務(wù)機制

RabbitMQ客戶端中與事務(wù)機制相關(guān)的方法有三個:channel.txSelect、channel.txCommit和channel.txRollback。channel.txSelect用于將當(dāng)前的信道設(shè)置成事務(wù)模式,channel.txCommit用于提交事務(wù),channel.txRollback用于事務(wù)回滾。在通過channel.txSelect方法開啟事務(wù)之后,我們便可以發(fā)布消息給RabbitMQ了,如果事務(wù)提交成功,則消息一定到達了RabbitMQ中,如果在事務(wù)提交執(zhí)行之前由于RabbitMQ異常崩潰或者其他原因拋出異常,這個時候我們便可以將其捕獲,進而通過執(zhí)行channel.txRollback方法來實現(xiàn)事務(wù)回滾。注意這里的RabbitMQ中的事務(wù)機制與大多數(shù)數(shù)據(jù)庫中的事務(wù)概念并不相同,需要注意區(qū)分。

關(guān)鍵示例代碼如下圖所示:

上面代碼對應(yīng)的AMQP協(xié)議流轉(zhuǎn)過程如下圖所示:

可以發(fā)現(xiàn)開啟事務(wù)機制與不開啟多了四個步驟:

(1)客戶端發(fā)送Tx.Select,將信道設(shè)置為事務(wù)模式。

(2)Broker回復(fù)Tx.Select-Ok,確認已將信道設(shè)置為事務(wù)模式。

(3)在發(fā)送完消息之后,客戶端發(fā)送Tx.Commit提交事務(wù)。

(4)Broker回復(fù)Tx.Commit-Ok,確認事務(wù)提交。

上面所陳述的是正常的情況下的事務(wù)機制運轉(zhuǎn)過程,而事務(wù)回滾是什么樣子的呢?我們先來參考下面一段實例代碼,來看看怎么使用事務(wù)回滾。

上面代碼中很明顯有一個java.lang.ArithmeticException,在事務(wù)提交之前捕獲到異常,之后顯式的提交事務(wù)回滾,其AMQP協(xié)議流轉(zhuǎn)過程如下圖所示:

如果要發(fā)送多條消息,則將channel.basicPublish和channel.txCommit等方法包裹進循環(huán)內(nèi)即可,可以參考如下示例代碼:

事務(wù)確實能夠解決消息發(fā)送方和RabbitMQ之前消息確認的問題,只有消息成功被RabbitMQ接收,事務(wù)才能提交成功,否則便可在捕獲異常之后進行事務(wù)回滾,與此同時可以進行消息重發(fā)。但是使用事務(wù)機制會“吸干”RabbitMQ的性能,那么有沒有更好的方法既能保證消息發(fā)送方確認消息已經(jīng)正確送達,又能基本上不帶來性能上的損失呢?從AMQP協(xié)議層面來看并沒有更好的辦法,但是RabbitMQ提供了一個改進方案,即發(fā)送方確認機制。

2、發(fā)送方確認機制

生產(chǎn)者將信道設(shè)置成confirm(確認)模式,一旦信道進入confirm模式,所有在該信道上面發(fā)布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ就會發(fā)送一個確認(Basic.Ack)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知曉消息已經(jīng)正確到達了目的地了。如果消息和隊列是可持久化的,那么確認消息會在消息寫入磁盤之后發(fā)出。RabbitMQ回傳給生產(chǎn)者的確認消息中的deliveryTag包含了確認消息的序號,此外RabbitMQ也可以設(shè)置channel.basicAck方法中的multiple參數(shù),表示到這個序號之前的所有消息都可以得到了處理,可以參考下圖。

事務(wù)機制在一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ的回應(yīng),之后才能繼續(xù)發(fā)送下一條消息。相比之下,發(fā)送方確認機制最大的好處在于它是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認的同時繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認之后,生產(chǎn)者應(yīng)用程序便可以通過回調(diào)方法來處理該確認消息,如果RabbitMQ因為自身內(nèi)部錯誤導(dǎo)致消息丟失,就會發(fā)送一條nack(Basic.Nack)命令,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack命令。

生產(chǎn)者通過調(diào)用channel.confirmSelect方法(即Confirm.Select命令)將信道設(shè)置為confirm模式,之后RabbitMQ會返回Confirm.Select-Ok命令表示同意生產(chǎn)者將當(dāng)前信道設(shè)置為confirm模式。所有被發(fā)送的后續(xù)消息都被ack或者nack一次,不會出現(xiàn)一條消息既被ack又被nack的情況,并且RabbitMQ也并沒有對消息被confirm的快慢做任何保證。

下面看一下publisher confirm機制怎么運作,簡要代碼如下圖所示:

如果發(fā)送多條消息,只需要將channel.basicPublish和channel.waitForConfirms方法包裹在循環(huán)里面即可,可以參考事務(wù)機制,不過不需要把channel.confirmSelect方法包裹在循環(huán)內(nèi)部。

在publisher confirm模式下發(fā)送多條消息的AMQP協(xié)議流轉(zhuǎn)過程如下圖所示:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容