MQ之RabbitMQ

  • 更新記錄:
    • 2017.07.18 增加 MAC 安裝方式。
      [toc]
      簡書不支持 toc 目錄模式,截圖一張。


      image.png

      image.png

簡介

MQ為Message Queue,消息隊列是應用程和應用程序之間的通信方法。
RabbitMQ是一個開源的,在AMQP基礎上完整的,可復用的企業(yè)消息系統(tǒng)。
支持主流的操作系統(tǒng),Linux、Windows、MacOX等。
多種開發(fā)語言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等

AMQP

AMQP是消息隊列的一個協(xié)議。
AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發(fā)語言等條件的限制。Erlang中的實現(xiàn)有 RabbitMQ等。

RabbitMQ官網地址http://www.rabbitmq.com/

Linux安裝RabbitMQ

安裝Erlang

添加yum支持

在線安裝:

cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq

wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

sudo yum install erlang

離線安裝:

centos相關資源:
http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1-1.noarch.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang-compat-R14B-1.el6.noarch.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang_17.3-1centos6_amd64.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang-17.3-1.x86_64.rpm

windows相關資源:
http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1.exe
http://omj2w5bt7.bkt.clouddn.com/otp_win32_17.4.exe
http://omj2w5bt7.bkt.clouddn.com/otp_win64_17.3.exe


上傳esl-erlang_17.3-1~centos~6_amd64.rpm
執(zhí)行 yum install esl-erlang_17.3-1~centos~6_amd64.rpm

上傳:esl-erlang-compat-R14B-1.el6.noarch.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm


安裝RabbitMQ

上傳rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安裝:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

啟動、停止

service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart

設置開機啟動

chkconfig rabbitmq-server on

設置配置文件

cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config

開啟用戶遠程訪問

vi /etc/rabbitmq/rabbitmq.config

開啟web界面管理工具

rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

防火墻開放15672端口

/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/etc/rc.d/init.d/iptables save



MAC安裝RabbitMQ

安裝:brew install rabbitmq

配置環(huán)境變量:

RabbitMQ Config

需要在 bash 或者 zsh 的環(huán)境變量中添加:

export PATH=$PATH:/usr/local/sbin

不然下面的指令,就必須到/usr/local/sbin目錄下,才能直接執(zhí)行。
啟動:rabbitmq-server
停止:rabbitmqctl stop
狀態(tài):rabbitmqctl status

訪問:http://localhost:15672/
默認的用戶名和密碼是 guestguest



基本使用

訪問界面管理:


使用 guest/guest 進行登錄。

添加用戶

界面介紹:


創(chuàng)建虛擬主機

用戶授權虛擬主機



交換機和隊列綁定

首先創(chuàng)建好交換機和隊列




通過交換機綁定隊列或者通過隊列綁定交換機都可以。



隊列

簡單隊列


P:消息的生產者
C:消息的消費者
紅色:隊列

生產者將消息發(fā)送到隊列,消費者從隊列中獲取消息。

代碼測試:

導入下面測試代碼,運行完 Send.java 消息隊列中就有"Hello World! -- 1"字符串,在運行Recv.java,就會取出字符串。

啟動接收者Recv.java,就可以在 rabbitMQ 管理平臺看到如下的一些信息:



啟動接收者Send.java,就可以在 rabbitMQ 看到隊列中的信息:


Recv.java

public class Recv {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監(jiān)聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

Send.java

public class Send {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從連接中創(chuàng)建通道
        Channel channel = connection.createChannel();

        // 聲明(創(chuàng)建)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息內容
        String message = "Hello World! -- 1";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //關閉通道和連接
        channel.close();
        connection.close();
    }
}

ConnectionUtil.java

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務地址
        factory.setHost("192.168.18.130");
        //端口
        factory.setPort(5672);
        //設置賬號信息,用戶名、密碼、vhost
        factory.setVirtualHost("/inke-test");
        factory.setUsername("inke");
        factory.setPassword("inke");
        // 通過工程獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }

}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitmq-demo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
    </dependencies>
</project>



Work模式

一個生產者、2個消費者。
一個消息只能被一個消費者獲取。

Send.java

public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            // 消息內容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

Recv.java

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發(fā)一條消息給消費者
        // channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監(jiān)聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回確認狀態(tài)
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

Recv2.java

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻服務器只會發(fā)一條消息給消費者
        // channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監(jiān)聽隊列,手動返回完成狀態(tài)
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

測試結果:

1、消費者1和消費者2獲取到的消息內容是不同的,同一個消息只能被一個消費者獲取。
2、消費者1和消費者2獲取到的消息的數量是相同的,一個是奇數一個是偶數。
其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的消息多才對。

Work模式的“能者多勞”

消費者需要打開這個開關


測試結果:

消費者1比消費者2獲取的消息更多。



消息的確認模式

消費者從隊列中獲取消息,服務端如何知道消息已經被消費呢?

  • 模式1:自動確認
    只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都認為是消息已經成功消費。

  • 模式2:手動確認
    消費者從隊列中獲取消息后,服務器會將該消息標記為不可用狀態(tài),等待消費者的反饋,如果消費者一直沒有反饋,那么該消息將一直處于不可用狀態(tài)。

手動模式:

手動模式:



訂閱模式:

解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發(fā)送到隊列,而是發(fā)送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發(fā)送的消息,經過交換機,到達隊列,實現(xiàn),一個消息被多個消費者獲取的目的

代碼演示:

需求是:一個后臺服務提供給前臺和搜索展示數據,如何保證數據的統(tǒng)一性。

向交換機中發(fā)送消息。


注意:消息發(fā)送到沒有隊列綁定的交換機時,消息將丟失,因為,交換機沒有存儲消息的能力,消息只能存在在隊列中。

消費者1(看作是前臺系統(tǒng))


消費者2(看作是搜索系統(tǒng))


測試結果:

同一個消息被多個消費者獲取。

在管理工具中查看隊列和交換機的綁定關系:



使用訂閱模式能否實現(xiàn)商品數據的同步?

答案:可以的。

后臺系統(tǒng)就是消息的生產者。
前臺系統(tǒng)和搜索系統(tǒng)是消息的消費者。
后臺系統(tǒng)將消息發(fā)送到交換機中,前臺系統(tǒng)和搜索系統(tǒng)都創(chuàng)建自己的隊列,然后將隊列綁定到交換機,即可實現(xiàn)。

消息,新增商品、修改商品、刪除商品。

前臺系統(tǒng):修改商品、刪除商品。
搜索系統(tǒng):新增商品、修改商品、刪除商品。

所以使用訂閱模式實現(xiàn)商品數據的同步并不合理。



路由模式


如圖所示:c1 隊列綁定了 error ,c2 隊列綁定了 info、error、warning。
如果發(fā)送 error 的消息,那么 c1 和 c2 都可以收到,發(fā)送 info、warning 只有c2可以收到,可以隨意組合,更加自由。
例如:前臺系統(tǒng)只需要收取“更新“”刪除”的消息,而搜索系統(tǒng)還需要收取“新增”的消息。





測試結果:
當發(fā)送的key是 insert 的時候,只有Recv2隊列的test_queue_direct_2才能接收到消息,
當發(fā)送的key是 delete、update的時候,Recv1和Recv2都能接收到消息。



通配符模式

將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。
符號“#”匹配一個或多個詞,符號“”匹配不多不少一個詞。*
因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
topic交換機是如何工作的:


生產者(后臺系統(tǒng))Send.java

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息內容
        String message = "商品刪除,id=1003";
        channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
        System.out.println(" 后臺系統(tǒng): '" + message + "'");

        channel.close();
        connection.close();
    }
}

消費者1(前臺系統(tǒng))Recv.java

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // 同一時刻服務器只會發(fā)一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監(jiān)聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 前臺系統(tǒng): '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2(搜索系統(tǒng))Recv2.java

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");

        // 同一時刻服務器只會發(fā)一條消息給消費者
        channel.basicQos(1);

        // 定義隊列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監(jiān)聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 搜索系統(tǒng): '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

測試結果:item.# 匹配所有前綴是 item 的消息。



Spring-Rabbit

Spring項目
http://spring.io/projects

The project consists of two parts; spring-amqp is the base abstraction, and spring-rabbit is the RabbitMQ implementation.
該項目由兩部分組成: spring-amqp是基礎抽象,spring-rabbit是RabbitMQ實現(xiàn)。

代碼演示:

public class SpringMain {
    
    public static void main(final String... args) throws Exception {
        AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                "classpath:spring/rabbitmq-context.xml");
        //RabbitMQ模板
        RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
        //發(fā)送消息
        template.convertAndSend("Hello, world!");
        Thread.sleep(1000);// 休眠1秒
        ctx.destroy(); //容器銷毀
    }
}
/**
 * 消費者
 */
public class Foo {

    //具體執(zhí)行業(yè)務的方法
    public void listen(String foo) {
        System.out.println("消費者: " + foo);
    }
}

rabbitmq-context.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

    <!-- 定義RabbitMQ的連接工廠 -->
    <rabbit:connection-factory id="connectionFactory"
        host="192.168.18.130" port="5672" username="inke" password="inke" virtual-host="/inke-test" />

    <!-- 定義Rabbit模板,指定連接工廠以及定義exchange -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
        exchange="fanoutExchange" routing-key="foo.bar" /> -->

    <!-- MQ的管理,包括隊列、交換器等 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 定義隊列,自動聲明 -->
    <rabbit:queue name="myQueue" auto-declare="true" durable="false"/>

    <!-- 定義交換器,自動聲明 -->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

<!--    <rabbit:topic-exchange name="myExchange">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue" pattern="foo.*" />
        </rabbit:bindings>
    </rabbit:topic-exchange> -->

    <!-- 隊列監(jiān)聽 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
    </rabbit:listener-container>

    <bean id="foo" class="com.example.rabbitmq.spring.Foo" />

</beans>


<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
這里可以直接選擇發(fā)送到哪個隊列,而不是交換機。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" queue="myQueue"/>



持久化交換機和隊列

持久化:將交換機或隊列的數據保存到磁盤,服務器宕機或重啟之后依然存在。
非持久化:將交換機或隊列的數據保存到內存,服務器宕機或重啟之后將不存在。

非持久化的性能高于持久化。
如何選擇持久化?非持久化?-- 看需求。





參考自:
某培訓機構
CSDN
spring 官網

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器。支持消息的持久化、事務、擁塞控...
    jiangmo閱讀 10,520評論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,715評論 19 139
  • RabbitMQ筆記 本文參考資料:http://blog.csdn.net/chwshuang/article/...
    wangxiaoda閱讀 2,950評論 0 11
  • 關于消息隊列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術選型,是時...
    預流閱讀 586,709評論 51 787
  • RabbitMQ 簡介 RabbitMQ是一個在AMQP(Advanced Message Queuing Pro...
    Doris_Lee閱讀 72,945評論 2 29

友情鏈接更多精彩內容