- 更新記錄:
-
2017.07.18 增加 MAC 安裝方式。
[toc]
簡書不支持 toc 目錄模式,截圖一張。
image.png
image.png
-
簡介
MQ為Message Queue,消息隊列是應用程和應用程序之間的通信方法。
RabbitMQ是一個開源的,在AMQP基礎上完整的,可復用的企業(yè)消息系統(tǒng)。
支持主流的操作系統(tǒng),Linux、Windows、MacOX等。
多種開發(fā)語言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等
AMQP
AMQP是消息隊列的一個協(xié)議。
AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發(fā)語言等條件的限制。Erlang中的實現(xiàn)有 RabbitMQ等。
RabbitMQ官網地址http://www.rabbitmq.com/
Linux安裝RabbitMQ
安裝Erlang
添加yum支持
在線安裝:
cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
sudo yum install erlang
離線安裝:
centos相關資源:
http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1-1.noarch.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang-compat-R14B-1.el6.noarch.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang_17.3-1centos6_amd64.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang-17.3-1.x86_64.rpm
windows相關資源:
http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1.exe
http://omj2w5bt7.bkt.clouddn.com/otp_win32_17.4.exe
http://omj2w5bt7.bkt.clouddn.com/otp_win64_17.3.exe
上傳esl-erlang_17.3-1~centos~6_amd64.rpm
執(zhí)行 yum install esl-erlang_17.3-1~centos~6_amd64.rpm
上傳:esl-erlang-compat-R14B-1.el6.noarch.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm
安裝RabbitMQ
上傳rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安裝:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
啟動、停止
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
設置開機啟動
chkconfig rabbitmq-server on
設置配置文件
cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
開啟用戶遠程訪問
vi /etc/rabbitmq/rabbitmq.config

開啟web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
防火墻開放15672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/etc/rc.d/init.d/iptables save
MAC安裝RabbitMQ
安裝:brew install rabbitmq
配置環(huán)境變量:
RabbitMQ Config
需要在 bash 或者 zsh 的環(huán)境變量中添加:
export PATH=$PATH:/usr/local/sbin
不然下面的指令,就必須到/usr/local/sbin目錄下,才能直接執(zhí)行。
啟動:rabbitmq-server
停止:rabbitmqctl stop
狀態(tài):rabbitmqctl status
訪問:http://localhost:15672/
默認的用戶名和密碼是 guest 和 guest
基本使用
訪問界面管理:

使用 guest/guest 進行登錄。
添加用戶

界面介紹:


創(chuàng)建虛擬主機

用戶授權虛擬主機

交換機和隊列綁定
首先創(chuàng)建好交換機和隊列



通過交換機綁定隊列或者通過隊列綁定交換機都可以。



隊列
簡單隊列

P:消息的生產者
C:消息的消費者
紅色:隊列
生產者將消息發(fā)送到隊列,消費者從隊列中獲取消息。
代碼測試:
導入下面測試代碼,運行完 Send.java 消息隊列中就有"Hello World! -- 1"字符串,在運行Recv.java,就會取出字符串。
啟動接收者Recv.java,就可以在 rabbitMQ 管理平臺看到如下的一些信息:



啟動接收者Send.java,就可以在 rabbitMQ 看到隊列中的信息:


Recv.java
public class Recv {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監(jiān)聽隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
Send.java
public class Send {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 從連接中創(chuàng)建通道
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息內容
String message = "Hello World! -- 1";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關閉通道和連接
channel.close();
connection.close();
}
}
ConnectionUtil.java
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置服務地址
factory.setHost("192.168.18.130");
//端口
factory.setPort(5672);
//設置賬號信息,用戶名、密碼、vhost
factory.setVirtualHost("/inke-test");
factory.setUsername("inke");
factory.setPassword("inke");
// 通過工程獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
</dependencies>
</project>
Work模式

一個生產者、2個消費者。
一個消息只能被一個消費者獲取。
Send.java
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
// 消息內容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
Recv.java
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻服務器只會發(fā)一條消息給消費者
// channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監(jiān)聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 返回確認狀態(tài)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Recv2.java
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻服務器只會發(fā)一條消息給消費者
// channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監(jiān)聽隊列,手動返回完成狀態(tài)
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
測試結果:
1、消費者1和消費者2獲取到的消息內容是不同的,同一個消息只能被一個消費者獲取。
2、消費者1和消費者2獲取到的消息的數量是相同的,一個是奇數一個是偶數。
其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的消息多才對。
Work模式的“能者多勞”
消費者需要打開這個開關

測試結果:
消費者1比消費者2獲取的消息更多。
消息的確認模式
消費者從隊列中獲取消息,服務端如何知道消息已經被消費呢?
模式1:自動確認
只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都認為是消息已經成功消費。模式2:手動確認
消費者從隊列中獲取消息后,服務器會將該消息標記為不可用狀態(tài),等待消費者的反饋,如果消費者一直沒有反饋,那么該消息將一直處于不可用狀態(tài)。
手動模式:

手動模式:
訂閱模式:

解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發(fā)送到隊列,而是發(fā)送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發(fā)送的消息,經過交換機,到達隊列,實現(xiàn),一個消息被多個消費者獲取的目的

代碼演示:
需求是:一個后臺服務提供給前臺和搜索展示數據,如何保證數據的統(tǒng)一性。
向交換機中發(fā)送消息。

注意:消息發(fā)送到沒有隊列綁定的交換機時,消息將丟失,因為,交換機沒有存儲消息的能力,消息只能存在在隊列中。
消費者1(看作是前臺系統(tǒng))

消費者2(看作是搜索系統(tǒng))

測試結果:
同一個消息被多個消費者獲取。
在管理工具中查看隊列和交換機的綁定關系:


使用訂閱模式能否實現(xiàn)商品數據的同步?
答案:可以的。
后臺系統(tǒng)就是消息的生產者。
前臺系統(tǒng)和搜索系統(tǒng)是消息的消費者。
后臺系統(tǒng)將消息發(fā)送到交換機中,前臺系統(tǒng)和搜索系統(tǒng)都創(chuàng)建自己的隊列,然后將隊列綁定到交換機,即可實現(xiàn)。
消息,新增商品、修改商品、刪除商品。
前臺系統(tǒng):修改商品、刪除商品。
搜索系統(tǒng):新增商品、修改商品、刪除商品。
所以使用訂閱模式實現(xiàn)商品數據的同步并不合理。
路由模式


如圖所示:c1 隊列綁定了 error ,c2 隊列綁定了 info、error、warning。
如果發(fā)送 error 的消息,那么 c1 和 c2 都可以收到,發(fā)送 info、warning 只有c2可以收到,可以隨意組合,更加自由。
例如:前臺系統(tǒng)只需要收取“更新“”刪除”的消息,而搜索系統(tǒng)還需要收取“新增”的消息。





測試結果:
當發(fā)送的key是 insert 的時候,只有Recv2隊列的test_queue_direct_2才能接收到消息,
當發(fā)送的key是 delete、update的時候,Recv1和Recv2都能接收到消息。
通配符模式
將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。
符號“#”匹配一個或多個詞,符號“”匹配不多不少一個詞。*
因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
topic交換機是如何工作的:


生產者(后臺系統(tǒng))Send.java
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息內容
String message = "商品刪除,id=1003";
channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
System.out.println(" 后臺系統(tǒng): '" + message + "'");
channel.close();
connection.close();
}
}
消費者1(前臺系統(tǒng))Recv.java
public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 同一時刻服務器只會發(fā)一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監(jiān)聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前臺系統(tǒng): '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者2(搜索系統(tǒng))Recv2.java
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
// 同一時刻服務器只會發(fā)一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監(jiān)聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 搜索系統(tǒng): '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

測試結果:item.# 匹配所有前綴是 item 的消息。
Spring-Rabbit
Spring項目
http://spring.io/projects

The project consists of two parts; spring-amqp is the base abstraction, and spring-rabbit is the RabbitMQ implementation.
該項目由兩部分組成: spring-amqp是基礎抽象,spring-rabbit是RabbitMQ實現(xiàn)。
代碼演示:
public class SpringMain {
public static void main(final String... args) throws Exception {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
"classpath:spring/rabbitmq-context.xml");
//RabbitMQ模板
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
//發(fā)送消息
template.convertAndSend("Hello, world!");
Thread.sleep(1000);// 休眠1秒
ctx.destroy(); //容器銷毀
}
}
/**
* 消費者
*/
public class Foo {
//具體執(zhí)行業(yè)務的方法
public void listen(String foo) {
System.out.println("消費者: " + foo);
}
}
rabbitmq-context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定義RabbitMQ的連接工廠 -->
<rabbit:connection-factory id="connectionFactory"
host="192.168.18.130" port="5672" username="inke" password="inke" virtual-host="/inke-test" />
<!-- 定義Rabbit模板,指定連接工廠以及定義exchange -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
<!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="fanoutExchange" routing-key="foo.bar" /> -->
<!-- MQ的管理,包括隊列、交換器等 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 定義隊列,自動聲明 -->
<rabbit:queue name="myQueue" auto-declare="true" durable="false"/>
<!-- 定義交換器,自動聲明 -->
<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" durable="false">
<rabbit:bindings>
<rabbit:binding queue="myQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- <rabbit:topic-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="foo.*" />
</rabbit:bindings>
</rabbit:topic-exchange> -->
<!-- 隊列監(jiān)聽 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
<bean id="foo" class="com.example.rabbitmq.spring.Foo" />
</beans>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
這里可以直接選擇發(fā)送到哪個隊列,而不是交換機。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" queue="myQueue"/>
持久化交換機和隊列
持久化:將交換機或隊列的數據保存到磁盤,服務器宕機或重啟之后依然存在。
非持久化:將交換機或隊列的數據保存到內存,服務器宕機或重啟之后將不存在。
非持久化的性能高于持久化。
如何選擇持久化?非持久化?-- 看需求。

參考自:
某培訓機構
CSDN
spring 官網

