SpringAMQP 消息容器 - SimpleMessageListenerContainer

SimpleMessageListenerContainer 即簡(jiǎn)單消息監(jiān)聽(tīng)容器。

  • 這個(gè)類(lèi)非常的強(qiáng)大,我們可以對(duì)他進(jìn)行很多的設(shè)置,用對(duì)于消費(fèi)者的配置項(xiàng),這個(gè)類(lèi)都可以滿足。它有監(jiān)聽(tīng)單個(gè)或多個(gè)隊(duì)列、自動(dòng)啟動(dòng)、自動(dòng)聲明功能。

  • 它可以設(shè)置事務(wù)特性、事務(wù)管理器、事務(wù)屬性、事務(wù)并發(fā)、是否開(kāi)啟事務(wù)、回滾消息等。但是我們?cè)趯?shí)際生產(chǎn)中,很少使用事務(wù),基本都是采用補(bǔ)償機(jī)制。

  • 它可以設(shè)置消費(fèi)者數(shù)量、最小最大數(shù)量、批量消費(fèi)。

  • 它可以設(shè)置消息確認(rèn)和自動(dòng)確認(rèn)模式、是否重回隊(duì)列、異常捕獲 Handler 函數(shù)。

  • 它可以設(shè)置消費(fèi)者標(biāo)簽生成策略、是否獨(dú)占模式、消費(fèi)者屬性等。

  • 它還可以設(shè)置具體的監(jiān)聽(tīng)器、消息轉(zhuǎn)換器等等。

    注意: SimpleMessageListenerContainer 可以進(jìn)行動(dòng)態(tài)設(shè)置,比如在運(yùn)行中的應(yīng)用可以動(dòng)態(tài)的修改其消費(fèi)者數(shù)量的大小、接收消息的模式等。

很多基于 rabbitMQ 的自制定化后端管控臺(tái)在進(jìn)行設(shè)置的時(shí)候,也是根據(jù)這一去實(shí)現(xiàn)的。所以可以看出 SpringAMQP 非常的強(qiáng)大。

代碼示例:

代碼地址:  https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項(xiàng)目下

在上一節(jié)的 SpringAMQP 之 RabbitTemplate 的 RabbitMQConfig 原有代碼的基礎(chǔ)上加上

    @Bean   //connectionFactory 也是要和最上面方法名保持一致
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003());    //監(jiān)聽(tīng)的隊(duì)列
        container.setConcurrentConsumers(1);    //當(dāng)前的消費(fèi)者數(shù)量
        container.setMaxConcurrentConsumers(5); //  最大的消費(fèi)者數(shù)量
        container.setDefaultRequeueRejected(false); //是否重回隊(duì)列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
        container.setExposeListenerChannel(true);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {    //消費(fèi)端的標(biāo)簽策略
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });


        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                log.info("----------消費(fèi)者: " + msg);
            }
        });
        return container;
    }

單元測(cè)試?yán)锩孢€是繼續(xù)使用上一次的代碼

    @Test
    public void testSendMessage2() throws Exception {
        //1 創(chuàng)建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq 消息1234   --spring.abc".getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.abc", message);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!   -spring.amqp");
        rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!  -rabbit.abc");
    }

運(yùn)行 test 看日志,綁定的三個(gè)隊(duì)列都能被消費(fèi)


image

自此,SimpleMessageListenerContainer 的簡(jiǎn)單使用就介紹完畢, container 里面還有很多方法等著你來(lái)使用,我就不繼續(xù)介紹了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容