rabbitmq保證消息可靠 消息不丟失

三端可靠

  1. 發(fā)送方和mq保證消息送達(dá)到mq
  2. mq保證保存的消息不丟失
  3. 消費(fèi)方和mq一起保證消息被成功消費(fèi)

發(fā)送方和mq保證消息送達(dá)到mq

方案一、rabbitmq如果是用spring boot提供的模版接口發(fā)送 需要調(diào)用rabbitTemplate.convertSendAndReceive()方法發(fā)送 這個(gè)是當(dāng)消息成功到隊(duì)列了才會(huì)返回結(jié)果 如果失敗則會(huì)拋異常 不過這就會(huì)導(dǎo)致等待時(shí)間比較長(zhǎng) 適合高可靠場(chǎng)景
不過一般在業(yè)務(wù)開發(fā)都是完成業(yè)務(wù)以后再發(fā)消息 比如插入訂單表一筆訂單 發(fā)送訂單創(chuàng)建的消息 這兩步是需要保證原子性的 要么都成功要么都失敗 rabbitmq支持事務(wù)消息 不過如果出現(xiàn)下面情況

  • 開啟事務(wù)
  • 插入訂單表
  • 發(fā)送mq消息
  • 提交數(shù)據(jù)庫(kù)事務(wù)成功
  • 提交mq事務(wù)失敗
  • 消息丟失

所以如果是用rabbitmq的事務(wù)消息來(lái)做 其實(shí)在極端情況是會(huì)丟失消息的 在這里可以采用一個(gè)異步命令組件提供的方案https://github.com/bojiw/asyncmd

  • 開啟事務(wù)
  • 插入訂單表
  • 插入異步命令表
  • 提交數(shù)據(jù)庫(kù)事務(wù)
  • 線程掃描異步命令表?yè)迫∠?/li>
  • 通過rabbitTemplate.convertSendAndReceive()方法發(fā)送
  • 如果失敗 則重試 并且報(bào)警

方案二、如采用rabbitTemplate.convertAndSend和confirms(消費(fèi)回調(diào))加Return(錯(cuò)誤回調(diào))模式

  • convertAndSend 發(fā)送到mq 立刻返回 不管交換機(jī)是否成功處理 所以并發(fā)會(huì)高
  • confirms(消費(fèi)回調(diào)) 實(shí)現(xiàn)接口ConfirmCallback 消息成功發(fā)送到rabbitmq交換機(jī)上則會(huì)回調(diào)接口 入?yún)ck為true代表成功發(fā)送到交換機(jī) false代表異常
  • Return(錯(cuò)誤回調(diào)) 實(shí)現(xiàn)接口ReturnCallback 消息從交換機(jī)到隊(duì)列 成功不會(huì)回調(diào) 如果發(fā)送到隊(duì)列失敗 則會(huì)調(diào)用回調(diào)

上面這種方式如果在回調(diào)中處理消息發(fā)送失敗的邏輯時(shí)出現(xiàn)異?;蛘邞?yīng)用服務(wù)器掛了 則會(huì)導(dǎo)致消息丟失 因?yàn)橹粫?huì)回調(diào)一次
這種情況可以采用加一張消息表 先插入消息表 然后掃表發(fā)送消息 confirms回調(diào)成功 則更新表狀態(tài) 如果回調(diào)的時(shí)候異常 則消息表會(huì)重新發(fā)送 這種就會(huì)出現(xiàn)消息重發(fā)的情況 不過一般消息消費(fèi)者都要保證冪等 所以這個(gè)問題不大 不過如果出現(xiàn)以下情況

  • 數(shù)據(jù)庫(kù)有兩個(gè)字段 confirms默認(rèn)0 和 return 默認(rèn)0
  • 回調(diào)成功confirms=1 回調(diào)失敗confirms=2 錯(cuò)誤回調(diào)return=2
  • 當(dāng)回調(diào)成功 confirms=1 錯(cuò)誤回調(diào)處理失敗沒有成功更新表 則return還是0
  • 這個(gè)時(shí)候你掃表就不確定需不需要重發(fā)消息 因?yàn)槿绻⒊晒Φ疥?duì)列 表的狀態(tài)也是confirms=1 return=0
  • 無(wú)法對(duì)發(fā)送隊(duì)列成功和發(fā)送隊(duì)列失敗可在回調(diào)異常這兩種情況做區(qū)分

這里邏輯就會(huì)出問題 所以只能處理消息成功到交換機(jī) 是否到隊(duì)列則不管 因?yàn)橐话愣际浅晒Φ?除了極端情況 比如隊(duì)列被人誤刪除
方案二和方案一其實(shí)從整個(gè)流程來(lái)講 發(fā)送消息速度其實(shí)差不多的 不過可靠性還是方案一高一點(diǎn)

mq保證保存的消息不丟失

消息、交換機(jī)、隊(duì)列都需要設(shè)置持久化

消費(fèi)方和mq一起保證消息被成功消費(fèi)

消費(fèi)者開啟手動(dòng)確認(rèn)

acknowledge="manual"

在業(yè)務(wù)代碼里 成功處理業(yè)務(wù) 才返回給rabbitmq消費(fèi)成功的確認(rèn)

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

如果業(yè)務(wù)處理失敗則重新放到隊(duì)列重新消費(fèi)

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

由消費(fèi)者 只有業(yè)務(wù)成功處理才進(jìn)行ack 記得需要做好冪等
不過這里rabbitmq在重試這塊沒有做好 如果不確定會(huì)一直重試 如果因?yàn)橐蕾嚨囊粋€(gè)系統(tǒng)掛了 要一個(gè)小時(shí)以后才會(huì)啟動(dòng)成功 在這一個(gè)小時(shí)里會(huì)一直重試 這就會(huì)對(duì)rabbitmq和消費(fèi)者帶來(lái)一定的壓力 這塊也可以采用異步命令組件提供的方案https://github.com/bojiw/asyncmd

  • 接收消息
  • 把消息插入異步命令
  • 返回rabbitmq成功
  • 異步組件執(zhí)行業(yè)務(wù)邏輯
  • 調(diào)用接口失敗重試
  • 重試一定次數(shù)則不重試 由人工進(jìn)行處理 也可以把重試間隔設(shè)置的長(zhǎng)一點(diǎn) 比如前三次每隔1s重試 第四次隔一個(gè)小時(shí)重試
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容