RabbitMQ整理

kafka與RabbitMQ的區(qū)別?
1、確認機制不同
在RabbitMQ中,消息確認是指生產(chǎn)者發(fā)送消息到RabbitMQ后,等待RabbitMQ返回確認信息,確認消息已經(jīng)被正確地接收并處理。通常情況下,【消息確認是同步進行的】即必須等到 【消息隊列】 接收到和處理了消息后,才會返回確認信息。這樣可以確保消息隊列完全處理了消息,避免消息的丟失,保證了數(shù)據(jù)的一致性和可靠性。
而在Kafka中,為了保證高吞吐率,沒有提供消息確認機制(at-most-once和at-least-once除外),數(shù)據(jù)是異步推送到broker中的,一旦生產(chǎn)者把消息推送到broker后,就不會再對該消息進行確認處理,消費者需要自己對消息的處理情況進行確認。在Kafka中,也有一些同步的模式(例如producer.send()方法中的同步調(diào)用),可以確保發(fā)送的消息被同步復制到Kafka的所有副本,并在接收到確認后才會返回。

2、實施部署不同
RabbitMQ 穩(wěn)定可靠,數(shù)據(jù)?致,?持多協(xié)議,有消息確認,基于erlang語?
Kafka ?吞吐,?性能,快速持久化,?消息確認,?消息遺漏,可能會有有重復消息,依賴于zookeeper,成本高,“成本高”的含義是指 Kafka 部署和運維所需要的成本相比 RabbitMQ更高需要更加復雜的基礎(chǔ)設(shè)施來支持它的運行,這些基礎(chǔ)設(shè)施包括:
一、 集群環(huán)境成本高:Kafka 集群需要依賴 ZooKeeper 來實現(xiàn)節(jié)點的選舉和協(xié)調(diào)工作,而 ZooKeeper 集群的部署和配置都比 RabbitMQ 更加復雜,需要更多的服務(wù)器資源和維護成本
二. 數(shù)據(jù)持久化成本高:Kafka 依賴磁盤的高速讀寫特性來實現(xiàn)其快速的消息持久化,因此需要更高的成本來維護磁盤的健康狀態(tài)和數(shù)據(jù)備份和恢復等工作。

總的來說,Kafka 的高吞吐、高性能的特點是與高成本密切相關(guān)的,如果一個應(yīng)用場景對性能要求很高,且具備一定的技術(shù)架構(gòu)能力,那么可以選擇使用 Kafka。而如果應(yīng)用場景對可靠性和穩(wěn)定性要求較高,對【性能要求不太高,且業(yè)務(wù)規(guī)模不太大 ,那么選擇 RabbitMQ 更為合適。

邏輯結(jié)構(gòu):
用一棟大樓類比說明
1、Broker 是指 RabbitMQ 服務(wù)器實例,并根據(jù)預(yù)先定義的規(guī)則將消息發(fā)送到合適的隊列或 Exchange
2、exchange是 RabbitMQ 中用于接收和分發(fā)消息的中間組件,它負責接收來自生產(chǎn)者的消息,并將其路由到與之相關(guān)的1個或多個隊列 (生產(chǎn)者到交換機)
3、隊列:房間是存放消息的地方(每個房間)
4、綁定:綁定規(guī)定了交換機和隊列之間的連接(樓層之間的通道)
5、虛擬主機:虛擬主機則是 RabbitMQ 中用于【邏輯隔離】的重要概念,它類似于一個命名空間(每層樓)
可以在一個 Broker 內(nèi)部創(chuàng)建多個不同的虛擬主機,每個虛擬主機都有自己的命名空間,并且可以獨立配置和管理其中Exchange、Queue、Binding、用戶權(quán)限等

在 RabbitMQ 中是可以設(shè)置多個交換機的。通過設(shè)置不同類型的交換機以及它們之間的綁定關(guān)系,可以實現(xiàn)復雜的消息路由和處理邏輯
交換機類型和工作模式都涉及消息在 RabbitMQ 中的傳遞和處理,但它們的焦點和作用有所不同。交換機類型主要關(guān)注消息的路由和分發(fā),決定消息如何被發(fā)送到隊列中;而工作模式主要關(guān)注消費者對消息的接收和處理方式,決定消息如何被消費

消息發(fā)送模式:
1、簡單模式:一個隊列只有一個消費者
2、工作模式:多個消費者監(jiān)聽同一個隊列,但消費者中只有一個消費者會成功消費消息(負載均衡)
3、發(fā)布/訂閱模式:一個交換機綁定多個隊列,消息同時被所有隊列消費(集體通知)
4、路由模式:它通過在交換機和隊列之間建立綁定關(guān)系,并定義路由鍵來實現(xiàn)消息的有選擇性地路由傳遞(訂閱廣告)

有一個即時消息系統(tǒng)或者新聞訂閱系統(tǒng)需要將消息或新聞推送給多個用戶,這時可以采用發(fā)布/訂閱模式。生產(chǎn)者將消息發(fā)送到交換機 (exchange),交換機將消息廣播給多個訂閱者,訂閱者通過綁定交換機來接收消息。在這種模式下,每個訂閱者都會接收到相同的消息,可以采用不同的隊列來對訂閱者進行分組,以實現(xiàn)訂閱者之間的隔離和不同級別的消息推送

  • 工作模式和發(fā)布訂閱模式的區(qū)別?
    在工作模式中,多個消費者可以并行地處理來自同一個隊列的任務(wù)
    每個消息只能由一個消費者處理,但是多個消息可以同時被多個消費者處理
    在發(fā)布訂閱模式中,雖然消息會被廣播到多個訂閱者,但是每個訂閱者獨立地接收和處理消息。因此,不同的訂閱者可以并行地處理自己接收到的消息

  • 路由模式和發(fā)布訂閱模式的區(qū)別?
    發(fā)布訂閱Fanout(廣播)模式:在 Fanout 模式中,交換機會將消息廣播到綁定的所有隊列。無論隊列是否有不同的路由鍵或其他綁定條件,交換機都會將消息傳遞給所有綁定的隊列。
    路由Redirect(重定向)模式:在 Redirect 模式中,交換機會將消息發(fā)送到1個特定的隊列而不是廣播到所有綁定的隊列。這種模式適用于需要將消息定向發(fā)送到1個特定的隊列

創(chuàng)建用戶和創(chuàng)建交換機
創(chuàng)建用戶:創(chuàng)建用戶名、密碼
創(chuàng)建交換機:
1、選擇虛擬主機
2、選擇交換機類型
3、持久性……
在創(chuàng)建交換機時,durability 是一個可選的參數(shù),用于配置交換機的持久性。持久性指的是當消息代理(如 RabbitMQ)重新啟動時,交換機是否仍然存在。如果將 durability 設(shè)置為 true,交換機將被標記為持久的,即使消息代理重新啟動,交換機也將保留。這意味著在消息代理恢復正常運行后,交換機將保持不變
如果你的應(yīng)用程序需要頻繁地更改或刪除交換機,并且你不希望在每次更改時手動刪除或重新創(chuàng)建交換機,那么可以選擇非持久化交換機
4、交換機綁定隊列
發(fā)布消息并設(shè)置消息屬性
在 RabbitMQ 的 Java 客戶端 API 中,可以在發(fā)布消息時設(shè)置這些消息屬性。
下面是一個示例代碼,展示了如何設(shè)置消息的持久性、優(yōu)先級和過期時間

// 創(chuàng)建連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 創(chuàng)建消息屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)  // 設(shè)置消息的持久性
        .priority(5)  // 設(shè)置消息的優(yōu)先級
        .expiration("10000")  // 設(shè)置消息的過期時間
        .build();

// 發(fā)布消息
String exchangeName = "exchange1";
String routingKey = "routingKey1";
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());

// 關(guān)閉連接和通道
channel.close();
connection.close();

在上述代碼中,首先創(chuàng)建了連接和通道。然后使用 AMQP.BasicProperties.Builder 類來創(chuàng)建消息屬性對象,通過鏈式調(diào)用方法來設(shè)置屬性,例如使用 deliveryMode 方法設(shè)置消息的持久性(1非持久2持久),使用 priority 方法設(shè)置消息的優(yōu)先級,使用 expiration 方法設(shè)置消息的過期時間。最后,調(diào)用 basicPublish 方法發(fā)布消息時,將消息屬性對象作為參數(shù)傳遞,即可在消息中包含相應(yīng)的屬性。

使用RabbitMQ傳遞對象
發(fā)送和接收的都是字符串/字節(jié)數(shù)組類型的消息

  • 使用序列化對象

消息提供者

@Service
public class MQService {
   @Resource
   private AmqpTemplate amqpTemplate;

   public void sendGoodsToMq(Goods goods){
   //消息隊列可以發(fā)送 字符串、字節(jié)數(shù)組、序列化對象
   byte[] bytes = SerializationUtils.serialize(goods);
   amqpTemplate.convertAndSend("","queue1",bytes);
   }
}

消息消費者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

   @RabbitHandler
   public void receiveMsg(byte[] bs){
   Goods goods = (Goods) SerializationUtils.deserialize(bs);
   System.out.println("byte[]---"+goods);
   }
}

PS:為什么要實現(xiàn)序列化接口?
1、易于傳輸和存儲:將 Java 對象序列化為字節(jié)數(shù)組后,可以將其轉(zhuǎn)換為不同的格式,例如 JSON 或 XML,用于傳輸?shù)讲煌南到y(tǒng)之間。字節(jié)數(shù)組還可以在數(shù)據(jù)庫中進行持久化存儲,以便以后檢索和使用。
2、平臺無關(guān)性:通過使用序列化對象,可以將 Java 對象轉(zhuǎn)換為平臺無關(guān)的字節(jié)序列,這樣它們可以在不同的系統(tǒng)之間進行傳輸和存儲,包括跨語言和跨平臺的場景。
3、可擴展性:可以在對象中添加新字段和屬性,并保證可以與舊版本兼容,以便在不同版本的應(yīng)用程序之間進行傳輸和存儲。這可以通過維護對象的序列化版本來實現(xiàn)。
4、高效性:通過序列化對象,可以減少在網(wǎng)絡(luò)或磁盤 I/O 期間傳輸數(shù)據(jù)的大小,從而提高傳輸效率并減少網(wǎng)絡(luò)流量。這是因為序列化過程通常會去除對象中的一些額外信息或元數(shù)據(jù),只保留必要的數(shù)據(jù)以及用于恢復對象的必要信息。

  • 使用JSON字符串傳遞

消息提供者

@Service
public class MQService {
   @Resource
   private AmqpTemplate amqpTemplate;
   public void sendGoodsToMq(Goods goods) throws 
JsonProcessingException {
    //消息隊列可以發(fā)送 字符串、字節(jié)數(shù)組、序列化對象
    ObjectMapper objectMapper = new ObjectMapper();
    String msg = objectMapper.writeValueAsString(goods);
    amqpTemplate.convertAndSend("","queue1",msg);
  }
}

RabbitMQ事務(wù)、消息確認和return機制、手動ACK
RabbitMQ事務(wù)指的是基于客戶端實現(xiàn)的事務(wù)管理,當在消息發(fā)送過程中添加了事務(wù),處理效率降低幾十倍甚至上百倍

Connection connection = RabbitMQUtil.getConnection(); //connection 表
示與 host1的連接

Channel channel = connection.createChannel();

channel.txSelect(); //開啟事務(wù)

try{
 channel.basicPublish("ex4", "k1", null, msg.getBytes());
 channel.txCommit(); //提交事務(wù)

}catch (Exception e){
 channel.txRollback(); //事務(wù)回滾

}finally{
 channel.close();
 connection.close();
}

同步等待:為了實現(xiàn)事務(wù)的原子性,事務(wù)提交是一個同步操作,即發(fā)送端會等待事務(wù)提交的結(jié)果,只有在返回提交成功的響應(yīng)后才會繼續(xù)發(fā)送下一條消息。
重復操作:如果發(fā)送端在發(fā)送消息的過程中出現(xiàn)錯誤或異常,事務(wù)會回滾并且消息會重發(fā),這種方式可以保證消息的可靠性傳遞
所以,在實際場景中,如果對消息的實時性要求較高或?qū)ο⑻幚淼耐掏铝坑休^高要求,建議盡量避免使用事務(wù)

PS:消息吞吐量(Message Throughput):消息吞吐量指的是系統(tǒng)在單位時間內(nèi)處理的消息量,對于需要高吞吐量的應(yīng)用來說,高效地傳遞和處理消息是非常重要的。使用事務(wù)機制會對消息發(fā)送的性能造成一定的影響,因為事務(wù)機制需要等待事務(wù)提交的確認,會增加發(fā)送消息的延遲。

事務(wù)如何確保消息發(fā)送的可靠性?
消息的可靠性:從 生產(chǎn)者發(fā)送消息 —— 消息隊列存儲消息 —— 消費者消費消息的整個過程中消息的安全性及可控性

RabbitMQ提供了消息確認機制及return機制
意義:消息確認機制可以幫助發(fā)送方確保消息成功發(fā)送到交換機,并可以跟蹤消息是否被一個或多個消費者成功消費。Return 機制則是確保了消息成功從交換機分發(fā)到隊列的過程,當消息無法成功路由到隊列時,消息會被 Return給消息提供者

  • 消息確認機制:當消息提供者將消息發(fā)送到交換機時,交換機會對消息提供者進行反饋(到達交換機)
  • return機制:當消息達到交換機之后,交換機會將消息分發(fā)到隊列,MQ會將分發(fā)的結(jié)果也會反饋給消息提供者(到達隊列)

在 RabbitMQ 中使用消息確認機制可以確保消息被成功發(fā)送到交換機,并被一個或多個消費者成功消費。

RabbitMQ 提供了同步和異步兩種消息確認方式

同步消息確認指的是,在消息發(fā)送之后,發(fā)送方會阻塞等待確認結(jié)果。具體來說,發(fā)送方通過 waitForConfirms() 方法等待 RabbitMQ 發(fā)送確認結(jié)果,直到接收到確認結(jié)果或者超時時間到達。如果確認結(jié)果為消息成功發(fā)送到交換機并被一個或多個消費者消費,則 waitForConfirms()方法返回 true,否則返回 false。

異步消息確認指的是,在消息發(fā)送之后,發(fā)送方不會阻塞等待確認結(jié)果。相反,它通過添加ConfirmListener來異步處理確認回調(diào)。確認回調(diào)會在消息成功發(fā)送到交換機并被一個或多個消費者消費或者發(fā)送失敗時觸發(fā)。

異步消息確認相較于同步消息確認,其對消息發(fā)送方更加友好。使用異步確認的優(yōu)點是可以提高發(fā)送消息的吞吐量,因為發(fā)送方能夠繼續(xù)發(fā)送下一條消息,而不需要等待每條消息的確認結(jié)果。另外,異步確認還可以通過監(jiān)聽確認回調(diào)在消息發(fā)送失敗時及時發(fā)現(xiàn)并處理這種情況,而同步確認則只能通過超時等待的方式判斷消息是否發(fā)送成功。

普通maven項目的消息確認機制
下面是 RabbitMQ 中 Java 客戶端使用消息確認和批量發(fā)送消息的示例代碼,以及使用確認監(jiān)聽器異步處理確認結(jié)果:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MessageSender {
    private static final String EXCHANGE_NAME = "ex1";
    private static final String ROUTING_KEY = "a";
    private static final String MESSAGE = "Hello, RabbitMQ!";
    private static final int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //開啟消息確Z
        channel.confirmSelect();

        //批量發(fā)送消息
        for (int i=0 ; i<MESSAGE_COUNT ; i++) {
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
        }、
        //假如發(fā)送消息需要10s,waitForConfirms會進?阻塞狀態(tài)(同步)
        //boolean b = channel.waitForConfirms();

        //使用監(jiān)聽器異步處理確認結(jié)果
        channel.addConfirmListener(new ConfirmListener() {
            //參數(shù)1:long deliveryTag 返回消息的標識
            //參數(shù)2:boolean multiple 是否為批量confirm
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("~~~~~消息成功發(fā)送到交換機");
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("~~~~~消息發(fā)送到交換機失敗");
            }
        });
    }
}

當調(diào)用 channel.waitForConfirms() 時,它會阻塞當前線程,直到 RabbitMQ 確認消息是否成功發(fā)送到隊列或交換機。
換句話說,在以下情況下,waitForConfirms() 會阻塞程序繼續(xù)執(zhí)行:

  • 當你發(fā)送一條消息后,還沒有收到 RabbitMQ 的確認消息;
  • 當發(fā)送消息時發(fā)生異常,如網(wǎng)絡(luò)中斷或發(fā)送超時;
  • 當你設(shè)置了批量發(fā)送消息的模式,并且尚未收到足夠數(shù)量的消息確認,或者超過了設(shè)置的超時時間。
    簡而言之,waitForConfirms() 會等待確認消息的返回,確保消息成功發(fā)送到 RabbitMQ。
    消息確認機制不光監(jiān)聽成功的消息同時也監(jiān)聽失敗的消息

spring項目的return機制

@Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback {
 @Autowired
 private AmqpTemplate amqpTemplate;

 @Autowired
 private RabbitTemplate rabbitTemplate;

 @PostConstruct
 public void init(){
   rabbitTemplate.setReturnsCallback(this);
 }
 @Override
  public void returnedMessage(ReturnedMessage returnedMessage) {
     System.out.println("消息從交換機分發(fā)到隊列失敗");
     String exchange = returnedMessage.getExchange();
     String routingKey = returnedMessage.getRoutingKey();
     String msg = returnedMessage.getMessage().toString();
     amqpTemplate.convertAndSend(exchange,routingKey,msg);
  }
}

在上述代碼中,returnedMessage方法是用于處理消息從交換機分發(fā)到隊列失敗的回調(diào)方法。當消息無法路由到指定的隊列時,會觸發(fā)該方法。
下面是代碼中各個參數(shù)的含義:
returnedMessage對象:表示被退回的消息的相關(guān)信息。
getExchange():獲取退回消息所使用的交換機名稱。
getRoutingKey():獲取退回消息時所使用的路由鍵。
getMessage():獲取退回的消息對象,可以通過toString()方法將其轉(zhuǎn)換為字符串。
在該方法中,它首先打印了一條消息提示,表示消息從交換機分發(fā)到隊列失敗。然后,通過 amqpTemplate.convertAndSend(exchange,routingKey, msg) 方法將消息重新發(fā)送到指定的交換機和路由鍵,以便重新分發(fā)消息

RabbitMQ消費者手動應(yīng)答(手動ACK)

@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {

    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        try {
            System.out.println("成功接收到消息 msg = " + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.err.println("接收消息失敗 msg = " + msg);
        }
    }
}

在上述代碼中,channel.basicAck 是用于手動確認消息的方法,用于告知 RabbitMQ 已經(jīng)成功處理(消費)了一個消息。下面是 channel.basicAck 方法的各個參數(shù)的詳細說明:

  1. deliveryTag:這是一個消息的唯一標識符。每個消息都有一個獨特的 deliveryTag,用于標識消息的順序。通過 message.getMessageProperties().getDeliveryTag() 可以獲取當前處理消息的 deliveryTag。
    PS:每條消息在 RabbitMQ 中都會被分配一個唯一deliveryTag,用于標識消息的順序。deliveryTag 是一個連續(xù)遞增的正整數(shù),每次消費一條消息時,deliveryTag 會被分配并隨消息一起傳遞給消費者。消費者在處理完一條消息后,需要對該消息進行手動 ACK 才能告知 RabbitMQ 已經(jīng)成功處理了該消息,此時會將該消息的 deliveryTag 返回給 RabbitMQ 作為確認

  2. multiple:這是一個布爾值,用于指定是否確認多個消息。對于簡單地確認單個消息,可以將其設(shè)置為 false。如果要確認多個消息,可以將其設(shè)置為 true,這將確認所有比當前 deliveryTag 小(包括當前 deliveryTag)的未確認消息。
    PS:在消息隊列系統(tǒng)中,通常會有多個消息同時被發(fā)送到隊列中等待消費。當消費者處理消息時,通常會以一批消息的方式進行確認,而非逐條確認。因此,multiple參數(shù)的存在是為了提供一種批量確認消息的機制

    在這種情況下,如果設(shè)置為 true,則表示當前消息之前的所有消息也被認為已經(jīng)處理完成,全部移除。如果設(shè)置為 false,則僅確認當前 deliveryTag 的消息,然后將其從隊列中移除。通常設(shè)置為false。

  3. requeue:這是一個布爾值,用于指定當確認消息時,是否將消息重新加入到隊列。如果設(shè)置為 false,則代表不重新加入隊列,即認為消息已被完全處理。如果設(shè)置為 true,則代表將消息重新加入隊列,以便重新被消費。通常情況下,如果消息處理成功,則設(shè)置為 false,如果消息處理失敗,則設(shè)置為 true。

在上述代碼中,第一個 channel.basicAck 用于確認消息已經(jīng)成功處理,同時使用了 message.getMessageProperties().getDeliveryTag() 來獲取當前消息的 deliveryTag 值。
而第二個 channel.basicAck 是在處理消息出現(xiàn)異常時的回退機制。它使用與第一個 channel.basicAck 相同的參數(shù),以確認消息處理失敗,然后將消息重新加入隊列(requeue 設(shè)置為 true),以便重新消費。同時,使用了 message.getMessageProperties().getDeliveryTag() 獲取當前消息的 deliveryTag 值。

手動ACK的意義體現(xiàn)在以下幾個方面:
1、確??煽啃裕合M者在處理消息時,可能會發(fā)生異常或出現(xiàn)錯誤的情況。如果沒有手動ACK機制,當消費者在處理消息過程中出現(xiàn)異常時,消息隊列將無法得知消息是否被成功處理,可能會導致消息的丟失重復消費。手動ACK可以確保消費者在處理消息成功后再發(fā)送確認,從而確保消息的可靠性。
2、提高消息處理效率:手動ACK機制可以確保消息隊列在收到消費者的ACK確認后,【將該消息從隊列中移除】,從而減少隊列的負荷。通過手動ACK,消費者可以在處理完一條消息后立即發(fā)送確認,減少了消息在隊列中滯留的時間,提高了消息的處理效率

手動ACK與消息確認機制和return機制之間的區(qū)別?
手動 ACK 與消息確認機制:手動 ACK 是消費者在處理完一條消息后,向 RabbitMQ 發(fā)送確認信號,告知 RabbitMQ 已成功處理該消息的機制。通過手動 ACK,消費者可以明確地通知 RabbitMQ 消息已經(jīng)被處理,避免消息丟失重復消費的情況發(fā)生,手動ACK是消費端的機制。消息確認機制則是生產(chǎn)者端的機制,用于確認消息已經(jīng)被發(fā)送到交換機并被消費者接收,確保消息的可靠傳遞。手動 ACK 和消息確認機制共同保證了消息在生產(chǎn)者和消費者之間的可靠傳遞。

手動 ACK 與消息返回機制:消息返回機制會在消息無法被路由到隊列時將消息返回給生產(chǎn)者。在這種情況下,消費者可能無法接收到消息,因此消費者不會發(fā)送 ACK。手動 ACK 可以確保消費者在成功處理消息后再發(fā)送確認信號,而不是直接丟棄未處理的消息。通過手動 ACK,消費者能夠在實際處理消息后再確認消息的處理情況,提高了消息傳遞的完整性和可靠性

消息的冪等性問題
要在 RabbitMQ 中實現(xiàn)冪等性,可以使用 Redis 的 SETNX`(Set if Not Exists)命令來進行輔助。

  • 意義:
    確保消息被處理一次而不會重復處理
    確保消息處理的唯一性和正確性

SETNX 是 Redis 提供的一種原子性命令,用于設(shè)置指定鍵的值,僅當該鍵不存在時才會設(shè)置成功。在 RabbitMQ 的消息處理中,可以將消息的唯一標識或關(guān)鍵信息作為鍵,將已處理消息的標記作為值存儲在 Redis 中,并利用 SETNX 來保證只有第一次處理消息時才會成功設(shè)置標記,從而保證消息的冪等性

下面是一個示例代碼:

@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private final String LOCK_PREFIX = "messageLock:";
    private final long LOCK_EXPIRE_TIME = 60 * 1000; // 鎖的過期時間,單位毫秒

    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        // 使用消息的唯一標識作為鎖的 key
        String lockKey = LOCK_PREFIX + message.getMessageProperties().getDeliveryTag();
        Boolean isLocked = false;

        try {
            // 嘗試獲取鎖
            isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, "");
            // 設(shè)置鎖的過期時間
            redisTemplate.expire(lockKey, LOCK_EXPIRE_TIME, TimeUnit.MILLISECONDS);

            // 如果成功獲取到了鎖,則處理消息
            if (isLocked) {
                System.out.println("成功獲取到鎖并處理消息 msg = " + msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("未獲取到鎖,消息已被其他消費者處理 msg = " + msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.err.println("處理消息失敗 msg = " + msg);
        } finally {
            // 釋放鎖
            if (isLocked) {
                redisTemplate.delete(lockKey);
            }
        }
    }
}
  1. 添加了 RedisTemplate<String, Object> 對象的自動裝配,該對象用于與 Redis 進行交互。
  2. 聲明了一個以 messageLock: 作為前綴的鎖定鍵,使用消息的唯一標識作為鎖的唯一標識。
  3. 使用 RedisTemplateopsForValue().setIfAbsent() 方法嘗試獲取鎖,如果成功獲取到鎖則繼續(xù)處理該消息,否則說明該消息已被其他消費者處理。
  4. 使用 redisTemplate.expire() 設(shè)置鎖的過期時間,避免死鎖。
  5. finally 塊中,如果成功獲取到了鎖,則釋放鎖。

延遲機制
未完待續(xù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 悲觀鎖 有鎖,每次只有一個人能訪問某個數(shù)據(jù),這和Golang中的互斥鎖一樣。 如果是在分布式下使用鎖,可以使用如 ...
    bysir閱讀 626評論 0 1
  • 什么是冪等性? HTTP/1.1中對冪等性的定義是:一次和多次請求某一個資源對于資源本身應(yīng)該具有同樣的結(jié)果(網(wǎng)絡(luò)超...
    侯華明閱讀 1,479評論 0 0
  • 一分鐘教你知道樂觀鎖和悲觀鎖的區(qū)別 悲觀鎖(Pessimistic Lock), 顧名思義,就是很悲觀,每次去拿數(shù)...
    php紅薯閱讀 4,645評論 2 58
  • 樂觀鎖和悲觀鎖 樂觀鎖:使用無鎖結(jié)構(gòu),無非是對發(fā)生沖突保有樂觀態(tài)度,覺得大多數(shù)情況下沖突不會發(fā)生,一旦發(fā)生就采取重...
    L千年老妖閱讀 2,516評論 0 0
  • 在數(shù)據(jù)庫的鎖機制中介紹過,數(shù)據(jù)庫管理系統(tǒng)(DBMS)中的并發(fā)控制的任務(wù)是確保在多個事務(wù)同時存取數(shù)據(jù)庫中同一數(shù)據(jù)時不...
    java成功之路閱讀 311評論 0 1

友情鏈接更多精彩內(nèi)容