上篇我寫了一個通用的消息隊列(redis,kafka,rabbitmq)--生產(chǎn)者篇,這次寫一個消費者篇.
1.消費者的通用調(diào)用類:
/**
* 消息隊列處理的handle
* @author starmark
* @date 2020/5/1 上午10:56
*/
public interface IMessageQueueConsumerService {
/**
* 處理消息隊列的消息
* @param message 消息
*/
void receiveMessage(String message);
/**
* 返回監(jiān)聽的topic
* @return 主題
*/
String topic();
/**
*
* @param consumerType 消費者類型
* @return 是否支持該消費者類者
*/
boolean support(String consumerType);
}
只要實現(xiàn)該類的接口就可以實現(xiàn)監(jiān)聽,
redis的消費端,有兩個類,如下:
/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueRedisConsumerListener implements MessageListener {
private IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message, byte[] pattern) {
messageQueueConsumerService.receiveMessage(message.toString());
}
}
/**
* 消息隊列服務(wù)端的監(jiān)聽
*
* @author starmark
* @date 2020/5/1 上午10:55
*/
@Service
public class MessageQueueRedisConsumerServiceFactory {
private List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("redis")).collect(Collectors.toList());
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
new MessageQueueRedisConsumerListener(messageQueueConsumerService));
messageListenerAdapter.afterPropertiesSet();
container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));
});
return container;
}
}
kafka消費者也有兩個類,如下:
/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {
private final IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
messageQueueConsumerService.receiveMessage(data.value());
}
}
/**
* 消息隊列服務(wù)端的監(jiān)聽
*
* @author starmark
* @date 2020/5/1 上午10:55
*/
@Component
public class MessageQueueKafkaConsumerServiceFactory implements InitializingBean {
@Autowired
KafkaProperties kafkaProperties;
private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
@Autowired
public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
}
private KafkaMessageListenerContainer<Integer, String> createContainer(
ContainerProperties containerProps) {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(props);
return new KafkaMessageListenerContainer<>(cf, containerProps);
}
@Override
public void afterPropertiesSet() {
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());
containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
);
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");
container.start();
});
}
}
這些類都是實現(xiàn)動態(tài)監(jiān)聽某個主題.
rabbitmq就有點復(fù)雜,因為他要求建了queue才能實現(xiàn)監(jiān)聽,我現(xiàn)在這個代碼,如果生產(chǎn)者沒有創(chuàng)建隊列,會自動幫生產(chǎn)者創(chuàng)建該主題的隊列。其實這是不對的,但不這么做,無法實現(xiàn)監(jiān)聽.
/**
* @author starmark
* @date 2020/5/2 下午3:05
*/
public class MessageQueueRabbitmqConsumerListener implements MessageListener {
private final IMessageQueueConsumerService messageQueueConsumerService;
public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
this.messageQueueConsumerService = messageQueueConsumerService;
}
@Override
public void onMessage(Message message) {
messageQueueConsumerService.receiveMessage(new String(message.getBody()));
}
}
@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {
//自動注入RabbitTemplate模板類
@Autowired
private RabbitTemplate rabbitTemplate;
private final ConfigurableApplicationContext applicationContext;
private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
private final ConnectionFactory connectionFactory;
@Autowired
public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
this.applicationContext = applicationContext;
this.connectionFactory = connectionFactory;
}
@Override
public void afterPropertiesSet() {
messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setConsumerStartTimeout(6000L);
;
//設(shè)置監(jiān)聽的隊列名,
String[] types = {messageQueueConsumerService.topic()};
container.setQueueNames(types);
container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
container.start();
});
}
private void registerBean(String name, Object... args) {
if (applicationContext.containsBean(name)) {
return;
}
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
if (args.length > 0) {
for (Object arg : args) {
beanDefinitionBuilder.addConstructorArgValue(arg);
}
}
BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();
BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(name, beanDefinition);
}
}
至此,通用的消息隊列已完成,這個只能滿足一般情況的使用 .
如果要更高端的使用,直接使用其原生的api會更好.