RabbitMQ交換器Exchange介紹與實(shí)踐
導(dǎo)讀
有了Rabbit的基礎(chǔ)知識(shí)之后(基礎(chǔ)知識(shí)詳見(jiàn):深入解讀RabbitMQ工作原理及簡(jiǎn)單使用),本章我們重點(diǎn)學(xué)習(xí)一下Rabbit里面的exchange(交換器)的知識(shí)。
交換器分類(lèi)
RabbitMQ的Exchange(交換器)分為四類(lèi):
- direct(默認(rèn))
- headers
- fanout
- topic
其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以我們本文也不做講解。
注意:fanout、topic交換器是沒(méi)有歷史數(shù)據(jù)的,也就是說(shuō)對(duì)于中途創(chuàng)建的隊(duì)列,獲取不到之前的消息。
1、direct交換器
direct為默認(rèn)的交換器類(lèi)型,也非常的簡(jiǎn)單,如果路由鍵匹配的話,消息就投遞到相應(yīng)的隊(duì)列,如圖:

使用代碼:channel.basicPublish("", QueueName, null, message)推送direct交換器消息到對(duì)于的隊(duì)列,空字符為默認(rèn)的direct交換器,用隊(duì)列名稱(chēng)當(dāng)做路由鍵。
direct交換器代碼示例
發(fā)送端:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊(duì)列【參數(shù)說(shuō)明:參數(shù)一:隊(duì)列名稱(chēng),參數(shù)二:是否持久化;參數(shù)三:是否獨(dú)占模式;參數(shù)四:消費(fèi)者斷開(kāi)連接時(shí)是否刪除隊(duì)列;參數(shù)五:消息其他參數(shù)】
channel.queueDeclare(config.QueueName, false, false, false, null);
String message = String.format("當(dāng)前時(shí)間:%s", new Date().getTime());
// 推送內(nèi)容【參數(shù)說(shuō)明:參數(shù)一:交換機(jī)名稱(chēng);參數(shù)二:隊(duì)列名稱(chēng),參數(shù)三:消息的其他屬性-路由的headers信息;參數(shù)四:消息主體】
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
接收端,持續(xù)接收消息:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊(duì)列【參數(shù)說(shuō)明:參數(shù)一:隊(duì)列名稱(chēng),參數(shù)二:是否持久化;參數(shù)三:是否獨(dú)占模式;參數(shù)四:消費(fèi)者斷開(kāi)連接時(shí)是否刪除隊(duì)列;參數(shù)五:消息其他參數(shù)】
channel.queueDeclare(config.QueueName, false, false, false, null);
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "utf-8"); // 消息正文
System.out.println(workName + "收到消息 => " + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手動(dòng)確認(rèn)消息【參數(shù)說(shuō)明:參數(shù)一:該消息的index;參數(shù)二:是否批量應(yīng)答,true批量確認(rèn)小于當(dāng)前id的消息】
}
};
channel.basicConsume(config.QueueName, false, "", defaultConsumer);
接收端,獲取單條消息
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息確認(rèn)
持續(xù)消息獲取使用:basic.consume;單個(gè)消息獲取使用:basic.get。
注意:不能使用for循環(huán)單個(gè)消息消費(fèi)來(lái)替代持續(xù)消息消費(fèi),因?yàn)檫@樣性能很低;
消息的發(fā)后既忘特性
發(fā)后既往只的是接受者不知道消息的來(lái)源是誰(shuí)發(fā)送的,如果想要指定消息的發(fā)送者,需要包含在發(fā)送內(nèi)容里面,這點(diǎn)就像我們?cè)谛偶锩孀⒚髯约旱男彰粯樱挥羞@樣才能知道發(fā)送者是誰(shuí)。
消息確認(rèn)
看了上面的代碼我們可以知道,消息接收到之后必須使用channel.basicAck()方法手動(dòng)確認(rèn)(非自動(dòng)確認(rèn)刪除模式下),那么問(wèn)題來(lái)了。
消息收到未確認(rèn)會(huì)怎么樣?
如果應(yīng)用程序接收了消息,因?yàn)閎ug忘記確認(rèn)接收的話,消息在隊(duì)列的狀態(tài)會(huì)從“Ready”變?yōu)椤癠nacked”,如圖:

如果消息收到卻未確認(rèn),Rabbit將不會(huì)再給這個(gè)應(yīng)用程序發(fā)送更多的消息了,這是因?yàn)镽abbit認(rèn)為你沒(méi)有準(zhǔn)備好接收下一條消息。
此條消息會(huì)一直保持Unacked的狀態(tài),直到你確認(rèn)了消息,或者斷開(kāi)與Rabbit的連接,Rabbit會(huì)自動(dòng)把消息改完Ready狀態(tài),分發(fā)給其他訂閱者。
當(dāng)然你可以利用這一點(diǎn),讓你的程序延遲確認(rèn)該消息,直到你的程序處理完相應(yīng)的業(yè)務(wù)邏輯,這樣可以有效的防治Rabbit給你過(guò)多的消息,導(dǎo)致程序崩潰。
消息確認(rèn)Demo:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
channel.basicAck(long deliveryTag, boolean multiple)為消息確認(rèn),參數(shù)1:消息的id;參數(shù)2:是否批量應(yīng)答,true批量確認(rèn)小于次id的消息。
總結(jié):消費(fèi)者消費(fèi)的每條消息都必須確認(rèn)。
消息拒絕
消息在確認(rèn)之前,可以有兩個(gè)選擇:
選擇1:斷開(kāi)與Rabbit的連接,這樣Rabbit會(huì)重新把消息分派給另一個(gè)消費(fèi)者;
選擇2:拒絕Rabbit發(fā)送的消息使用channel.basicReject(long deliveryTag, boolean requeue),參數(shù)1:消息的id;參數(shù)2:處理消息的方式,如果是true,Rabbib會(huì)重新分配這個(gè)消息給其他訂閱者,如果設(shè)置成false的話,Rabbit會(huì)把消息發(fā)送到一個(gè)特殊的“死信”隊(duì)列,用來(lái)存放被拒絕而不重新放入隊(duì)列的消息。
消息拒絕Demo:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒絕
2、fanout交換器——發(fā)布/訂閱模式
fanout有別于direct交換器,fanout是一種發(fā)布/訂閱模式的交換器,當(dāng)你發(fā)送一條消息的時(shí)候,交換器會(huì)把消息廣播到所有附加到這個(gè)交換器的隊(duì)列上。
比如用戶(hù)上傳了自己的頭像,這個(gè)時(shí)候圖片需要清除緩存,同時(shí)用戶(hù)應(yīng)該得到積分獎(jiǎng)勵(lì),你可以把這兩個(gè)隊(duì)列綁定到圖片上傳的交換器上,這樣當(dāng)有第三個(gè)、第四個(gè)上傳完圖片需要處理的需求的時(shí)候,原來(lái)的代碼可以不變,只需要添加一個(gè)訂閱消息即可,這樣發(fā)送方和消費(fèi)者的代碼完全解耦,并可以輕而易舉的添加新功能了。
和direct交換器不同,我們?cè)诎l(fā)送消息的時(shí)候新增channel.exchangeDeclare(ExchangeName, "fanout"),這行代碼聲明fanout交換器。
發(fā)送端:
final String ExchangeName = "fanoutec"; // 交換器名稱(chēng)
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器
String message = "時(shí)間:" + new Date().getTime();
channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));
接受消息不同于direct,我們需要聲明fanout路由器,并使用默認(rèn)的隊(duì)列綁定到fanout交換器上。
接收端:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器
String queueName = channel.queueDeclare().getQueue(); // 聲明隊(duì)列
channel.queueBind(queueName, ExchangeName, "");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
}
};
channel.basicConsume(queueName, true, consumer);
fanout和direct的區(qū)別最多的在接收端,fanout需要綁定隊(duì)列到對(duì)應(yīng)的交換器用于訂閱消息。
其中channel.queueDeclare().getQueue()為隨機(jī)隊(duì)列,Rabbit會(huì)隨機(jī)生成隊(duì)列名稱(chēng),一旦消費(fèi)者斷開(kāi)連接,該隊(duì)列會(huì)自動(dòng)刪除。
注意:對(duì)于fanout交換器來(lái)說(shuō)routingKey(路由鍵)是無(wú)效的,這個(gè)參數(shù)是被忽略的。
3、topic交換器——匹配訂閱模式
最后介紹的是topic交換器,topic交換器運(yùn)行和fanout類(lèi)似,但是可以更靈活的匹配自己想要訂閱的信息,這個(gè)時(shí)候routingKey路由鍵就排上用場(chǎng)了,使用路由鍵進(jìn)行消息(規(guī)則)匹配。
假設(shè)我們現(xiàn)在有一個(gè)日志系統(tǒng),會(huì)把所有日志級(jí)別的日志發(fā)送到交換器,warning、log、error、fatal,但我們只想處理error以上的日志,要怎么處理?這就需要使用topic路由器了。
topic路由器的關(guān)鍵在于定義路由鍵,定義routingKey名稱(chēng)不能超過(guò)255字節(jié),使用“.”作為分隔符,例如:com.mq.rabbit.error。
消費(fèi)消息的時(shí)候routingKey可以使用下面字符匹配消息:
- "*"可以匹配所有內(nèi)容;
- "#"匹配0和多個(gè)字符;
例如發(fā)布了一個(gè)“com.mq.rabbit.error”的消息:
能匹配上的路由鍵:
cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
#
不能匹配上的路由鍵:
- cn.mq.*
- *.error
- *
所以如果想要訂閱所有消息,可以使用“#”匹配。
注意:fanout、topic交換器是沒(méi)有歷史數(shù)據(jù)的,也就是說(shuō)對(duì)于中途創(chuàng)建的隊(duì)列,獲取不到之前的消息。
發(fā)布端:
String routingKey = "com.mq.rabbit.error";
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器
String message = "時(shí)間:" + new Date().getTime();
channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));
接收端:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器
String queueName = channel.queueDeclare().getQueue(); // 聲明隊(duì)列
String routingKey = "#.error";
channel.queueBind(queueName, ExchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(routingKey + "|接收消息 => " + message);
}
};
channel.basicConsume(queueName, true, consumer);
擴(kuò)展部分—自定義線程池
如果需要更大的控制連接,用戶(hù)可自己設(shè)置線程池,代碼如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
其實(shí)看過(guò)源碼的同學(xué)可能知道,factory.newConnection本身默認(rèn)也有線程池的機(jī)制,ConnectionFactory.class部分源碼如下:
private ExecutorService sharedExecutor;
public Connection newConnection() throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}
public void setSharedExecutor(ExecutorService executor) {
this.sharedExecutor = executor;
}
其中this.sharedExecutor就是默認(rèn)的線程池,可以通過(guò)setSharedExecutor()方法設(shè)置ConnectionFactory的線程池,如果不設(shè)置則為null。
用戶(hù)如果自己設(shè)置了線程池,像本小節(jié)第一段代碼寫(xiě)的那樣,那么當(dāng)連接關(guān)閉的時(shí)候,不會(huì)自動(dòng)關(guān)閉用戶(hù)自定義的線程池,所以用戶(hù)必須自己手動(dòng)關(guān)閉,通過(guò)調(diào)用shutdown()方法,否則可能會(huì)阻止JVM的終止。
官方的建議是只有在程序出現(xiàn)嚴(yán)重性能瓶頸的時(shí)候,才應(yīng)該考慮使用此功能。
項(xiàng)目地址
GitHub:https://github.com/vipstone/rabbitmq-java.git