Spring Boot(十三)RabbitMQ安裝與集成

一、前言

RabbitMQ是一個(gè)開源的消息代理軟件(面向消息的中間件),它的核心作用就是創(chuàng)建消息隊(duì)列,異步接收和發(fā)送消息,MQ的全程是:Message Queue中文的意思是消息隊(duì)列。

1.1 使用場(chǎng)景

  • 削峰填谷:用于應(yīng)對(duì)間歇性流量提升對(duì)于系統(tǒng)的“破壞”,比如秒殺活動(dòng),可以把請(qǐng)求先發(fā)送到消息隊(duì)列在平滑的交由系統(tǒng)去處理,當(dāng)訪問量大于一定數(shù)量的時(shí)候,還可以直接屏蔽后續(xù)操作,給前臺(tái)的用戶友好的顯示;

  • 延遲處理:可以進(jìn)行事件后置,比如訂單超時(shí)業(yè)務(wù),用戶下單30分鐘未支付取消訂單;

  • 系統(tǒng)解耦:消息隊(duì)列也可以幫開發(fā)人員完成業(yè)務(wù)的解耦,比如用戶上傳頭像的功能,最初的設(shè)計(jì)是用戶上傳完之后才能發(fā)帖,后面有增加了經(jīng)驗(yàn)系統(tǒng),需要在上傳頭像之后增加經(jīng)驗(yàn)值,到后來又上線了金幣系統(tǒng),上傳頭像之后可以增加金幣,像這種需求的不斷升級(jí),如果在業(yè)務(wù)代碼里面寫死每次該業(yè)務(wù)代碼是很不優(yōu)雅的,這個(gè)時(shí)候如果使用消息隊(duì)列,那么只需要增加一個(gè)訂閱器用于介紹用戶上傳頭像的消息,再執(zhí)行經(jīng)驗(yàn)的增加和金幣的增加是非常簡(jiǎn)單的,并且在不改動(dòng)業(yè)務(wù)模塊業(yè)務(wù)代碼的基礎(chǔ)上可以輕松實(shí)現(xiàn),如果后期需要撤銷某個(gè)模塊了,只需要?jiǎng)h除訂閱器即可,就這樣就降低了系統(tǒng)開發(fā)的耦合性;

1.2 為什么使用RabbitMQ?

現(xiàn)在市面上比較主流的消息隊(duì)列還有Kafka、RocketMQ、RabbitMQ,它們的介紹和區(qū)別如下:

  • Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache定級(jí)項(xiàng)目。Kafka主要特點(diǎn)是基于Pull的模式來處理消息消費(fèi),追求高吞吐量,一開始的目的就是用于日志收集和傳輸。0.8版本開始支持復(fù)制,對(duì)消息的重復(fù)、丟失、錯(cuò)誤沒有嚴(yán)格要求,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。

  • RabbitMQ是使用Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來實(shí)現(xiàn)。AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次。

  • RocketMQ是阿里開源的消息中間件,它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起源于Kafka,但并不是Kafka的一個(gè)Copy,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計(jì)算、消息推送、日志流式處理、binglog分發(fā)等場(chǎng)景。

簡(jiǎn)單總結(jié): Kafka的性能最好,適用于對(duì)消息吞吐量達(dá),對(duì)消息丟失不敏感的系統(tǒng);RocketMQ借鑒了Kafka并提高了消息的可靠性,修復(fù)了Kafka的不足;RabbitMQ性能略低于Kafka,并實(shí)現(xiàn)了AMQP(Advanced Message Queuing Protocol)高級(jí)消息隊(duì)列協(xié)議的標(biāo)準(zhǔn),有非常好的穩(wěn)定性。

支持語言對(duì)比

  • RocketMQ 支持語言:Java、C++、Golang
  • Kafka 支持語言:Java、Scala
  • RabbitMQ 支持語言:C#、Java、Js/NodeJs、Python、Ruby、Erlang、Perl、Clojure、Golang

1.3 RabbitMQ特點(diǎn)

RabbitMQ的特點(diǎn)是易用、擴(kuò)展性好(集群訪問)、高可用,具體如下:

  • 可靠性:持久化、消息確認(rèn)、事務(wù)等保證了消息的可靠性;
  • 伸縮性:集群服務(wù),可以很方便的添加服務(wù)器來提高系統(tǒng)的負(fù)載;
  • 高可用:集群狀態(tài)下部分節(jié)點(diǎn)出現(xiàn)問題依然可以運(yùn)行;
  • 多語言支持:RabbitMQ幾乎支持了所有的語言,比如Java、.Net、Nodejs、Golang等;
  • 易用的管理頁面:RabbitMQ提供了易用了網(wǎng)頁版的管理監(jiān)控系統(tǒng),可以很方便的完成RabbitMQ的控制和查看;
  • 插件機(jī)制:RabbitMQ提供了許多插件,可以豐富和擴(kuò)展Rabbit的功能,用戶也可編寫自己的插件;

1.4 RabbitMQ基礎(chǔ)知識(shí)

在了解消息通訊之前首先要了解3個(gè)概念:生產(chǎn)者、消費(fèi)者和代理。

生產(chǎn)者:消息的創(chuàng)建者,負(fù)責(zé)創(chuàng)建和推送數(shù)據(jù)到消息服務(wù)器;

消費(fèi)者:消息的接收方,用于處理數(shù)據(jù)和確認(rèn)消息;

代理:就是RabbitMQ本身,用于扮演“快遞”的角色,本身不生產(chǎn)消息,只是扮演“快遞”的角色。

(一)消息發(fā)送原理

首先你必須連接到Rabbit才能發(fā)布和消費(fèi)消息,那怎么連接和發(fā)送消息的呢?

你的應(yīng)用程序和Rabbit Server之間會(huì)創(chuàng)建一個(gè)TCP連接,一旦TCP打開,并通過了認(rèn)證,認(rèn)證就是你試圖連接Rabbit之前發(fā)送的Rabbit服務(wù)器連接信息和用戶名和密碼,有點(diǎn)像程序連接數(shù)據(jù)庫,使用Java有兩種連接認(rèn)證的方式,后面代碼會(huì)詳細(xì)介紹,一旦認(rèn)證通過你的應(yīng)用程序和Rabbit就創(chuàng)建了一條AMQP信道(Channel)。

信道是創(chuàng)建在“真實(shí)”TCP上的虛擬連接,AMQP命令都是通過信道發(fā)送出去的,每個(gè)信道都會(huì)有一個(gè)唯一的ID,不論是發(fā)布消息,訂閱隊(duì)列或者接收消息都是通過信道完成的。

(二)為什么不通過TCP直接發(fā)送命令?

對(duì)于操作系統(tǒng)來說創(chuàng)建和銷毀TCP會(huì)話是非常昂貴的開銷,假設(shè)高峰期每秒有成千上萬條連接,每個(gè)連接都要?jiǎng)?chuàng)建一條TCP會(huì)話,這就造成了TCP連接的巨大浪費(fèi),而且操作系統(tǒng)每秒能創(chuàng)建的TCP也是有限的,因此很快就會(huì)遇到系統(tǒng)瓶頸。

如果我們每個(gè)請(qǐng)求都使用一條TCP連接,既滿足了性能的需要,又能確保每個(gè)連接的私密性,這就是引入信道概念的原因。

(三)RabbitMQ名稱解釋

ConnectionFactory(連接管理器): 應(yīng)用程序與Rabbit之間建立連接的管理器,程序代碼中使用;

Channel(信道): 消息推送使用的通道;

Exchange(交換器): 用于接受、分配消息;

Queue(隊(duì)列): 用于存儲(chǔ)生產(chǎn)者的消息;

RoutingKey(路由鍵): 用于把生成者的數(shù)據(jù)分配到交換器上;

BindingKey(綁定鍵): 用于把交換器的消息綁定到隊(duì)列上;

看到上面的解釋,最難理解的路由鍵和綁定鍵了,那么他們具體怎么發(fā)揮作用的,請(qǐng)看下圖:

1.5 交換器分類

RabbitMQ的Exchange(交換器)分為四類:

  • direct(默認(rèn))
  • headers
  • fanout
  • topic

其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以我們這里不做解釋。

1.5.1 direct交換器

direct為默認(rèn)的交換器類型,也非常的簡(jiǎn)單,如果路由鍵匹配的話,消息就投遞到相應(yīng)的隊(duì)列,如下圖:

1.5.2 fanout交換器

fanout有別于direct交換器,fanout是一種發(fā)布/訂閱模式的交換器,當(dāng)你發(fā)送一條消息的時(shí)候,交換器會(huì)把消息廣播到所有附加到這個(gè)交換器的隊(duì)列上。

注意: 對(duì)于fanout交換器來說routingKey(路由鍵)是無效的,這個(gè)參數(shù)是被忽略的。

1.5.3 topic交換器

topic交換器運(yùn)行和fanout類似,但是可以更靈活的匹配自己想要訂閱的信息,這個(gè)時(shí)候routingKey路由鍵就排上用場(chǎng)了,使用路由鍵進(jìn)行消息(規(guī)則)匹配。

topic路由器的關(guān)鍵在于定義路由鍵,定義routingKey名稱不能超過255字節(jié),使用“.”作為分隔符,例如:com.mq.rabbit.error。

匹配規(guī)則

匹配表達(dá)式可以用“*”和“#”匹配任何字符,具體規(guī)則如下:

  • “*”匹配一個(gè)分段(用“.”分割)的內(nèi)容;
  • “#”匹配所有字符;

例如發(fā)布了一個(gè)“cn.mq.rabbit.error”的消息:

能匹配上的路由鍵:

  • cn.mq.rabbit.*
  • cn.mq.rabbit.#
  • #.error
  • cn.mq.#
  • #

不能匹配上的路由鍵:

  • cn.mq.*
  • *.error
  • *

1.6 消息持久化

RabbitMQ隊(duì)列和交換器有一個(gè)不可告人的秘密,就是默認(rèn)情況下重啟服務(wù)器會(huì)導(dǎo)致消息丟失,那么怎么保證Rabbit在重啟的時(shí)候不丟失呢?答案就是消息持久化。

當(dāng)你把消息發(fā)送到Rabbit服務(wù)器的時(shí)候,你需要選擇你是否要進(jìn)行持久化,但這并不能保證Rabbit能從崩潰中恢復(fù),想要Rabbit消息能恢復(fù)必須滿足3個(gè)條件:

  1. 投遞消息的時(shí)候durable設(shè)置為true,消息持久化,代碼:channel.queueDeclare(x, true, false, false, null),參數(shù)2設(shè)置為true持久化;
  2. 設(shè)置投遞模式deliveryMode設(shè)置為2(持久),代碼:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),參數(shù)3設(shè)置為存儲(chǔ)純文本到磁盤;
  3. 消息已經(jīng)到達(dá)持久化交換器上;
  4. 消息已經(jīng)到達(dá)持久化的隊(duì)列;

持久化工作原理

Rabbit會(huì)將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費(fèi)之后,Rabbit會(huì)把這條消息標(biāo)識(shí)為等待垃圾回收。

持久化的缺點(diǎn)

消息持久化的優(yōu)點(diǎn)顯而易見,但缺點(diǎn)也很明顯,那就是性能,因?yàn)橐獙懭胗脖P要比寫入內(nèi)存性能較低很多,從而降低了服務(wù)器的吞吐量,盡管使用SSD硬盤可以使事情得到緩解,但他仍然吸干了Rabbit的性能,當(dāng)消息成千上萬條要寫入磁盤的時(shí)候,性能是很低的。

所以使用者要根據(jù)自己的情況,選擇適合自己的方式。

學(xué)習(xí)更多RabbitMQ知識(shí),訪問:https://gitbook.cn/gitchat/activity/5b558d54c28306099b47ae9c

二、在Docker中安裝RabbitMQ

(1)下載鏡像

https://hub.docker.com/r/library/rabbitmq/tags/

  • alpine 輕量版
  • management 帶插件的版本

從鏡像的大小也可以很直觀的看出來alpine是輕量版。

使用命令:

docker pull rabbitmq:3.7.7-management

下載帶management插件的版本。

(2)運(yùn)行RabbitMQ

使用命令:

docker run -d --hostname myrabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management

  • -d 后臺(tái)運(yùn)行
  • --hostname 主機(jī)名稱
  • --name 容器名稱
  • -p 15672:15672 http訪問端口,映射本地端口到容器端口
  • -p 5672:5672 amqp端口,映射本地端口到容器端口

正常啟動(dòng)之后,訪問:http://localhost:15672/

登錄網(wǎng)頁管理頁面,用戶名密碼:guest/guest,登錄成功如下圖:

三、RabbitMQ集成

3.1 添加依賴

如果用Idea創(chuàng)建新項(xiàng)目,可以直接在創(chuàng)建Spring Boot的時(shí)候,點(diǎn)擊“Integration”面板,選擇RabbitMQ集成,如下圖:

如果是老Maven項(xiàng)目,直接在pom.xml添加如下代碼:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 配置RabbitMQ信息

在application.properties設(shè)置如下信息:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test

3.3 代碼

3.3 代碼實(shí)現(xiàn)

本節(jié)分別來看三種交換器:direct、fanout、topic的實(shí)現(xiàn)代碼。

3.3.1 Direct Exchange

3.3.1.1 配置隊(duì)列

創(chuàng)建DirectConfig.java代碼如下:

package com.example.rabbitmq.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {
    final static String QUEUE_NAME = "direct"; //隊(duì)列名稱
    final static String EXCHANGE_NAME = "mydirect"; //交換器名稱
    @Bean
    public Queue queue() {
        // 聲明隊(duì)列 參數(shù)一:隊(duì)列名稱;參數(shù)二:是否持久化
        return new Queue(DirectConfig.QUEUE_NAME, false);
    }
    // 配置默認(rèn)的交換機(jī),以下部分都可以不配置,不設(shè)置使用默認(rèn)交換器(AMQP default)
    @Bean
    DirectExchange directExchange() {
        // 參數(shù)一:交換器名稱;參數(shù)二:是否持久化;參數(shù)三:是否自動(dòng)刪除消息
        return new DirectExchange(DirectConfig.EXCHANGE_NAME, false, false);
    }
    // 綁定“direct”隊(duì)列到上面配置的“mydirect”路由器
    @Bean
    Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with(DirectConfig.QUEUE_NAME);
    }
}

3.3.1.2 發(fā)送消息

創(chuàng)建Sender.java代碼如下:

package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
 * 消息發(fā)送者-生產(chǎn)消息
 */
@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void driectSend(String message) {
        System.out.println("Direct 發(fā)送消息:" + message);
        //參數(shù)一:交換器名稱,可以省略(省略存儲(chǔ)到AMQP default交換器);參數(shù)二:路由鍵名稱(direct模式下路由鍵=隊(duì)列名稱);參數(shù)三:存儲(chǔ)消息
        this.rabbitTemplate.convertAndSend("direct", message);
    }
}

注意:

  • 在direct交換器中,路由鍵名稱就是隊(duì)列的名稱;
  • 發(fā)送消息“convertAndSend”的時(shí)候,第一個(gè)參數(shù)為交換器的名稱,非必填可以忽略,如果忽略則會(huì)把消息發(fā)送到默認(rèn)交換器“AMQP default”;

3.3.1.3 消費(fèi)消息

創(chuàng)建Receiver.java代碼如下:

package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息接收者-消費(fèi)消息
 */
@Component
@RabbitListener(queues = "direct")
public class Receiver {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @RabbitHandler
    /**
     * 監(jiān)聽消費(fèi)消息
     */
    public void process(String message) {
        System.out.println("Direct 消費(fèi)消息:" + message);
    }
}

3.3.1.4 測(cè)試代碼

使用Spring Boot中的默認(rèn)測(cè)試框架JUnit進(jìn)行單元測(cè)試,不了解JUnit的可以參考我的上一篇文章,創(chuàng)建MQTest.java代碼如下:

package com.example.rabbitmq.mq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
import static org.junit.Assert.*;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
    @Autowired
    private Sender sender;
    @Test
    public void driectTest() {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.driectSend("Driect Data:" + sf.format(new Date()));
    }
}

執(zhí)行之后,效果如下圖:

表示消息已經(jīng)被發(fā)送并被消費(fèi)了。

3.3.2 Fanout Exchange

3.3.2.1 配置隊(duì)列

創(chuàng)建FanoutConfig.java代碼如下:

package com.example.rabbitmq.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    final static String QUEUE_NAME = "fanout"; //隊(duì)列名稱
    final static String QUEUE_NAME2 = "fanout2"; //隊(duì)列名稱
    final static String EXCHANGE_NAME = "myfanout"; //交換器名稱
    @Bean
    public Queue queueFanout() {
        return new Queue(FanoutConfig.QUEUE_NAME);
    }
    @Bean
    public Queue queueFanout2() {
        return new Queue(FanoutConfig.QUEUE_NAME2);
    }
    //配置交換器
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(FanoutConfig.EXCHANGE_NAME);
    }
    // 綁定隊(duì)列到交換器
    @Bean
    Binding bindingFanoutExchangeQueue(Queue queueFanout, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueFanout).to(fanoutExchange);
    }
    // 綁定隊(duì)列到交換器
    @Bean
    Binding bindingFanoutExchangeQueue2(Queue queueFanout2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueFanout2).to(fanoutExchange);
    }
}

3.3.2.2 發(fā)送消息

創(chuàng)建FanoutSender.java代碼如下:

package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send(String message) {
        System.out.println("發(fā)送消息:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME, message);
    }
    public void send2(String message) {
        System.out.println("發(fā)送消息2:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME2, message);
    }
}

3.3.2.3 消費(fèi)消息

創(chuàng)建兩個(gè)監(jiān)聽類,第一個(gè)FanoutReceiver.java代碼如下:

package com.example.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@RabbitListener(queues = "fanout")
public class FanoutReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Fanout(FanoutReceiver)消費(fèi)消息:" + msg);
    }
}

第二個(gè)FanoutReceiver2.java代碼如下:

package com.example.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout2")
public class FanoutReceiver2 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Fanout(FanoutReceiver2)消費(fèi)消息:" + message);
    }
}

3.3.2.4 測(cè)試代碼

創(chuàng)建FanoutTest.java代碼如下:

package com.example.rabbitmq.mq;
import com.example.rabbitmq.RabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
    @Autowired
    private FanoutSender sender;

    @Test
    public void Test() throws InterruptedException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.send("Time1 => " + sf.format(new Date()));
        sender.send2("Date2 => " + sf.format(new Date()));
    }
}

運(yùn)行測(cè)試代碼,輸出結(jié)果如下:

發(fā)送消息:Time1 => 2018-09-11
發(fā)送消息2:Date2 => 2018-09-11
Fanout(FanoutReceiver2)消費(fèi)消息:Time1 => 2018-09-11
Fanout(FanoutReceiver2)消費(fèi)消息:Date2 => 2018-09-11
Fanout(FanoutReceiver)消費(fèi)消息:Time1 => 2018-09-11
Fanout(FanoutReceiver)消費(fèi)消息:Date2 => 2018-09-11

總結(jié): 可以看出fanout會(huì)把消息分發(fā)到所有訂閱到該交換器的隊(duì)列,fanout模式是忽略路由鍵的。

3.3.3 Topic Exchange

3.3.3.1 配置隊(duì)列

@Configuration
public class TopicConfig {
    final static String QUEUE_NAME = "log";
    final static String QUEUE_NAME2 = "log.all";
    final static String QUEUE_NAME3 = "log.all.error";
    final static String EXCHANGE_NAME = "topicExchange"; //交換器名稱
    @Bean
    public Queue queuetopic() {
        return new Queue(TopicConfig.QUEUE_NAME);
    }
    @Bean
    public Queue queuetopic2() {
        return new Queue(TopicConfig.QUEUE_NAME2);
    }
    @Bean
    public Queue queuetopic3() {
        return new Queue(TopicConfig.QUEUE_NAME3);
    }
    // 配置交換器
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(TopicConfig.EXCHANGE_NAME);
    }
    // 綁定隊(duì)列到交換器,并設(shè)置路由鍵(log.#)
    @Bean
    Binding bindingtopicExchangeQueue(Queue queuetopic, TopicExchange topicExchange) {
        return BindingBuilder.bind(queuetopic).to(topicExchange).with("log.#");
    }
    // 綁定隊(duì)列到交換器,并設(shè)置路由鍵(log.*)
    @Bean
    Binding bindingtopicExchangeQueue2(Queue queuetopic2, TopicExchange topicExchange) {
        return BindingBuilder.bind(queuetopic2).to(topicExchange).with("log.*");
    }
    // 綁定隊(duì)列到交換器,并設(shè)置路由鍵(log.*.error)
    @Bean
    Binding bindingtopicExchangeQueue3(Queue queuetopic3, TopicExchange topicExchange) {
        return BindingBuilder.bind(queuetopic3).to(topicExchange).with("log.*.error");
    }
}

3.3.3.2 發(fā)布消息

@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void topicSender(String message) {
        String routingKey = "log.all.error";
        System.out.println(routingKey + " 發(fā)送消息:" + message);
        this.rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, message);
    }
}

3.3.3.3 消費(fèi)消息

@Component
@RabbitListener(queues = "log")
public class TopicReceiver {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("log.# 消費(fèi)消息:" + msg);
    }
}
@Component
@RabbitListener(queues = "log.all")
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("log.* 消費(fèi)消息:" + msg);
    }
}
@Component
@RabbitListener(queues = "log.all.error")
public class TopicReceiver3 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("log.*.error 消費(fèi)消息:" + msg);
    }
}

3.3.3.4 測(cè)試代碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;
    @Test
    public void Test() {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        fanoutSender.send("Time1 => " + sf.format(new Date()));
        fanoutSender.send2("Date2 => " + sf.format(new Date()));
    }
}

輸出結(jié)果:

log.all.error 發(fā)送消息:time => 2018-09-11
log.# 消費(fèi)消息:time => 2018-09-11
log.*.error 消費(fèi)消息:time => 2018-09-11

總結(jié): 在Topic Exchange中“#”可以匹配所有內(nèi)容,而“*”則是匹配一個(gè)字符段的內(nèi)容。

以上示例代碼Github地址:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq

參考文檔

阿里 RocketMQ 優(yōu)勢(shì)對(duì)比:https://juejin.im/entry/5a0abfb5f265da43062a4a91

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 什么叫消息隊(duì)列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡(jiǎn)單,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,442評(píng)論 0 24
  • http://blog.csdn.net/zl18310999566/article/details/543410...
    sherlock_6981閱讀 1,604評(píng)論 0 0
  • RabbitMQ 即一個(gè)消息隊(duì)列,主要是用來實(shí)現(xiàn)應(yīng)用程序的異步和解耦,同時(shí)也能起到消息緩沖,消息分發(fā)的作用。 消息...
    彩虹之夢(mèng)閱讀 1,161評(píng)論 2 1
  • 本章我們重點(diǎn)學(xué)習(xí)一下Rabbit里面的exchange(交換器)的知識(shí)。 交換器分類 RabbitMQ的Excha...
    Java大生閱讀 415評(píng)論 0 1
  • 喜歡講笑話是因?yàn)橐粋€(gè)男孩子,他幾乎是我的整個(gè)青春,認(rèn)識(shí)十幾年,卻一直都不能在一起。理由很簡(jiǎn)單,因?yàn)樗幌矚g我,或者...
    錦衣夜行1992閱讀 357評(píng)論 0 1

友情鏈接更多精彩內(nèi)容