消息隊列實現(xiàn)
支持的消息隊列
- ActiveMq
- RabbitMq
- RocketMq
- Kafka
各個隊列實現(xiàn)隊列與廣播模式的方法
ActiveMq
添加依賴(用的是SpringBoot提供的jms包)
<!--activemq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置文件
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
password: admin
user: admin
實現(xiàn)發(fā)送對象
ActiveMq要實現(xiàn)發(fā)送對象的話,需要注入序列化類
@Bean(name = "jacksonJmsMessageConverter")
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
然后這個序列化類會替換掉默認(rèn)的,而且jackson也是支持字符串的
隊列模式
ActiveMq默認(rèn)的為隊列模式,發(fā)送實現(xiàn)為
jmsTemplate.convertAndSend("heyu-test-string", "heyu-test");
jmsTemplate.convertAndSend("heyu-test-object", new User("jack", 25));
消費(fèi)實現(xiàn)為
@JmsListener(destination = "heyu-test-string")
public void stringListener(String message) {
log.info(message);
}
@JmsListener(destination = "heyu-test-object")
public void objectListener(User user) {
log.info(JsonUtil.obj2json(user));
}
廣播模式
要實現(xiàn)廣播模式,需要先注入廣播模式的JmsListenerContainerFactory, 序列化對象也是jackson
@Bean(name = "topicFactory")
public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configure) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configure.configure(factory, connectionFactory);
// 是否為訂閱注冊模式
factory.setPubSubDomain(true);
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
發(fā)送實現(xiàn)為
Topic heyuTopicString = () -> "heyu-topic-string";
Topic heyuTopicObject = () -> "heyu-topic-object";
jmsTemplate.send(heyuTopicString, session -> jacksonJmsMessageConverter.toMessage("topic-string", session));
jmsTemplate.send(heyuTopicObject, session -> jacksonJmsMessageConverter.toMessage(new User("jack", 26), session));
消費(fèi)實現(xiàn)為
@JmsListener(destination = "heyu-topic-string", containerFactory = "topicFactory")
public void topicString1(String user) {
log.info(user);
}
@JmsListener(destination = "heyu-topic-object", containerFactory = "topicFactory")
public void topicObject1(User user) {
log.info(JsonUtil.obj2json(user));
}
RabbitMq
添加依賴(springboot的amqp包)
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 3456
username: admin
password: 123456
隊列配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonConverter());
return template;
}
/**
* 只要注入MessageConverter,就會替換掉默認(rèn)的,要實現(xiàn)發(fā)送對象,必須先注入新的序列化對象
*
* @return jackson序列化方式
*/
@Bean
public MessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 隊列需要優(yōu)先注入創(chuàng)建,否則廣播接受的時候會報錯
*/
@Bean
public Queue a() {
return new Queue("heyu-queue-string", true);
}
@Bean
public Queue b() {
return new Queue("heyu-queue-object", true);
}
@Bean
public Queue c() {
return new Queue("heyu-fanout-object1", true);
}
@Bean
public Queue s() {
return new Queue("heyu-fanout-object2", true);
}
@Bean
public Queue f() {
return new Queue("heyu-fanout-string1", true);
}
@Bean
public Queue g() {
return new Queue("heyu-fanout-string2", true);
}
隊列模式
發(fā)送實現(xiàn)
rabbitTemplate.convertAndSend("heyu-queue-string", "heyu-string");
rabbitTemplate.convertAndSend("heyu-queue-object", new User("heyu", 25));
接收實現(xiàn)
@RabbitListener(queues = "heyu-queue-string")
public void test1(String str) {
log.info(str);
}
@RabbitListener(queues = "heyu-queue-object")
public void test2(User user) {
log.info(JsonUtil.obj2json(user));
}
廣播模式
發(fā)送實現(xiàn)
rabbitTemplate.convertAndSend("heyu-fanout-string", "", "heyu-string");
rabbitTemplate.convertAndSend("heyu-fanout-object", "", new User("heyu", 25));
接收實現(xiàn)
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-string1", durable = "true"), exchange = @Exchange(value = "heyu-fanout-string", type = "fanout")))
public void test3(String str) {
log.info(str);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-string2", durable = "true"), exchange = @Exchange(value = "heyu-fanout-string", type = "fanout")))
public void test5(String str) {
log.info(str);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-object1", durable = "true"), exchange = @Exchange(value = "heyu-fanout-object", type = "fanout")))
public void test4(User user) {
log.info(JsonUtil.obj2json(user));
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "heyu-fanout-object2", durable = "true"), exchange = @Exchange(value = "heyu-fanout-object", type = "fanout")))
public void test6(User user) {
log.info(JsonUtil.obj2json(user));
}
RocketMq
添加依賴
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
配置文件
rocketmq:
name-server: 1270.0.1:9876
producer:
group: heyu-producer
sendMessageTimeout: 300000
隊列廣播實現(xiàn)
RocketMq用組的方式管理消息,沒有具體的隊列與廣播方式的區(qū)別,同一個topic下,相同的組消費(fèi)同一條消息,不同的組共享消息,而且在同一個服務(wù)下,不能存在相同的組,如果同一個組要添加消費(fèi)者,只能通過集群的方式來實現(xiàn)
發(fā)送實現(xiàn)
rocketMQTemplate.syncSend(stringTopic, "stringTopic1");
rocketMQTemplate.syncSend(objectTopic, new User("heyu", 25));
接受實現(xiàn)
@Service
@RocketMQMessageListener(topic = "${heyu.rocketmq.topic.string}", consumerGroup = "heyu-string1")
public class ConsumeStringLister implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info(message);
}
}
@Service
@RocketMQMessageListener(topic = "${heyu.rocketmq.topic.object}", consumerGroup = "heyu-object1")
public class ConsumeObjectLister implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
log.info(JsonUtil.obj2json(message));
}
}
Kafka
添加依賴
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置文件
kafka:
bootstrap-server: 127.0.0.1:9092
隊列配置
生產(chǎn)者
@Component
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-server}")
private String bootstrapServer;
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
private ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(10);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
消費(fèi)者
@Component
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-server}")
private String bootstrapServer;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
隊列廣播實現(xiàn)
kafka與rocketMq一樣,都是通過組來實現(xiàn)隊列與廣播,而且邏輯完全相同,只不過kafka沒有一個服務(wù)內(nèi)不能有相同組的限制
發(fā)送實現(xiàn)
kafkaTemplate.send("heyu-string", "heyu-string");
接受實現(xiàn)
@KafkaListener(topics = "heyu-string", groupId = "heyu1")
public void string(String message){
log.info(message);
}
@KafkaListener(topics = "heyu-string", groupId = "heyu2")
public void string2(String message){
log.info(message);
}