kafka與RabbitMQ的區(qū)別?
1、確認機制不同
在RabbitMQ中,消息確認是指生產(chǎn)者發(fā)送消息到RabbitMQ后,等待RabbitMQ返回確認信息,確認消息已經(jīng)被正確地接收并處理。通常情況下,【消息確認是同步進行的】即必須等到 【消息隊列】 接收到和處理了消息后,才會返回確認信息。這樣可以確保消息隊列完全處理了消息,避免消息的丟失,保證了數(shù)據(jù)的一致性和可靠性。
而在Kafka中,為了保證高吞吐率,沒有提供消息確認機制(at-most-once和at-least-once除外),數(shù)據(jù)是異步推送到broker中的,一旦生產(chǎn)者把消息推送到broker后,就不會再對該消息進行確認處理,消費者需要自己對消息的處理情況進行確認。在Kafka中,也有一些同步的模式(例如producer.send()方法中的同步調(diào)用),可以確保發(fā)送的消息被同步復制到Kafka的所有副本,并在接收到確認后才會返回。
2、實施部署不同
RabbitMQ 穩(wěn)定可靠,數(shù)據(jù)?致,?持多協(xié)議,有消息確認,基于erlang語?
Kafka ?吞吐,?性能,快速持久化,?消息確認,?消息遺漏,可能會有有重復消息,依賴于zookeeper,成本高,“成本高”的含義是指 Kafka 部署和運維所需要的成本相比 RabbitMQ更高需要更加復雜的基礎(chǔ)設(shè)施來支持它的運行,這些基礎(chǔ)設(shè)施包括:
一、 集群環(huán)境成本高:Kafka 集群需要依賴 ZooKeeper 來實現(xiàn)節(jié)點的選舉和協(xié)調(diào)工作,而 ZooKeeper 集群的部署和配置都比 RabbitMQ 更加復雜,需要更多的服務(wù)器資源和維護成本
二. 數(shù)據(jù)持久化成本高:Kafka 依賴磁盤的高速讀寫特性來實現(xiàn)其快速的消息持久化,因此需要更高的成本來維護磁盤的健康狀態(tài)和數(shù)據(jù)備份和恢復等工作。
總的來說,Kafka 的高吞吐、高性能的特點是與高成本密切相關(guān)的,如果一個應(yīng)用場景對性能要求很高,且具備一定的技術(shù)架構(gòu)能力,那么可以選擇使用 Kafka。而如果應(yīng)用場景對可靠性和穩(wěn)定性要求較高,對【性能要求不太高,且業(yè)務(wù)規(guī)模不太大 ,那么選擇 RabbitMQ 更為合適。
邏輯結(jié)構(gòu):
用一棟大樓類比說明
1、Broker 是指 RabbitMQ 服務(wù)器實例,并根據(jù)預(yù)先定義的規(guī)則將消息發(fā)送到合適的隊列或 Exchange
2、exchange是 RabbitMQ 中用于接收和分發(fā)消息的中間組件,它負責接收來自生產(chǎn)者的消息,并將其路由到與之相關(guān)的1個或多個隊列 (生產(chǎn)者到交換機)
3、隊列:房間是存放消息的地方(每個房間)
4、綁定:綁定規(guī)定了交換機和隊列之間的連接(樓層之間的通道)
5、虛擬主機:虛擬主機則是 RabbitMQ 中用于【邏輯隔離】的重要概念,它類似于一個命名空間(每層樓)
可以在一個 Broker 內(nèi)部創(chuàng)建多個不同的虛擬主機,每個虛擬主機都有自己的命名空間,并且可以獨立配置和管理其中Exchange、Queue、Binding、用戶權(quán)限等
在 RabbitMQ 中是可以設(shè)置多個交換機的。通過設(shè)置不同類型的交換機以及它們之間的綁定關(guān)系,可以實現(xiàn)復雜的消息路由和處理邏輯
交換機類型和工作模式都涉及消息在 RabbitMQ 中的傳遞和處理,但它們的焦點和作用有所不同。交換機類型主要關(guān)注消息的路由和分發(fā),決定消息如何被發(fā)送到隊列中;而工作模式主要關(guān)注消費者對消息的接收和處理方式,決定消息如何被消費
消息發(fā)送模式:
1、簡單模式:一個隊列只有一個消費者
2、工作模式:多個消費者監(jiān)聽同一個隊列,但消費者中只有一個消費者會成功消費消息(負載均衡)
3、發(fā)布/訂閱模式:一個交換機綁定多個隊列,消息同時被所有隊列消費(集體通知)
4、路由模式:它通過在交換機和隊列之間建立綁定關(guān)系,并定義路由鍵來實現(xiàn)消息的有選擇性地路由傳遞(訂閱廣告)
有一個即時消息系統(tǒng)或者新聞訂閱系統(tǒng)需要將消息或新聞推送給多個用戶,這時可以采用發(fā)布/訂閱模式。生產(chǎn)者將消息發(fā)送到交換機 (exchange),交換機將消息廣播給多個訂閱者,訂閱者通過綁定交換機來接收消息。在這種模式下,每個訂閱者都會接收到相同的消息,可以采用不同的隊列來對訂閱者進行分組,以實現(xiàn)訂閱者之間的隔離和不同級別的消息推送
工作模式和發(fā)布訂閱模式的區(qū)別?
在工作模式中,多個消費者可以并行地處理來自同一個隊列的任務(wù)
每個消息只能由一個消費者處理,但是多個消息可以同時被多個消費者處理
在發(fā)布訂閱模式中,雖然消息會被廣播到多個訂閱者,但是每個訂閱者獨立地接收和處理消息。因此,不同的訂閱者可以并行地處理自己接收到的消息路由模式和發(fā)布訂閱模式的區(qū)別?
發(fā)布訂閱Fanout(廣播)模式:在 Fanout 模式中,交換機會將消息廣播到綁定的所有隊列。無論隊列是否有不同的路由鍵或其他綁定條件,交換機都會將消息傳遞給所有綁定的隊列。
路由Redirect(重定向)模式:在 Redirect 模式中,交換機會將消息發(fā)送到1個特定的隊列而不是廣播到所有綁定的隊列。這種模式適用于需要將消息定向發(fā)送到1個特定的隊列
創(chuàng)建用戶和創(chuàng)建交換機
創(chuàng)建用戶:創(chuàng)建用戶名、密碼
創(chuàng)建交換機:
1、選擇虛擬主機
2、選擇交換機類型
3、持久性……
在創(chuàng)建交換機時,durability 是一個可選的參數(shù),用于配置交換機的持久性。持久性指的是當消息代理(如 RabbitMQ)重新啟動時,交換機是否仍然存在。如果將 durability 設(shè)置為 true,交換機將被標記為持久的,即使消息代理重新啟動,交換機也將保留。這意味著在消息代理恢復正常運行后,交換機將保持不變
如果你的應(yīng)用程序需要頻繁地更改或刪除交換機,并且你不希望在每次更改時手動刪除或重新創(chuàng)建交換機,那么可以選擇非持久化交換機
4、交換機綁定隊列
發(fā)布消息并設(shè)置消息屬性
在 RabbitMQ 的 Java 客戶端 API 中,可以在發(fā)布消息時設(shè)置這些消息屬性。
下面是一個示例代碼,展示了如何設(shè)置消息的持久性、優(yōu)先級和過期時間:
// 創(chuàng)建連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 創(chuàng)建消息屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 設(shè)置消息的持久性
.priority(5) // 設(shè)置消息的優(yōu)先級
.expiration("10000") // 設(shè)置消息的過期時間
.build();
// 發(fā)布消息
String exchangeName = "exchange1";
String routingKey = "routingKey1";
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
// 關(guān)閉連接和通道
channel.close();
connection.close();
在上述代碼中,首先創(chuàng)建了連接和通道。然后使用 AMQP.BasicProperties.Builder 類來創(chuàng)建消息屬性對象,通過鏈式調(diào)用方法來設(shè)置屬性,例如使用 deliveryMode 方法設(shè)置消息的持久性(1非持久2持久),使用 priority 方法設(shè)置消息的優(yōu)先級,使用 expiration 方法設(shè)置消息的過期時間。最后,調(diào)用 basicPublish 方法發(fā)布消息時,將消息屬性對象作為參數(shù)傳遞,即可在消息中包含相應(yīng)的屬性。
使用RabbitMQ傳遞對象
發(fā)送和接收的都是字符串/字節(jié)數(shù)組類型的消息
- 使用
序列化對象
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods){
//消息隊列可以發(fā)送 字符串、字節(jié)數(shù)組、序列化對象
byte[] bytes = SerializationUtils.serialize(goods);
amqpTemplate.convertAndSend("","queue1",bytes);
}
}
消息消費者
@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {
@RabbitHandler
public void receiveMsg(byte[] bs){
Goods goods = (Goods) SerializationUtils.deserialize(bs);
System.out.println("byte[]---"+goods);
}
}
PS:為什么要實現(xiàn)序列化接口?
1、易于傳輸和存儲:將 Java 對象序列化為字節(jié)數(shù)組后,可以將其轉(zhuǎn)換為不同的格式,例如 JSON 或 XML,用于傳輸?shù)讲煌南到y(tǒng)之間。字節(jié)數(shù)組還可以在數(shù)據(jù)庫中進行持久化存儲,以便以后檢索和使用。
2、平臺無關(guān)性:通過使用序列化對象,可以將 Java 對象轉(zhuǎn)換為平臺無關(guān)的字節(jié)序列,這樣它們可以在不同的系統(tǒng)之間進行傳輸和存儲,包括跨語言和跨平臺的場景。
3、可擴展性:可以在對象中添加新字段和屬性,并保證可以與舊版本兼容,以便在不同版本的應(yīng)用程序之間進行傳輸和存儲。這可以通過維護對象的序列化版本來實現(xiàn)。
4、高效性:通過序列化對象,可以減少在網(wǎng)絡(luò)或磁盤 I/O 期間傳輸數(shù)據(jù)的大小,從而提高傳輸效率并減少網(wǎng)絡(luò)流量。這是因為序列化過程通常會去除對象中的一些額外信息或元數(shù)據(jù),只保留必要的數(shù)據(jù)以及用于恢復對象的必要信息。
- 使用
JSON字符串傳遞
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods) throws
JsonProcessingException {
//消息隊列可以發(fā)送 字符串、字節(jié)數(shù)組、序列化對象
ObjectMapper objectMapper = new ObjectMapper();
String msg = objectMapper.writeValueAsString(goods);
amqpTemplate.convertAndSend("","queue1",msg);
}
}
RabbitMQ事務(wù)、消息確認和return機制、手動ACK
RabbitMQ事務(wù)指的是基于客戶端實現(xiàn)的事務(wù)管理,當在消息發(fā)送過程中添加了事務(wù),處理效率降低幾十倍甚至上百倍
Connection connection = RabbitMQUtil.getConnection(); //connection 表
示與 host1的連接
Channel channel = connection.createChannel();
channel.txSelect(); //開啟事務(wù)
try{
channel.basicPublish("ex4", "k1", null, msg.getBytes());
channel.txCommit(); //提交事務(wù)
}catch (Exception e){
channel.txRollback(); //事務(wù)回滾
}finally{
channel.close();
connection.close();
}
同步等待:為了實現(xiàn)事務(wù)的原子性,事務(wù)提交是一個同步操作,即發(fā)送端會等待事務(wù)提交的結(jié)果,只有在返回提交成功的響應(yīng)后才會繼續(xù)發(fā)送下一條消息。
重復操作:如果發(fā)送端在發(fā)送消息的過程中出現(xiàn)錯誤或異常,事務(wù)會回滾并且消息會重發(fā),這種方式可以保證消息的可靠性傳遞
所以,在實際場景中,如果對消息的實時性要求較高或?qū)ο⑻幚淼耐掏铝坑休^高要求,建議盡量避免使用事務(wù)
PS:消息吞吐量(Message Throughput):消息吞吐量指的是系統(tǒng)在單位時間內(nèi)處理的消息量,對于需要高吞吐量的應(yīng)用來說,高效地傳遞和處理消息是非常重要的。使用事務(wù)機制會對消息發(fā)送的性能造成一定的影響,因為事務(wù)機制需要等待事務(wù)提交的確認,會增加發(fā)送消息的延遲。
事務(wù)如何確保消息發(fā)送的可靠性?
消息的可靠性:從 生產(chǎn)者發(fā)送消息 —— 消息隊列存儲消息 —— 消費者消費消息的整個過程中消息的安全性及可控性
RabbitMQ提供了消息確認機制及return機制
意義:消息確認機制可以幫助發(fā)送方確保消息成功發(fā)送到交換機,并可以跟蹤消息是否被一個或多個消費者成功消費。Return 機制則是確保了消息成功從交換機分發(fā)到隊列的過程,當消息無法成功路由到隊列時,消息會被 Return給消息提供者
- 消息確認機制:當消息提供者將消息發(fā)送到交換機時,交換機會對消息提供者進行反饋(到達交換機)
- return機制:當消息達到交換機之后,交換機會將消息分發(fā)到隊列,MQ會將分發(fā)的結(jié)果也會反饋給消息提供者(到達隊列)
在 RabbitMQ 中使用消息確認機制可以確保消息被成功發(fā)送到交換機,并被一個或多個消費者成功消費。
RabbitMQ 提供了同步和異步兩種消息確認方式
同步消息確認指的是,在消息發(fā)送之后,發(fā)送方會阻塞等待確認結(jié)果。具體來說,發(fā)送方通過 waitForConfirms() 方法等待 RabbitMQ 發(fā)送確認結(jié)果,直到接收到確認結(jié)果或者超時時間到達。如果確認結(jié)果為消息成功發(fā)送到交換機并被一個或多個消費者消費,則 waitForConfirms()方法返回 true,否則返回 false。
異步消息確認指的是,在消息發(fā)送之后,發(fā)送方不會阻塞等待確認結(jié)果。相反,它通過添加ConfirmListener來異步處理確認回調(diào)。確認回調(diào)會在消息成功發(fā)送到交換機并被一個或多個消費者消費或者發(fā)送失敗時觸發(fā)。
異步消息確認相較于同步消息確認,其對消息發(fā)送方更加友好。使用異步確認的優(yōu)點是可以提高發(fā)送消息的吞吐量,因為發(fā)送方能夠繼續(xù)發(fā)送下一條消息,而不需要等待每條消息的確認結(jié)果。另外,異步確認還可以通過監(jiān)聽確認回調(diào)在消息發(fā)送失敗時及時發(fā)現(xiàn)并處理這種情況,而同步確認則只能通過超時等待的方式判斷消息是否發(fā)送成功。
普通maven項目的消息確認機制
下面是 RabbitMQ 中 Java 客戶端使用消息確認和批量發(fā)送消息的示例代碼,以及使用確認監(jiān)聽器異步處理確認結(jié)果:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MessageSender {
private static final String EXCHANGE_NAME = "ex1";
private static final String ROUTING_KEY = "a";
private static final String MESSAGE = "Hello, RabbitMQ!";
private static final int MESSAGE_COUNT = 10;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//開啟消息確Z
channel.confirmSelect();
//批量發(fā)送消息
for (int i=0 ; i<MESSAGE_COUNT ; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
}、
//假如發(fā)送消息需要10s,waitForConfirms會進?阻塞狀態(tài)(同步)
//boolean b = channel.waitForConfirms();
//使用監(jiān)聽器異步處理確認結(jié)果
channel.addConfirmListener(new ConfirmListener() {
//參數(shù)1:long deliveryTag 返回消息的標識
//參數(shù)2:boolean multiple 是否為批量confirm
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("~~~~~消息成功發(fā)送到交換機");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("~~~~~消息發(fā)送到交換機失敗");
}
});
}
}
當調(diào)用 channel.waitForConfirms() 時,它會阻塞當前線程,直到 RabbitMQ 確認消息是否成功發(fā)送到隊列或交換機。
換句話說,在以下情況下,waitForConfirms() 會阻塞程序繼續(xù)執(zhí)行:
- 當你發(fā)送一條消息后,還沒有收到 RabbitMQ 的確認消息;
- 當發(fā)送消息時發(fā)生異常,如網(wǎng)絡(luò)中斷或發(fā)送超時;
- 當你設(shè)置了批量發(fā)送消息的模式,并且尚未收到足夠數(shù)量的消息確認,或者
超過了設(shè)置的超時時間。
簡而言之,waitForConfirms()會等待確認消息的返回,確保消息成功發(fā)送到 RabbitMQ。
消息確認機制不光監(jiān)聽成功的消息同時也監(jiān)聽失敗的消息
spring項目的return機制
@Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息從交換機分發(fā)到隊列失敗");
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
String msg = returnedMessage.getMessage().toString();
amqpTemplate.convertAndSend(exchange,routingKey,msg);
}
}
在上述代碼中,returnedMessage方法是用于處理消息從交換機分發(fā)到隊列失敗的回調(diào)方法。當消息無法路由到指定的隊列時,會觸發(fā)該方法。
下面是代碼中各個參數(shù)的含義:
returnedMessage對象:表示被退回的消息的相關(guān)信息。
getExchange():獲取退回消息所使用的交換機名稱。
getRoutingKey():獲取退回消息時所使用的路由鍵。
getMessage():獲取退回的消息對象,可以通過toString()方法將其轉(zhuǎn)換為字符串。
在該方法中,它首先打印了一條消息提示,表示消息從交換機分發(fā)到隊列失敗。然后,通過 amqpTemplate.convertAndSend(exchange,routingKey, msg) 方法將消息重新發(fā)送到指定的交換機和路由鍵,以便重新分發(fā)消息
RabbitMQ消費者手動應(yīng)答(手動ACK)
@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("成功接收到消息 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("接收消息失敗 msg = " + msg);
}
}
}
在上述代碼中,channel.basicAck 是用于手動確認消息的方法,用于告知 RabbitMQ 已經(jīng)成功處理(消費)了一個消息。下面是 channel.basicAck 方法的各個參數(shù)的詳細說明:
deliveryTag:這是一個消息的唯一標識符。每個消息都有一個獨特的deliveryTag,用于標識消息的順序。通過message.getMessageProperties().getDeliveryTag()可以獲取當前處理消息的deliveryTag。
PS:每條消息在 RabbitMQ 中都會被分配一個唯一的deliveryTag,用于標識消息的順序。deliveryTag 是一個連續(xù)遞增的正整數(shù),每次消費一條消息時,deliveryTag 會被分配并隨消息一起傳遞給消費者。消費者在處理完一條消息后,需要對該消息進行手動 ACK 才能告知 RabbitMQ 已經(jīng)成功處理了該消息,此時會將該消息的 deliveryTag 返回給 RabbitMQ 作為確認-
multiple:這是一個布爾值,用于指定是否確認多個消息。對于簡單地確認單個消息,可以將其設(shè)置為false。如果要確認多個消息,可以將其設(shè)置為true,這將確認所有比當前deliveryTag小(包括當前deliveryTag)的未確認消息。
PS:在消息隊列系統(tǒng)中,通常會有多個消息同時被發(fā)送到隊列中等待消費。當消費者處理消息時,通常會以一批消息的方式進行確認,而非逐條確認。因此,multiple參數(shù)的存在是為了提供一種批量確認消息的機制在這種情況下,如果設(shè)置為
true,則表示當前消息之前的所有消息也被認為已經(jīng)處理完成,全部移除。如果設(shè)置為false,則僅確認當前deliveryTag的消息,然后將其從隊列中移除。通常設(shè)置為false。 requeue:這是一個布爾值,用于指定當確認消息時,是否將消息重新加入到隊列。如果設(shè)置為false,則代表不重新加入隊列,即認為消息已被完全處理。如果設(shè)置為true,則代表將消息重新加入隊列,以便重新被消費。通常情況下,如果消息處理成功,則設(shè)置為false,如果消息處理失敗,則設(shè)置為true。
在上述代碼中,第一個 channel.basicAck 用于確認消息已經(jīng)成功處理,同時使用了 message.getMessageProperties().getDeliveryTag() 來獲取當前消息的 deliveryTag 值。
而第二個 channel.basicAck 是在處理消息出現(xiàn)異常時的回退機制。它使用與第一個 channel.basicAck 相同的參數(shù),以確認消息處理失敗,然后將消息重新加入隊列(requeue 設(shè)置為 true),以便重新消費。同時,使用了 message.getMessageProperties().getDeliveryTag() 獲取當前消息的 deliveryTag 值。
手動ACK的意義體現(xiàn)在以下幾個方面:
1、確??煽啃裕合M者在處理消息時,可能會發(fā)生異常或出現(xiàn)錯誤的情況。如果沒有手動ACK機制,當消費者在處理消息過程中出現(xiàn)異常時,消息隊列將無法得知消息是否被成功處理,可能會導致消息的丟失或重復消費。手動ACK可以確保消費者在處理消息成功后再發(fā)送確認,從而確保消息的可靠性。
2、提高消息處理效率:手動ACK機制可以確保消息隊列在收到消費者的ACK確認后,【將該消息從隊列中移除】,從而減少隊列的負荷。通過手動ACK,消費者可以在處理完一條消息后立即發(fā)送確認,減少了消息在隊列中滯留的時間,提高了消息的處理效率。
手動ACK與消息確認機制和return機制之間的區(qū)別?
手動 ACK 與消息確認機制:手動 ACK 是消費者在處理完一條消息后,向 RabbitMQ 發(fā)送確認信號,告知 RabbitMQ 已成功處理該消息的機制。通過手動 ACK,消費者可以明確地通知 RabbitMQ 消息已經(jīng)被處理,避免消息丟失或重復消費的情況發(fā)生,手動ACK是消費端的機制。消息確認機制則是生產(chǎn)者端的機制,用于確認消息已經(jīng)被發(fā)送到交換機并被消費者接收,確保消息的可靠傳遞。手動 ACK 和消息確認機制共同保證了消息在生產(chǎn)者和消費者之間的可靠傳遞。
手動 ACK 與消息返回機制:消息返回機制會在消息無法被路由到隊列時將消息返回給生產(chǎn)者。在這種情況下,消費者可能無法接收到消息,因此消費者不會發(fā)送 ACK。手動 ACK 可以確保消費者在成功處理消息后再發(fā)送確認信號,而不是直接丟棄未處理的消息。通過手動 ACK,消費者能夠在實際處理消息后再確認消息的處理情況,提高了消息傳遞的完整性和可靠性
消息的冪等性問題
要在 RabbitMQ 中實現(xiàn)冪等性,可以使用 Redis 的 SETNX`(Set if Not Exists)命令來進行輔助。
- 意義:
確保消息被處理一次而不會重復處理
確保消息處理的唯一性和正確性
SETNX 是 Redis 提供的一種原子性命令,用于設(shè)置指定鍵的值,僅當該鍵不存在時才會設(shè)置成功。在 RabbitMQ 的消息處理中,可以將消息的唯一標識或關(guān)鍵信息作為鍵,將已處理消息的標記作為值存儲在 Redis 中,并利用 SETNX 來保證只有第一次處理消息時才會成功設(shè)置標記,從而保證消息的冪等性
下面是一個示例代碼:
@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final String LOCK_PREFIX = "messageLock:";
private final long LOCK_EXPIRE_TIME = 60 * 1000; // 鎖的過期時間,單位毫秒
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
// 使用消息的唯一標識作為鎖的 key
String lockKey = LOCK_PREFIX + message.getMessageProperties().getDeliveryTag();
Boolean isLocked = false;
try {
// 嘗試獲取鎖
isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, "");
// 設(shè)置鎖的過期時間
redisTemplate.expire(lockKey, LOCK_EXPIRE_TIME, TimeUnit.MILLISECONDS);
// 如果成功獲取到了鎖,則處理消息
if (isLocked) {
System.out.println("成功獲取到鎖并處理消息 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("未獲取到鎖,消息已被其他消費者處理 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("處理消息失敗 msg = " + msg);
} finally {
// 釋放鎖
if (isLocked) {
redisTemplate.delete(lockKey);
}
}
}
}
- 添加了
RedisTemplate<String, Object>對象的自動裝配,該對象用于與 Redis 進行交互。 - 聲明了一個以
messageLock:作為前綴的鎖定鍵,使用消息的唯一標識作為鎖的唯一標識。 - 使用
RedisTemplate的opsForValue().setIfAbsent()方法嘗試獲取鎖,如果成功獲取到鎖則繼續(xù)處理該消息,否則說明該消息已被其他消費者處理。 - 使用
redisTemplate.expire()設(shè)置鎖的過期時間,避免死鎖。 - 在
finally塊中,如果成功獲取到了鎖,則釋放鎖。
延遲機制
未完待續(xù)