架構(gòu)師方案-業(yè)務(wù)角度下保證消息的可靠性的投遞

前言:

消息隊(duì)列的主要作用是實(shí)現(xiàn)系統(tǒng)間的解耦、異步處理和削峰填谷。 由于消息隊(duì)列的異步使用特性,天然的會(huì)存在一定概率消息丟失的情況。


image.png

方案1:消息落庫

消息落庫重發(fā)是基于MQ的confirm機(jī)制,在消息發(fā)送失敗后自動(dòng)重發(fā)。

image.png
  • Step 1: 首先把消息信息(業(yè)務(wù)數(shù)據(jù))存儲(chǔ)到數(shù)據(jù)庫中,緊接著,我們?cè)侔堰@個(gè)消息記錄也存儲(chǔ)到一張消息記錄表里(或者另外一個(gè)同源數(shù)據(jù)庫的消息記錄表)

  • Step 2:發(fā)送消息到MQ Broker節(jié)點(diǎn)(采用confirm方式發(fā)送,會(huì)有異步的返回結(jié)果)

  • Step 3、4:生產(chǎn)者端接受MQ Broker節(jié)點(diǎn)返回的Confirm確認(rèn)消息結(jié)果,然后進(jìn)行更新消息記錄表里的消息狀態(tài)。比如默認(rèn)Status = 0 當(dāng)收到消息確認(rèn)成功后,更新為1即可!

  • Step 5:但是在消息確認(rèn)這個(gè)過程中可能由于網(wǎng)絡(luò)閃斷、MQ Broker端異常等原因?qū)е?回送消息失敗或者異常。這個(gè)時(shí)候就需要發(fā)送方(生產(chǎn)者)對(duì)消息進(jìn)行可靠性投遞了,保障消息不丟失,100%的投遞成功?。ㄓ幸环N極限情況是閃斷,Broker返回的成功確認(rèn)消息,但是生產(chǎn)端由于網(wǎng)絡(luò)閃斷沒收到,這個(gè)時(shí)候重新投遞可能會(huì)造成消息重復(fù),需要消費(fèi)端去做冪等處理)所以我們需要有一個(gè)定時(shí)任務(wù),(比如每5分鐘拉取一下處于中間狀態(tài)的消息,當(dāng)然這個(gè)消息可以設(shè)置一個(gè)超時(shí)時(shí)間,比如超過1分鐘 Status = 0 ,也就說明了1分鐘這個(gè)時(shí)間窗口內(nèi),我們的消息沒有被確認(rèn),那么會(huì)被定時(shí)任務(wù)拉取出來)

  • Step 6:接下來我們把中間狀態(tài)的消息進(jìn)行重新投遞 retry send,繼續(xù)發(fā)送消息到MQ ,當(dāng)然也可能有多種原因?qū)е掳l(fā)送失敗

  • Step 7:我們可以采用設(shè)置最大努力嘗試次數(shù),比如投遞了3次,還是失敗,那么我們可以將最終狀態(tài)設(shè)置為Status = 2 ,最后 交由人工解決處理此類問題(或者把消息轉(zhuǎn)儲(chǔ)到失敗表中)。

表結(jié)構(gòu)和代碼示例


CREATE TABLE IF NOT EXISTS `message_log`
(
    `message_id`      varchar(30) NOT NULL COMMENT '消息唯一ID',
    `message`         varchar(1000)  DEFAULT '' COMMENT '消息內(nèi)容',
    `business_id`     varchar(40) NOT NULL COMMENT '業(yè)務(wù)id,比如記錄訂單號(hào)',
    `try_count`       int(4)       DEFAULT '0' COMMENT '重試次數(shù)',
    `status`          tinyint(2)   DEFAULT '0' COMMENT ' 消息投遞狀態(tài)  0:投遞中 1:投遞成功   2:投遞失敗',
    `next_retry_time` datetime     DEFAULT CURRENT_TIMESTAMP COMMENT '下一次投遞時(shí)間',
    `create_time`     datetime     DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
    `update_time`     datetime     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后一次更新時(shí)間',
    PRIMARY KEY (`message_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;
創(chuàng)建訂單方法
@Service
@RequiredArgsConstructor
public class OrderService {
        
        private final OrderMapper orderMapper;
        
        private final MessageLogMapper messageLogMapper;

        private final RocketMQProducer rocketMQProducer;
        
        
        //創(chuàng)建訂單
        public void createOrder(Order order) {
                //插入業(yè)務(wù)數(shù)據(jù)
                orderMapper.insert(order);
                //插入消息記錄表數(shù)據(jù)
                MessageLog messageLog = new MessageLog();
                //消息唯一ID
                messageLog.setMessageId(messageId);
                //保存消息整體
                messageLog.setMessage(JSONObject.toJSONString(order));
                //設(shè)置消息狀態(tài)為0 表示發(fā)送中
                messageLog.setStatus(0);
                //設(shè)置下一次執(zhí)行時(shí)間
                messageLog.setNextRetryTime(nextRetryTime);
                messageLogMapper.insert(brokerMessageLog);
                
                //發(fā)送消息
                rocketMQProducer.sendOrder(order);

        }
}
消息生產(chǎn)者
@Component
public class RocketMQProducer {

    public void sendOrder(Order order) {
        //1.創(chuàng)建消息
        Message message = new Message("test_quick_topic",// 主題
                "TagA",// 標(biāo)簽
                "KeyA",// 用戶自定義的key,唯一的標(biāo)識(shí)
                FastJsonConvertUtil.convertObjectToJSON(order).getBytes()); //消息內(nèi)容實(shí)體(byte[])

        try {
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
             
                    //如果confirm返回成功 則進(jìn)行更新
                    messageLogMapper.changeMessageLogStatus();
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    //失敗則進(jìn)行具體的后續(xù)操作:重試 或者補(bǔ)償?shù)仁侄?                   System.err.println("-----------異常處理-----------");
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
定時(shí)任務(wù)

@Component
public class RetryMessageTasker {

    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    public void reSend() {
        System.out.println("----------------定時(shí)任務(wù)開始----------------");
        //pull status = 0 and timeout message 
        List<MessageLog> list = getNeedReSendMsgList();
        for (MessageLog messageLog : list) {
            if (messageLog.getTryCount() > maxTryCount) {
                //update fail message 
                continue;
            }
        
            //更新try_count
            // resend
            try {
                sendOrder(getMessage());
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("-----------異常處理-----------");
            }
            
        }

    }
}
該方案只能保證消息從生產(chǎn)者到MQ之間的可靠性投遞,解決辦法:

方式1. 在消息表中新增 消費(fèi)成功狀態(tài),下游消費(fèi)者變更消費(fèi)狀態(tài)(要考慮多個(gè)業(yè)務(wù)消費(fèi)的情況)
方式2. 使用業(yè)務(wù)正確性校驗(yàn)平臺(tái)BCP檢查上下游業(yè)務(wù)數(shù)據(jù)是否一致,進(jìn)行修復(fù)

方案2:二次確認(rèn)檢測(cè)

二次確認(rèn)檢測(cè)是基于延時(shí)投遞機(jī)制實(shí)現(xiàn)的,主要目的是為了減少數(shù)據(jù)庫操作,提高并發(fā)量。

image.png
  • Step 1:先將業(yè)務(wù)數(shù)據(jù)進(jìn)行入庫,然后上游服務(wù)將消息M1發(fā)送出去

  • Step 2:在發(fā)送消息M1之后,緊接著生產(chǎn)端再次發(fā)送一條延遲消息(Second Send Delay Check),即延遲檢查投遞消息M3

  • Step 3:消費(fèi)端去監(jiān)聽指定隊(duì)列,將收到的消息進(jìn)行處理

  • Step 4:處理完成之后,發(fā)送一個(gè)confirm消息M2,也就是回送響應(yīng),但是這里響應(yīng)不是正常的ACK,而是重新生成一條消息,投遞到MQ中

  • Step 5:下游Callback Check Service是一個(gè)單獨(dú)的服務(wù),其實(shí)它扮演了方案一的存儲(chǔ)消息的DB角色,它通過MQ去監(jiān)聽下游服務(wù)發(fā)送的confirm消息M2,如果下游Callback Check Service收到下游服務(wù)的confirm消息M2,那么就對(duì)消息做持久化存儲(chǔ),即將消息持久化到DB中

  • Step 6:10分鐘之后MQ Server推送了延遲消息發(fā)送M3

  • Step 7:下游Callback Check Service收到延遲消息發(fā)送M3后,Check消息后去檢查DB中是否存在消息M2,如果存在,則不需要做任何處理,如果不存在或者消費(fèi)失敗了,那么下游Callback Check Service就需要主動(dòng)發(fā)起RPC通信給上游服務(wù),上游服務(wù)收到信息后就會(huì)重新查詢業(yè)務(wù)消息然后將消息M1發(fā)送出去

該方案能夠保證消息從生成者端到消費(fèi)者的可靠性投遞,消費(fèi)者都能消費(fèi)到,生產(chǎn)者也就自然而然是可靠性的投遞。

方案對(duì)比

方案 優(yōu)點(diǎn) 缺點(diǎn)
消息落庫 實(shí)現(xiàn)簡(jiǎn)單 發(fā)送消息前需要2次DB操作,影響并發(fā)性能
二次確認(rèn)檢測(cè) 減少了數(shù)據(jù)庫操作,提高并發(fā)量 不一定能保障百分百投遞成功,但是基本上可以保障大概99.9%的消息是OK的,有些特別極端的情況只能是使用定時(shí)任務(wù)去、BCP或人工去做補(bǔ)償了,

參考:

阿神-RabbitMQ消息可靠性投遞解決方案
美團(tuán)業(yè)務(wù)正確性校驗(yàn)平臺(tái) BCP的設(shè)計(jì)與實(shí)踐

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容