RabbitMQ系列(三)RabbitMQ交換器Exchange介紹與實(shí)踐

RabbitMQ交換器Exchange介紹與實(shí)踐

導(dǎo)讀

有了Rabbit的基礎(chǔ)知識(shí)之后(基礎(chǔ)知識(shí)詳見(jiàn):深入解讀RabbitMQ工作原理及簡(jiǎn)單使用),本章我們重點(diǎn)學(xué)習(xí)一下Rabbit里面的exchange(交換器)的知識(shí)。

交換器分類(lèi)

RabbitMQ的Exchange(交換器)分為四類(lèi):

  • direct(默認(rèn))
  • headers
  • fanout
  • topic

其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以我們本文也不做講解。

注意:fanout、topic交換器是沒(méi)有歷史數(shù)據(jù)的,也就是說(shuō)對(duì)于中途創(chuàng)建的隊(duì)列,獲取不到之前的消息。

1、direct交換器

direct為默認(rèn)的交換器類(lèi)型,也非常的簡(jiǎn)單,如果路由鍵匹配的話,消息就投遞到相應(yīng)的隊(duì)列,如圖:

image

使用代碼:channel.basicPublish("", QueueName, null, message)推送direct交換器消息到對(duì)于的隊(duì)列,空字符為默認(rèn)的direct交換器,用隊(duì)列名稱(chēng)當(dāng)做路由鍵。

direct交換器代碼示例

發(fā)送端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊(duì)列【參數(shù)說(shuō)明:參數(shù)一:隊(duì)列名稱(chēng),參數(shù)二:是否持久化;參數(shù)三:是否獨(dú)占模式;參數(shù)四:消費(fèi)者斷開(kāi)連接時(shí)是否刪除隊(duì)列;參數(shù)五:消息其他參數(shù)】
channel.queueDeclare(config.QueueName, false, false, false, null);
String message = String.format("當(dāng)前時(shí)間:%s", new Date().getTime());
// 推送內(nèi)容【參數(shù)說(shuō)明:參數(shù)一:交換機(jī)名稱(chēng);參數(shù)二:隊(duì)列名稱(chēng),參數(shù)三:消息的其他屬性-路由的headers信息;參數(shù)四:消息主體】
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));

接收端,持續(xù)接收消息:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊(duì)列【參數(shù)說(shuō)明:參數(shù)一:隊(duì)列名稱(chēng),參數(shù)二:是否持久化;參數(shù)三:是否獨(dú)占模式;參數(shù)四:消費(fèi)者斷開(kāi)連接時(shí)是否刪除隊(duì)列;參數(shù)五:消息其他參數(shù)】
channel.queueDeclare(config.QueueName, false, false, false, null);
Consumer defaultConsumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "utf-8"); // 消息正文
        System.out.println(workName + "收到消息 => " + message);
        channel.basicAck(envelope.getDeliveryTag(), false); // 手動(dòng)確認(rèn)消息【參數(shù)說(shuō)明:參數(shù)一:該消息的index;參數(shù)二:是否批量應(yīng)答,true批量確認(rèn)小于當(dāng)前id的消息】
    }
};
channel.basicConsume(config.QueueName, false, "", defaultConsumer);

接收端,獲取單條消息

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息確認(rèn)

持續(xù)消息獲取使用:basic.consume;單個(gè)消息獲取使用:basic.get。

注意:不能使用for循環(huán)單個(gè)消息消費(fèi)來(lái)替代持續(xù)消息消費(fèi),因?yàn)檫@樣性能很低;

消息的發(fā)后既忘特性

發(fā)后既往只的是接受者不知道消息的來(lái)源是誰(shuí)發(fā)送的,如果想要指定消息的發(fā)送者,需要包含在發(fā)送內(nèi)容里面,這點(diǎn)就像我們?cè)谛偶锩孀⒚髯约旱男彰粯樱挥羞@樣才能知道發(fā)送者是誰(shuí)。

消息確認(rèn)

看了上面的代碼我們可以知道,消息接收到之后必須使用channel.basicAck()方法手動(dòng)確認(rèn)(非自動(dòng)確認(rèn)刪除模式下),那么問(wèn)題來(lái)了。

消息收到未確認(rèn)會(huì)怎么樣?

如果應(yīng)用程序接收了消息,因?yàn)閎ug忘記確認(rèn)接收的話,消息在隊(duì)列的狀態(tài)會(huì)從“Ready”變?yōu)椤癠nacked”,如圖:

image

如果消息收到卻未確認(rèn),Rabbit將不會(huì)再給這個(gè)應(yīng)用程序發(fā)送更多的消息了,這是因?yàn)镽abbit認(rèn)為你沒(méi)有準(zhǔn)備好接收下一條消息。

此條消息會(huì)一直保持Unacked的狀態(tài),直到你確認(rèn)了消息,或者斷開(kāi)與Rabbit的連接,Rabbit會(huì)自動(dòng)把消息改完Ready狀態(tài),分發(fā)給其他訂閱者。

當(dāng)然你可以利用這一點(diǎn),讓你的程序延遲確認(rèn)該消息,直到你的程序處理完相應(yīng)的業(yè)務(wù)邏輯,這樣可以有效的防治Rabbit給你過(guò)多的消息,導(dǎo)致程序崩潰。

消息確認(rèn)Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

channel.basicAck(long deliveryTag, boolean multiple)為消息確認(rèn),參數(shù)1:消息的id;參數(shù)2:是否批量應(yīng)答,true批量確認(rèn)小于次id的消息。

總結(jié):消費(fèi)者消費(fèi)的每條消息都必須確認(rèn)。

消息拒絕

消息在確認(rèn)之前,可以有兩個(gè)選擇:

選擇1:斷開(kāi)與Rabbit的連接,這樣Rabbit會(huì)重新把消息分派給另一個(gè)消費(fèi)者;

選擇2:拒絕Rabbit發(fā)送的消息使用channel.basicReject(long deliveryTag, boolean requeue),參數(shù)1:消息的id;參數(shù)2:處理消息的方式,如果是true,Rabbib會(huì)重新分配這個(gè)消息給其他訂閱者,如果設(shè)置成false的話,Rabbit會(huì)把消息發(fā)送到一個(gè)特殊的“死信”隊(duì)列,用來(lái)存放被拒絕而不重新放入隊(duì)列的消息。

消息拒絕Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒絕

2、fanout交換器——發(fā)布/訂閱模式

fanout有別于direct交換器,fanout是一種發(fā)布/訂閱模式的交換器,當(dāng)你發(fā)送一條消息的時(shí)候,交換器會(huì)把消息廣播到所有附加到這個(gè)交換器的隊(duì)列上。

比如用戶(hù)上傳了自己的頭像,這個(gè)時(shí)候圖片需要清除緩存,同時(shí)用戶(hù)應(yīng)該得到積分獎(jiǎng)勵(lì),你可以把這兩個(gè)隊(duì)列綁定到圖片上傳的交換器上,這樣當(dāng)有第三個(gè)、第四個(gè)上傳完圖片需要處理的需求的時(shí)候,原來(lái)的代碼可以不變,只需要添加一個(gè)訂閱消息即可,這樣發(fā)送方和消費(fèi)者的代碼完全解耦,并可以輕而易舉的添加新功能了。

和direct交換器不同,我們?cè)诎l(fā)送消息的時(shí)候新增channel.exchangeDeclare(ExchangeName, "fanout"),這行代碼聲明fanout交換器。

發(fā)送端:

final String ExchangeName = "fanoutec"; // 交換器名稱(chēng)
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器
String message = "時(shí)間:" + new Date().getTime();
channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));

接受消息不同于direct,我們需要聲明fanout路由器,并使用默認(rèn)的隊(duì)列綁定到fanout交換器上。

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器
String queueName = channel.queueDeclare().getQueue(); // 聲明隊(duì)列
channel.queueBind(queueName, ExchangeName, "");
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
    }
};
channel.basicConsume(queueName, true, consumer);

fanout和direct的區(qū)別最多的在接收端,fanout需要綁定隊(duì)列到對(duì)應(yīng)的交換器用于訂閱消息。

其中channel.queueDeclare().getQueue()為隨機(jī)隊(duì)列,Rabbit會(huì)隨機(jī)生成隊(duì)列名稱(chēng),一旦消費(fèi)者斷開(kāi)連接,該隊(duì)列會(huì)自動(dòng)刪除。

注意:對(duì)于fanout交換器來(lái)說(shuō)routingKey(路由鍵)是無(wú)效的,這個(gè)參數(shù)是被忽略的。

3、topic交換器——匹配訂閱模式

最后介紹的是topic交換器,topic交換器運(yùn)行和fanout類(lèi)似,但是可以更靈活的匹配自己想要訂閱的信息,這個(gè)時(shí)候routingKey路由鍵就排上用場(chǎng)了,使用路由鍵進(jìn)行消息(規(guī)則)匹配。

假設(shè)我們現(xiàn)在有一個(gè)日志系統(tǒng),會(huì)把所有日志級(jí)別的日志發(fā)送到交換器,warning、log、error、fatal,但我們只想處理error以上的日志,要怎么處理?這就需要使用topic路由器了。

topic路由器的關(guān)鍵在于定義路由鍵,定義routingKey名稱(chēng)不能超過(guò)255字節(jié),使用“.”作為分隔符,例如:com.mq.rabbit.error。

消費(fèi)消息的時(shí)候routingKey可以使用下面字符匹配消息:

  • "*"可以匹配所有內(nèi)容;
  • "#"匹配0和多個(gè)字符;

例如發(fā)布了一個(gè)“com.mq.rabbit.error”的消息:

能匹配上的路由鍵:

  • cn.mq.rabbit.*

  • cn.mq.rabbit.#

  • #.error

  • cn.mq.#

  • #

不能匹配上的路由鍵:

  • cn.mq.*
  • *.error
  • *

所以如果想要訂閱所有消息,可以使用“#”匹配。

注意:fanout、topic交換器是沒(méi)有歷史數(shù)據(jù)的,也就是說(shuō)對(duì)于中途創(chuàng)建的隊(duì)列,獲取不到之前的消息。

發(fā)布端:

String routingKey = "com.mq.rabbit.error";
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器
String message = "時(shí)間:" + new Date().getTime();
channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器
String queueName = channel.queueDeclare().getQueue(); // 聲明隊(duì)列
String routingKey = "#.error";
channel.queueBind(queueName, ExchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(routingKey + "|接收消息 => " + message);
    }
};
channel.basicConsume(queueName, true, consumer);

擴(kuò)展部分—自定義線程池

如果需要更大的控制連接,用戶(hù)可自己設(shè)置線程池,代碼如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

其實(shí)看過(guò)源碼的同學(xué)可能知道,factory.newConnection本身默認(rèn)也有線程池的機(jī)制,ConnectionFactory.class部分源碼如下:

private ExecutorService sharedExecutor;
public Connection newConnection() throws IOException, TimeoutException {
        return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}
public void setSharedExecutor(ExecutorService executor) {
        this.sharedExecutor = executor;
}

其中this.sharedExecutor就是默認(rèn)的線程池,可以通過(guò)setSharedExecutor()方法設(shè)置ConnectionFactory的線程池,如果不設(shè)置則為null。

用戶(hù)如果自己設(shè)置了線程池,像本小節(jié)第一段代碼寫(xiě)的那樣,那么當(dāng)連接關(guān)閉的時(shí)候,不會(huì)自動(dòng)關(guān)閉用戶(hù)自定義的線程池,所以用戶(hù)必須自己手動(dòng)關(guān)閉,通過(guò)調(diào)用shutdown()方法,否則可能會(huì)阻止JVM的終止。

官方的建議是只有在程序出現(xiàn)嚴(yán)重性能瓶頸的時(shí)候,才應(yīng)該考慮使用此功能。

項(xiàng)目地址

GitHub:https://github.com/vipstone/rabbitmq-java.git

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本章我們重點(diǎn)學(xué)習(xí)一下Rabbit里面的exchange(交換器)的知識(shí)。 交換器分類(lèi) RabbitMQ的Excha...
    Java大生閱讀 415評(píng)論 0 1
  • 一、前言 RabbitMQ是一個(gè)開(kāi)源的消息代理軟件(面向消息的中間件),它的核心作用就是創(chuàng)建消息隊(duì)列,異步接收和發(fā)...
    Java中文社群_老王閱讀 1,022評(píng)論 0 50
  • 什么叫消息隊(duì)列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡(jiǎn)單,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,442評(píng)論 0 24
  • 今晚沒(méi)有作業(yè),都是回家復(fù)習(xí),明天全部考試,家長(zhǎng)們的心比孩子都激動(dòng),今晚接回來(lái)和兒子復(fù)習(xí)了一下,和他說(shuō)了一下考試注意...
    2018級(jí)高佳浩媽媽閱讀 313評(píng)論 0 1
  • 2019年12月3日,2:30報(bào)告會(huì)開(kāi)始,2:00我就到達(dá)了會(huì)場(chǎng),期待聽(tīng)到專(zhuān)家的報(bào)告,讓自己有所改變,聽(tīng)畢...
    031a025ff3e6閱讀 376評(píng)論 0 1

友情鏈接更多精彩內(nèi)容