springboot使用Rabbitmq實現(xiàn)消息隊列服務(wù)

rabbitmq中間件搭建在本地虛擬機上,詳情搭建過程可查看:rabbitmq安裝部署
使用上次搭建的dubbo項目補充rabbitmq實現(xiàn),代碼可參考:20分鐘springboot搭建dubbo服務(wù)

首先查看virtual-host配置(VirtualHost相當月一個相對獨立的RabbitMQ服務(wù)器,每個VirtualHost之間是相互隔離的。exchange、queue、message不能互通。)


image.png

rabbitmq原理結(jié)構(gòu)


image.png

生產(chǎn)者/消費者模型,類似于交換機。Exchange交換器,共有四種類型,不同的類型對應不同的路由策略。
Queue:消息隊列,接收消息、緩存消息。

Exchange:交換機,一方面接收生產(chǎn)者發(fā)送來的消息。另一方面知道如何處理消息,例如交給特別的隊列,或者全部的隊列,或者將消息丟棄。到底如何操作取決于Exchange是哪種類型:

根據(jù)交換機類型不同,分為3種發(fā)布模式:
Direct<定向>:1對1-----一個消息只能被一個消費者消費;把消息交給符合特定routing key(queue與exchange的關(guān)系key) 的隊列。
Topic<通配符>:1對多-----一個消息可以被多個消費者消費(輪詢);把消息交給符合routing pattern (路由模式)的隊列。
Fanout<廣播>:將消息分發(fā)給所有綁定到交換機的隊列。

消息隊列內(nèi)生產(chǎn)者添加消息隊列數(shù)據(jù),消費者接收并使用隊列中的數(shù)據(jù),上次搭建的簡單的dubbo服務(wù)中consumer發(fā)出請求,provider提供查詢數(shù)據(jù)庫的服務(wù),具體如下圖:


image.png

繼續(xù)完成代碼實現(xiàn)

consumer主體結(jié)構(gòu)如下:


image.png

補充consumer的pom文件rabbitmq配置


    <!--rabbitmq-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

補充consumer內(nèi)yml的rabbitmq配置

spring:
  application:
    name: consumer
  profiles:
    active: test
  #配置rabbitMq 服務(wù)器
  rabbitmq:
    host: 10.1.31.199
    port: 5672
    username: admin
    password: admin
    #虛擬host 可以不設(shè)置,使用server默認host
    virtual-host: /

注意1) rabbitmq的默認web端口號是15672,接扣訪問端口是5672
2)rabbitmq的默認virtualhost配置為"/"
在config文件夾添加DirectRabbitConfig類,配置rabbitmq的配置信息如下:

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {
    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創(chuàng)建的連接使用,而且當連接關(guān)閉后隊列即被刪除。此參考優(yōu)先級高于durable
        // autoDelete:是否自動刪除,當沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般設(shè)置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("TestDirectQueue",true);
    }

    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    }

    //綁定  將隊列和交換機綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }



    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }
}

創(chuàng)建測試接口SendMessageController類,完成消息隊列數(shù)據(jù)的添加

package com.example.consumer.openapi;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
@RequestMapping("/demo")
public class SendMessageController {


    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發(fā)送等等方法

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
}

注意:1)convertAndSend的方法中exchange是Virtual host的name決定了在哪個queue存放消息,routingKey則確定了queue與exchange的綁定,不填寫時自動為exchange的name。
2)rabbitTemplate與amqpTemplate方法,rabbitTemplate實現(xiàn)自amqpTemplate接口,使用起來并無區(qū)別

啟動項目,訪問url,執(zhí)行rabbitmq消息寫入:


image.png

寫入成功:


image.png

provider主體結(jié)構(gòu):


image.png

首先同理consumer,修改provider的pom文件及yml文件

在service文件夾內(nèi)添加DirectReceiver類如下:

package com.example.provider.service.rabbitmq;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽的隊列名稱 TestDirectQueue
public class DirectReceiver {


    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消費者收到消息  : " + testMessage.toString());
    }
}

啟動provider項目,查看監(jiān)聽到的消息如下:


image.png

簡單的消息隊列完成。


RabbitTemplate和AmqpTemplate的使用區(qū)別:

兩者都能調(diào)用convertAndSend方法向隊列發(fā)送消息,而
API:amqpTemplate.convertAndSend("隊列名",“消息內(nèi)容”)此處隊列名必須與創(chuàng)建的隊列一致。
API:amqpTemplate.convertAndSend("交換機名",“路由鍵”,“消息內(nèi)容”)
具體實現(xiàn)可詳看使用方法。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容