RabbitMQ——如何保證消息發(fā)送的可靠性

[TOC]

一、RabbitMQ消息發(fā)送機(jī)制

消息由生產(chǎn)者生產(chǎn),首先并發(fā)送到交換機(jī)(Exchange),然后由交換機(jī)根據(jù)指定的路由規(guī)則將消息路由到不同的隊(duì)列(Queue)中。最后由不同的消費(fèi)者去消費(fèi)處理。

RabbitMQ

根據(jù)RabbitMQ的機(jī)制分析,想要確保消息發(fā)送的可靠性,需要保證兩個(gè)方面:

  • 確認(rèn)消息到達(dá)交換機(jī),publish
  • 確認(rèn)消息到達(dá)隊(duì)列,routes

這兩步如果全成功了,說(shuō)明消息已經(jīng)成功發(fā)送;任何一步出現(xiàn)問(wèn)題,消息就沒(méi)發(fā)送成功。

二、方案1: 開(kāi)啟事務(wù)機(jī)制

在SpringBoot項(xiàng)目中可以通過(guò)開(kāi)啟RabbitMQ事務(wù)機(jī)制的方式解決,具體操作如下。

  1. 提供一個(gè)事物管理器

    @Bean
    RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
    
  2. 生產(chǎn)者上添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式

    @Service
    public class MsgService {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        // 添加事務(wù)注解
        @Transactional
        public void send() {
            // 開(kāi)啟事務(wù)模式
            rabbitTemplate.setChannelTransacted(true);
            rabbitTemplate.convertAndSend("ecchange", "queue", "Hello World!");
            throw new RuntimeException();
        }
    }
    

開(kāi)啟事務(wù)模式以后,RabbitMQ生產(chǎn)者發(fā)送消息要經(jīng)過(guò)如下4步:

  1. 客戶端請(qǐng)求,將信道設(shè)置為事務(wù)模式;
  2. 服務(wù)端響應(yīng),設(shè)置完成;
  3. 客戶端發(fā)送消息;
  4. 客戶端提交事務(wù);
  5. 服務(wù)端響應(yīng),確認(rèn)事務(wù)提交。

正常情況我們發(fā)送消息只需要第三步,可以看出上面的步驟比較復(fù)雜,所以事務(wù)模式效率不是很高。

三、方案2: 發(fā)送方確認(rèn)

添加配置,啟用發(fā)送方確認(rèn)。注意,發(fā)送方確認(rèn)和事務(wù)不能同時(shí)存在,會(huì)報(bào)錯(cuò)。

spring:
  rabbitmq:
    # 消息到達(dá)交換器確認(rèn)回調(diào)
    publisher-confirm-type: correlated
    # 消息到達(dá)隊(duì)列回調(diào)
    publisher-returns: true

publisher-confirm-type枚舉如下:

public enum ConfirmType {

  /**
    * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
    * within scoped operations.
    */
    SIMPLE,

    /**
  * Use with {@code CorrelationData} to correlate confirmations with sent
  * messsages.
  */
    CORRELATED,

  /**
    * Publisher confirms are disabled (default).
    */
    NONE

}

配置監(jiān)聽(tīng),實(shí)現(xiàn)回調(diào)

@Slf4j
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    public static final String EXCHANGE_NAME = "exchange_name";
    public static final String QUEUE_NAME = "queue.name";
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue()).to(directExchange()).with(QUEUE_NAME);
    }

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("{}:消息成功到達(dá)交換器", correlationData.getId());
        } else {
            log.error("{}:消息發(fā)送失敗", correlationData.getId());
        }
    }


    /**
     * Returned message callback.
     *
     * @param message    the returned message.
     * @param replyCode  the reply code.
     * @param replyText  the reply text.
     * @param exchange   the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("{}:消息未成功路由到隊(duì)列", message.getMessageProperties().getMessageId());

    }
}

三、方案3: 失敗重試

失敗重試分兩種情況,沒(méi)鏈接上mq的鏈接重試和鏈接上了發(fā)送失敗的發(fā)送重試。

1. 鏈接重試

利用Spring Boot自帶的重試機(jī)制,直接通過(guò)配置開(kāi)啟即可:

spring:
  rabbitmq:
    template:
      retry:
        enabled: true #開(kāi)啟重試機(jī)制
        initial-interval: 1000ms #重試起始時(shí)間間隔
        max-attempts: 10 #最大重試次數(shù)
        max-interval: 10000ms #最大重試時(shí)間間隔
        multiplier: 2 #時(shí)間間隔系數(shù)

配置完以后,當(dāng)MQ鏈接斷開(kāi)后,Spring會(huì)進(jìn)行retry連接。retry時(shí)間間隔為起始時(shí)間間隔*系數(shù)。

2. 發(fā)送重試

發(fā)送重試主要針對(duì)的是消息沒(méi)有到達(dá)交換器??傮w思路就是:利用消息確認(rèn)機(jī)制,當(dāng)消息沒(méi)有到達(dá)交換器時(shí),就會(huì)走失敗回調(diào),在回調(diào)方法中進(jìn)行相應(yīng)的業(yè)務(wù)補(bǔ)償處理即可。

利用數(shù)據(jù)庫(kù)存儲(chǔ)發(fā)送的消息記錄,創(chuàng)建數(shù)據(jù)表,大致字段內(nèi)容如下:

字段名 字段類型 字段是否為空 默認(rèn)值 備注
id bigint(20) unsigned N 主鍵,消息id
content text N 消息內(nèi)容
state tinyint(1) N 狀態(tài):0-發(fā)送中,1-成功,2-失敗
route_key varchar(255) N 路由key
exchange varcher(255) N 交換機(jī)
count tinyint N 重試次數(shù)
try_time datetime N 重試時(shí)間
create_time datetime N CURRENT_TIMESTAMP 創(chuàng)建時(shí)間
update_time datetime N CURRENT_TIMESTAMP 更新時(shí)間
del_flag tinyint(1) N 邏輯刪除

操作步驟如下:

  1. 消息發(fā)送前,向表中插入消息發(fā)送記錄,狀態(tài)為0,try_time根據(jù)實(shí)際情況設(shè)置即可。
  2. 在confirm回調(diào)方法中,收到發(fā)送成功的回調(diào),則將該消息的狀態(tài)修改為1。
  3. 通過(guò)定時(shí)job掃描發(fā)送記錄,篩選出狀態(tài)為0,并且過(guò)了重試時(shí)間的消息,重新發(fā)送。重試次數(shù)根據(jù)實(shí)際情況判斷。

注意,在發(fā)送的時(shí)候會(huì)出現(xiàn)重復(fù)發(fā)送的情況,所以在消費(fèi)的時(shí)候需要做好冪等。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容