springboot項目架構(gòu)(4)activemq、rabbitMq、rocketMq和kafka的實現(xiàn)

消息隊列實現(xiàn)

支持的消息隊列

  • ActiveMq
  • RabbitMq
  • RocketMq
  • Kafka

各個隊列實現(xiàn)隊列與廣播模式的方法

ActiveMq

添加依賴(用的是SpringBoot提供的jms包)

        <!--activemq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

配置文件

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    password: admin
    user: admin

實現(xiàn)發(fā)送對象

ActiveMq要實現(xiàn)發(fā)送對象的話,需要注入序列化類

    @Bean(name = "jacksonJmsMessageConverter")
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

然后這個序列化類會替換掉默認(rèn)的,而且jackson也是支持字符串的

隊列模式

ActiveMq默認(rèn)的為隊列模式,發(fā)送實現(xiàn)為

jmsTemplate.convertAndSend("heyu-test-string", "heyu-test");
jmsTemplate.convertAndSend("heyu-test-object", new User("jack", 25));

消費(fèi)實現(xiàn)為

    @JmsListener(destination = "heyu-test-string")
    public void stringListener(String message) {
        log.info(message);
    }

    @JmsListener(destination = "heyu-test-object")
    public void objectListener(User user) {
        log.info(JsonUtil.obj2json(user));
    }

廣播模式

要實現(xiàn)廣播模式,需要先注入廣播模式的JmsListenerContainerFactory, 序列化對象也是jackson

    @Bean(name = "topicFactory")
    public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configure) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configure.configure(factory, connectionFactory);
        // 是否為訂閱注冊模式
        factory.setPubSubDomain(true);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

發(fā)送實現(xiàn)為

Topic heyuTopicString =  () -> "heyu-topic-string";
Topic heyuTopicObject =  () -> "heyu-topic-object";
jmsTemplate.send(heyuTopicString, session -> jacksonJmsMessageConverter.toMessage("topic-string", session));
jmsTemplate.send(heyuTopicObject, session -> jacksonJmsMessageConverter.toMessage(new User("jack", 26), session));

消費(fèi)實現(xiàn)為

    @JmsListener(destination = "heyu-topic-string", containerFactory = "topicFactory")
    public void topicString1(String user) {
        log.info(user);
    }

    @JmsListener(destination = "heyu-topic-object", containerFactory = "topicFactory")
    public void topicObject1(User user) {
        log.info(JsonUtil.obj2json(user));
    }

RabbitMq

添加依賴(springboot的amqp包)

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 3456
    username: admin
    password: 123456

隊列配置

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonConverter());
        return template;
    }

    /**
     * 只要注入MessageConverter,就會替換掉默認(rèn)的,要實現(xiàn)發(fā)送對象,必須先注入新的序列化對象
     * 
     * @return jackson序列化方式
     */
    @Bean
    public MessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 隊列需要優(yōu)先注入創(chuàng)建,否則廣播接受的時候會報錯
     */
    @Bean
    public Queue a() {
        return new Queue("heyu-queue-string", true);
    }

    @Bean
    public Queue b() {
        return new Queue("heyu-queue-object", true);
    }

    @Bean
    public Queue c() {
        return new Queue("heyu-fanout-object1", true);
    }

    @Bean
    public Queue s() {
        return new Queue("heyu-fanout-object2", true);
    }

    @Bean
    public Queue f() {
        return new Queue("heyu-fanout-string1", true);
    }

    @Bean
    public Queue g() {
        return new Queue("heyu-fanout-string2", true);
    }

隊列模式

發(fā)送實現(xiàn)

    rabbitTemplate.convertAndSend("heyu-queue-string", "heyu-string");
    rabbitTemplate.convertAndSend("heyu-queue-object", new User("heyu", 25));

接收實現(xiàn)

    @RabbitListener(queues = "heyu-queue-string")
    public void test1(String str) {
        log.info(str);
    }

    @RabbitListener(queues = "heyu-queue-object")
    public void test2(User user) {
        log.info(JsonUtil.obj2json(user));
    }

廣播模式

發(fā)送實現(xiàn)

    rabbitTemplate.convertAndSend("heyu-fanout-string", "", "heyu-string");
    rabbitTemplate.convertAndSend("heyu-fanout-object", "", new User("heyu", 25));

接收實現(xiàn)

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-string1", durable = "true"), exchange = @Exchange(value = "heyu-fanout-string", type = "fanout")))
    public void test3(String str) {
        log.info(str);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-string2", durable = "true"), exchange = @Exchange(value = "heyu-fanout-string", type = "fanout")))
    public void test5(String str) {
        log.info(str);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-object1", durable = "true"), exchange = @Exchange(value = "heyu-fanout-object", type = "fanout")))
    public void test4(User user) {
        log.info(JsonUtil.obj2json(user));
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-object2", durable = "true"), exchange = @Exchange(value = "heyu-fanout-object", type = "fanout")))
    public void test6(User user) {
        log.info(JsonUtil.obj2json(user));
    }

RocketMq

添加依賴

        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

配置文件

rocketmq:
  name-server: 1270.0.1:9876
  producer:
    group: heyu-producer
    sendMessageTimeout: 300000

隊列廣播實現(xiàn)

RocketMq用組的方式管理消息,沒有具體的隊列與廣播方式的區(qū)別,同一個topic下,相同的組消費(fèi)同一條消息,不同的組共享消息,而且在同一個服務(wù)下,不能存在相同的組,如果同一個組要添加消費(fèi)者,只能通過集群的方式來實現(xiàn)

發(fā)送實現(xiàn)

rocketMQTemplate.syncSend(stringTopic, "stringTopic1");
rocketMQTemplate.syncSend(objectTopic, new User("heyu", 25));

接受實現(xiàn)

@Service
@RocketMQMessageListener(topic = "${heyu.rocketmq.topic.string}", consumerGroup = "heyu-string1")
public class ConsumeStringLister implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info(message);
    }
}

@Service
@RocketMQMessageListener(topic = "${heyu.rocketmq.topic.object}", consumerGroup = "heyu-object1")
public class ConsumeObjectLister implements RocketMQListener<User> {
    @Override
    public void onMessage(User message) {
        log.info(JsonUtil.obj2json(message));
    }
}

Kafka

添加依賴

        <!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

kafka:
  bootstrap-server: 127.0.0.1:9092

隊列配置

生產(chǎn)者

@Component
@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-server}")
    private String bootstrapServer;

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    private ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

消費(fèi)者

@Component
@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-server}")
    private String bootstrapServer;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

隊列廣播實現(xiàn)

kafka與rocketMq一樣,都是通過組來實現(xiàn)隊列與廣播,而且邏輯完全相同,只不過kafka沒有一個服務(wù)內(nèi)不能有相同組的限制

發(fā)送實現(xiàn)

kafkaTemplate.send("heyu-string", "heyu-string");

接受實現(xiàn)

    @KafkaListener(topics = "heyu-string", groupId = "heyu1")
    public void string(String message){
        log.info(message);
    }

    @KafkaListener(topics = "heyu-string", groupId = "heyu2")
    public void string2(String message){
        log.info(message);
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容