通用的消息隊列(redis,kafka,rabbitmq)--消費者篇

上篇我寫了一個通用的消息隊列(redis,kafka,rabbitmq)--生產(chǎn)者篇,這次寫一個消費者篇.
1.消費者的通用調(diào)用類:


/**
 * 消息隊列處理的handle
 * @author starmark
 * @date 2020/5/1  上午10:56
 */
public interface IMessageQueueConsumerService {


    /**
     * 處理消息隊列的消息
     * @param message 消息
     */
    void receiveMessage(String message);

    /**
     * 返回監(jiān)聽的topic
     * @return 主題
     */
    String topic();

    /**
     *
     * @param consumerType 消費者類型
     * @return 是否支持該消費者類者
     */
    boolean support(String consumerType);
}

只要實現(xiàn)該類的接口就可以實現(xiàn)監(jiān)聽,
redis的消費端,有兩個類,如下:


/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRedisConsumerListener implements MessageListener {

    private IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message, byte[] pattern) {
        messageQueueConsumerService.receiveMessage(message.toString());
    }
}

/**
 * 消息隊列服務(wù)端的監(jiān)聽
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Service
public class MessageQueueRedisConsumerServiceFactory {


    private List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("redis")).collect(Collectors.toList());
    }

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
                    new MessageQueueRedisConsumerListener(messageQueueConsumerService));
            messageListenerAdapter.afterPropertiesSet();
            container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));

        });

        return container;
    }


}

kafka消費者也有兩個類,如下:


/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        messageQueueConsumerService.receiveMessage(data.value());
    }
}

/**
 * 消息隊列服務(wù)端的監(jiān)聽
 *
 * @author starmark
 * @date 2020/5/1  上午10:55
 */
@Component
public class MessageQueueKafkaConsumerServiceFactory  implements InitializingBean {

    @Autowired
    KafkaProperties kafkaProperties;

    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
    }




    private KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(props);
        return new KafkaMessageListenerContainer<>(cf, containerProps);
    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());

            containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
            );
            KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
            container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");

            container.start();

        });

    }
}

這些類都是實現(xiàn)動態(tài)監(jiān)聽某個主題.

rabbitmq就有點復(fù)雜,因為他要求建了queue才能實現(xiàn)監(jiān)聽,我現(xiàn)在這個代碼,如果生產(chǎn)者沒有創(chuàng)建隊列,會自動幫生產(chǎn)者創(chuàng)建該主題的隊列。其實這是不對的,但不這么做,無法實現(xiàn)監(jiān)聽.


/**
 * @author starmark
 * @date 2020/5/2  下午3:05
 */
public class MessageQueueRabbitmqConsumerListener implements MessageListener  {

    private final IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message) {

        messageQueueConsumerService.receiveMessage(new String(message.getBody()));
    }

}

@Component
public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {

    //自動注入RabbitTemplate模板類
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final ConfigurableApplicationContext applicationContext;
    private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
    private final ConnectionFactory connectionFactory;

    @Autowired
    public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
        this.applicationContext = applicationContext;
        this.connectionFactory = connectionFactory;

    }


    @Override
    public void afterPropertiesSet() {
        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {

            this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setConsumerStartTimeout(6000L);
        ;
            //設(shè)置監(jiān)聽的隊列名,
            String[] types = {messageQueueConsumerService.topic()};
            container.setQueueNames(types);
            container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
            container.start();
        });

    }


    private void registerBean(String name, Object... args) {
        if (applicationContext.containsBean(name)) {
            return;
        }
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
        if (args.length > 0) {
            for (Object arg : args) {
                beanDefinitionBuilder.addConstructorArgValue(arg);
            }
        }
        BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();

        BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
        beanFactory.registerBeanDefinition(name, beanDefinition);

    }
}

至此,通用的消息隊列已完成,這個只能滿足一般情況的使用 .
如果要更高端的使用,直接使用其原生的api會更好.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容