RabbitMQ

1. RabbitMQ的介紹和安裝

1.1 介紹

1.1.1 是什么?

MQ全稱為Message Queue,即消息隊(duì)列. 它也是一個(gè)隊(duì)列,遵循FIFO原則.(先進(jìn)先出)

RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開 發(fā)中應(yīng)用非常廣泛。

RabbitMQ官網(wǎng)地址:https://www.rabbitmq.com/

RabbitMQ是基于AMQP協(xié)議實(shí)現(xiàn)一種MQ

1.1.2 使用場(chǎng)景

開發(fā)中消息隊(duì)列通常有如下應(yīng)用場(chǎng)景:

  1. 提高系統(tǒng)響應(yīng)速度

    任務(wù)異步處理。 將不需要同步處理的并且耗時(shí)長的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。提高了應(yīng)用程序的響應(yīng)時(shí)間

  2. 提高系統(tǒng)穩(wěn)定性

    系統(tǒng)掛了關(guān)系,操作內(nèi)容放到消息隊(duì)列

  3. 異步化

  4. 解耦

    應(yīng)用程序解耦合MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。

  5. 排序保證 FIFO(比如應(yīng)用于秒殺場(chǎng)景)

  6. 消除峰值

1.1.3 市面上常見消息隊(duì)列

ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。

ActiveMQ是老牌,不太好用,有點(diǎn)過時(shí)了!
RabbitMQ,Kafka,Redis現(xiàn)在這三個(gè)比較好用!

為什么使用RabbitMQ呢?

  1. 使得簡單,功能強(qiáng)大。
  2. 基于AMQP協(xié)議。
  3. 社區(qū)活躍,文檔完善。
  4. 高并發(fā)性能好,這主要得益于Erlang語言。
  5. Spring Boot默認(rèn)已集成RabbitMQ

1.1.4 相關(guān)介紹

  1. AMQP
    • (1)概念:AMQP,即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。Erlang中的實(shí)現(xiàn)有RabbitMQ等。
    • (2)官網(wǎng)地址:http://www.amqp.org/
    • (3)總結(jié):AMQP是一套公開的消息隊(duì)列協(xié)議,最早在2003年被提出,它旨在從協(xié)議層定義消息通信數(shù)據(jù)的標(biāo)準(zhǔn)格式,為的就是解決MQ市場(chǎng)上協(xié)議不統(tǒng)一的問題,RabbitMQ就是遵循AMQP標(biāo)準(zhǔn)協(xié)議開發(fā)的MQ服務(wù)
  2. JMS
    • (1)概念:JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
    • (2)官網(wǎng):https://www.oracle.com/technetwork/java/jms/index.html
    • (3)總結(jié): JMS是java提供的一套消息服務(wù)API標(biāo)準(zhǔn),其目的是為所有的java應(yīng)用程序提供統(tǒng)一的消息通信的標(biāo)準(zhǔn),類似java的jdbc,只要遵循jms標(biāo)準(zhǔn)的應(yīng)用程序之間都可以進(jìn)行消息通信。它和AMQP有什么不同,jms是java語言專屬的消息服務(wù)標(biāo)準(zhǔn),它是在api層定義標(biāo)準(zhǔn),并且只能用于java應(yīng)用;而AMQP是在協(xié)議層定義的標(biāo)準(zhǔn),是跨語言的 。

1.2 安裝

1.2.1 工作原理

組成部分說明如下:

  1. Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue。
    • (1)Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對(duì)消息進(jìn)行過慮。
    • (2)Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)方。
  2. Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送到MQ。
  3. Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。

消息發(fā)送流程:

  1. 生產(chǎn)者和Broker建立TCP連接。
  2. 生產(chǎn)者和Broker建立通道。
  3. 生產(chǎn)者通過通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。
  4. Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)

消息接收流程:

  1. 消費(fèi)者和Broker建立TCP連接
  2. 消費(fèi)者和Broker建立通道
  3. 消費(fèi)者監(jiān)聽指定的Queue(隊(duì)列)
  4. 當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。
  5. 消費(fèi)者接收到消息。

1.2.2 安裝

RabbitMQ由Erlang語言開發(fā),Erlang語言用于并發(fā)及分布式系統(tǒng)的開發(fā),在電信領(lǐng)域應(yīng)用廣泛,OTP(Open Telecom Platform)作為Erlang語言的一部分,包含了很多基于Erlang開發(fā)的中間件及工具庫,安裝RabbitMQ需要安裝Erlang/OTP,并保持版本匹配

RabbitMQ的下載地址:http://www.rabbitmq.com/download.html

我這里使用Erlang/OTP 20.3版本和RabbitMQ3.7.4版本。

  1. 安裝erlang

    下載地址:https://www.erlang.org/downloads

    我這里下載了OTP 20.3 Windows 64位二進(jìn)制文??件 (99142192)-->otp_win64_20.3.exe.運(yùn)行此文件進(jìn)行安裝。

    安裝完以后配置環(huán)境變量!

  2. 安裝RabbitMQ

    下載地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.4

    下載rabbitmq-server-3.7.4.exe

  3. 啟動(dòng)

    安裝成功后會(huì)自動(dòng)創(chuàng)建RabbitMQ服務(wù)并且啟動(dòng)。

    • (啟動(dòng)方式1)從開始菜單啟動(dòng)RabbitMQ完成在開始菜單找到RabbitMQ的菜單?;蛘呷ト蝿?wù)管理器的服務(wù)那里手動(dòng)搞一下!

    • (啟動(dòng)方式2)進(jìn)入安裝目錄下sbin目錄手動(dòng)啟動(dòng):rabbitmq-service.bat install 安裝服務(wù) rabbitmq-service.bat stop 停止服務(wù) rabbitmq-service.bat start 啟動(dòng)服務(wù)

  4. 安裝管理插件

    安裝rabbitMQ的管理插件,方便在瀏覽器端管理RabbitMQ

    管理員身份運(yùn)行 rabbitmq-plugins.bat enable rabbitmq_management

    重新啟動(dòng)RabbitMQ服務(wù)

  5. 瀏覽器訪問測(cè)試

    進(jìn)入瀏覽器,訪問http://localhost:15672

    初始賬號(hào)和密碼:guest/guest

注意:

  1. 安裝erlang和rabbitMQ以管理員身份運(yùn)行。
  2. 當(dāng)卸載重新安裝時(shí)會(huì)出現(xiàn)RabbitMQ服務(wù)注冊(cè)失敗,此時(shí)需要進(jìn)入注冊(cè)表清理erlang。搜索RabbitMQ、ErlSrv,將對(duì)應(yīng)的項(xiàng)全部刪除。

2. Java操作RabbitMQ

2.1 RabbitMQ的消息模型

RabbitMQ提供了6種消息模型,但是第6種其實(shí)是RPC,并不是MQ!那么也就剩下5種。

3、4、5這三種都屬于訂閱模型,只不過進(jìn)行路由的方式不同。

消息模型如下圖所示:
[圖片上傳失敗...(image-b28b51-1569803506043)]

2.2 搭建環(huán)境

  1. 創(chuàng)建maven工程!

    我這里取名RabbitMQTest

  2. 導(dǎo)包

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <!--和springboot2.0.5對(duì)應(yīng)-->
            <version>5.4.1</version>
        </dependency>
  1. 工具類
package cn.wangningbo.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    /**
     * 建立與RabbitMQ的連接
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定義連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置服務(wù)地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //設(shè)置賬號(hào)信息,用戶名、密碼、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通過工程獲取連接
        Connection connection = factory.newConnection();
        return connection;
    }
}

注意:我下面為了測(cè)試方便,就沒有處理異常都拋出去了!

2.2.1 "Hello World!"------基本消息模型

2.2.1.1 生產(chǎn)者

package cn.wangningbo._01helloworld;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    //隊(duì)列名稱
    private static final String QUEUE = "helloworld";

    //使用main方法進(jìn)行測(cè)試
    public static void main(String[] args) throws Exception {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
        Channel channel = connection.createChannel();
        /**
         * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
         * param1:隊(duì)列名稱
         * param2:是否持久化
         * param3:隊(duì)列是否獨(dú)占此連接
         * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
         * param5:隊(duì)列參數(shù)
         */
        channel.queueDeclare(QUEUE, true, false, false, null);
        // 準(zhǔn)備一條消息 // 字符串+當(dāng)前系統(tǒng)時(shí)間的時(shí)間戳
        String message = "helloworld小明" + System.currentTimeMillis();
        /**
         * 消息發(fā)布方法
         * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
         * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
         * param3:消息包含的屬性
         * param4:消息體
         */
        /**
         * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯
         示綁定或解除綁定
         * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
         */
        channel.basicPublish("", QUEUE, null, message.getBytes());
        System.out.println("Send Message is:'" + message + "'");
    }
}

2.2.1.2 消費(fèi)者

package cn.wangningbo._01helloworld;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    //隊(duì)列名稱 //消費(fèi)哪個(gè)生產(chǎn)者隊(duì)列的消息
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws Exception {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
        Channel channel = connection.createChannel();
        //聲明隊(duì)列
        channel.queueDeclare(QUEUE, true, false, false, null);
        //定義消費(fèi)方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 消費(fèi)者接收消息調(diào)用此方法
             * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
             * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
            (收到消息失敗后是否需要重新發(fā)送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                //交換機(jī)
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息內(nèi)容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);
            }
        };
        /**
         * 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
         * 參數(shù)明細(xì)
         * 1、隊(duì)列名稱
         * 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
         為false則需要手動(dòng)回復(fù)
         * 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
         */
        channel.basicConsume(QUEUE, true, consumer);
    }
}

2.2.1.3 簡單測(cè)試

工作流程簡述:

  1. 先啟動(dòng)生產(chǎn)者,這時(shí)候會(huì)有一個(gè)消息被丟到默認(rèn)的交換機(jī)里面,這個(gè)交換機(jī)會(huì)把消息丟到指定的隊(duì)列里面,等待被消費(fèi)者消費(fèi)掉!
  2. 再啟動(dòng)消費(fèi)者,這時(shí)候發(fā)現(xiàn)消費(fèi)者消費(fèi)掉了剛才生產(chǎn)者發(fā)送的那條消息,由于消費(fèi)者設(shè)置了自動(dòng)回復(fù),所以隊(duì)列里的那個(gè)消息就被消費(fèi)掉了,隊(duì)里的消息被刪除了!但是程序沒有停止,一直在監(jiān)聽隊(duì)列中是否有新的消息。一旦有新的消息進(jìn)入隊(duì)列,就會(huì)立即打印!

2.2.3.4 消息確認(rèn)機(jī)制(ACK)

通過剛才的案例可以看出,消息一旦被消費(fèi)者接收,隊(duì)列中的消息就會(huì)被刪除。

那么問題來了:RabbitMQ怎么知道消息被接收了呢?

如果消費(fèi)者領(lǐng)取消息后,還沒執(zhí)行操作就掛掉了呢?或者拋出了異常?消息消費(fèi)失敗,但是RabbitMQ無從得知,這樣消息就丟失了!

因此,RabbitMQ有一個(gè)ACK機(jī)制。當(dāng)消費(fèi)者獲取消息后,會(huì)向RabbitMQ發(fā)送回執(zhí)ACK,告知消息已經(jīng)被接收。不過這種回執(zhí)ACK分兩種情況:

  1. 自動(dòng)ACK:消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送ACK
  2. 手動(dòng)ACK:消息接收后,不會(huì)發(fā)送ACK,需要手動(dòng)調(diào)用

只有發(fā)送了回執(zhí)ACK,隊(duì)列的消息就會(huì)被刪除掉!

但是哪種更好呢?

這還需要看消息的重要性!

  1. 如果消息不太重要,丟失也沒有影響,那么自動(dòng)ACK會(huì)比較方便
  2. 如果消息非常重要,不容丟失。那么最好在消費(fèi)完成后手動(dòng)ACK,否則接收消息后就自動(dòng)ACK,RabbitMQ就會(huì)把消息從隊(duì)列中刪除。如果此時(shí)消費(fèi)者宕機(jī),那么消息就丟失了。

我上面的測(cè)試都是自動(dòng)ACK的,如果要手動(dòng)ACK,需要改動(dòng)消費(fèi)者的代碼!

2.2.1.4 自動(dòng)確認(rèn)存在的問題

修改消費(fèi)者里面的這個(gè)方法,讓它1/0發(fā)生異常!

            /**
             * 消費(fèi)者接收消息調(diào)用此方法
             * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
             * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
            (收到消息失敗后是否需要重新發(fā)送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                int i = 1 / 0;
                //交換機(jī)
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息內(nèi)容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);
            }
        };

生產(chǎn)者不做任何修改,直接運(yùn)行,消息發(fā)送成功!

運(yùn)行消費(fèi)者,程序拋出異常。但是消息依然被消費(fèi)!

這時(shí)候就存在問題,這個(gè)消息對(duì)我很重要,不允許丟失,這時(shí)候就需要消費(fèi)者沒報(bào)錯(cuò)的時(shí)候才回執(zhí)ACK!這就需要手動(dòng)回執(zhí)了!

2.2.1.5 手動(dòng)確認(rèn)實(shí)現(xiàn)

修改消費(fèi)者,把自動(dòng)改成手動(dòng)(去掉之前制造的異常)

        /**
         * 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
         * 參數(shù)明細(xì)
         * 1、隊(duì)列名稱
         * 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
         為false則需要手動(dòng)回復(fù)
         * 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
         */
        channel.basicConsume(QUEUE, false, consumer);

生產(chǎn)者不變,再次運(yùn)行!

運(yùn)行消費(fèi)者!消費(fèi)者沒有異常,并且也獲取了消息!但是通過可視化界面看到隊(duì)列中依然存在那個(gè)消息!

這是因?yàn)殡m然我們?cè)O(shè)置了手動(dòng)ACK,但是代碼中并沒有進(jìn)行消息確認(rèn)!所以消息并未被真正消費(fèi)掉。

當(dāng)我們關(guān)掉這個(gè)消費(fèi)者,消息的狀態(tài)再次稱為Ready!

手動(dòng)ACK設(shè)置:channel.basicAck(envelope.getDeliveryTag(), false);當(dāng)消費(fèi)完以后才發(fā)送回執(zhí)ACK!

        //定義消費(fèi)方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 消費(fèi)者接收消息調(diào)用此方法
             * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
             * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
            (收到消息失敗后是否需要重新發(fā)送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
//                int i = 1 / 0;
                //交換機(jī)
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息內(nèi)容
                String msg = new String(body, "utf8");
                System.out.println("receive message.." + msg);
                //手動(dòng)進(jìn)行ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

2.2.1.6 小結(jié)

  1. 發(fā)送端操作流程
    • 1)創(chuàng)建連接
    • 2)創(chuàng)建通道
    • 3)聲明隊(duì)列
    • 4)發(fā)送消息
  2. 接收端操作流程
    • 1)創(chuàng)建連接
    • 2)創(chuàng)建通道
    • 3)聲明隊(duì)列
    • 4)監(jiān)聽隊(duì)列
    • 5)接收消息
    • 6)ack回復(fù)

2.2.2 Work queues

work queues與入門程序相比,多了一個(gè)消費(fèi)端,兩個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。

應(yīng)用場(chǎng)景:對(duì)于任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。

測(cè)試:

  1. 一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
  2. 生產(chǎn)者發(fā)送多個(gè)消息!

結(jié)果:

  1. 一條消息只會(huì)被一個(gè)消費(fèi)者接收;
  2. rabbit采用輪詢的方式將消息是平均發(fā)送給消費(fèi)者的;
  3. 消費(fèi)者在處理完某條消息后,才會(huì)收到下一條消息。

工作隊(duì)列,又稱任務(wù)隊(duì)列。主要思想就是避免執(zhí)行資源密集型任務(wù)時(shí),必須等待它執(zhí)行完成。相反我們稍后完成任務(wù),我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。 在后臺(tái)運(yùn)行的工作進(jìn)程將獲取任務(wù)并最終執(zhí)行作業(yè)。當(dāng)你運(yùn)行許多工人時(shí),任務(wù)將在他們之間共享,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。

模擬流程:

  • P:生產(chǎn)者:任務(wù)的發(fā)布者
  • C1:消費(fèi)者,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較快
  • C2:消費(fèi)者2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度慢

2.2.2.1 生產(chǎn)者

對(duì)生產(chǎn)者做一點(diǎn)的小小加工,讓它發(fā)送50個(gè)消息到隊(duì)列,而且是發(fā)送的別太快!

加工部分

        //生產(chǎn)者發(fā)送50個(gè)消息
        for (int i = 1; i <= 50; i++) {
            Thread.sleep(i * 2);
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println(i + "Send Message is:'" + message + "'");
        }

完整生產(chǎn)者

package cn.wangningbo._02workqueues;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    //隊(duì)列名稱
    private static final String QUEUE = "workqueues";

    //使用main方法進(jìn)行測(cè)試
    public static void main(String[] args) throws Exception {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
        Channel channel = connection.createChannel();
        /**
         * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
         * param1:隊(duì)列名稱
         * param2:是否持久化
         * param3:隊(duì)列是否獨(dú)占此連接
         * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
         * param5:隊(duì)列參數(shù)
         */
        channel.queueDeclare(QUEUE, true, false, false, null);
        // 準(zhǔn)備一條消息 // 字符串+當(dāng)前系統(tǒng)時(shí)間的時(shí)間戳
        String message = "helloworld小明" + System.currentTimeMillis();
        /**
         * 消息發(fā)布方法
         * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
         * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
         * param3:消息包含的屬性
         * param4:消息體
         */
        /**
         * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯
         示綁定或解除綁定
         * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
         */
        //生產(chǎn)者發(fā)送50個(gè)消息
        for (int i = 1; i <= 50; i++) {
            Thread.sleep(i * 2);
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println(i + "Send Message is:'" + message + "'");
        }
    }
}

2.2.2.2 消費(fèi)者1

讓消費(fèi)者1處理的能力強(qiáng)

消費(fèi)者1不需要做什么改變!正常消費(fèi)即可!只是多個(gè)計(jì)數(shù)器統(tǒng)計(jì)消費(fèi)了多少條消息!

package cn.wangningbo._02workqueues;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    //隊(duì)列名稱 //消費(fèi)哪個(gè)生產(chǎn)者隊(duì)列的消息
    private static final String QUEUE = "workqueues";

    public static void main(String[] args) throws Exception {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
        Channel channel = connection.createChannel();
        //聲明隊(duì)列
        channel.queueDeclare(QUEUE, true, false, false, null);
        //定義消費(fèi)方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //計(jì)數(shù)器
            int index = 1;
            /**
             * 消費(fèi)者接收消息調(diào)用此方法
             * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
             * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
            (收到消息失敗后是否需要重新發(fā)送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                //交換機(jī)
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息內(nèi)容
                String msg = new String(body, "utf8");
                //index用來統(tǒng)計(jì)消費(fèi)了多少條消息
                System.out.println(index+++"receive message.." + msg);
                //手動(dòng)進(jìn)行ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        /**
         * 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
         * 參數(shù)明細(xì)
         * 1、隊(duì)列名稱
         * 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
         為false則需要手動(dòng)回復(fù)
         * 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
         */
        channel.basicConsume(QUEUE, false, consumer);
    }
}

2.2.2.3 消費(fèi)者2

讓消費(fèi)者2處理的能力弱!這里模擬消費(fèi)耗時(shí)!

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

消費(fèi)者2完整代碼

package cn.wangningbo._02workqueues;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    //隊(duì)列名稱 //消費(fèi)哪個(gè)生產(chǎn)者隊(duì)列的消息
    private static final String QUEUE = "workqueues";

    public static void main(String[] args) throws Exception {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
        Channel channel = connection.createChannel();
        //聲明隊(duì)列
        channel.queueDeclare(QUEUE, true, false, false, null);
        //定義消費(fèi)方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //計(jì)數(shù)器
            int index = 1;
            /**
             * 消費(fèi)者接收消息調(diào)用此方法
             * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
             * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
            (收到消息失敗后是否需要重新發(fā)送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //交換機(jī)
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息內(nèi)容
                String msg = new String(body, "utf8");
                //index用來統(tǒng)計(jì)消費(fèi)了多少條消息
                System.out.println(index+++"receive message.." + msg);
                //手動(dòng)進(jìn)行ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        /**
         * 監(jiān)聽隊(duì)列String queue, boolean autoAck,Consumer callback
         * 參數(shù)明細(xì)
         * 1、隊(duì)列名稱
         * 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置
         為false則需要手動(dòng)回復(fù)
         * 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法
         */
        channel.basicConsume(QUEUE, false, consumer);
    }
}

2.2.2.4 測(cè)試

先啟動(dòng)兩個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者!

測(cè)試結(jié)果:

  1. 可以發(fā)現(xiàn)兩個(gè)消費(fèi)者各自消費(fèi)了25條消息,而且各不相同,這就實(shí)現(xiàn)了任務(wù)分發(fā)!
  2. 而消費(fèi)者1能力比較強(qiáng),很快就處理完了25條消息,然后處于空閑的監(jiān)聽狀態(tài)。
  3. 而消費(fèi)者2的能力比較弱,一直在緩慢的處理,一條一條的,直到25條處理完畢,然后處于空閑監(jiān)聽狀態(tài)!

2.2.2.5 能者多勞

剛才產(chǎn)生的問題?

  1. 消費(fèi)者1比消費(fèi)者2的效率要高,一次任務(wù)的耗時(shí)較短
  2. 然而兩人最終消費(fèi)的消息數(shù)量是一樣的
  3. 消費(fèi)者1大量時(shí)間處于空閑狀態(tài),消費(fèi)者2一直忙碌

現(xiàn)在的狀態(tài)屬于是把任務(wù)平均分配,正確的做法應(yīng)該是消費(fèi)越快的人,消費(fèi)的越多。

我希望是能者多勞,效率高的多消費(fèi)一些,效率低的少消費(fèi)一些,這樣把所有消息消費(fèi)完所需要的時(shí)間最短!

實(shí)現(xiàn)方法:
使用basicQos方法和prefetchCount = 1設(shè)置。 這告訴RabbitMQ一次不要向工作人員發(fā)送多于一條消息。 或者換句話說,不要向工作人員發(fā)送新消息,直到它處理并確認(rèn)了前一個(gè)消息。相反,它會(huì)將其分派給不是仍然忙碌的下一個(gè)工作人員。

在兩個(gè)消費(fèi)者那里分別設(shè)置

//聲明一次發(fā)1條消息,等到處理完了再發(fā)
            channel.basicQos(1);

2.2.2.6 再次測(cè)試

生產(chǎn)者發(fā)送了50條消息,消費(fèi)者1能力較強(qiáng),消費(fèi)了32條消息,消費(fèi)者2能力較弱,消費(fèi)了18條!

實(shí)現(xiàn)了能者多勞!

2.2.3 訂閱模型分類

2.2.3.1 概述

在之前的模式中,我們創(chuàng)建了一個(gè)工作隊(duì)列。 工作隊(duì)列背后的假設(shè)是:每個(gè)任務(wù)只被傳遞給一個(gè)工作人員。 在這一部分,我們將做一些完全不同的事情 - 我們將會(huì)傳遞一個(gè)信息給多個(gè)消費(fèi)者。 這種模式被稱為“發(fā)布/訂閱”。

  1. 1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
  2. 每一個(gè)消費(fèi)者都有自己的一個(gè)隊(duì)列
  3. 生產(chǎn)者沒有將消息直接發(fā)送到隊(duì)列,而是發(fā)送到了交換機(jī)
  4. 每個(gè)隊(duì)列都要綁定到交換機(jī)
  5. 生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者獲取的目的

X(Exchanges):交換機(jī)一方面:接收生產(chǎn)者發(fā)送的消息。另一方面:知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。

2.2.3.2 分類

Exchange類型有以下幾種:

  1. Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列 all
  2. Direct:定向,把消息交給符合指定routing key 的隊(duì)列 一堆或一個(gè)
  3. Topic:通配符,把消息交給符合routing pattern(路由模式)的隊(duì)列 一堆或者一個(gè)

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

2.2.4 訂閱模型-FANOUT

2.2.4.1 工作模式

發(fā)布訂閱模式:

在廣播模式下,消息發(fā)送流程是這樣的:

  • 1) 可以有多個(gè)消費(fèi)者
  • 2) 每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
  • 3) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
  • 4) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定。
  • 5) 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
  • 6) 隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)

2.2.4.2 生產(chǎn)者

與上面那種非訂閱模式有兩種變化:

  1. 聲明Exchange,不再聲明Queue
  2. 發(fā)送消息到Exchange,不再發(fā)送到Queue
package cn.wangningbo._03fanout;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明exchange的類型,指定類型為fanout
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        // 消息內(nèi)容
        String message = "Hello everyone";
        // 發(fā)布消息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [生產(chǎn)者] Sent '" + message + "'");
        //關(guān)閉連接
        channel.close();
        connection.close();
    }
}

2.2.4.3 消費(fèi)者

消費(fèi)者1

package cn.wangningbo._03fanout;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "fanout_exchange_queue_1";
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者1] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,自動(dòng)返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費(fèi)者2

package cn.wangningbo._03fanout;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "fanout_exchange_queue_2";
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者2] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,手動(dòng)返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.2.4.4 測(cè)試

先啟動(dòng)一下生產(chǎn)者,來創(chuàng)建交換機(jī)。再啟動(dòng)消費(fèi)者1和消費(fèi)者2。這時(shí)候再啟動(dòng)一次生產(chǎn)者,可以看到兩個(gè)消費(fèi)者都收到了消息!

因?yàn)樯a(chǎn)者把消息丟給了交換機(jī),而交換機(jī)又制定了交換機(jī)類型是fanout類型,兩個(gè)消費(fèi)者的隊(duì)列都綁定到了生產(chǎn)者的交換機(jī)上面!所以兩個(gè)消費(fèi)者都收到了同名交換機(jī)發(fā)出的消息!

應(yīng)用場(chǎng)景:

  1. 注冊(cè)成功后發(fā)送短信和郵件
  2. 消息推送

2.2.5 訂閱模型-Direct

2.2.5.1 工作模式

有選擇性的接收消息

在訂閱模式中,生產(chǎn)者發(fā)布消息,所有消費(fèi)者都可以獲取所有消息。

在路由模式中,我們將添加一個(gè)功能,我們將只能訂閱一部分消息。 例如,我們只能將重要的錯(cuò)誤消息引導(dǎo)到日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。

但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。

在Direct模型下,隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)

消息的發(fā)送方在向Exchange發(fā)送消息時(shí),也必須指定消息的routing key。如下:

  1. P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
  2. X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
  3. C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
  4. C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息

2.2.5.2 生產(chǎn)者

此處模擬商品的增刪改,發(fā)送消息的RoutingKey分別是:insert、update、delete

生產(chǎn)者

package cn.wangningbo._04direct;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明exchange,指定類型為direct
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 消息內(nèi)容
//        String message = "商品新增了, id = 1001";
//        String message = "商品修改了, id = 1001";
        String message = "商品刪除了, id = 1001";
        // 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
        System.out.println(" [商品服務(wù):] Sent '" + message + "'");
        // 關(guān)閉
        channel.close();
        connection.close();
    }
}

2.2.5.3 消費(fèi)者

消費(fèi)者1:處假設(shè)消費(fèi)者1只接收兩種類型的消息:更新商品和刪除商品

package cn.wangningbo._04direct;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "direct_exchange_queue_1";
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。假設(shè)此處需要update和delete消息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者1] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,自動(dòng)ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費(fèi)者2:此處假設(shè)消費(fèi)者2接收所有類型的消息:新增商品,更新商品和刪除商品。

package cn.wangningbo._04direct;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "direct_exchange_queue_2";
    // 交換機(jī)名稱
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者2] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,自動(dòng)ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.2.5.4 測(cè)試

分別發(fā)送增、刪、改的RoutingKey。結(jié)果:消費(fèi)者1只可以收到修改和刪除的消息,消費(fèi)者2都可以收到!

我們項(xiàng)目中就要用,特定站點(diǎn)靜態(tài)頁面,只能發(fā)到特定站點(diǎn)服務(wù)器

sendMsg routingKey:sitesn???rev:綁定到交換機(jī)要以sitesn來作為routingkey

2.2.6 訂閱模型-Topics

2.2.6.1 工作模式

Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routingkey的時(shí)候使用通配符!

Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: goods.insert

通配符規(guī)則:

  • :匹配一個(gè)或多個(gè)詞

  • *:匹配不多不少恰好1個(gè)詞

舉例:

  • audit.#:能夠匹配audit.irs.corporate 或者 audit.irs
  • audit.*:只能匹配audit.irs

路由模式:

  1. 每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶統(tǒng)配符的routingkey。
  2. 生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊(duì)列。

2.2.6.2 生產(chǎn)者

package cn.wangningbo._05topics;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    // 交換機(jī)名稱
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明exchange,指定類型為topic
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 消息內(nèi)容
        String message = "新增商品 : id = 1001";
//        String message = "修改商品 : id = 1001";
//        String message = "刪除商品 : id = 1001";
        // 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
        System.out.println(" [商品服務(wù):] Sent '" + message + "'");
        // 關(guān)閉連接
        channel.close();
        connection.close();
    }
}

2.2.6.3 消費(fèi)者

消費(fèi)者1:此處假設(shè)消費(fèi)者1只接收兩種類型的消息:更新商品和刪除商品

package cn.wangningbo._05topics;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "topic_exchange_queue_1";
    // 交換機(jī)名稱
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。需要 update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者1] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,自動(dòng)ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費(fèi)者2:此處假設(shè)消費(fèi)者2接收所有類型的消息:新增商品,更新商品和刪除商品。

package cn.wangningbo._05topics;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "topic_exchange_queue_2";
    // 交換機(jī)名稱
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");

        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者2] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,自動(dòng)ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.2.6.4 測(cè)試

測(cè)試:先啟動(dòng)生產(chǎn)者,再啟動(dòng)兩個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者!分別發(fā)送增、刪、改的RoutingKey。

測(cè)試結(jié)果:結(jié)果:消費(fèi)者1只可以收到修改和刪除的消息,消費(fèi)者2都可以收到!消費(fèi)者使用的是通配符接收!

2.2.7 Header模式

header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對(duì))匹配 隊(duì)列。

案例: 根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種 通知類型都接收的則兩種通知都有效。

  1. 生產(chǎn)者

隊(duì)列與交換機(jī)綁定的代碼與之前不同,如下:

Map<String, Object> headers_email = new Hashtable<String, Object>(); 
headers_email.put("inform_type", "email"); 
Map<String, Object> headers_sms = new Hashtable<String, Object>(); 
headers_sms.put("inform_type", "sms"); 
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms); 

通知

String message = "email inform to user"+i; 
Map<String,Object> headers = new Hashtable<String, Object>(); 
headers.put("inform_type", "email");//匹配email通知消費(fèi)者綁定的header 
//headers.put("inform_type", "sms");//匹配sms通知消費(fèi)者綁定的header 
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); 
properties.headers(headers); 
//Email通知 
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes()); 
  1. 發(fā)送郵件消費(fèi)者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); 
Map<String, Object> headers_email = new Hashtable<String, Object>(); 
headers_email.put("inform_email", "email"); 
//交換機(jī)和隊(duì)列綁定 
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 
//指定消費(fèi)隊(duì)列 
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer); 

============實(shí)戰(zhàn)演示================

生產(chǎn)者:只發(fā)送給"inform_type", "email"

package cn.wangningbo._06header;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Hashtable;
import java.util.Map;

public class Producer {
    // 交換機(jī)名稱
    private final static String EXCHANGE_NAME = "headers_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明exchange,指定類型為headers
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
        // 消息內(nèi)容
        String message = "發(fā)送了一個(gè)email消息";
        Map<String, Object> headers = new Hashtable<String, Object>();
        headers.put("inform_type", "email");//匹配email通知消費(fèi)者綁定的header
//        headers.put("inform_type2", "sms");//匹配sms通知消費(fèi)者綁定的header
        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
        properties.headers(headers);
        // 發(fā)送消息,并綁定到指定的交換機(jī),并指定發(fā)送到指定類型
        channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes());
        System.out.println(" [商品服務(wù):] Sent '" + message + "'");
        // 關(guān)閉
        channel.close();
        connection.close();
    }
}

消費(fèi)者1:只接收"inform_type", "email"

package cn.wangningbo._06header;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

public class Consumer {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "headers_exchange_queue_1";
    // 交換機(jī)名稱
    private final static String EXCHANGE_NAME = "headers_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Map<String, Object> headersEmail = new Hashtable<String, Object>();
        // 指定接受的類型
        headersEmail.put("inform_type", "email");
//        headersEmail.put("inform_type2", "sms");
        // 綁定隊(duì)列到交換機(jī),同時(shí)指定需要接收消息的類型
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headersEmail);

        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者1] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,自動(dòng)ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消費(fèi)者2:只接收"inform_type2", "sms"

package cn.wangningbo._06header;

import cn.wangningbo.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;

public class Consumer2 {
    // 隊(duì)列名稱
    private final static String QUEUE_NAME = "headers_exchange_queue_2";
    // 交換機(jī)名稱
    private final static String EXCHANGE_NAME = "headers_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Map<String, Object> headersSms = new Hashtable<String, Object>();
//        headersSms.put("inform_type", "email");
        headersSms.put("inform_type2", "sms");
        // 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。假設(shè)此處需要update和delete消息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headersSms);

        // 定義隊(duì)列的消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // body 即消息體
                String msg = new String(body);
                System.out.println(" [消費(fèi)者2] received : " + msg + "!");
            }
        };
        // 監(jiān)聽隊(duì)列,自動(dòng)ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

測(cè)試:先啟動(dòng)生產(chǎn)者,再啟動(dòng)2個(gè)消費(fèi)者,再啟動(dòng)生產(chǎn)者!

測(cè)試結(jié)果:只有消費(fèi)者1接收到了"inform_type", "email"

2.2.7 RPC

RPC即客戶端遠(yuǎn)程調(diào)用服務(wù)端的方法 ,使用MQ可以實(shí)現(xiàn)RPC的異步調(diào)用,基于Direct交換機(jī)實(shí)現(xiàn),流程如下:

  1. 客戶端即是生產(chǎn)者就是消費(fèi)者,向RPC請(qǐng)求隊(duì)列發(fā)送RPC調(diào)用消息,同時(shí)監(jiān)聽RPC響應(yīng)隊(duì)列。
  2. 服務(wù)端監(jiān)聽RPC請(qǐng)求隊(duì)列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果
  3. 服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應(yīng)隊(duì)列
  4. 客戶端(RPC調(diào)用方)監(jiān)聽RPC響應(yīng)隊(duì)列,接收到RPC調(diào)用結(jié)果。

2.3 持久化-解決數(shù)據(jù)安全

如何避免消息丟失?

  1. 消費(fèi)者的ACK機(jī)制??梢苑乐瓜M(fèi)者丟失消息。
  2. 但是,如果在消費(fèi)者消費(fèi)之前,MQ就宕機(jī)了,消息就沒了。

是可以將消息進(jìn)行持久化呢?

要將消息持久化,前提是:隊(duì)列、Exchange都持久化

2.3.1 交換機(jī)持久化

生產(chǎn)者那邊指定一下,第三個(gè)參數(shù)那里true就是持久化

// 聲明exchange,指定類型為topic, 是否持久化
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,true);

2.3.2 隊(duì)列持久化

在消費(fèi)者那里指定一下隊(duì)列持久化

        // 獲取到連接
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 參數(shù)1:聲明隊(duì)列,參數(shù)2:隊(duì)列是否持久化
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

2.3.3 消息持久化

生產(chǎn)者發(fā)送消息的第三個(gè)參數(shù)

        // 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品,參數(shù)3:消息持久化
        channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

3. SpringBoot整合RabbitMQ

3.1 步驟分析

  1. 創(chuàng)建項(xiàng)目
  2. pom導(dǎo)包
  3. 入口類
  4. application.yml配置
  5. 配置類
  6. 生產(chǎn)者
  7. 消費(fèi)者
  8. 測(cè)試

3.2 步驟實(shí)現(xiàn)

3.2.1 創(chuàng)建項(xiàng)目

創(chuàng)建一個(gè)maven項(xiàng)目

3.2.2 pom導(dǎo)包

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.5.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--spirngboot集成RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

3.2.3 入口類

package cn.wangningbo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class APP {
    public static void main(String[] args) {
        SpringApplication.run(APP.class, args);
    }
}

3.2.4 application.yml配置

server:
  port: 44000
spring:
  application:
    name: test-rabbitmq-producer
rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: guest
  password: guest
  virtualHost: /

3.2.5 配置類

package cn.wangningbo.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

    /**
     * 交換機(jī)配置
     * ExchangeBuilder提供了fanout、direct、topic、header交換機(jī)類型的配置
     *
     * @return the exchange
     */
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM() {
        //durable(true)持久化,消息隊(duì)列重啟后交換機(jī)仍然存在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //聲明隊(duì)列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS() {
        Queue queue = new Queue(QUEUE_INFORM_SMS);
        return queue;
    }

    //聲明隊(duì)列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL() {
        Queue queue = new Queue(QUEUE_INFORM_EMAIL);
        return queue;
    }

    /**
     * channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
     * 綁定隊(duì)列到交換機(jī) .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                            @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
    }

    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
    }
}

3.2.6 生產(chǎn)者

使用RarbbitTemplate發(fā)送消息

package cn.wangningbo;

import cn.wangningbo.config.RabbitmqConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = APP.class)
@RunWith(SpringRunner.class)
public class Producer_topics_springboot {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // 生產(chǎn)者
    @Test
    public void testSendByTopics() {
        // 使用for循環(huán)發(fā)送5條消息
        for (int i = 0; i < 5; i++) {
            // 準(zhǔn)備消息
            String message = "sms email inform to user" + i;
            // 參數(shù)1:交換機(jī)名稱,參數(shù)2:使用通配符類型的routingKey,參數(shù)3:要發(fā)送的消息
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message);
            // 打印發(fā)送的消息
            System.out.println("Send Message is:'" + message + "'");
        }
    }
}

3.2.7 消費(fèi)者

package cn.wangningbo.handler;

import cn.wangningbo.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費(fèi)者
 */
@Component
public class ReceiveHandler {
    //監(jiān)聽email隊(duì)列
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
    public void receive_email(String msg, Message message, Channel channel) {
        System.out.println(msg);
    }

    //監(jiān)聽sms隊(duì)列
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
    public void receive_sms(String msg, Message message, Channel channel) {
        System.out.println(msg);
    }
} 

3.2.8 測(cè)試

啟動(dòng)生產(chǎn)者即可測(cè)試!查看消息到了哪里的消費(fèi)者!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容