如何保證 RabbitMQ 的消息可靠性

前言

項(xiàng)目開發(fā)中經(jīng)常會使用消息隊(duì)列來完成異步處理、應(yīng)用解耦、流量控制等功能。雖然消息隊(duì)列的出現(xiàn)解決了一些場景下的問題,但是同時也引出了一些問題,其中使用消息隊(duì)列時如何保證消息的可靠性就是一個常見的問題。如果在項(xiàng)目中遇到需要保證消息一定被消費(fèi)的場景時,如何保證消息不丟失,如何保證消息的可靠性?

最近在寫項(xiàng)目時使用 RabbitMQ 消息隊(duì)列中間件時也遇到了需要保證消息可靠性的場景,所以將如何保持消息可靠性的方案記錄下來,下面將講解一下如何保證 RabbitMQ 的消息可靠性。

如何保證 RabbitMQ 的消息可靠性

先放一張 RabbitMQ 是如何消息傳遞的圖:

生產(chǎn)者Producer 將消息發(fā)送到指定的 交換機(jī)Exchange,交換機(jī)根據(jù)路由規(guī)則路由到綁定的 隊(duì)列Queue 中,最后和消費(fèi)者建立連接后,將消息推送給 消費(fèi)者Consumer。

那么消息會在哪些環(huán)節(jié)丟失呢,列出可能出現(xiàn)消息丟失的場景有:

  • 生產(chǎn)者將消息發(fā)送到 RabbitMQ Server 異常:可能因?yàn)榫W(wǎng)絡(luò)問題造成 RabbitMQ 服務(wù)端無法收到消息,造成生產(chǎn)者發(fā)送消息丟失場景。
  • RabbitMQ Server 中消息在交換機(jī)中無法路由到指定隊(duì)列:可能由于代碼層面或配置層面錯誤導(dǎo)致消息路由到指定隊(duì)列失敗,造成生產(chǎn)者發(fā)送消息丟失場景。
  • RabbitMQ Server 中存儲的消息丟失:可能因?yàn)?RabbitMQ Server 宕機(jī)導(dǎo)致消息未完全持久化或隊(duì)列丟失導(dǎo)致消息丟失等持久化問題,造成 RabbitMQ Server 存儲的消息丟失場景。
  • 消費(fèi)者消費(fèi)消息異常:可能在消費(fèi)者接收到消息后,還沒來得及消費(fèi)消息,消費(fèi)者宕機(jī)或故障等問題,造成消費(fèi)者無法消費(fèi)消息導(dǎo)致消息丟失的場景。

以上就是 RabbitMQ 可能出現(xiàn)消息丟失的場景,接下來將依次講解如何避免這些消息丟失的場景問題。


由于在項(xiàng)目開發(fā)中使用的是 RabbitMQ,所以使用 Spring Boot 集成的 AMQP 依賴即可使用 RabbitMQ。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1. 保證生產(chǎn)者發(fā)送消息到 RabbitMQ Server

為了避免因?yàn)榫W(wǎng)絡(luò)故障或閃斷問題導(dǎo)致消息無法正常發(fā)送到 RabbitMQ Server 的情況,RabbitMQ 提供了兩種方案讓生產(chǎn)者可以感知到消息是否正確無誤的發(fā)送到 RabbitMQ Server中,這兩種方案分別是 事務(wù)機(jī)制發(fā)送方確認(rèn)機(jī)制。下面分別介紹一下這兩種機(jī)制如何實(shí)現(xiàn)。

事務(wù)機(jī)制

先說配置和使用:

  1. 配置類中配置事務(wù)管理器

    /**
     * 消息隊(duì)列配置類
     *
     * @author 單程車票
     */
    @Configuration
    public class RabbitMQConfig {
        /**
         * 配置事務(wù)管理器
         */
        @Bean
        public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new RabbitTransactionManager(connectionFactory);
        }
    }
    
  2. 通過添加事務(wù)注解 + 開啟事務(wù)實(shí)現(xiàn)事務(wù)機(jī)制

    /**
     * 消息業(yè)務(wù)實(shí)現(xiàn)類
     *
     * @author 單程車票
     */
    @Service
    public class RabbitMQServiceImpl {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Transactional // 事務(wù)注解
        public void sendMessage() {
            // 開啟事務(wù)
            rabbitTemplate.setChannelTransacted(true);
            // 發(fā)送消息
            rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
        }
    }
    

通過上面的配置即可實(shí)現(xiàn)事務(wù)機(jī)制,執(zhí)行流程為:在生產(chǎn)者發(fā)送消息之前,開啟事務(wù),而后發(fā)送消息,如果消息發(fā)送至 RabbitMQ Server 失敗后,進(jìn)行事務(wù)回滾,重新發(fā)送。如果 RabbitMQ Server 接收到消息,則提交事務(wù)。

可以發(fā)現(xiàn)事務(wù)機(jī)制其實(shí)是同步操作,存在阻塞生產(chǎn)者的情況直到 RabbitMQ Server 應(yīng)答,這樣其實(shí)會很大程度上降低發(fā)送消息的性能,所以一般不會使用事務(wù)機(jī)制來保證生產(chǎn)者的消息可靠性,而是使用發(fā)送方確認(rèn)機(jī)制。


發(fā)送方確認(rèn)機(jī)制

先說配置和使用:

  1. 配置文件

    spring:
      rabbitmq:
        publisher-confirm-type: correlated  # 開啟發(fā)送方確認(rèn)機(jī)制
    

    配置屬性有三種分別為:

    • none:表示禁用發(fā)送方確認(rèn)機(jī)制
    • correlated:表示開啟發(fā)送方確認(rèn)機(jī)制
    • simple:表示開啟發(fā)送方確認(rèn)機(jī)制,并支持 waitForConfirms()waitForConfirmsOrDie() 的調(diào)用。

    這里一般使用 correlated 開啟發(fā)送方確認(rèn)機(jī)制即可,至于 simplewaitForConfirms() 方法調(diào)用是指串行確認(rèn)方法,即生產(chǎn)者發(fā)送消息后,調(diào)用該方法等待 RabbitMQ Server 確認(rèn),如果返回 false 或超時未返回則進(jìn)行消息重傳。由于串行性能較差,這里一般都是用異步 confirm 模式。

  2. 通過調(diào)用 setConfirmCallback() 實(shí)現(xiàn)異步 confirm 模式感知消息發(fā)送結(jié)果

    /**
     * 消息業(yè)務(wù)實(shí)現(xiàn)類
     *
     * @author 單程車票
     */
    @Service
    public class RabbitMQServiceImpl {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void sendMessage() {
            // 發(fā)送消息
            rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
            // 設(shè)置消息確認(rèn)回調(diào)方法
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 * MQ確認(rèn)回調(diào)方法
                 * @param correlationData 消息的唯一標(biāo)識
                 * @param ack 消息是否成功收到
                 * @param cause 失敗原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    // 記錄日志
                    log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
                    if (!ack) {
                        // 出錯處理
                        ...
                    }
                }
            });
        }
    }
    

生產(chǎn)者發(fā)送消息后通過調(diào)用 setConfirmCallback() 可以將信道設(shè)置為 confirm 模式,所有消息會被指派一個消息唯一標(biāo)識,當(dāng)消息被發(fā)送到 RabbitMQ Server 后,Server 確認(rèn)消息后生產(chǎn)者會回調(diào)設(shè)置的方法,從而實(shí)現(xiàn)生產(chǎn)者可以感知到消息是否正確無誤的投遞,從而實(shí)現(xiàn)發(fā)送方確認(rèn)機(jī)制。并且該模式是異步的,發(fā)送消息的吞吐量會得到很大提升。

上面就是發(fā)送放確認(rèn)機(jī)制的配置和使用,使用這種機(jī)制可以保證生產(chǎn)者的消息可靠性投遞,并且性能較好。


2. 保證消息能從交換機(jī)路由到指定隊(duì)列

在確保生產(chǎn)者能將消息投遞到交換機(jī)的前提下,RabbitMQ 同樣提供了消息投遞失敗的策略配置來確保消息的可靠性,接下來通過配置來介紹一下消息投遞失敗的策略。

先說配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 開啟發(fā)送方確認(rèn)機(jī)制
    publisher-returns: true   # 開啟消息返回
    template:
      mandatory: true     # 消息投遞失敗返回客戶端

mandatory 分為 true 失敗后返回客戶端false 失敗后自動刪除兩種策略。顯然設(shè)置為 false 無法保證消息的可靠性。

到這里的配置是可以保證生產(chǎn)者發(fā)送消息的可靠性投遞。

通過調(diào)用 setReturnCallback() 方法設(shè)置路由失敗后的回調(diào)方法:

/**
 * 消息業(yè)務(wù)實(shí)現(xiàn)類
 *
 * @author 單程車票
 */
@Service
public class RabbitMQServiceImpl {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage() {
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
        // 設(shè)置消息確認(rèn)回調(diào)方法
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * MQ確認(rèn)回調(diào)方法
             * @param correlationData 消息的唯一標(biāo)識
             * @param ack 消息是否成功收到
             * @param cause 失敗原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // 記錄日志
                log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
                if (!ack) {
                    // 出錯處理
                    ...
                }
            }
        });

        // 設(shè)置路由失敗回調(diào)方法
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * MQ沒有將消息投遞給指定的隊(duì)列回調(diào)方法
             * @param message 投遞失敗的消息詳細(xì)信息
             * @param replyCode 回復(fù)的狀態(tài)碼
             * @param replyText 回復(fù)的文本內(nèi)容
             * @param exchange 消息發(fā)給哪個交換機(jī)
             * @param routingKey 消息用哪個路郵鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                // 記錄日志
                log.info("Fail Message["+message+"]==>replyCode["+replyCode+"]" +"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
                // 出錯處理
                ...
            }
        });
    }
}

通過調(diào)用 setReturnCallback() 方法即可實(shí)現(xiàn)當(dāng)交換機(jī)路由到指定隊(duì)列失敗后回調(diào)方法,拿到被退回的消息信息,進(jìn)行相應(yīng)的處理如記錄日志或重傳等等。


3. 保證消息在 RabbitMQ Server 中的持久化

對于消息的持久化,只需要在發(fā)送消息時將消息持久化,并且在創(chuàng)建交換機(jī)和隊(duì)列時也保證持久化即可。

配置如下:

/**
 * 消息隊(duì)列
 */
@Bean
public Queue queue() {
    // 四個參數(shù):name(隊(duì)列名)、durable(持久化)、 exclusive(獨(dú)占)、autoDelete(自動刪除)
    return new Queue(MESSAGE_QUEUE, true);
}

/**
 * 直接交換機(jī)
 */
@Bean
public DirectExchange exchange() {
    // 四個參數(shù):name(交換機(jī)名)、durable(持久化)、autoDelete(自動刪除)、arguments(額外參數(shù))
    return new DirectExchange(Direct_Exchange, true, false);
}

在創(chuàng)建交換機(jī)和隊(duì)列時通過構(gòu)造方法將持久化的參數(shù)都設(shè)置為 true 即可實(shí)現(xiàn)交換機(jī)和隊(duì)列的持久化。

@Override
public void sendMessage() {
    // 構(gòu)造消息(將消息持久化)
    Message message = MessageBuilder.withBody("單程車票".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
    // 向MQ發(fā)送消息(消息內(nèi)容都為消息表記錄的id)
    rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
}

在發(fā)送消息前通過調(diào)用 MessageBuildersetDeliveryMode(MessageDeliveryMode.PERSISTENT) 在構(gòu)造消息時設(shè)置消息持久化(MessageDeliveryMode.PERSISTENT)即可實(shí)現(xiàn)對消息的持久化。

通過確保消息、交換機(jī)、隊(duì)列的持久化操作可以保證消息的在 RabbitMQ Server 中不丟失,從而保證可靠性,其實(shí)除了持久化之外還需要保證 RabbitMQ 的高可用性,否則 MQ 都宕機(jī)或磁盤受損都無法確保消息的可靠性,關(guān)于高可用性這里就不作過多說明,有興趣的可以去了解一下。


4. 保證消費(fèi)者消費(fèi)的消息不丟失

在保證發(fā)送方和 RabbitMQ Server 的消息可靠性的前提下,只需要保證消費(fèi)者在消費(fèi)消息時異常消息不丟失即可保證消息的可靠性。

RabbitMQ 提供了 消費(fèi)者應(yīng)答機(jī)制 來使 RabbitMQ 能夠感知到消費(fèi)者是否消費(fèi)成功消息,默認(rèn)情況下,消費(fèi)者應(yīng)答機(jī)制是自動應(yīng)答的,也就是RabbitMQ 將消息推送給消費(fèi)者,便會從隊(duì)列刪除該消息,如果消費(fèi)者在消費(fèi)過程失敗時,消息就存在丟失的情況。所以需要將消費(fèi)者應(yīng)答機(jī)制設(shè)置為手動應(yīng)答,只有消費(fèi)者確認(rèn)消費(fèi)成功后才會刪除消息,從而避免消息的丟失。

下面來看看如何配置消費(fèi)者手動應(yīng)答:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 開啟發(fā)送方確認(rèn)機(jī)制
    publisher-returns: true   # 開啟消息返回
    template:
      mandatory: true     # 消息投遞失敗返回客戶端
    listener:
      simple:
        acknowledge-mode: manual  # 開啟手動確認(rèn)消費(fèi)機(jī)制

通過 listener.simple.acknowledge-mode = manual 即可將消費(fèi)者應(yīng)答機(jī)制設(shè)置為手動應(yīng)答。

之后只需要在消費(fèi)消息時,通過調(diào)用 channel.basicAck()channel.basicNack() 來根據(jù)業(yè)務(wù)的執(zhí)行成功選擇是手動確認(rèn)消費(fèi)還是手動丟棄消息。

/**
 * 監(jiān)聽消費(fèi)隊(duì)列的消息
 */
@RabbitListener(queues = RabbitMQConfig.MESSAGE_QUEUE)
public void onMessage(Message message, Channel channel) {
    // 獲取消息索引
    long index = message.getMessageProperties().getDeliveryTag();
    // 解析消息
    byte[] body = message.getBody();
    ...
    try {
        // 業(yè)務(wù)處理
        ...
        // 業(yè)務(wù)執(zhí)行成功則手動確認(rèn)
        channel.basicAck(index, false);
    }catch (Exception e) {
        // 記錄日志
        log.info("出現(xiàn)異常:{}", e.getMessage());
        try {
            // 手動丟棄信息
            channel.basicNack(index, false, false);
        } catch (IOException ex) {
            log.info("丟棄消息異常");
        }
    }
}

這里說明一下 basicAck()basicNack() 的參數(shù)說明:

  • void basicAck(long deliveryTag, boolean multiple) 方法(會拋異常):
    • deliveryTag:該消息的index
    • multiple:是否批量處理(true 表示將一次性ack所有小于deliveryTag的消息)
  • void basicNack(long deliveryTag, boolean multiple, boolean requeue) 方法(會拋異常):
    • deliveryTag:該消息的index
    • multiple:是否批量處理(true 表示將一次性ack所有小于deliveryTag的消息)
    • requeue:被拒絕的是否重新入隊(duì)列(true 表示添加在隊(duì)列的末端;false 表示丟棄)

通過設(shè)置手動確認(rèn)消費(fèi)者應(yīng)答機(jī)制即可保證消費(fèi)者在消費(fèi)信息時的消息可靠性。

Spring Boot 提供的消息重試機(jī)制

除了消費(fèi)者應(yīng)答機(jī)制外,Spring Boot也提供了一種重試機(jī)制,只需要通過配置即可實(shí)現(xiàn)消息重試從而確保消息的可靠性,這里簡單介紹一下:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 開啟自動確認(rèn)消費(fèi)機(jī)制
        retry:
          enabled: true # 開啟消費(fèi)者失敗重試
          initial-interval: 5000ms # 初始失敗等待時長為5秒
          multiplier: 1  # 失敗的等待時長倍數(shù)(下次等待時長 = multiplier * 上次等待時間)
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # true無狀態(tài);false有狀態(tài)(如果業(yè)務(wù)中包含事務(wù),這里改為false)

通過配置在消費(fèi)者的方法上如果執(zhí)行失敗或執(zhí)行異常只需要拋出異常(一定要出現(xiàn)異常才會觸發(fā)重試,注意:不要捕獲異常) 即可實(shí)現(xiàn)消息重試,這樣也可以保證消息的可靠性。


總要有總結(jié)

上面就是我在項(xiàng)目中關(guān)于如何保證 RabbitMQ 的消息可靠性的配置和實(shí)現(xiàn)方案了。下面想聊聊我在實(shí)際使用消息隊(duì)列實(shí)現(xiàn)消息可靠性時遇到的問題。

消費(fèi)者消費(fèi)消息需要保證冪等性

由于實(shí)現(xiàn)了消息可靠性導(dǎo)致消息重發(fā)或消息重試造成消費(fèi)者可能會存在消息被重復(fù)消費(fèi)的情況,這種情況就需要保證消息不被重復(fù)消費(fèi),也就是消息保證冪等性。

實(shí)現(xiàn)冪等性的方法有很多:借助數(shù)據(jù)庫的樂觀鎖或悲觀鎖、借助 redis 的分布式鎖、借助 redis 實(shí)現(xiàn) token 機(jī)制等等都可以很好的保證消息的冪等性。

使用消息隊(duì)列很難做到 100% 的消息可靠性

我在項(xiàng)目實(shí)際開發(fā)中使用 RabbitMQ 實(shí)現(xiàn)消息可靠性,實(shí)踐后的感受是消息隊(duì)列很難能做到 100% 的消息可靠性,上面的實(shí)現(xiàn)方案中 RabbitMQ 提供的機(jī)制做到的是盡可能地減小消息丟失的幾率。

大多數(shù)情況下消息丟失都是因?yàn)榇a出現(xiàn)錯誤,那么這樣無論進(jìn)行多少次重發(fā)都是無法解決問題的,這樣只會增加 CPU 的開銷,所以我認(rèn)為更好的解決辦法是通過記錄日志的方式等待后續(xù)回溯時更好的發(fā)現(xiàn)問題并解決問題。對于一些不是很需要保證百分百可靠性的場景,都可以通過記錄日志的方式來保證消息可靠性即可。

我在項(xiàng)目中采用的是消息落庫的方式,先將消息落庫,而后生產(chǎn)者將消息發(fā)送給 MQ,使用數(shù)據(jù)庫記錄消息的消費(fèi)情況,對于重試多次仍然無法消費(fèi)成功的消息,后續(xù)通過定時任務(wù)調(diào)度的方式對這些無法消費(fèi)成功的消息進(jìn)行補(bǔ)償。我認(rèn)為這樣可以盡可能地保證消息的可靠性。但是同樣這樣也帶來了問題就是消息落庫需要數(shù)據(jù)庫磁盤IO的開銷,增大數(shù)據(jù)庫壓力同時降低了性能。

總之,在實(shí)現(xiàn)消息的可靠性時,應(yīng)該根據(jù)項(xiàng)目的需求來考慮如何處理。對于消息要求可靠性低的只需要在出錯時記錄日志方便后續(xù)回溯解決出錯問題即可,對于消息可靠性要求高的則可以采用消息落庫 + 定時任務(wù)的方式盡可能保證百分百的可靠性。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容