SimpleMessageListenerContainer 即簡(jiǎn)單消息監(jiān)聽(tīng)容器。
這個(gè)類(lèi)非常的強(qiáng)大,我們可以對(duì)他進(jìn)行很多的設(shè)置,用對(duì)于消費(fèi)者的配置項(xiàng),這個(gè)類(lèi)都可以滿足。它有監(jiān)聽(tīng)單個(gè)或多個(gè)隊(duì)列、自動(dòng)啟動(dòng)、自動(dòng)聲明功能。
它可以設(shè)置事務(wù)特性、事務(wù)管理器、事務(wù)屬性、事務(wù)并發(fā)、是否開(kāi)啟事務(wù)、回滾消息等。但是我們?cè)趯?shí)際生產(chǎn)中,很少使用事務(wù),基本都是采用補(bǔ)償機(jī)制。
它可以設(shè)置消費(fèi)者數(shù)量、最小最大數(shù)量、批量消費(fèi)。
它可以設(shè)置消息確認(rèn)和自動(dòng)確認(rèn)模式、是否重回隊(duì)列、異常捕獲 Handler 函數(shù)。
它可以設(shè)置消費(fèi)者標(biāo)簽生成策略、是否獨(dú)占模式、消費(fèi)者屬性等。
-
它還可以設(shè)置具體的監(jiān)聽(tīng)器、消息轉(zhuǎn)換器等等。
注意: SimpleMessageListenerContainer 可以進(jìn)行動(dòng)態(tài)設(shè)置,比如在運(yùn)行中的應(yīng)用可以動(dòng)態(tài)的修改其消費(fèi)者數(shù)量的大小、接收消息的模式等。
很多基于 rabbitMQ 的自制定化后端管控臺(tái)在進(jìn)行設(shè)置的時(shí)候,也是根據(jù)這一去實(shí)現(xiàn)的。所以可以看出 SpringAMQP 非常的強(qiáng)大。
代碼示例:
代碼地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項(xiàng)目下
在上一節(jié)的 SpringAMQP 之 RabbitTemplate 的 RabbitMQConfig 原有代碼的基礎(chǔ)上加上
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //監(jiān)聽(tīng)的隊(duì)列
container.setConcurrentConsumers(1); //當(dāng)前的消費(fèi)者數(shù)量
container.setMaxConcurrentConsumers(5); // 最大的消費(fèi)者數(shù)量
container.setDefaultRequeueRejected(false); //是否重回隊(duì)列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消費(fèi)端的標(biāo)簽策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
log.info("----------消費(fèi)者: " + msg);
}
});
return container;
}
單元測(cè)試?yán)锩孢€是繼續(xù)使用上一次的代碼
@Test
public void testSendMessage2() throws Exception {
//1 創(chuàng)建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234 --spring.abc".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send! -spring.amqp");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send! -rabbit.abc");
}
運(yùn)行 test 看日志,綁定的三個(gè)隊(duì)列都能被消費(fèi)
自此,SimpleMessageListenerContainer 的簡(jiǎn)單使用就介紹完畢, container 里面還有很多方法等著你來(lái)使用,我就不繼續(xù)介紹了。