三端可靠
- 發(fā)送方和mq保證消息送達(dá)到mq
- mq保證保存的消息不丟失
- 消費(fèi)方和mq一起保證消息被成功消費(fèi)
發(fā)送方和mq保證消息送達(dá)到mq
方案一、rabbitmq如果是用spring boot提供的模版接口發(fā)送 需要調(diào)用rabbitTemplate.convertSendAndReceive()方法發(fā)送 這個(gè)是當(dāng)消息成功到隊(duì)列了才會(huì)返回結(jié)果 如果失敗則會(huì)拋異常 不過這就會(huì)導(dǎo)致等待時(shí)間比較長(zhǎng) 適合高可靠場(chǎng)景
不過一般在業(yè)務(wù)開發(fā)都是完成業(yè)務(wù)以后再發(fā)消息 比如插入訂單表一筆訂單 發(fā)送訂單創(chuàng)建的消息 這兩步是需要保證原子性的 要么都成功要么都失敗 rabbitmq支持事務(wù)消息 不過如果出現(xiàn)下面情況
- 開啟事務(wù)
- 插入訂單表
- 發(fā)送mq消息
- 提交數(shù)據(jù)庫(kù)事務(wù)成功
- 提交mq事務(wù)失敗
- 消息丟失
所以如果是用rabbitmq的事務(wù)消息來(lái)做 其實(shí)在極端情況是會(huì)丟失消息的 在這里可以采用一個(gè)異步命令組件提供的方案https://github.com/bojiw/asyncmd
- 開啟事務(wù)
- 插入訂單表
- 插入異步命令表
- 提交數(shù)據(jù)庫(kù)事務(wù)
- 線程掃描異步命令表?yè)迫∠?/li>
- 通過rabbitTemplate.convertSendAndReceive()方法發(fā)送
- 如果失敗 則重試 并且報(bào)警
方案二、如采用rabbitTemplate.convertAndSend和confirms(消費(fèi)回調(diào))加Return(錯(cuò)誤回調(diào))模式
- convertAndSend 發(fā)送到mq 立刻返回 不管交換機(jī)是否成功處理 所以并發(fā)會(huì)高
- confirms(消費(fèi)回調(diào)) 實(shí)現(xiàn)接口ConfirmCallback 消息成功發(fā)送到rabbitmq交換機(jī)上則會(huì)回調(diào)接口 入?yún)ck為true代表成功發(fā)送到交換機(jī) false代表異常
- Return(錯(cuò)誤回調(diào)) 實(shí)現(xiàn)接口ReturnCallback 消息從交換機(jī)到隊(duì)列 成功不會(huì)回調(diào) 如果發(fā)送到隊(duì)列失敗 則會(huì)調(diào)用回調(diào)
上面這種方式如果在回調(diào)中處理消息發(fā)送失敗的邏輯時(shí)出現(xiàn)異?;蛘邞?yīng)用服務(wù)器掛了 則會(huì)導(dǎo)致消息丟失 因?yàn)橹粫?huì)回調(diào)一次
這種情況可以采用加一張消息表 先插入消息表 然后掃表發(fā)送消息 confirms回調(diào)成功 則更新表狀態(tài) 如果回調(diào)的時(shí)候異常 則消息表會(huì)重新發(fā)送 這種就會(huì)出現(xiàn)消息重發(fā)的情況 不過一般消息消費(fèi)者都要保證冪等 所以這個(gè)問題不大 不過如果出現(xiàn)以下情況
- 數(shù)據(jù)庫(kù)有兩個(gè)字段 confirms默認(rèn)0 和 return 默認(rèn)0
- 回調(diào)成功confirms=1 回調(diào)失敗confirms=2 錯(cuò)誤回調(diào)return=2
- 當(dāng)回調(diào)成功 confirms=1 錯(cuò)誤回調(diào)處理失敗沒有成功更新表 則return還是0
- 這個(gè)時(shí)候你掃表就不確定需不需要重發(fā)消息 因?yàn)槿绻⒊晒Φ疥?duì)列 表的狀態(tài)也是confirms=1 return=0
- 無(wú)法對(duì)發(fā)送隊(duì)列成功和發(fā)送隊(duì)列失敗可在回調(diào)異常這兩種情況做區(qū)分
這里邏輯就會(huì)出問題 所以只能處理消息成功到交換機(jī) 是否到隊(duì)列則不管 因?yàn)橐话愣际浅晒Φ?除了極端情況 比如隊(duì)列被人誤刪除
方案二和方案一其實(shí)從整個(gè)流程來(lái)講 發(fā)送消息速度其實(shí)差不多的 不過可靠性還是方案一高一點(diǎn)
mq保證保存的消息不丟失
消息、交換機(jī)、隊(duì)列都需要設(shè)置持久化
消費(fèi)方和mq一起保證消息被成功消費(fèi)
消費(fèi)者開啟手動(dòng)確認(rèn)
acknowledge="manual"
在業(yè)務(wù)代碼里 成功處理業(yè)務(wù) 才返回給rabbitmq消費(fèi)成功的確認(rèn)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
如果業(yè)務(wù)處理失敗則重新放到隊(duì)列重新消費(fèi)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
由消費(fèi)者 只有業(yè)務(wù)成功處理才進(jìn)行ack 記得需要做好冪等
不過這里rabbitmq在重試這塊沒有做好 如果不確定會(huì)一直重試 如果因?yàn)橐蕾嚨囊粋€(gè)系統(tǒng)掛了 要一個(gè)小時(shí)以后才會(huì)啟動(dòng)成功 在這一個(gè)小時(shí)里會(huì)一直重試 這就會(huì)對(duì)rabbitmq和消費(fèi)者帶來(lái)一定的壓力 這塊也可以采用異步命令組件提供的方案https://github.com/bojiw/asyncmd
- 接收消息
- 把消息插入異步命令
- 返回rabbitmq成功
- 異步組件執(zhí)行業(yè)務(wù)邏輯
- 調(diào)用接口失敗重試
- 重試一定次數(shù)則不重試 由人工進(jìn)行處理 也可以把重試間隔設(shè)置的長(zhǎng)一點(diǎn) 比如前三次每隔1s重試 第四次隔一個(gè)小時(shí)重試