本文參考石杉的架構(gòu)筆記公眾號(hào), 總結(jié)了一下RabbitMQ的核心知識(shí), 非常感謝
一、消息中間件選型
1. ActiveMQ:
- 優(yōu)點(diǎn): ActiveMQ是老牌的消息中間件,國(guó)內(nèi)很多公司過(guò)去運(yùn)用的還是非常廣泛的,功能很強(qiáng)大。
- 缺點(diǎn): 沒(méi)法確認(rèn)ActiveMQ可以支撐互聯(lián)網(wǎng)公司的高并發(fā)、高負(fù)載以及高吞吐的復(fù)雜場(chǎng)景,在國(guó)內(nèi)互聯(lián)網(wǎng)公司落地較少。而且使用較多的是一些傳統(tǒng)企業(yè),用ActiveMQ做異步調(diào)用和系統(tǒng)解耦。
2. RabbitMQ:
- 優(yōu)點(diǎn): 可以支撐高并發(fā)、高吞吐、性能很高,同時(shí)有非常完善便捷的后臺(tái)管理界面可以使用。另外,他還支持集群化、高可用部署架構(gòu)、消息高可靠支持,功能較為完善。而且經(jīng)過(guò)調(diào)研,國(guó)內(nèi)各大互聯(lián)網(wǎng)公司落地大規(guī)模RabbitMQ集群支撐自身業(yè)務(wù)的case較多,國(guó)內(nèi)各種中小型互聯(lián)網(wǎng)公司使用RabbitMQ的實(shí)踐也比較多。除此之外,RabbitMQ的開(kāi)源社區(qū)很活躍,較高頻率的迭代版本,來(lái)修復(fù)發(fā)現(xiàn)的bug以及進(jìn)行各種優(yōu)化,因此,綜合對(duì)比后,RabbitMQ是一個(gè)不錯(cuò)的消息中間件選擇。
- 缺點(diǎn): 基于erlang語(yǔ)言開(kāi)發(fā)的,所以導(dǎo)致較為難以分析里面的源碼,也較難進(jìn)行深層次的源碼定制和改造,畢竟需要較為扎實(shí)的erlang語(yǔ)言功底才可以。
3. RocketMQ:
- 優(yōu)點(diǎn): 阿里開(kāi)源的,經(jīng)過(guò)阿里的生產(chǎn)環(huán)境的超高并發(fā)、高吞吐的考驗(yàn),性能卓越,同時(shí)還支持分布式事務(wù)等特殊場(chǎng)景。而且RocketMQ是基于Java語(yǔ)言開(kāi)發(fā)的,適合深入閱讀源碼,有需要可以站在源碼層面解決線上生產(chǎn)問(wèn)題,包括源碼的二次開(kāi)發(fā)和改造。
- 缺點(diǎn): 社區(qū)活躍度相對(duì)較為一般,不過(guò)也還可以,文檔相對(duì)來(lái)說(shuō)簡(jiǎn)單一些,然后接口這塊不是按照標(biāo)準(zhǔn)JMS規(guī)范走的有些系統(tǒng)要遷移需要修改大量代碼
4. Kafka:
- 優(yōu)點(diǎn): 專為超高吞吐量的實(shí)時(shí)日志采集、實(shí)時(shí)數(shù)據(jù)同步、實(shí)時(shí)數(shù)據(jù)計(jì)算等場(chǎng)景來(lái)設(shè)計(jì)。在大數(shù)據(jù)領(lǐng)域中配合實(shí)時(shí)計(jì)算技術(shù)(比如Spark Streaming、Storm、Flink)使用的較多。
- 缺點(diǎn): 相對(duì)于以上幾種中間件來(lái)說(shuō),功能較少,在傳統(tǒng)的MQ中間件使用場(chǎng)景中較少采用。
二、消息中間件的常見(jiàn)使用場(chǎng)景
- 復(fù)雜系統(tǒng)的解耦: 多個(gè)系統(tǒng)間通過(guò)中間件進(jìn)行數(shù)據(jù)交互, 避免牽一發(fā)而動(dòng)全身, 減少耦合, 提升系統(tǒng)穩(wěn)定性與可擴(kuò)展性
- 復(fù)雜鏈路的異步調(diào)用: 某些業(yè)務(wù)場(chǎng)景可以通過(guò)異步執(zhí)行減少同步調(diào)用的時(shí)間, 從而大大提高系統(tǒng)響應(yīng)時(shí)間而不影響核心邏輯
- 瞬時(shí)高峰的削峰處理: 流量高峰期, 可以將請(qǐng)求積壓在MQ中, 服務(wù)器不用一下處理所有請(qǐng)求從而導(dǎo)致系統(tǒng)崩潰, 高峰期后, 消費(fèi)者可以慢慢消費(fèi)
三、系統(tǒng)架構(gòu)引入消息中間件后會(huì)有哪些缺點(diǎn)
- 系統(tǒng)可用性降低: 引入MQ,系統(tǒng)多了一個(gè)依賴。依賴如果出現(xiàn)問(wèn)題,就會(huì)導(dǎo)致系統(tǒng)可用性降低。一旦引入中間件,就必須考慮這個(gè)中間件是如何部署的,如何保證高可用性
- 系統(tǒng)穩(wěn)定性降低: 引入MQ, 可能由于網(wǎng)絡(luò)故障、中間件故障、消費(fèi)者異常等原因?qū)е赂鞣N各樣亂七八糟的問(wèn)題產(chǎn)生, 從而使系統(tǒng)穩(wěn)定性下降
- 分布式一致性問(wèn)題: 多系統(tǒng)協(xié)同處理一個(gè)業(yè)務(wù), 不能保證所有系統(tǒng)都正常處理, 有可能出現(xiàn)系統(tǒng)數(shù)據(jù)不一致的情況, 所以此時(shí)又需要使用可靠消息最終一致性的分布式事務(wù)方案來(lái)保障數(shù)據(jù)一致性。
四、消息發(fā)送確認(rèn)
生產(chǎn)者發(fā)送消息, 先發(fā)送消息到Exchange, 然后Exchange再路由到Queue, 這中間就需要確認(rèn)兩個(gè)事情
- 確認(rèn)消息是否成功發(fā)送到Exchange
- 確認(rèn)消息是否從Exchange成功路由到Queue
spring提供了兩個(gè)回調(diào)函數(shù)來(lái)處理這兩種消息發(fā)送確認(rèn)
1. 確認(rèn)消息是否成功發(fā)送到Exchange
有2種方式, 一種是重量級(jí)的事務(wù)消息機(jī)制。采用類事務(wù)的機(jī)制把消息投遞到MQ,可以保證消息不丟失,但是性能極差,經(jīng)過(guò)測(cè)試性能會(huì)呈現(xiàn)幾百倍的下降。
所以說(shuō)現(xiàn)在一般是不會(huì)用這種過(guò)于重量級(jí)的機(jī)制,而是會(huì)用輕量級(jí)的confirm機(jī)制。
另一種方式是confirm機(jī)制, 跟手動(dòng)ack機(jī)制類似, 生產(chǎn)者將消息投遞到RabbitMQ, 且將消息持久化到硬盤后, RabbitMQ會(huì)通過(guò)一個(gè)回調(diào)方法將confirm信息回傳給生產(chǎn)端, 這樣, 如果生產(chǎn)端的服務(wù)接收到了這個(gè)confirm消息,就知道是已經(jīng)持久化到磁盤了。否則如果沒(méi)有接收到confirm消息,那么就說(shuō)明這條消息可能半路丟失了,此時(shí)你就可以重新投遞消息到MQ去,確保消息不會(huì)丟失。
1.1 通過(guò)AMQP的事務(wù)機(jī)制可以保證消息發(fā)送確認(rèn)
事務(wù)機(jī)制主要是通過(guò)對(duì)channel的設(shè)置實(shí)現(xiàn)
channel.txSelect();// 聲明啟動(dòng)事務(wù)模式
channel.txComment();// 提交事務(wù)
channel.txRollback();// 回滾事務(wù)
1.2 使用confirm確認(rèn)機(jī)制
實(shí)現(xiàn)ConfirmCallback并重寫confirm回調(diào)方法, 消息發(fā)送到Broker后觸發(fā)回調(diào), 可以確認(rèn)消息是否成功發(fā)送到Exchange
application.properties:
# 開(kāi)啟confirms回調(diào) P -> Exchange
spring.rabbitmq.publisher-confirms=true
回調(diào):
// 消息是否成功發(fā)送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功發(fā)送到Exchange");
} else {
log.info("消息發(fā)送到Exchange失敗: cause: {}", correlationData, cause);
}
});
2. 確認(rèn)消息是否從Exchange成功路由到Queue
實(shí)現(xiàn)ReturnCallback并重寫returnedMessage回調(diào)方法, 可以確認(rèn)消息從EXchange路由到Queue失敗, 注意: 這里的回調(diào)是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
application.properties:
# 開(kāi)啟returnedMessage回調(diào) Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 觸發(fā)returnedMessage回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
spring.rabbitmq.template.mandatory=true
回調(diào):
// 消息是否從Exchange路由到Queue, 注意: 這是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
五、消息接收確認(rèn)
消息怎樣才算消費(fèi)成功?
RabbitMQ默認(rèn)自動(dòng)確認(rèn)(ack)消息被正確消費(fèi), 即消息投遞到消費(fèi)者后就自動(dòng)確認(rèn)消息被處理完畢, 并且會(huì)將該消息刪除, 即使消費(fèi)者意外宕機(jī), 或者拋出異常, 如果消費(fèi)者接收到消息, 還沒(méi)處理完成就down掉或者拋出異常, 那么, 這條消息就丟失了
分析一下問(wèn)題出在哪, 問(wèn)題出在RabbitMQ只管消息投遞出去, 而不管消息是否被正確處理就自動(dòng)刪除消息, 所以, 只要將自動(dòng)ack修改為手動(dòng)ack, 消費(fèi)成功才通知RabbitMQ可以刪除該消息即可, 如果消費(fèi)者宕機(jī), 消費(fèi)失敗, 由于RabbitMQ并未收到ack通知, 且感知到該消費(fèi)者狀態(tài)異常(如拋出異常), 就會(huì)將該消息重新推送給其他消費(fèi)者, 讓其他消費(fèi)者繼續(xù)執(zhí)行, 這樣就保證消費(fèi)者掛掉但消息不會(huì)丟失
消息確認(rèn)模式有:
- AcknowledgeMode.NONE:自動(dòng)確認(rèn)
- AcknowledgeMode.AUTO:根據(jù)情況確認(rèn)
- AcknowledgeMode.MANUAL:手動(dòng)確認(rèn)
默認(rèn)情況下消息消費(fèi)者是自動(dòng)ack(確認(rèn))消息的, 如果要手動(dòng)ack(確認(rèn)), 則需要修改確認(rèn)模式為manual
application.properties:
# 設(shè)置手動(dòng)確認(rèn)(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消費(fèi)消息并手動(dòng)確認(rèn):
@Component
@Slf4j
public class LogUserConsumer {
@Autowired
UserLogService userLogService;
@RabbitListener(queues = "log.user.queue")
public void logUserConsumer(Message message, Channel channel, @Header (AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
log.info("收到消息: {}", message.toString());
userLogService.insert(MessageHelper.msgToObj(message, UserLog.class));
} catch (Exception e){
log.error("logUserConsumer error", e);
channel.basicNack(tag, false, true);
} finally {
channel.basicAck(tag, false);
}
}
}
重點(diǎn)在
channel.basicAck(tag, false)方法, 第一個(gè)參數(shù)deliveryTag(唯一標(biāo)識(shí) ID):當(dāng)一個(gè)消費(fèi)者向 RabbitMQ 注冊(cè)后,會(huì)建立起一個(gè) Channel ,RabbitMQ 會(huì)用 basic.deliver 方法向消費(fèi)者推送消息,這個(gè)方法攜帶了一個(gè) delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識(shí) ID,是一個(gè)單調(diào)遞增的正整數(shù),delivery tag 的范圍僅限于 Channel第二個(gè)參數(shù)multiple:為了減少網(wǎng)絡(luò)流量,手動(dòng)確認(rèn)可以被批處理,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息
六、消息持久化
消息被投遞到RabbitMQ的內(nèi)存中, 還沒(méi)投遞到消費(fèi)者實(shí)例之前宕機(jī)了, 消息不就丟失了?
可以進(jìn)行消息持久化, 將Exchange、queue和message都持久化到硬盤, 這樣, RabbitMQ重啟時(shí), 會(huì)把持久化的Exchange、queue和message從硬盤重新加載出來(lái), 重新投遞消息
1.1 Exchange的持久化, 聲明交換機(jī)時(shí)指定持久化參數(shù)為true即可
@Bean
public DirectExchange logUserExchange() {
return new DirectExchange("log.user.exchange", true, false);
}
第二個(gè)參數(shù)durable: 是否持久化, 第三個(gè)參數(shù)autoDelete: 當(dāng)所有綁定隊(duì)列都不再使用時(shí), 是否自動(dòng)刪除交換器, true: 刪除, false: 不刪除
1.2 queue的持久化, 聲明隊(duì)列時(shí)指定持久化參數(shù)為true即可
@Bean
public Queue logUserQueue() {
return new Queue("log.user.queue.name", true);
}
第二個(gè)參數(shù)durable, 是否持久化
1.3 message的持久化, 是通過(guò)配置deliveryMode實(shí)現(xiàn)的, 生產(chǎn)者投遞時(shí), 指定deliveryMode為MessageDeliveryMode.PERSISTENT即可實(shí)現(xiàn)消息的持久化, 投遞和消費(fèi)都需要通過(guò)Message對(duì)象進(jìn)行交互, 為了不每次都寫配置轉(zhuǎn)換的代碼, 我們寫一個(gè)消息幫助類MessageHelper:
public class MessageHelper {
public static Message objToMsg(Object obj) {
if (null == obj) {
return null;
}
Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
return message;
}
public static <T> T msgToObj(Message message, Class<T> clazz) {
if (null == message || null == clazz) {
return null;
}
String str = new String(message.getBody());
T obj = JsonUtil.strToObj(str, clazz);
return obj;
}
}
消息投遞時(shí):
rabbitTemplate.convertAndSend("log.user.exchange.name", "log.user.routing.key.name", MessageHelper.objToMsg(userLog));
消息消費(fèi)時(shí)(參考五、消息接收確認(rèn)):
UserLog userLog = MessageHelper.msgToObj(message, UserLog.class);
如果不需要消息持久化, 則不需要通過(guò)Message進(jìn)行轉(zhuǎn)換, 可以直接通過(guò)字符串或者對(duì)象投遞和消費(fèi)
七、unack消息的積壓?jiǎn)栴}
什么叫unack消息的積壓?jiǎn)栴}, 簡(jiǎn)單來(lái)說(shuō)就是消費(fèi)者處理能力有限, 無(wú)法一下將MQ投遞過(guò)來(lái)的所有消息消費(fèi)完, 如果MQ推送消息過(guò)多, 比如可能有幾千上萬(wàn)條消息積壓在某個(gè)消費(fèi)者實(shí)例內(nèi)存中, 此時(shí)這些積壓的消息就處于unack狀態(tài), 如果一直積壓, 就有可能導(dǎo)致消費(fèi)者服務(wù)實(shí)例內(nèi)存溢出、內(nèi)存消耗過(guò)大、甚至內(nèi)存泄露
所以, RabbitMQ是必須要考慮一下消費(fèi)者服務(wù)的處理能力的。
如何解決?
RabbitMQ基于一個(gè)prefetch count來(lái)控制這個(gè)unack message的數(shù)量。
你可以通過(guò) “channel.basicQos(10)” 這個(gè)方法來(lái)設(shè)置當(dāng)前channel的prefetch count。也可以通過(guò)配置文件設(shè)置: spring.rabbitmq.listener.simple.prefetch=10
舉個(gè)例子,比如你要是設(shè)置為10的話,那么意味著當(dāng)前這個(gè)channel里,unack message的數(shù)量不能超過(guò)10個(gè),以此來(lái)避免消費(fèi)者服務(wù)實(shí)例積壓unack message過(guò)多。
這樣的話,就意味著RabbitMQ正在投遞到channel過(guò)程中的unack message,以及消費(fèi)者服務(wù)在處理中的unack message,以及異步ack之后還沒(méi)完成ack的unack message,所有這些message加起來(lái),一個(gè)channel也不能超過(guò)10個(gè)。
如果你要簡(jiǎn)單粗淺的理解的話,也大致可以理解為這個(gè)prefetch count就代表了一個(gè)消費(fèi)者服務(wù)同時(shí)最多可以獲取多少個(gè)message來(lái)處理。
prefetch就是預(yù)抓取的意思,就意味著你的消費(fèi)者服務(wù)實(shí)例預(yù)抓取多少條message過(guò)來(lái)處理,但是最多只能同時(shí)處理這么多消息。
如果一個(gè)channel里的unack message超過(guò)了prefetch count指定的數(shù)量,此時(shí)RabbitMQ就會(huì)停止給這個(gè)channel投遞消息了,必須要等待已經(jīng)投遞過(guò)去的消息被ack了,此時(shí)才能繼續(xù)投遞下一個(gè)消息。
設(shè)置多大合理?
RabbitMQ官方給出的建議是prefetch count一般設(shè)置在100 - 300之間。也就是一個(gè)消費(fèi)者服務(wù)最多接收到100 - 300個(gè)message來(lái)處理,允許處于unack狀態(tài)。
這個(gè)狀態(tài)下可以兼顧吞吐量也很高,同時(shí)也不容易造成內(nèi)存溢出的問(wèn)題。
八、總結(jié)
- 配置匯總
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 開(kāi)啟confirms回調(diào) P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 開(kāi)啟returnedMessage回調(diào) Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 觸發(fā)returnedMessage回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
spring.rabbitmq.template.mandatory=true
# 設(shè)置手動(dòng)確認(rèn)(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
Github
完整項(xiàng)目請(qǐng)查看我的GitHub, 通過(guò)RabbitMQ實(shí)現(xiàn)了用戶登錄日志的記錄
https://github.com/wangzaiplus/springboot/tree/wxw另外, 關(guān)于windows安裝RabbitMQ詳細(xì)教程, 請(qǐng)參考:
http://m.itdecent.cn/p/c7726ba4b046關(guān)于Linux centos7安裝RabbitMQ詳細(xì)教程, 請(qǐng)參考:
http://m.itdecent.cn/p/ee9f7594212b