前言
項(xiàng)目開發(fā)中經(jīng)常會使用消息隊(duì)列來完成異步處理、應(yīng)用解耦、流量控制等功能。雖然消息隊(duì)列的出現(xiàn)解決了一些場景下的問題,但是同時也引出了一些問題,其中使用消息隊(duì)列時如何保證消息的可靠性就是一個常見的問題。如果在項(xiàng)目中遇到需要保證消息一定被消費(fèi)的場景時,如何保證消息不丟失,如何保證消息的可靠性?
最近在寫項(xiàng)目時使用 RabbitMQ 消息隊(duì)列中間件時也遇到了需要保證消息可靠性的場景,所以將如何保持消息可靠性的方案記錄下來,下面將講解一下如何保證 RabbitMQ 的消息可靠性。
如何保證 RabbitMQ 的消息可靠性
先放一張 RabbitMQ 是如何消息傳遞的圖:

生產(chǎn)者Producer 將消息發(fā)送到指定的 交換機(jī)Exchange,交換機(jī)根據(jù)路由規(guī)則路由到綁定的 隊(duì)列Queue 中,最后和消費(fèi)者建立連接后,將消息推送給 消費(fèi)者Consumer。
那么消息會在哪些環(huán)節(jié)丟失呢,列出可能出現(xiàn)消息丟失的場景有:
- 生產(chǎn)者將消息發(fā)送到 RabbitMQ Server 異常:可能因?yàn)榫W(wǎng)絡(luò)問題造成 RabbitMQ 服務(wù)端無法收到消息,造成生產(chǎn)者發(fā)送消息丟失場景。
- RabbitMQ Server 中消息在交換機(jī)中無法路由到指定隊(duì)列:可能由于代碼層面或配置層面錯誤導(dǎo)致消息路由到指定隊(duì)列失敗,造成生產(chǎn)者發(fā)送消息丟失場景。
- RabbitMQ Server 中存儲的消息丟失:可能因?yàn)?RabbitMQ Server 宕機(jī)導(dǎo)致消息未完全持久化或隊(duì)列丟失導(dǎo)致消息丟失等持久化問題,造成 RabbitMQ Server 存儲的消息丟失場景。
- 消費(fèi)者消費(fèi)消息異常:可能在消費(fèi)者接收到消息后,還沒來得及消費(fèi)消息,消費(fèi)者宕機(jī)或故障等問題,造成消費(fèi)者無法消費(fèi)消息導(dǎo)致消息丟失的場景。
以上就是 RabbitMQ 可能出現(xiàn)消息丟失的場景,接下來將依次講解如何避免這些消息丟失的場景問題。
由于在項(xiàng)目開發(fā)中使用的是 RabbitMQ,所以使用 Spring Boot 集成的 AMQP 依賴即可使用 RabbitMQ。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1. 保證生產(chǎn)者發(fā)送消息到 RabbitMQ Server
為了避免因?yàn)榫W(wǎng)絡(luò)故障或閃斷問題導(dǎo)致消息無法正常發(fā)送到 RabbitMQ Server 的情況,RabbitMQ 提供了兩種方案讓生產(chǎn)者可以感知到消息是否正確無誤的發(fā)送到 RabbitMQ Server中,這兩種方案分別是 事務(wù)機(jī)制 和 發(fā)送方確認(rèn)機(jī)制。下面分別介紹一下這兩種機(jī)制如何實(shí)現(xiàn)。
事務(wù)機(jī)制
先說配置和使用:
-
配置類中配置事務(wù)管理器
/** * 消息隊(duì)列配置類 * * @author 單程車票 */ @Configuration public class RabbitMQConfig { /** * 配置事務(wù)管理器 */ @Bean public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } } -
通過添加事務(wù)注解 + 開啟事務(wù)實(shí)現(xiàn)事務(wù)機(jī)制
/** * 消息業(yè)務(wù)實(shí)現(xiàn)類 * * @author 單程車票 */ @Service public class RabbitMQServiceImpl { @Autowired private RabbitTemplate rabbitTemplate; @Transactional // 事務(wù)注解 public void sendMessage() { // 開啟事務(wù) rabbitTemplate.setChannelTransacted(true); // 發(fā)送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message); } }
通過上面的配置即可實(shí)現(xiàn)事務(wù)機(jī)制,執(zhí)行流程為:在生產(chǎn)者發(fā)送消息之前,開啟事務(wù),而后發(fā)送消息,如果消息發(fā)送至 RabbitMQ Server 失敗后,進(jìn)行事務(wù)回滾,重新發(fā)送。如果 RabbitMQ Server 接收到消息,則提交事務(wù)。
可以發(fā)現(xiàn)事務(wù)機(jī)制其實(shí)是同步操作,存在阻塞生產(chǎn)者的情況直到 RabbitMQ Server 應(yīng)答,這樣其實(shí)會很大程度上降低發(fā)送消息的性能,所以一般不會使用事務(wù)機(jī)制來保證生產(chǎn)者的消息可靠性,而是使用發(fā)送方確認(rèn)機(jī)制。
發(fā)送方確認(rèn)機(jī)制
先說配置和使用:
-
配置文件
spring: rabbitmq: publisher-confirm-type: correlated # 開啟發(fā)送方確認(rèn)機(jī)制配置屬性有三種分別為:
-
none:表示禁用發(fā)送方確認(rèn)機(jī)制 -
correlated:表示開啟發(fā)送方確認(rèn)機(jī)制 -
simple:表示開啟發(fā)送方確認(rèn)機(jī)制,并支持waitForConfirms()和waitForConfirmsOrDie()的調(diào)用。
這里一般使用
correlated開啟發(fā)送方確認(rèn)機(jī)制即可,至于simple的waitForConfirms()方法調(diào)用是指串行確認(rèn)方法,即生產(chǎn)者發(fā)送消息后,調(diào)用該方法等待 RabbitMQ Server 確認(rèn),如果返回 false 或超時未返回則進(jìn)行消息重傳。由于串行性能較差,這里一般都是用異步 confirm 模式。 -
-
通過調(diào)用
setConfirmCallback()實(shí)現(xiàn)異步 confirm 模式感知消息發(fā)送結(jié)果/** * 消息業(yè)務(wù)實(shí)現(xiàn)類 * * @author 單程車票 */ @Service public class RabbitMQServiceImpl { @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendMessage() { // 發(fā)送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message); // 設(shè)置消息確認(rèn)回調(diào)方法 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * MQ確認(rèn)回調(diào)方法 * @param correlationData 消息的唯一標(biāo)識 * @param ack 消息是否成功收到 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 記錄日志 log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); if (!ack) { // 出錯處理 ... } } }); } }
生產(chǎn)者發(fā)送消息后通過調(diào)用 setConfirmCallback() 可以將信道設(shè)置為 confirm 模式,所有消息會被指派一個消息唯一標(biāo)識,當(dāng)消息被發(fā)送到 RabbitMQ Server 后,Server 確認(rèn)消息后生產(chǎn)者會回調(diào)設(shè)置的方法,從而實(shí)現(xiàn)生產(chǎn)者可以感知到消息是否正確無誤的投遞,從而實(shí)現(xiàn)發(fā)送方確認(rèn)機(jī)制。并且該模式是異步的,發(fā)送消息的吞吐量會得到很大提升。
上面就是發(fā)送放確認(rèn)機(jī)制的配置和使用,使用這種機(jī)制可以保證生產(chǎn)者的消息可靠性投遞,并且性能較好。
2. 保證消息能從交換機(jī)路由到指定隊(duì)列
在確保生產(chǎn)者能將消息投遞到交換機(jī)的前提下,RabbitMQ 同樣提供了消息投遞失敗的策略配置來確保消息的可靠性,接下來通過配置來介紹一下消息投遞失敗的策略。
先說配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 開啟發(fā)送方確認(rèn)機(jī)制
publisher-returns: true # 開啟消息返回
template:
mandatory: true # 消息投遞失敗返回客戶端
mandatory 分為 true 失敗后返回客戶端 和 false 失敗后自動刪除兩種策略。顯然設(shè)置為 false 無法保證消息的可靠性。
到這里的配置是可以保證生產(chǎn)者發(fā)送消息的可靠性投遞。
通過調(diào)用 setReturnCallback() 方法設(shè)置路由失敗后的回調(diào)方法:
/**
* 消息業(yè)務(wù)實(shí)現(xiàn)類
*
* @author 單程車票
*/
@Service
public class RabbitMQServiceImpl {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage() {
// 發(fā)送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
// 設(shè)置消息確認(rèn)回調(diào)方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* MQ確認(rèn)回調(diào)方法
* @param correlationData 消息的唯一標(biāo)識
* @param ack 消息是否成功收到
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 記錄日志
log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
if (!ack) {
// 出錯處理
...
}
}
});
// 設(shè)置路由失敗回調(diào)方法
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* MQ沒有將消息投遞給指定的隊(duì)列回調(diào)方法
* @param message 投遞失敗的消息詳細(xì)信息
* @param replyCode 回復(fù)的狀態(tài)碼
* @param replyText 回復(fù)的文本內(nèi)容
* @param exchange 消息發(fā)給哪個交換機(jī)
* @param routingKey 消息用哪個路郵鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 記錄日志
log.info("Fail Message["+message+"]==>replyCode["+replyCode+"]" +"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
// 出錯處理
...
}
});
}
}
通過調(diào)用 setReturnCallback() 方法即可實(shí)現(xiàn)當(dāng)交換機(jī)路由到指定隊(duì)列失敗后回調(diào)方法,拿到被退回的消息信息,進(jìn)行相應(yīng)的處理如記錄日志或重傳等等。
3. 保證消息在 RabbitMQ Server 中的持久化
對于消息的持久化,只需要在發(fā)送消息時將消息持久化,并且在創(chuàng)建交換機(jī)和隊(duì)列時也保證持久化即可。
配置如下:
/**
* 消息隊(duì)列
*/
@Bean
public Queue queue() {
// 四個參數(shù):name(隊(duì)列名)、durable(持久化)、 exclusive(獨(dú)占)、autoDelete(自動刪除)
return new Queue(MESSAGE_QUEUE, true);
}
/**
* 直接交換機(jī)
*/
@Bean
public DirectExchange exchange() {
// 四個參數(shù):name(交換機(jī)名)、durable(持久化)、autoDelete(自動刪除)、arguments(額外參數(shù))
return new DirectExchange(Direct_Exchange, true, false);
}
在創(chuàng)建交換機(jī)和隊(duì)列時通過構(gòu)造方法將持久化的參數(shù)都設(shè)置為 true 即可實(shí)現(xiàn)交換機(jī)和隊(duì)列的持久化。
@Override
public void sendMessage() {
// 構(gòu)造消息(將消息持久化)
Message message = MessageBuilder.withBody("單程車票".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 向MQ發(fā)送消息(消息內(nèi)容都為消息表記錄的id)
rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
}
在發(fā)送消息前通過調(diào)用 MessageBuilder 的 setDeliveryMode(MessageDeliveryMode.PERSISTENT) 在構(gòu)造消息時設(shè)置消息持久化(MessageDeliveryMode.PERSISTENT)即可實(shí)現(xiàn)對消息的持久化。
通過確保消息、交換機(jī)、隊(duì)列的持久化操作可以保證消息的在 RabbitMQ Server 中不丟失,從而保證可靠性,其實(shí)除了持久化之外還需要保證 RabbitMQ 的高可用性,否則 MQ 都宕機(jī)或磁盤受損都無法確保消息的可靠性,關(guān)于高可用性這里就不作過多說明,有興趣的可以去了解一下。
4. 保證消費(fèi)者消費(fèi)的消息不丟失
在保證發(fā)送方和 RabbitMQ Server 的消息可靠性的前提下,只需要保證消費(fèi)者在消費(fèi)消息時異常消息不丟失即可保證消息的可靠性。
RabbitMQ 提供了 消費(fèi)者應(yīng)答機(jī)制 來使 RabbitMQ 能夠感知到消費(fèi)者是否消費(fèi)成功消息,默認(rèn)情況下,消費(fèi)者應(yīng)答機(jī)制是自動應(yīng)答的,也就是RabbitMQ 將消息推送給消費(fèi)者,便會從隊(duì)列刪除該消息,如果消費(fèi)者在消費(fèi)過程失敗時,消息就存在丟失的情況。所以需要將消費(fèi)者應(yīng)答機(jī)制設(shè)置為手動應(yīng)答,只有消費(fèi)者確認(rèn)消費(fèi)成功后才會刪除消息,從而避免消息的丟失。
下面來看看如何配置消費(fèi)者手動應(yīng)答:
spring:
rabbitmq:
publisher-confirm-type: correlated # 開啟發(fā)送方確認(rèn)機(jī)制
publisher-returns: true # 開啟消息返回
template:
mandatory: true # 消息投遞失敗返回客戶端
listener:
simple:
acknowledge-mode: manual # 開啟手動確認(rèn)消費(fèi)機(jī)制
通過 listener.simple.acknowledge-mode = manual 即可將消費(fèi)者應(yīng)答機(jī)制設(shè)置為手動應(yīng)答。
之后只需要在消費(fèi)消息時,通過調(diào)用 channel.basicAck() 與 channel.basicNack() 來根據(jù)業(yè)務(wù)的執(zhí)行成功選擇是手動確認(rèn)消費(fèi)還是手動丟棄消息。
/**
* 監(jiān)聽消費(fèi)隊(duì)列的消息
*/
@RabbitListener(queues = RabbitMQConfig.MESSAGE_QUEUE)
public void onMessage(Message message, Channel channel) {
// 獲取消息索引
long index = message.getMessageProperties().getDeliveryTag();
// 解析消息
byte[] body = message.getBody();
...
try {
// 業(yè)務(wù)處理
...
// 業(yè)務(wù)執(zhí)行成功則手動確認(rèn)
channel.basicAck(index, false);
}catch (Exception e) {
// 記錄日志
log.info("出現(xiàn)異常:{}", e.getMessage());
try {
// 手動丟棄信息
channel.basicNack(index, false, false);
} catch (IOException ex) {
log.info("丟棄消息異常");
}
}
}
這里說明一下 basicAck() 與 basicNack() 的參數(shù)說明:
-
void basicAck(long deliveryTag, boolean multiple)方法(會拋異常):- deliveryTag:該消息的index
- multiple:是否批量處理(true 表示將一次性ack所有小于deliveryTag的消息)
-
void basicNack(long deliveryTag, boolean multiple, boolean requeue)方法(會拋異常):- deliveryTag:該消息的index
- multiple:是否批量處理(true 表示將一次性ack所有小于deliveryTag的消息)
- requeue:被拒絕的是否重新入隊(duì)列(true 表示添加在隊(duì)列的末端;false 表示丟棄)
通過設(shè)置手動確認(rèn)消費(fèi)者應(yīng)答機(jī)制即可保證消費(fèi)者在消費(fèi)信息時的消息可靠性。
Spring Boot 提供的消息重試機(jī)制
除了消費(fèi)者應(yīng)答機(jī)制外,Spring Boot也提供了一種重試機(jī)制,只需要通過配置即可實(shí)現(xiàn)消息重試從而確保消息的可靠性,這里簡單介紹一下:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 開啟自動確認(rèn)消費(fèi)機(jī)制
retry:
enabled: true # 開啟消費(fèi)者失敗重試
initial-interval: 5000ms # 初始失敗等待時長為5秒
multiplier: 1 # 失敗的等待時長倍數(shù)(下次等待時長 = multiplier * 上次等待時間)
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)(如果業(yè)務(wù)中包含事務(wù),這里改為false)
通過配置在消費(fèi)者的方法上如果執(zhí)行失敗或執(zhí)行異常只需要拋出異常(一定要出現(xiàn)異常才會觸發(fā)重試,注意:不要捕獲異常) 即可實(shí)現(xiàn)消息重試,這樣也可以保證消息的可靠性。
總要有總結(jié)
上面就是我在項(xiàng)目中關(guān)于如何保證 RabbitMQ 的消息可靠性的配置和實(shí)現(xiàn)方案了。下面想聊聊我在實(shí)際使用消息隊(duì)列實(shí)現(xiàn)消息可靠性時遇到的問題。
消費(fèi)者消費(fèi)消息需要保證冪等性
由于實(shí)現(xiàn)了消息可靠性導(dǎo)致消息重發(fā)或消息重試造成消費(fèi)者可能會存在消息被重復(fù)消費(fèi)的情況,這種情況就需要保證消息不被重復(fù)消費(fèi),也就是消息保證冪等性。
實(shí)現(xiàn)冪等性的方法有很多:借助數(shù)據(jù)庫的樂觀鎖或悲觀鎖、借助 redis 的分布式鎖、借助 redis 實(shí)現(xiàn) token 機(jī)制等等都可以很好的保證消息的冪等性。
使用消息隊(duì)列很難做到 100% 的消息可靠性
我在項(xiàng)目實(shí)際開發(fā)中使用 RabbitMQ 實(shí)現(xiàn)消息可靠性,實(shí)踐后的感受是消息隊(duì)列很難能做到 100% 的消息可靠性,上面的實(shí)現(xiàn)方案中 RabbitMQ 提供的機(jī)制做到的是盡可能地減小消息丟失的幾率。
大多數(shù)情況下消息丟失都是因?yàn)榇a出現(xiàn)錯誤,那么這樣無論進(jìn)行多少次重發(fā)都是無法解決問題的,這樣只會增加 CPU 的開銷,所以我認(rèn)為更好的解決辦法是通過記錄日志的方式等待后續(xù)回溯時更好的發(fā)現(xiàn)問題并解決問題。對于一些不是很需要保證百分百可靠性的場景,都可以通過記錄日志的方式來保證消息可靠性即可。
我在項(xiàng)目中采用的是消息落庫的方式,先將消息落庫,而后生產(chǎn)者將消息發(fā)送給 MQ,使用數(shù)據(jù)庫記錄消息的消費(fèi)情況,對于重試多次仍然無法消費(fèi)成功的消息,后續(xù)通過定時任務(wù)調(diào)度的方式對這些無法消費(fèi)成功的消息進(jìn)行補(bǔ)償。我認(rèn)為這樣可以盡可能地保證消息的可靠性。但是同樣這樣也帶來了問題就是消息落庫需要數(shù)據(jù)庫磁盤IO的開銷,增大數(shù)據(jù)庫壓力同時降低了性能。
總之,在實(shí)現(xiàn)消息的可靠性時,應(yīng)該根據(jù)項(xiàng)目的需求來考慮如何處理。對于消息要求可靠性低的只需要在出錯時記錄日志方便后續(xù)回溯解決出錯問題即可,對于消息可靠性要求高的則可以采用消息落庫 + 定時任務(wù)的方式盡可能保證百分百的可靠性。