- 1. RabbitMQ的介紹和安裝
- 2. Java操作RabbitMQ
- 3. SpringBoot整合RabbitMQ
1. RabbitMQ的介紹和安裝
1.1 介紹
1.1.1 是什么?
MQ全稱為Message Queue,即消息隊(duì)列. 它也是一個(gè)隊(duì)列,遵循FIFO原則.(先進(jìn)先出)
RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開 發(fā)中應(yīng)用非常廣泛。
RabbitMQ官網(wǎng)地址:https://www.rabbitmq.com/
RabbitMQ是基于AMQP協(xié)議實(shí)現(xiàn)一種MQ
1.1.2 使用場(chǎng)景
開發(fā)中消息隊(duì)列通常有如下應(yīng)用場(chǎng)景:
-
提高系統(tǒng)響應(yīng)速度
任務(wù)異步處理。 將不需要同步處理的并且耗時(shí)長的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。提高了應(yīng)用程序的響應(yīng)時(shí)間
-
提高系統(tǒng)穩(wěn)定性
系統(tǒng)掛了關(guān)系,操作內(nèi)容放到消息隊(duì)列
異步化
-
解耦
應(yīng)用程序解耦合MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。
排序保證 FIFO(比如應(yīng)用于秒殺場(chǎng)景)
消除峰值
1.1.3 市面上常見消息隊(duì)列
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
ActiveMQ是老牌,不太好用,有點(diǎn)過時(shí)了!
RabbitMQ,Kafka,Redis現(xiàn)在這三個(gè)比較好用!
為什么使用RabbitMQ呢?
- 使得簡單,功能強(qiáng)大。
- 基于AMQP協(xié)議。
- 社區(qū)活躍,文檔完善。
- 高并發(fā)性能好,這主要得益于Erlang語言。
- Spring Boot默認(rèn)已集成RabbitMQ
1.1.4 相關(guān)介紹
- AMQP
- (1)概念:AMQP,即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。Erlang中的實(shí)現(xiàn)有RabbitMQ等。
- (2)官網(wǎng)地址:http://www.amqp.org/
- (3)總結(jié):AMQP是一套公開的消息隊(duì)列協(xié)議,最早在2003年被提出,它旨在從協(xié)議層定義消息通信數(shù)據(jù)的標(biāo)準(zhǔn)格式,為的就是解決MQ市場(chǎng)上協(xié)議不統(tǒng)一的問題,RabbitMQ就是遵循AMQP標(biāo)準(zhǔn)協(xié)議開發(fā)的MQ服務(wù)
- JMS
- (1)概念:JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
- (2)官網(wǎng):https://www.oracle.com/technetwork/java/jms/index.html
- (3)總結(jié): JMS是java提供的一套消息服務(wù)API標(biāo)準(zhǔn),其目的是為所有的java應(yīng)用程序提供統(tǒng)一的消息通信的標(biāo)準(zhǔn),類似java的jdbc,只要遵循jms標(biāo)準(zhǔn)的應(yīng)用程序之間都可以進(jìn)行消息通信。它和AMQP有什么不同,jms是java語言專屬的消息服務(wù)標(biāo)準(zhǔn),它是在api層定義標(biāo)準(zhǔn),并且只能用于java應(yīng)用;而AMQP是在協(xié)議層定義的標(biāo)準(zhǔn),是跨語言的 。
1.2 安裝
1.2.1 工作原理
組成部分說明如下:
- Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue。
- (1)Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對(duì)消息進(jìn)行過慮。
- (2)Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)方。
- Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送到MQ。
- Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。
消息發(fā)送流程:
- 生產(chǎn)者和Broker建立TCP連接。
- 生產(chǎn)者和Broker建立通道。
- 生產(chǎn)者通過通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。
- Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)
消息接收流程:
- 消費(fèi)者和Broker建立TCP連接
- 消費(fèi)者和Broker建立通道
- 消費(fèi)者監(jiān)聽指定的Queue(隊(duì)列)
- 當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。
- 消費(fèi)者接收到消息。
1.2.2 安裝
RabbitMQ由Erlang語言開發(fā),Erlang語言用于并發(fā)及分布式系統(tǒng)的開發(fā),在電信領(lǐng)域應(yīng)用廣泛,OTP(Open Telecom Platform)作為Erlang語言的一部分,包含了很多基于Erlang開發(fā)的中間件及工具庫,安裝RabbitMQ需要安裝Erlang/OTP,并保持版本匹配
RabbitMQ的下載地址:http://www.rabbitmq.com/download.html
我這里使用Erlang/OTP 20.3版本和RabbitMQ3.7.4版本。
-
安裝erlang
下載地址:https://www.erlang.org/downloads
我這里下載了OTP 20.3 Windows 64位二進(jìn)制文??件 (99142192)-->otp_win64_20.3.exe.運(yùn)行此文件進(jìn)行安裝。
安裝完以后配置環(huán)境變量!
-
安裝RabbitMQ
下載地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.4
下載rabbitmq-server-3.7.4.exe
-
啟動(dòng)
安裝成功后會(huì)自動(dòng)創(chuàng)建RabbitMQ服務(wù)并且啟動(dòng)。
(啟動(dòng)方式1)從開始菜單啟動(dòng)RabbitMQ完成在開始菜單找到RabbitMQ的菜單?;蛘呷ト蝿?wù)管理器的服務(wù)那里手動(dòng)搞一下!
(啟動(dòng)方式2)進(jìn)入安裝目錄下sbin目錄手動(dòng)啟動(dòng):rabbitmq-service.bat install 安裝服務(wù) rabbitmq-service.bat stop 停止服務(wù) rabbitmq-service.bat start 啟動(dòng)服務(wù)
-
安裝管理插件
安裝rabbitMQ的管理插件,方便在瀏覽器端管理RabbitMQ
管理員身份運(yùn)行 rabbitmq-plugins.bat enable rabbitmq_management
重新啟動(dòng)RabbitMQ服務(wù)
-
瀏覽器訪問測(cè)試
進(jìn)入瀏覽器,訪問http://localhost:15672
初始賬號(hào)和密碼:guest/guest
注意:
- 安裝erlang和rabbitMQ以管理員身份運(yùn)行。
- 當(dāng)卸載重新安裝時(shí)會(huì)出現(xiàn)RabbitMQ服務(wù)注冊(cè)失敗,此時(shí)需要進(jìn)入注冊(cè)表清理erlang。搜索RabbitMQ、ErlSrv,將對(duì)應(yīng)的項(xiàng)全部刪除。
2. Java操作RabbitMQ
2.1 RabbitMQ的消息模型
RabbitMQ提供了6種消息模型,但是第6種其實(shí)是RPC,并不是MQ!那么也就剩下5種。
3、4、5這三種都屬于訂閱模型,只不過進(jìn)行路由的方式不同。
消息模型如下圖所示:
[圖片上傳失敗...(image-b28b51-1569803506043)]
2.2 搭建環(huán)境
-
創(chuàng)建maven工程!
我這里取名RabbitMQTest
導(dǎo)包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<!--和springboot2.0.5對(duì)應(yīng)-->
<version>5.4.1</version>
</dependency>
- 工具類
package cn.wangningbo.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連接
*
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置服務(wù)地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);
//設(shè)置賬號(hào)信息,用戶名、密碼、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 通過工程獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
注意:我下面為了測(cè)試方便,就沒有處理異常都拋出去了!
2.2.1 "Hello World!"------基本消息模型
2.2.1.1 生產(chǎn)者
package cn.wangningbo._01helloworld;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
//隊(duì)列名稱
private static final String QUEUE = "helloworld";
//使用main方法進(jìn)行測(cè)試
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
Channel channel = connection.createChannel();
/**
* 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
* param1:隊(duì)列名稱
* param2:是否持久化
* param3:隊(duì)列是否獨(dú)占此連接
* param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
* param5:隊(duì)列參數(shù)
*/
channel.queueDeclare(QUEUE, true, false, false, null);
// 準(zhǔn)備一條消息 // 字符串+當(dāng)前系統(tǒng)時(shí)間的時(shí)間戳
String message = "helloworld小明" + System.currentTimeMillis();
/**
* 消息發(fā)布方法
* param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
* param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
* param3:消息包含的屬性
* param4:消息體
*/
/**
* 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯
示綁定或解除綁定
* 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
*/
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
}
2.2.1.2 消費(fèi)者
package cn.wangningbo._01helloworld;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
//隊(duì)列名稱 //消費(fèi)哪個(gè)生產(chǎn)者隊(duì)列的消息
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
Channel channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE, true, false, false, null);
//定義消費(fèi)方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消費(fèi)者接收消息調(diào)用此方法
* @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
* @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
(收到消息失敗后是否需要重新發(fā)送)
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//交換機(jī)
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內(nèi)容
String msg = new String(body, "utf8");
System.out.println("receive message.." + msg);
}
};
/**
* 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
* 參數(shù)明細(xì)
* 1、隊(duì)列名稱
* 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
為false則需要手動(dòng)回復(fù)
* 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
*/
channel.basicConsume(QUEUE, true, consumer);
}
}
2.2.1.3 簡單測(cè)試
工作流程簡述:
- 先啟動(dòng)生產(chǎn)者,這時(shí)候會(huì)有一個(gè)消息被丟到默認(rèn)的交換機(jī)里面,這個(gè)交換機(jī)會(huì)把消息丟到指定的隊(duì)列里面,等待被消費(fèi)者消費(fèi)掉!
- 再啟動(dòng)消費(fèi)者,這時(shí)候發(fā)現(xiàn)消費(fèi)者消費(fèi)掉了剛才生產(chǎn)者發(fā)送的那條消息,由于消費(fèi)者設(shè)置了自動(dòng)回復(fù),所以隊(duì)列里的那個(gè)消息就被消費(fèi)掉了,隊(duì)里的消息被刪除了!但是程序沒有停止,一直在監(jiān)聽隊(duì)列中是否有新的消息。一旦有新的消息進(jìn)入隊(duì)列,就會(huì)立即打印!
2.2.3.4 消息確認(rèn)機(jī)制(ACK)
通過剛才的案例可以看出,消息一旦被消費(fèi)者接收,隊(duì)列中的消息就會(huì)被刪除。
那么問題來了:RabbitMQ怎么知道消息被接收了呢?
如果消費(fèi)者領(lǐng)取消息后,還沒執(zhí)行操作就掛掉了呢?或者拋出了異常?消息消費(fèi)失敗,但是RabbitMQ無從得知,這樣消息就丟失了!
因此,RabbitMQ有一個(gè)ACK機(jī)制。當(dāng)消費(fèi)者獲取消息后,會(huì)向RabbitMQ發(fā)送回執(zhí)ACK,告知消息已經(jīng)被接收。不過這種回執(zhí)ACK分兩種情況:
- 自動(dòng)ACK:消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送ACK
- 手動(dòng)ACK:消息接收后,不會(huì)發(fā)送ACK,需要手動(dòng)調(diào)用
只有發(fā)送了回執(zhí)ACK,隊(duì)列的消息就會(huì)被刪除掉!
但是哪種更好呢?
這還需要看消息的重要性!
- 如果消息不太重要,丟失也沒有影響,那么自動(dòng)ACK會(huì)比較方便
- 如果消息非常重要,不容丟失。那么最好在消費(fèi)完成后手動(dòng)ACK,否則接收消息后就自動(dòng)ACK,RabbitMQ就會(huì)把消息從隊(duì)列中刪除。如果此時(shí)消費(fèi)者宕機(jī),那么消息就丟失了。
我上面的測(cè)試都是自動(dòng)ACK的,如果要手動(dòng)ACK,需要改動(dòng)消費(fèi)者的代碼!
2.2.1.4 自動(dòng)確認(rèn)存在的問題
修改消費(fèi)者里面的這個(gè)方法,讓它1/0發(fā)生異常!
/**
* 消費(fèi)者接收消息調(diào)用此方法
* @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
* @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
(收到消息失敗后是否需要重新發(fā)送)
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
int i = 1 / 0;
//交換機(jī)
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內(nèi)容
String msg = new String(body, "utf8");
System.out.println("receive message.." + msg);
}
};
生產(chǎn)者不做任何修改,直接運(yùn)行,消息發(fā)送成功!
運(yùn)行消費(fèi)者,程序拋出異常。但是消息依然被消費(fèi)!
這時(shí)候就存在問題,這個(gè)消息對(duì)我很重要,不允許丟失,這時(shí)候就需要消費(fèi)者沒報(bào)錯(cuò)的時(shí)候才回執(zhí)ACK!這就需要手動(dòng)回執(zhí)了!
2.2.1.5 手動(dòng)確認(rèn)實(shí)現(xiàn)
修改消費(fèi)者,把自動(dòng)改成手動(dòng)(去掉之前制造的異常)
/**
* 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
* 參數(shù)明細(xì)
* 1、隊(duì)列名稱
* 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
為false則需要手動(dòng)回復(fù)
* 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
*/
channel.basicConsume(QUEUE, false, consumer);
生產(chǎn)者不變,再次運(yùn)行!
運(yùn)行消費(fèi)者!消費(fèi)者沒有異常,并且也獲取了消息!但是通過可視化界面看到隊(duì)列中依然存在那個(gè)消息!
這是因?yàn)殡m然我們?cè)O(shè)置了手動(dòng)ACK,但是代碼中并沒有進(jìn)行消息確認(rèn)!所以消息并未被真正消費(fèi)掉。
當(dāng)我們關(guān)掉這個(gè)消費(fèi)者,消息的狀態(tài)再次稱為Ready!
手動(dòng)ACK設(shè)置:channel.basicAck(envelope.getDeliveryTag(), false);當(dāng)消費(fèi)完以后才發(fā)送回執(zhí)ACK!
//定義消費(fèi)方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消費(fèi)者接收消息調(diào)用此方法
* @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
* @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
(收到消息失敗后是否需要重新發(fā)送)
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
// int i = 1 / 0;
//交換機(jī)
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內(nèi)容
String msg = new String(body, "utf8");
System.out.println("receive message.." + msg);
//手動(dòng)進(jìn)行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
2.2.1.6 小結(jié)
- 發(fā)送端操作流程
- 1)創(chuàng)建連接
- 2)創(chuàng)建通道
- 3)聲明隊(duì)列
- 4)發(fā)送消息
- 接收端操作流程
- 1)創(chuàng)建連接
- 2)創(chuàng)建通道
- 3)聲明隊(duì)列
- 4)監(jiān)聽隊(duì)列
- 5)接收消息
- 6)ack回復(fù)
2.2.2 Work queues
work queues與入門程序相比,多了一個(gè)消費(fèi)端,兩個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。
應(yīng)用場(chǎng)景:對(duì)于任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。
測(cè)試:
- 一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
- 生產(chǎn)者發(fā)送多個(gè)消息!
結(jié)果:
- 一條消息只會(huì)被一個(gè)消費(fèi)者接收;
- rabbit采用輪詢的方式將消息是平均發(fā)送給消費(fèi)者的;
- 消費(fèi)者在處理完某條消息后,才會(huì)收到下一條消息。
工作隊(duì)列,又稱任務(wù)隊(duì)列。主要思想就是避免執(zhí)行資源密集型任務(wù)時(shí),必須等待它執(zhí)行完成。相反我們稍后完成任務(wù),我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。 在后臺(tái)運(yùn)行的工作進(jìn)程將獲取任務(wù)并最終執(zhí)行作業(yè)。當(dāng)你運(yùn)行許多工人時(shí),任務(wù)將在他們之間共享,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。
模擬流程:
- P:生產(chǎn)者:任務(wù)的發(fā)布者
- C1:消費(fèi)者,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較快
- C2:消費(fèi)者2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度慢
2.2.2.1 生產(chǎn)者
對(duì)生產(chǎn)者做一點(diǎn)的小小加工,讓它發(fā)送50個(gè)消息到隊(duì)列,而且是發(fā)送的別太快!
加工部分
//生產(chǎn)者發(fā)送50個(gè)消息
for (int i = 1; i <= 50; i++) {
Thread.sleep(i * 2);
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println(i + "Send Message is:'" + message + "'");
}
完整生產(chǎn)者
package cn.wangningbo._02workqueues;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
//隊(duì)列名稱
private static final String QUEUE = "workqueues";
//使用main方法進(jìn)行測(cè)試
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
Channel channel = connection.createChannel();
/**
* 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
* param1:隊(duì)列名稱
* param2:是否持久化
* param3:隊(duì)列是否獨(dú)占此連接
* param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
* param5:隊(duì)列參數(shù)
*/
channel.queueDeclare(QUEUE, true, false, false, null);
// 準(zhǔn)備一條消息 // 字符串+當(dāng)前系統(tǒng)時(shí)間的時(shí)間戳
String message = "helloworld小明" + System.currentTimeMillis();
/**
* 消息發(fā)布方法
* param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
* param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
* param3:消息包含的屬性
* param4:消息體
*/
/**
* 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯
示綁定或解除綁定
* 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
*/
//生產(chǎn)者發(fā)送50個(gè)消息
for (int i = 1; i <= 50; i++) {
Thread.sleep(i * 2);
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println(i + "Send Message is:'" + message + "'");
}
}
}
2.2.2.2 消費(fèi)者1
讓消費(fèi)者1處理的能力強(qiáng)
消費(fèi)者1不需要做什么改變!正常消費(fèi)即可!只是多個(gè)計(jì)數(shù)器統(tǒng)計(jì)消費(fèi)了多少條消息!
package cn.wangningbo._02workqueues;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
//隊(duì)列名稱 //消費(fèi)哪個(gè)生產(chǎn)者隊(duì)列的消息
private static final String QUEUE = "workqueues";
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
Channel channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE, true, false, false, null);
//定義消費(fèi)方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
//計(jì)數(shù)器
int index = 1;
/**
* 消費(fèi)者接收消息調(diào)用此方法
* @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
* @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
(收到消息失敗后是否需要重新發(fā)送)
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//交換機(jī)
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內(nèi)容
String msg = new String(body, "utf8");
//index用來統(tǒng)計(jì)消費(fèi)了多少條消息
System.out.println(index+++"receive message.." + msg);
//手動(dòng)進(jìn)行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/**
* 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
* 參數(shù)明細(xì)
* 1、隊(duì)列名稱
* 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
為false則需要手動(dòng)回復(fù)
* 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
*/
channel.basicConsume(QUEUE, false, consumer);
}
}
2.2.2.3 消費(fèi)者2
讓消費(fèi)者2處理的能力弱!這里模擬消費(fèi)耗時(shí)!
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
消費(fèi)者2完整代碼
package cn.wangningbo._02workqueues;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
//隊(duì)列名稱 //消費(fèi)哪個(gè)生產(chǎn)者隊(duì)列的消息
private static final String QUEUE = "workqueues";
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
Channel channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE, true, false, false, null);
//定義消費(fèi)方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
//計(jì)數(shù)器
int index = 1;
/**
* 消費(fèi)者接收消息調(diào)用此方法
* @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
* @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
(收到消息失敗后是否需要重新發(fā)送)
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//交換機(jī)
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內(nèi)容
String msg = new String(body, "utf8");
//index用來統(tǒng)計(jì)消費(fèi)了多少條消息
System.out.println(index+++"receive message.." + msg);
//手動(dòng)進(jìn)行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/**
* 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
* 參數(shù)明細(xì)
* 1、隊(duì)列名稱
* 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
為false則需要手動(dòng)回復(fù)
* 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
*/
channel.basicConsume(QUEUE, false, consumer);
}
}
2.2.2.4 測(cè)試
先啟動(dòng)兩個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者!
測(cè)試結(jié)果:
- 可以發(fā)現(xiàn)兩個(gè)消費(fèi)者各自消費(fèi)了25條消息,而且各不相同,這就實(shí)現(xiàn)了任務(wù)分發(fā)!
- 而消費(fèi)者1能力比較強(qiáng),很快就處理完了25條消息,然后處于空閑的監(jiān)聽狀態(tài)。
- 而消費(fèi)者2的能力比較弱,一直在緩慢的處理,一條一條的,直到25條處理完畢,然后處于空閑監(jiān)聽狀態(tài)!
2.2.2.5 能者多勞
剛才產(chǎn)生的問題?
- 消費(fèi)者1比消費(fèi)者2的效率要高,一次任務(wù)的耗時(shí)較短
- 然而兩人最終消費(fèi)的消息數(shù)量是一樣的
- 消費(fèi)者1大量時(shí)間處于空閑狀態(tài),消費(fèi)者2一直忙碌
現(xiàn)在的狀態(tài)屬于是把任務(wù)平均分配,正確的做法應(yīng)該是消費(fèi)越快的人,消費(fèi)的越多。
我希望是能者多勞,效率高的多消費(fèi)一些,效率低的少消費(fèi)一些,這樣把所有消息消費(fèi)完所需要的時(shí)間最短!
實(shí)現(xiàn)方法:
使用basicQos方法和prefetchCount = 1設(shè)置。 這告訴RabbitMQ一次不要向工作人員發(fā)送多于一條消息。 或者換句話說,不要向工作人員發(fā)送新消息,直到它處理并確認(rèn)了前一個(gè)消息。相反,它會(huì)將其分派給不是仍然忙碌的下一個(gè)工作人員。
在兩個(gè)消費(fèi)者那里分別設(shè)置
//聲明一次發(fā)1條消息,等到處理完了再發(fā)
channel.basicQos(1);
2.2.2.6 再次測(cè)試
生產(chǎn)者發(fā)送了50條消息,消費(fèi)者1能力較強(qiáng),消費(fèi)了32條消息,消費(fèi)者2能力較弱,消費(fèi)了18條!
實(shí)現(xiàn)了能者多勞!
2.2.3 訂閱模型分類
2.2.3.1 概述
在之前的模式中,我們創(chuàng)建了一個(gè)工作隊(duì)列。 工作隊(duì)列背后的假設(shè)是:每個(gè)任務(wù)只被傳遞給一個(gè)工作人員。 在這一部分,我們將做一些完全不同的事情 - 我們將會(huì)傳遞一個(gè)信息給多個(gè)消費(fèi)者。 這種模式被稱為“發(fā)布/訂閱”。
- 1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
- 每一個(gè)消費(fèi)者都有自己的一個(gè)隊(duì)列
- 生產(chǎn)者沒有將消息直接發(fā)送到隊(duì)列,而是發(fā)送到了交換機(jī)
- 每個(gè)隊(duì)列都要綁定到交換機(jī)
- 生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者獲取的目的
X(Exchanges):交換機(jī)一方面:接收生產(chǎn)者發(fā)送的消息。另一方面:知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
2.2.3.2 分類
Exchange類型有以下幾種:
- Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列 all
- Direct:定向,把消息交給符合指定routing key 的隊(duì)列 一堆或一個(gè)
- Topic:通配符,把消息交給符合routing pattern(路由模式)的隊(duì)列 一堆或者一個(gè)
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
2.2.4 訂閱模型-FANOUT
2.2.4.1 工作模式
發(fā)布訂閱模式:
在廣播模式下,消息發(fā)送流程是這樣的:
- 1) 可以有多個(gè)消費(fèi)者
- 2) 每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
- 3) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
- 4) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定。
- 5) 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
- 6) 隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
2.2.4.2 生產(chǎn)者
與上面那種非訂閱模式有兩種變化:
- 聲明Exchange,不再聲明Queue
- 發(fā)送消息到Exchange,不再發(fā)送到Queue
package cn.wangningbo._03fanout;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明exchange的類型,指定類型為fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 消息內(nèi)容
String message = "Hello everyone";
// 發(fā)布消息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生產(chǎn)者] Sent '" + message + "'");
//關(guān)閉連接
channel.close();
connection.close();
}
}
2.2.4.3 消費(fèi)者
消費(fèi)者1
package cn.wangningbo._03fanout;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "fanout_exchange_queue_1";
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者1] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費(fèi)者2
package cn.wangningbo._03fanout;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "fanout_exchange_queue_2";
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者2] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,手動(dòng)返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2.2.4.4 測(cè)試
先啟動(dòng)一下生產(chǎn)者,來創(chuàng)建交換機(jī)。再啟動(dòng)消費(fèi)者1和消費(fèi)者2。這時(shí)候再啟動(dòng)一次生產(chǎn)者,可以看到兩個(gè)消費(fèi)者都收到了消息!
因?yàn)樯a(chǎn)者把消息丟給了交換機(jī),而交換機(jī)又制定了交換機(jī)類型是fanout類型,兩個(gè)消費(fèi)者的隊(duì)列都綁定到了生產(chǎn)者的交換機(jī)上面!所以兩個(gè)消費(fèi)者都收到了同名交換機(jī)發(fā)出的消息!
應(yīng)用場(chǎng)景:
- 注冊(cè)成功后發(fā)送短信和郵件
- 消息推送
2.2.5 訂閱模型-Direct
2.2.5.1 工作模式
有選擇性的接收消息
在訂閱模式中,生產(chǎn)者發(fā)布消息,所有消費(fèi)者都可以獲取所有消息。
在路由模式中,我們將添加一個(gè)功能,我們將只能訂閱一部分消息。 例如,我們只能將重要的錯(cuò)誤消息引導(dǎo)到日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。
但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。
在Direct模型下,隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
消息的發(fā)送方在向Exchange發(fā)送消息時(shí),也必須指定消息的routing key。如下:
- P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
- X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
- C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
- C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
2.2.5.2 生產(chǎn)者
此處模擬商品的增刪改,發(fā)送消息的RoutingKey分別是:insert、update、delete
生產(chǎn)者
package cn.wangningbo._04direct;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 消息內(nèi)容
// String message = "商品新增了, id = 1001";
// String message = "商品修改了, id = 1001";
String message = "商品刪除了, id = 1001";
// 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
System.out.println(" [商品服務(wù):] Sent '" + message + "'");
// 關(guān)閉
channel.close();
connection.close();
}
}
2.2.5.3 消費(fèi)者
消費(fèi)者1:處假設(shè)消費(fèi)者1只接收兩種類型的消息:更新商品和刪除商品
package cn.wangningbo._04direct;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "direct_exchange_queue_1";
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。假設(shè)此處需要update和delete消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者1] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費(fèi)者2:此處假設(shè)消費(fèi)者2接收所有類型的消息:新增商品,更新商品和刪除商品。
package cn.wangningbo._04direct;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "direct_exchange_queue_2";
// 交換機(jī)名稱
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者2] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2.2.5.4 測(cè)試
分別發(fā)送增、刪、改的RoutingKey。結(jié)果:消費(fèi)者1只可以收到修改和刪除的消息,消費(fèi)者2都可以收到!
我們項(xiàng)目中就要用,特定站點(diǎn)靜態(tài)頁面,只能發(fā)到特定站點(diǎn)服務(wù)器
sendMsg routingKey:sitesn???rev:綁定到交換機(jī)要以sitesn來作為routingkey
2.2.6 訂閱模型-Topics
2.2.6.1 工作模式
Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routingkey的時(shí)候使用通配符!
Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: goods.insert
通配符規(guī)則:
-
:匹配一個(gè)或多個(gè)詞
- *:匹配不多不少恰好1個(gè)詞
舉例:
- audit.#:能夠匹配audit.irs.corporate 或者 audit.irs
- audit.*:只能匹配audit.irs
路由模式:
- 每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶統(tǒng)配符的routingkey。
- 生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊(duì)列。
2.2.6.2 生產(chǎn)者
package cn.wangningbo._05topics;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 交換機(jī)名稱
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 消息內(nèi)容
String message = "新增商品 : id = 1001";
// String message = "修改商品 : id = 1001";
// String message = "刪除商品 : id = 1001";
// 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
System.out.println(" [商品服務(wù):] Sent '" + message + "'");
// 關(guān)閉連接
channel.close();
connection.close();
}
}
2.2.6.3 消費(fèi)者
消費(fèi)者1:此處假設(shè)消費(fèi)者1只接收兩種類型的消息:更新商品和刪除商品
package cn.wangningbo._05topics;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "topic_exchange_queue_1";
// 交換機(jī)名稱
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。需要 update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者1] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費(fèi)者2:此處假設(shè)消費(fèi)者2接收所有類型的消息:新增商品,更新商品和刪除商品。
package cn.wangningbo._05topics;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "topic_exchange_queue_2";
// 交換機(jī)名稱
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者2] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2.2.6.4 測(cè)試
測(cè)試:先啟動(dòng)生產(chǎn)者,再啟動(dòng)兩個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者!分別發(fā)送增、刪、改的RoutingKey。
測(cè)試結(jié)果:結(jié)果:消費(fèi)者1只可以收到修改和刪除的消息,消費(fèi)者2都可以收到!消費(fèi)者使用的是通配符接收!
2.2.7 Header模式
header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對(duì))匹配 隊(duì)列。
案例: 根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種 通知類型都接收的則兩種通知都有效。
- 生產(chǎn)者
隊(duì)列與交換機(jī)綁定的代碼與之前不同,如下:
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知
String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消費(fèi)者綁定的header
//headers.put("inform_type", "sms");//匹配sms通知消費(fèi)者綁定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
- 發(fā)送郵件消費(fèi)者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交換機(jī)和隊(duì)列綁定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消費(fèi)隊(duì)列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
============實(shí)戰(zhàn)演示================
生產(chǎn)者:只發(fā)送給"inform_type", "email"
package cn.wangningbo._06header;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Hashtable;
import java.util.Map;
public class Producer {
// 交換機(jī)名稱
private final static String EXCHANGE_NAME = "headers_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為headers
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
// 消息內(nèi)容
String message = "發(fā)送了一個(gè)email消息";
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消費(fèi)者綁定的header
// headers.put("inform_type2", "sms");//匹配sms通知消費(fèi)者綁定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
// 發(fā)送消息,并綁定到指定的交換機(jī),并指定發(fā)送到指定類型
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes());
System.out.println(" [商品服務(wù):] Sent '" + message + "'");
// 關(guān)閉
channel.close();
connection.close();
}
}
消費(fèi)者1:只接收"inform_type", "email"
package cn.wangningbo._06header;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
public class Consumer {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "headers_exchange_queue_1";
// 交換機(jī)名稱
private final static String EXCHANGE_NAME = "headers_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Map<String, Object> headersEmail = new Hashtable<String, Object>();
// 指定接受的類型
headersEmail.put("inform_type", "email");
// headersEmail.put("inform_type2", "sms");
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要接收消息的類型
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headersEmail);
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者1] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費(fèi)者2:只接收"inform_type2", "sms"
package cn.wangningbo._06header;
import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
public class Consumer2 {
// 隊(duì)列名稱
private final static String QUEUE_NAME = "headers_exchange_queue_2";
// 交換機(jī)名稱
private final static String EXCHANGE_NAME = "headers_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Map<String, Object> headersSms = new Hashtable<String, Object>();
// headersSms.put("inform_type", "email");
headersSms.put("inform_type2", "sms");
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。假設(shè)此處需要update和delete消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headersSms);
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者2] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
測(cè)試:先啟動(dòng)生產(chǎn)者,再啟動(dòng)2個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者!
測(cè)試結(jié)果:只有消費(fèi)者1接收到了"inform_type", "email"
2.2.7 RPC
RPC即客戶端遠(yuǎn)程調(diào)用服務(wù)端的方法 ,使用MQ可以實(shí)現(xiàn)RPC的異步調(diào)用,基于Direct交換機(jī)實(shí)現(xiàn),流程如下:
- 客戶端即是生產(chǎn)者就是消費(fèi)者,向RPC請(qǐng)求隊(duì)列發(fā)送RPC調(diào)用消息,同時(shí)監(jiān)聽RPC響應(yīng)隊(duì)列。
- 服務(wù)端監(jiān)聽RPC請(qǐng)求隊(duì)列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果
- 服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應(yīng)隊(duì)列
- 客戶端(RPC調(diào)用方)監(jiān)聽RPC響應(yīng)隊(duì)列,接收到RPC調(diào)用結(jié)果。
2.3 持久化-解決數(shù)據(jù)安全
如何避免消息丟失?
- 消費(fèi)者的ACK機(jī)制??梢苑乐瓜M(fèi)者丟失消息。
- 但是,如果在消費(fèi)者消費(fèi)之前,MQ就宕機(jī)了,消息就沒了。
是可以將消息進(jìn)行持久化呢?
要將消息持久化,前提是:隊(duì)列、Exchange都持久化
2.3.1 交換機(jī)持久化
生產(chǎn)者那邊指定一下,第三個(gè)參數(shù)那里true就是持久化
// 聲明exchange,指定類型為topic, 是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,true);
2.3.2 隊(duì)列持久化
在消費(fèi)者那里指定一下隊(duì)列持久化
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 參數(shù)1:聲明隊(duì)列,參數(shù)2:隊(duì)列是否持久化
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
2.3.3 消息持久化
生產(chǎn)者發(fā)送消息的第三個(gè)參數(shù)
// 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品,參數(shù)3:消息持久化
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
3. SpringBoot整合RabbitMQ
3.1 步驟分析
- 創(chuàng)建項(xiàng)目
- pom導(dǎo)包
- 入口類
- application.yml配置
- 配置類
- 生產(chǎn)者
- 消費(fèi)者
- 測(cè)試
3.2 步驟實(shí)現(xiàn)
3.2.1 創(chuàng)建項(xiàng)目
創(chuàng)建一個(gè)maven項(xiàng)目
3.2.2 pom導(dǎo)包
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.0.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--spirngboot集成RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
3.2.3 入口類
package cn.wangningbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class APP {
public static void main(String[] args) {
SpringApplication.run(APP.class, args);
}
}
3.2.4 application.yml配置
server:
port: 44000
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
3.2.5 配置類
package cn.wangningbo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
/**
* 交換機(jī)配置
* ExchangeBuilder提供了fanout、direct、topic、header交換機(jī)類型的配置
*
* @return the exchange
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息隊(duì)列重啟后交換機(jī)仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//聲明隊(duì)列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
Queue queue = new Queue(QUEUE_INFORM_SMS);
return queue;
}
//聲明隊(duì)列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
return queue;
}
/**
* channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
* 綁定隊(duì)列到交換機(jī) .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
}
3.2.6 生產(chǎn)者
使用RarbbitTemplate發(fā)送消息
package cn.wangningbo;
import cn.wangningbo.config.RabbitmqConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = APP.class)
@RunWith(SpringRunner.class)
public class Producer_topics_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
// 生產(chǎn)者
@Test
public void testSendByTopics() {
// 使用for循環(huán)發(fā)送5條消息
for (int i = 0; i < 5; i++) {
// 準(zhǔn)備消息
String message = "sms email inform to user" + i;
// 參數(shù)1:交換機(jī)名稱,參數(shù)2:使用通配符類型的routingKey,參數(shù)3:要發(fā)送的消息
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message);
// 打印發(fā)送的消息
System.out.println("Send Message is:'" + message + "'");
}
}
}
3.2.7 消費(fèi)者
package cn.wangningbo.handler;
import cn.wangningbo.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費(fèi)者
*/
@Component
public class ReceiveHandler {
//監(jiān)聽email隊(duì)列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg, Message message, Channel channel) {
System.out.println(msg);
}
//監(jiān)聽sms隊(duì)列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg, Message message, Channel channel) {
System.out.println(msg);
}
}
3.2.8 測(cè)試
啟動(dòng)生產(chǎn)者即可測(cè)試!查看消息到了哪里的消費(fèi)者!