rabbitmq中間件搭建在本地虛擬機上,詳情搭建過程可查看:rabbitmq安裝部署
使用上次搭建的dubbo項目補充rabbitmq實現(xiàn),代碼可參考:20分鐘springboot搭建dubbo服務(wù)
首先查看virtual-host配置(VirtualHost相當月一個相對獨立的RabbitMQ服務(wù)器,每個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。)

rabbitmq原理結(jié)構(gòu)

生產(chǎn)者/消費者模型,類似于交換機。Exchange交換器,共有四種類型,不同的類型對應不同的路由策略。
Queue:消息隊列,接收消息、緩存消息。
Exchange:交換機,一方面接收生產(chǎn)者發(fā)送來的消息。另一方面知道如何處理消息,例如交給特別的隊列,或者全部的隊列,或者將消息丟棄。到底如何操作取決于Exchange是哪種類型:
根據(jù)交換機類型不同,分為3種發(fā)布模式:
Direct<定向>:1對1-----一個消息只能被一個消費者消費;把消息交給符合特定routing key(queue與exchange的關(guān)系key) 的隊列。
Topic<通配符>:1對多-----一個消息可以被多個消費者消費(輪詢);把消息交給符合routing pattern (路由模式)的隊列。
Fanout<廣播>:將消息分發(fā)給所有綁定到交換機的隊列。
消息隊列內(nèi)生產(chǎn)者添加消息隊列數(shù)據(jù),消費者接收并使用隊列中的數(shù)據(jù),上次搭建的簡單的dubbo服務(wù)中consumer發(fā)出請求,provider提供查詢數(shù)據(jù)庫的服務(wù),具體如下圖:

繼續(xù)完成代碼實現(xiàn)
consumer主體結(jié)構(gòu)如下:

補充consumer的pom文件rabbitmq配置
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
補充consumer內(nèi)yml的rabbitmq配置
spring:
application:
name: consumer
profiles:
active: test
#配置rabbitMq 服務(wù)器
rabbitmq:
host: 10.1.31.199
port: 5672
username: admin
password: admin
#虛擬host 可以不設(shè)置,使用server默認host
virtual-host: /
注意1) rabbitmq的默認web端口號是15672,接扣訪問端口是5672
2)rabbitmq的默認virtualhost配置為"/"
在config文件夾添加DirectRabbitConfig類,配置rabbitmq的配置信息如下:
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
//隊列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
// exclusive:默認也是false,只能被當前創(chuàng)建的連接使用,而且當連接關(guān)閉后隊列即被刪除。此參考優(yōu)先級高于durable
// autoDelete:是否自動刪除,當沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除。
// return new Queue("TestDirectQueue",true,true,false);
//一般設(shè)置一下隊列的持久化就好,其余兩個就是默認false
return new Queue("TestDirectQueue",true);
}
//Direct交換機 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//綁定 將隊列和交換機綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
創(chuàng)建測試接口SendMessageController類,完成消息隊列數(shù)據(jù)的添加
package com.example.consumer.openapi;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
@RequestMapping("/demo")
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發(fā)送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
注意:1)convertAndSend的方法中exchange是Virtual host的name決定了在哪個queue存放消息,routingKey則確定了queue與exchange的綁定,不填寫時自動為exchange的name。
2)rabbitTemplate與amqpTemplate方法,rabbitTemplate實現(xiàn)自amqpTemplate接口,使用起來并無區(qū)別
啟動項目,訪問url,執(zhí)行rabbitmq消息寫入:

寫入成功:

provider主體結(jié)構(gòu):

首先同理consumer,修改provider的pom文件及yml文件
在service文件夾內(nèi)添加DirectReceiver類如下:
package com.example.provider.service.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽的隊列名稱 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString());
}
}
啟動provider項目,查看監(jiān)聽到的消息如下:

簡單的消息隊列完成。
RabbitTemplate和AmqpTemplate的使用區(qū)別:
兩者都能調(diào)用convertAndSend方法向隊列發(fā)送消息,而
API:amqpTemplate.convertAndSend("隊列名",“消息內(nèi)容”)此處隊列名必須與創(chuàng)建的隊列一致。
API:amqpTemplate.convertAndSend("交換機名",“路由鍵”,“消息內(nèi)容”)
具體實現(xiàn)可詳看使用方法。