RabbitMQ消息確認(rèn)、消息持久化等核心知識(shí)總結(jié)

本文參考石杉的架構(gòu)筆記公眾號(hào), 總結(jié)了一下RabbitMQ的核心知識(shí), 非常感謝

一、消息中間件選型

1. ActiveMQ:
  • 優(yōu)點(diǎn): ActiveMQ是老牌的消息中間件,國(guó)內(nèi)很多公司過(guò)去運(yùn)用的還是非常廣泛的,功能很強(qiáng)大。
  • 缺點(diǎn): 沒(méi)法確認(rèn)ActiveMQ可以支撐互聯(lián)網(wǎng)公司的高并發(fā)、高負(fù)載以及高吞吐的復(fù)雜場(chǎng)景,在國(guó)內(nèi)互聯(lián)網(wǎng)公司落地較少。而且使用較多的是一些傳統(tǒng)企業(yè),用ActiveMQ做異步調(diào)用和系統(tǒng)解耦。
2. RabbitMQ:
  • 優(yōu)點(diǎn): 可以支撐高并發(fā)、高吞吐、性能很高,同時(shí)有非常完善便捷的后臺(tái)管理界面可以使用。另外,他還支持集群化、高可用部署架構(gòu)、消息高可靠支持,功能較為完善。而且經(jīng)過(guò)調(diào)研,國(guó)內(nèi)各大互聯(lián)網(wǎng)公司落地大規(guī)模RabbitMQ集群支撐自身業(yè)務(wù)的case較多,國(guó)內(nèi)各種中小型互聯(lián)網(wǎng)公司使用RabbitMQ的實(shí)踐也比較多。除此之外,RabbitMQ的開(kāi)源社區(qū)很活躍,較高頻率的迭代版本,來(lái)修復(fù)發(fā)現(xiàn)的bug以及進(jìn)行各種優(yōu)化,因此,綜合對(duì)比后,RabbitMQ是一個(gè)不錯(cuò)的消息中間件選擇。
  • 缺點(diǎn): 基于erlang語(yǔ)言開(kāi)發(fā)的,所以導(dǎo)致較為難以分析里面的源碼,也較難進(jìn)行深層次的源碼定制和改造,畢竟需要較為扎實(shí)的erlang語(yǔ)言功底才可以。
3. RocketMQ:
  • 優(yōu)點(diǎn): 阿里開(kāi)源的,經(jīng)過(guò)阿里的生產(chǎn)環(huán)境的超高并發(fā)、高吞吐的考驗(yàn),性能卓越,同時(shí)還支持分布式事務(wù)等特殊場(chǎng)景。而且RocketMQ是基于Java語(yǔ)言開(kāi)發(fā)的,適合深入閱讀源碼,有需要可以站在源碼層面解決線上生產(chǎn)問(wèn)題,包括源碼的二次開(kāi)發(fā)和改造。
  • 缺點(diǎn): 社區(qū)活躍度相對(duì)較為一般,不過(guò)也還可以,文檔相對(duì)來(lái)說(shuō)簡(jiǎn)單一些,然后接口這塊不是按照標(biāo)準(zhǔn)JMS規(guī)范走的有些系統(tǒng)要遷移需要修改大量代碼
4. Kafka:
  • 優(yōu)點(diǎn): 專為超高吞吐量的實(shí)時(shí)日志采集、實(shí)時(shí)數(shù)據(jù)同步、實(shí)時(shí)數(shù)據(jù)計(jì)算等場(chǎng)景來(lái)設(shè)計(jì)。在大數(shù)據(jù)領(lǐng)域中配合實(shí)時(shí)計(jì)算技術(shù)(比如Spark Streaming、Storm、Flink)使用的較多。
  • 缺點(diǎn): 相對(duì)于以上幾種中間件來(lái)說(shuō),功能較少,在傳統(tǒng)的MQ中間件使用場(chǎng)景中較少采用。

二、消息中間件的常見(jiàn)使用場(chǎng)景

  • 復(fù)雜系統(tǒng)的解耦: 多個(gè)系統(tǒng)間通過(guò)中間件進(jìn)行數(shù)據(jù)交互, 避免牽一發(fā)而動(dòng)全身, 減少耦合, 提升系統(tǒng)穩(wěn)定性與可擴(kuò)展性
  • 復(fù)雜鏈路的異步調(diào)用: 某些業(yè)務(wù)場(chǎng)景可以通過(guò)異步執(zhí)行減少同步調(diào)用的時(shí)間, 從而大大提高系統(tǒng)響應(yīng)時(shí)間而不影響核心邏輯
  • 瞬時(shí)高峰的削峰處理: 流量高峰期, 可以將請(qǐng)求積壓在MQ中, 服務(wù)器不用一下處理所有請(qǐng)求從而導(dǎo)致系統(tǒng)崩潰, 高峰期后, 消費(fèi)者可以慢慢消費(fèi)

三、系統(tǒng)架構(gòu)引入消息中間件后會(huì)有哪些缺點(diǎn)

  • 系統(tǒng)可用性降低: 引入MQ,系統(tǒng)多了一個(gè)依賴。依賴如果出現(xiàn)問(wèn)題,就會(huì)導(dǎo)致系統(tǒng)可用性降低。一旦引入中間件,就必須考慮這個(gè)中間件是如何部署的,如何保證高可用性
  • 系統(tǒng)穩(wěn)定性降低: 引入MQ, 可能由于網(wǎng)絡(luò)故障、中間件故障、消費(fèi)者異常等原因?qū)е赂鞣N各樣亂七八糟的問(wèn)題產(chǎn)生, 從而使系統(tǒng)穩(wěn)定性下降
  • 分布式一致性問(wèn)題: 多系統(tǒng)協(xié)同處理一個(gè)業(yè)務(wù), 不能保證所有系統(tǒng)都正常處理, 有可能出現(xiàn)系統(tǒng)數(shù)據(jù)不一致的情況, 所以此時(shí)又需要使用可靠消息最終一致性的分布式事務(wù)方案來(lái)保障數(shù)據(jù)一致性。

四、消息發(fā)送確認(rèn)

生產(chǎn)者發(fā)送消息, 先發(fā)送消息到Exchange, 然后Exchange再路由到Queue, 這中間就需要確認(rèn)兩個(gè)事情

  • 確認(rèn)消息是否成功發(fā)送到Exchange
  • 確認(rèn)消息是否從Exchange成功路由到Queue

spring提供了兩個(gè)回調(diào)函數(shù)來(lái)處理這兩種消息發(fā)送確認(rèn)

1. 確認(rèn)消息是否成功發(fā)送到Exchange

有2種方式, 一種是重量級(jí)的事務(wù)消息機(jī)制。采用類事務(wù)的機(jī)制把消息投遞到MQ,可以保證消息不丟失,但是性能極差,經(jīng)過(guò)測(cè)試性能會(huì)呈現(xiàn)幾百倍的下降。

所以說(shuō)現(xiàn)在一般是不會(huì)用這種過(guò)于重量級(jí)的機(jī)制,而是會(huì)用輕量級(jí)的confirm機(jī)制。

另一種方式是confirm機(jī)制, 跟手動(dòng)ack機(jī)制類似, 生產(chǎn)者將消息投遞到RabbitMQ, 且將消息持久化到硬盤后, RabbitMQ會(huì)通過(guò)一個(gè)回調(diào)方法將confirm信息回傳給生產(chǎn)端, 這樣, 如果生產(chǎn)端的服務(wù)接收到了這個(gè)confirm消息,就知道是已經(jīng)持久化到磁盤了。否則如果沒(méi)有接收到confirm消息,那么就說(shuō)明這條消息可能半路丟失了,此時(shí)你就可以重新投遞消息到MQ去,確保消息不會(huì)丟失。

1.1 通過(guò)AMQP的事務(wù)機(jī)制可以保證消息發(fā)送確認(rèn)
事務(wù)機(jī)制主要是通過(guò)對(duì)channel的設(shè)置實(shí)現(xiàn)

channel.txSelect();// 聲明啟動(dòng)事務(wù)模式
channel.txComment();// 提交事務(wù)
channel.txRollback();// 回滾事務(wù)

1.2 使用confirm確認(rèn)機(jī)制
實(shí)現(xiàn)ConfirmCallback并重寫confirm回調(diào)方法, 消息發(fā)送到Broker后觸發(fā)回調(diào), 可以確認(rèn)消息是否成功發(fā)送到Exchange

application.properties:

# 開(kāi)啟confirms回調(diào) P -> Exchange
spring.rabbitmq.publisher-confirms=true

回調(diào):

        // 消息是否成功發(fā)送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功發(fā)送到Exchange");
            } else {
                log.info("消息發(fā)送到Exchange失敗: cause: {}", correlationData, cause);
            }
        });
2. 確認(rèn)消息是否從Exchange成功路由到Queue

實(shí)現(xiàn)ReturnCallback并重寫returnedMessage回調(diào)方法, 可以確認(rèn)消息從EXchange路由到Queue失敗, 注意: 這里的回調(diào)是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法

application.properties:

# 開(kāi)啟returnedMessage回調(diào) Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 觸發(fā)returnedMessage回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
spring.rabbitmq.template.mandatory=true

回調(diào):

        // 消息是否從Exchange路由到Queue, 注意: 這是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });

五、消息接收確認(rèn)

消息怎樣才算消費(fèi)成功?

RabbitMQ默認(rèn)自動(dòng)確認(rèn)(ack)消息被正確消費(fèi), 即消息投遞到消費(fèi)者后就自動(dòng)確認(rèn)消息被處理完畢, 并且會(huì)將該消息刪除, 即使消費(fèi)者意外宕機(jī), 或者拋出異常, 如果消費(fèi)者接收到消息, 還沒(méi)處理完成就down掉或者拋出異常, 那么, 這條消息就丟失了

分析一下問(wèn)題出在哪, 問(wèn)題出在RabbitMQ只管消息投遞出去, 而不管消息是否被正確處理就自動(dòng)刪除消息, 所以, 只要將自動(dòng)ack修改為手動(dòng)ack, 消費(fèi)成功才通知RabbitMQ可以刪除該消息即可, 如果消費(fèi)者宕機(jī), 消費(fèi)失敗, 由于RabbitMQ并未收到ack通知, 且感知到該消費(fèi)者狀態(tài)異常(如拋出異常), 就會(huì)將該消息重新推送給其他消費(fèi)者, 讓其他消費(fèi)者繼續(xù)執(zhí)行, 這樣就保證消費(fèi)者掛掉但消息不會(huì)丟失

消息確認(rèn)模式有:

  • AcknowledgeMode.NONE:自動(dòng)確認(rèn)
  • AcknowledgeMode.AUTO:根據(jù)情況確認(rèn)
  • AcknowledgeMode.MANUAL:手動(dòng)確認(rèn)

默認(rèn)情況下消息消費(fèi)者是自動(dòng)ack(確認(rèn))消息的, 如果要手動(dòng)ack(確認(rèn)), 則需要修改確認(rèn)模式為manual

application.properties:

# 設(shè)置手動(dòng)確認(rèn)(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消費(fèi)消息并手動(dòng)確認(rèn):

@Component
@Slf4j
public class LogUserConsumer {

    @Autowired
    UserLogService userLogService;

    @RabbitListener(queues = "log.user.queue")
    public void logUserConsumer(Message message, Channel channel, @Header (AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            log.info("收到消息: {}", message.toString());
            userLogService.insert(MessageHelper.msgToObj(message, UserLog.class));
        } catch (Exception e){
            log.error("logUserConsumer error", e);
            channel.basicNack(tag, false, true);
        } finally {
            channel.basicAck(tag, false);
        }
    }

}
  • 重點(diǎn)在channel.basicAck(tag, false)方法, 第一個(gè)參數(shù)deliveryTag(唯一標(biāo)識(shí) ID):當(dāng)一個(gè)消費(fèi)者向 RabbitMQ 注冊(cè)后,會(huì)建立起一個(gè) Channel ,RabbitMQ 會(huì)用 basic.deliver 方法向消費(fèi)者推送消息,這個(gè)方法攜帶了一個(gè) delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識(shí) ID,是一個(gè)單調(diào)遞增的正整數(shù),delivery tag 的范圍僅限于 Channel

  • 第二個(gè)參數(shù)multiple:為了減少網(wǎng)絡(luò)流量,手動(dòng)確認(rèn)可以被批處理,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息

六、消息持久化

消息被投遞到RabbitMQ的內(nèi)存中, 還沒(méi)投遞到消費(fèi)者實(shí)例之前宕機(jī)了, 消息不就丟失了?

可以進(jìn)行消息持久化, 將Exchange、queue和message都持久化到硬盤, 這樣, RabbitMQ重啟時(shí), 會(huì)把持久化的Exchange、queue和message從硬盤重新加載出來(lái), 重新投遞消息

1.1 Exchange的持久化, 聲明交換機(jī)時(shí)指定持久化參數(shù)為true即可

    @Bean
    public DirectExchange logUserExchange() {
        return new DirectExchange("log.user.exchange", true, false);
    }

第二個(gè)參數(shù)durable: 是否持久化, 第三個(gè)參數(shù)autoDelete: 當(dāng)所有綁定隊(duì)列都不再使用時(shí), 是否自動(dòng)刪除交換器, true: 刪除, false: 不刪除

1.2 queue的持久化, 聲明隊(duì)列時(shí)指定持久化參數(shù)為true即可

    @Bean
    public Queue logUserQueue() {
        return new Queue("log.user.queue.name", true);
    }

第二個(gè)參數(shù)durable, 是否持久化

1.3 message的持久化, 是通過(guò)配置deliveryMode實(shí)現(xiàn)的, 生產(chǎn)者投遞時(shí), 指定deliveryModeMessageDeliveryMode.PERSISTENT即可實(shí)現(xiàn)消息的持久化, 投遞和消費(fèi)都需要通過(guò)Message對(duì)象進(jìn)行交互, 為了不每次都寫配置轉(zhuǎn)換的代碼, 我們寫一個(gè)消息幫助類MessageHelper:

public class MessageHelper {

    public static Message objToMsg(Object obj) {
        if (null == obj) {
            return null;
        }

        Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
        message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);

        return message;
    }

    public static <T> T msgToObj(Message message, Class<T> clazz) {
        if (null == message || null == clazz) {
            return null;
        }

        String str = new String(message.getBody());
        T obj = JsonUtil.strToObj(str, clazz);

        return obj;
    }

}

消息投遞時(shí):

rabbitTemplate.convertAndSend("log.user.exchange.name", "log.user.routing.key.name", MessageHelper.objToMsg(userLog));

消息消費(fèi)時(shí)(參考五、消息接收確認(rèn)):

UserLog userLog = MessageHelper.msgToObj(message, UserLog.class);

如果不需要消息持久化, 則不需要通過(guò)Message進(jìn)行轉(zhuǎn)換, 可以直接通過(guò)字符串或者對(duì)象投遞和消費(fèi)

七、unack消息的積壓?jiǎn)栴}

什么叫unack消息的積壓?jiǎn)栴}, 簡(jiǎn)單來(lái)說(shuō)就是消費(fèi)者處理能力有限, 無(wú)法一下將MQ投遞過(guò)來(lái)的所有消息消費(fèi)完, 如果MQ推送消息過(guò)多, 比如可能有幾千上萬(wàn)條消息積壓在某個(gè)消費(fèi)者實(shí)例內(nèi)存中, 此時(shí)這些積壓的消息就處于unack狀態(tài), 如果一直積壓, 就有可能導(dǎo)致消費(fèi)者服務(wù)實(shí)例內(nèi)存溢出、內(nèi)存消耗過(guò)大、甚至內(nèi)存泄露

所以, RabbitMQ是必須要考慮一下消費(fèi)者服務(wù)的處理能力的。

如何解決?

RabbitMQ基于一個(gè)prefetch count來(lái)控制這個(gè)unack message的數(shù)量。

你可以通過(guò) “channel.basicQos(10)” 這個(gè)方法來(lái)設(shè)置當(dāng)前channel的prefetch count。也可以通過(guò)配置文件設(shè)置: spring.rabbitmq.listener.simple.prefetch=10

舉個(gè)例子,比如你要是設(shè)置為10的話,那么意味著當(dāng)前這個(gè)channel里,unack message的數(shù)量不能超過(guò)10個(gè),以此來(lái)避免消費(fèi)者服務(wù)實(shí)例積壓unack message過(guò)多。

這樣的話,就意味著RabbitMQ正在投遞到channel過(guò)程中的unack message,以及消費(fèi)者服務(wù)在處理中的unack message,以及異步ack之后還沒(méi)完成ack的unack message,所有這些message加起來(lái),一個(gè)channel也不能超過(guò)10個(gè)。

如果你要簡(jiǎn)單粗淺的理解的話,也大致可以理解為這個(gè)prefetch count就代表了一個(gè)消費(fèi)者服務(wù)同時(shí)最多可以獲取多少個(gè)message來(lái)處理。

prefetch就是預(yù)抓取的意思,就意味著你的消費(fèi)者服務(wù)實(shí)例預(yù)抓取多少條message過(guò)來(lái)處理,但是最多只能同時(shí)處理這么多消息。

如果一個(gè)channel里的unack message超過(guò)了prefetch count指定的數(shù)量,此時(shí)RabbitMQ就會(huì)停止給這個(gè)channel投遞消息了,必須要等待已經(jīng)投遞過(guò)去的消息被ack了,此時(shí)才能繼續(xù)投遞下一個(gè)消息。

設(shè)置多大合理?

RabbitMQ官方給出的建議是prefetch count一般設(shè)置在100 - 300之間。也就是一個(gè)消費(fèi)者服務(wù)最多接收到100 - 300個(gè)message來(lái)處理,允許處于unack狀態(tài)。

這個(gè)狀態(tài)下可以兼顧吞吐量也很高,同時(shí)也不容易造成內(nèi)存溢出的問(wèn)題。

八、總結(jié)

  • 配置匯總
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 開(kāi)啟confirms回調(diào) P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 開(kāi)啟returnedMessage回調(diào) Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 觸發(fā)returnedMessage回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
spring.rabbitmq.template.mandatory=true
# 設(shè)置手動(dòng)確認(rèn)(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 應(yīng)用場(chǎng)景 異步處理 場(chǎng)景說(shuō)明:用戶注冊(cè)后,需要發(fā)注冊(cè)郵件和注冊(cè)短信,傳統(tǒng)的做法有兩種: 1.串行的方式 2.并行的...
    lijun_m閱讀 1,952評(píng)論 0 3
  • RabbitMQ的學(xué)習(xí)筆記 關(guān)于RabbitMQ的幾個(gè)角色如下: 關(guān)于名詞的通俗解析: 首先我們肯定知道Rabbi...
    ChinaXieShuai閱讀 1,563評(píng)論 0 2
  • _Nobuta閱讀 277評(píng)論 0 0
  • 一、收獲:您本次學(xué)習(xí)最大的收獲是什么? 在學(xué)習(xí)中,最讓我受益的是北京教育學(xué)院外國(guó)語(yǔ)學(xué)院院長(zhǎng)、教授張金秀的講座《運(yùn)用...
    木子_ed3f閱讀 776評(píng)論 0 1
  • 陳小胖啊閱讀 171評(píng)論 0 1

友情鏈接更多精彩內(nèi)容