前言:
消息隊(duì)列的主要作用是實(shí)現(xiàn)系統(tǒng)間的解耦、異步處理和削峰填谷。 由于消息隊(duì)列的異步使用特性,天然的會(huì)存在一定概率消息丟失的情況。

方案1:消息落庫
消息落庫重發(fā)是基于MQ的confirm機(jī)制,在消息發(fā)送失敗后自動(dòng)重發(fā)。

Step 1: 首先把消息信息(業(yè)務(wù)數(shù)據(jù))存儲(chǔ)到數(shù)據(jù)庫中,緊接著,我們?cè)侔堰@個(gè)消息記錄也存儲(chǔ)到一張消息記錄表里(或者另外一個(gè)同源數(shù)據(jù)庫的消息記錄表)
Step 2:發(fā)送消息到MQ Broker節(jié)點(diǎn)(采用confirm方式發(fā)送,會(huì)有異步的返回結(jié)果)
Step 3、4:生產(chǎn)者端接受MQ Broker節(jié)點(diǎn)返回的Confirm確認(rèn)消息結(jié)果,然后進(jìn)行更新消息記錄表里的消息狀態(tài)。比如默認(rèn)Status = 0 當(dāng)收到消息確認(rèn)成功后,更新為1即可!
Step 5:但是在消息確認(rèn)這個(gè)過程中可能由于網(wǎng)絡(luò)閃斷、MQ Broker端異常等原因?qū)е?回送消息失敗或者異常。這個(gè)時(shí)候就需要發(fā)送方(生產(chǎn)者)對(duì)消息進(jìn)行可靠性投遞了,保障消息不丟失,100%的投遞成功?。ㄓ幸环N極限情況是閃斷,Broker返回的成功確認(rèn)消息,但是生產(chǎn)端由于網(wǎng)絡(luò)閃斷沒收到,這個(gè)時(shí)候重新投遞可能會(huì)造成消息重復(fù),需要消費(fèi)端去做冪等處理)所以我們需要有一個(gè)定時(shí)任務(wù),(比如每5分鐘拉取一下處于中間狀態(tài)的消息,當(dāng)然這個(gè)消息可以設(shè)置一個(gè)超時(shí)時(shí)間,比如超過1分鐘 Status = 0 ,也就說明了1分鐘這個(gè)時(shí)間窗口內(nèi),我們的消息沒有被確認(rèn),那么會(huì)被定時(shí)任務(wù)拉取出來)
Step 6:接下來我們把中間狀態(tài)的消息進(jìn)行重新投遞 retry send,繼續(xù)發(fā)送消息到MQ ,當(dāng)然也可能有多種原因?qū)е掳l(fā)送失敗
Step 7:我們可以采用設(shè)置最大努力嘗試次數(shù),比如投遞了3次,還是失敗,那么我們可以將最終狀態(tài)設(shè)置為Status = 2 ,最后 交由人工解決處理此類問題(或者把消息轉(zhuǎn)儲(chǔ)到失敗表中)。
表結(jié)構(gòu)和代碼示例
CREATE TABLE IF NOT EXISTS `message_log`
(
`message_id` varchar(30) NOT NULL COMMENT '消息唯一ID',
`message` varchar(1000) DEFAULT '' COMMENT '消息內(nèi)容',
`business_id` varchar(40) NOT NULL COMMENT '業(yè)務(wù)id,比如記錄訂單號(hào)',
`try_count` int(4) DEFAULT '0' COMMENT '重試次數(shù)',
`status` tinyint(2) DEFAULT '0' COMMENT ' 消息投遞狀態(tài) 0:投遞中 1:投遞成功 2:投遞失敗',
`next_retry_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '下一次投遞時(shí)間',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后一次更新時(shí)間',
PRIMARY KEY (`message_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
創(chuàng)建訂單方法
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderMapper orderMapper;
private final MessageLogMapper messageLogMapper;
private final RocketMQProducer rocketMQProducer;
//創(chuàng)建訂單
public void createOrder(Order order) {
//插入業(yè)務(wù)數(shù)據(jù)
orderMapper.insert(order);
//插入消息記錄表數(shù)據(jù)
MessageLog messageLog = new MessageLog();
//消息唯一ID
messageLog.setMessageId(messageId);
//保存消息整體
messageLog.setMessage(JSONObject.toJSONString(order));
//設(shè)置消息狀態(tài)為0 表示發(fā)送中
messageLog.setStatus(0);
//設(shè)置下一次執(zhí)行時(shí)間
messageLog.setNextRetryTime(nextRetryTime);
messageLogMapper.insert(brokerMessageLog);
//發(fā)送消息
rocketMQProducer.sendOrder(order);
}
}
消息生產(chǎn)者
@Component
public class RocketMQProducer {
public void sendOrder(Order order) {
//1.創(chuàng)建消息
Message message = new Message("test_quick_topic",// 主題
"TagA",// 標(biāo)簽
"KeyA",// 用戶自定義的key,唯一的標(biāo)識(shí)
FastJsonConvertUtil.convertObjectToJSON(order).getBytes()); //消息內(nèi)容實(shí)體(byte[])
try {
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//如果confirm返回成功 則進(jìn)行更新
messageLogMapper.changeMessageLogStatus();
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
//失敗則進(jìn)行具體的后續(xù)操作:重試 或者補(bǔ)償?shù)仁侄? System.err.println("-----------異常處理-----------");
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
定時(shí)任務(wù)
@Component
public class RetryMessageTasker {
@Scheduled(initialDelay = 5000, fixedDelay = 10000)
public void reSend() {
System.out.println("----------------定時(shí)任務(wù)開始----------------");
//pull status = 0 and timeout message
List<MessageLog> list = getNeedReSendMsgList();
for (MessageLog messageLog : list) {
if (messageLog.getTryCount() > maxTryCount) {
//update fail message
continue;
}
//更新try_count
// resend
try {
sendOrder(getMessage());
} catch (Exception e) {
e.printStackTrace();
System.err.println("-----------異常處理-----------");
}
}
}
}
該方案只能保證消息從生產(chǎn)者到MQ之間的可靠性投遞,解決辦法:
方式1. 在消息表中新增 消費(fèi)成功狀態(tài),下游消費(fèi)者變更消費(fèi)狀態(tài)(要考慮多個(gè)業(yè)務(wù)消費(fèi)的情況)
方式2. 使用業(yè)務(wù)正確性校驗(yàn)平臺(tái)BCP檢查上下游業(yè)務(wù)數(shù)據(jù)是否一致,進(jìn)行修復(fù)
方案2:二次確認(rèn)檢測(cè)
二次確認(rèn)檢測(cè)是基于延時(shí)投遞機(jī)制實(shí)現(xiàn)的,主要目的是為了減少數(shù)據(jù)庫操作,提高并發(fā)量。

Step 1:先將業(yè)務(wù)數(shù)據(jù)進(jìn)行入庫,然后上游服務(wù)將消息M1發(fā)送出去
Step 2:在發(fā)送消息M1之后,緊接著生產(chǎn)端再次發(fā)送一條延遲消息(Second Send Delay Check),即延遲檢查投遞消息M3
Step 3:消費(fèi)端去監(jiān)聽指定隊(duì)列,將收到的消息進(jìn)行處理
Step 4:處理完成之后,發(fā)送一個(gè)confirm消息M2,也就是回送響應(yīng),但是這里響應(yīng)不是正常的ACK,而是重新生成一條消息,投遞到MQ中
Step 5:
下游Callback Check Service是一個(gè)單獨(dú)的服務(wù),其實(shí)它扮演了方案一的存儲(chǔ)消息的DB角色,它通過MQ去監(jiān)聽下游服務(wù)發(fā)送的confirm消息M2,如果下游Callback Check Service收到下游服務(wù)的confirm消息M2,那么就對(duì)消息做持久化存儲(chǔ),即將消息持久化到DB中Step 6:10分鐘之后MQ Server推送了延遲消息發(fā)送M3
Step 7:
下游Callback Check Service收到延遲消息發(fā)送M3后,Check消息后去檢查DB中是否存在消息M2,如果存在,則不需要做任何處理,如果不存在或者消費(fèi)失敗了,那么下游Callback Check Service就需要主動(dòng)發(fā)起RPC通信給上游服務(wù),上游服務(wù)收到信息后就會(huì)重新查詢業(yè)務(wù)消息然后將消息M1發(fā)送出去
該方案能夠保證消息從生成者端到消費(fèi)者的可靠性投遞,消費(fèi)者都能消費(fèi)到,生產(chǎn)者也就自然而然是可靠性的投遞。
方案對(duì)比
| 方案 | 優(yōu)點(diǎn) | 缺點(diǎn) |
|---|---|---|
| 消息落庫 | 實(shí)現(xiàn)簡(jiǎn)單 | 發(fā)送消息前需要2次DB操作,影響并發(fā)性能 |
| 二次確認(rèn)檢測(cè) | 減少了數(shù)據(jù)庫操作,提高并發(fā)量 | 不一定能保障百分百投遞成功,但是基本上可以保障大概99.9%的消息是OK的,有些特別極端的情況只能是使用定時(shí)任務(wù)去、BCP或人工去做補(bǔ)償了, |
參考:
阿神-RabbitMQ消息可靠性投遞解決方案
美團(tuán)業(yè)務(wù)正確性校驗(yàn)平臺(tái) BCP的設(shè)計(jì)與實(shí)踐