消息模式
一般來說,消息隊(duì)列有兩種模式:
- 生產(chǎn)者消費(fèi)者模式(Queue);
image.png
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息。
消息被消費(fèi)以后,queue中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。
- 一種是發(fā)布者訂閱者模式(Topic);
image.png
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)。
利用redis這兩種場(chǎng)景的消息隊(duì)列都能實(shí)現(xiàn)。
Queue 模式
生產(chǎn)者生產(chǎn)消息放到隊(duì)列中,多個(gè)消費(fèi)者同時(shí)監(jiān)聽隊(duì)列,誰先搶到消息誰就會(huì)從隊(duì)列中取走消息,即對(duì)于每個(gè)消息最多只能被一個(gè)消費(fèi)者擁有。
具體的方法就是創(chuàng)建一個(gè)任務(wù)隊(duì)列,生產(chǎn)者主動(dòng)lpush消息,而消費(fèi)者去rpop數(shù)據(jù)。但是這樣存在一個(gè)問題,就是消費(fèi)者需要主動(dòng)去請(qǐng)求數(shù)據(jù),周期性的請(qǐng)求會(huì)造成資源的浪費(fèi)。如果可以實(shí)現(xiàn)一旦有新消息加入隊(duì)列就通知消費(fèi)者就好了,這時(shí)借助brpop命令就可以實(shí)現(xiàn)這樣的需求。brpop和rpop命令相似,唯一區(qū)別就是當(dāng)列表中沒有元素時(shí),brpop命令會(huì)一直阻塞住連接,直到有新元素加入。
package com.hand.hap.message;
import redis.clients.jedis.Jedis;
import java.util.List;
public class TestQueue {
static Jedis jedis = new Jedis("localhost", 6379);
public static void main(String[] args) throws Exception {
TestQueue test = new TestQueue();
Thread thred2 = new Thread(test.runnable, "消息發(fā)送者線程");
thred2.start();
while (true) {
Thread.currentThread().sleep(1);
// 設(shè)置超時(shí)時(shí)間為0,表示無限期阻塞
List<String> message = jedis.brpop(0, "queue1");
System.out.println(message.toString());
}
}
Runnable runnable = () -> {
Long count = 0L;
while (count < 10) {
System.out.println(Thread.currentThread().getName());
count++;
jedis.lpush("queue1", "message: hello redis queue" + count);
}
};
}
分別在命令行下執(zhí)行以下命令,結(jié)果如下
brpop queue1 0
pop queue1 0
image.png
發(fā)布者訂閱者模式
發(fā)布者生產(chǎn)消息放到隊(duì)列里,多個(gè)監(jiān)聽隊(duì)列的訂閱者都會(huì)受到同一份消息,訂閱者可以訂閱多個(gè)Topic。
package com.hand.hap.message;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import java.util.Date;
public class TestTopic {
static Jedis jedis = new Jedis("localhost", 6379);
public static void main(String[] args) throws Exception {
MessageHandler n = new MessageHandler();
Thread thread1 = new Thread(n);
thread1.start();
Thread thread2 = new Thread(n);
thread2.start();
Thread thread3 = new Thread(n);
thread3.start();
Thread.currentThread().sleep(1000);
// 向“channel1”的頻道發(fā)送消息, 返回訂閱者的數(shù)量
Long publishCount = jedis.publish("channel1", new Date() + ": hello redis channel1");
System.out.println("發(fā)送成功,該頻道有" + publishCount + "個(gè)訂閱者");
jedis.publish("channel1", "close channel");
}
}
class MessageHandler extends JedisPubSub implements Runnable {
/**
* channel頻道接收到新消息后,執(zhí)行的邏輯
*
* @param channel
* @param message
*/
@Override
public void onMessage(String channel, String message) {
// 執(zhí)行邏輯
System.out.println(channel + "頻道發(fā)來消息:" + message);
// 如果消息為 close channel, 則取消此頻道的訂閱
if ("close channel".equals(message)) {
this.unsubscribe(channel);
}
}
/**
* channel頻道有新的訂閱者時(shí)執(zhí)行的邏輯
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "頻道新增了" + subscribedChannels + "個(gè)訂閱者");
}
/**
* channel頻道有訂閱者退訂時(shí)執(zhí)行的邏輯
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "頻道退訂成功");
}
@Override
public void run() {
MessageHandler handler = new MessageHandler();
Jedis jedis = new Jedis("localhost", 6379);
jedis.subscribe(handler, "channel1");
/**
* 使用下面會(huì)報(bào)錯(cuò)
* ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
*/
// TestTopic.jedis.subscribe(handler, "channel1");
}
}
Spring繼承
本章節(jié)要介紹 Redis兩種消息隊(duì)列模式在Spring中的應(yīng)用,依賴于spring-data-redis。所以在此之前,簡(jiǎn)單的了解以下spring-data-redis。
RedisMessageListenerContainer
RedisMessageListenerContainer在spring-data-redis中負(fù)責(zé)消息監(jiān)聽??蛻舫绦蛐枰约簩?shí)現(xiàn)MessageListener,并以指定的topic注冊(cè)到RedisMessageListenerContainer,這樣,在指定的topic上如果有消息,RedisMessageListenerContainer便會(huì)通知該MessageListener。好了,有關(guān)于對(duì) spring-data-redis 的了解到這里就可以結(jié)束了,因?yàn)楸疚牡闹饕康氖墙榻BRedis兩種消息隊(duì)列模式在Hap中的應(yīng)用,其它的可以根據(jù)自己興趣選擇看或者不看。
配置文件
Hap中提供了兩種消息隊(duì)列支持:Redis 和 RabbitMQ。相對(duì)來說 RabbitMQ 相對(duì) Redis 更重量級(jí)一些,如果只是一些簡(jiǎn)單的業(yè)務(wù)場(chǎng)景,使用 Redis 作為消息隊(duì)列也足夠了。RabbitMQ不屬本文的講解范文,所以主要看看 Redis 消息隊(duì)列的配置文件。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<bean id="mapSerializer" class="org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer">
<constructor-arg type="java.lang.Class" value="java.util.HashMap"/>
<property name="objectMapper" ref="objectMapper"/>
</bean>
<!-- 發(fā)布消息工具類 -->
<bean class="com.hand.hap.message.impl.MessagePublisherImpl"/>
<!--發(fā)布/訂閱監(jiān)聽-->
<bean class="com.hand.hap.message.TopicListenerContainer">
<property name="connectionFactory" ref="v2redisConnectionFactory"/>
<property name="recoveryInterval" value="10000"/>
<!--<property name="messageListeners" ref="messageListeners"/>-->
</bean>
<bean id="simpleQueueConsumer" class="com.hand.hap.message.impl.SimpleMessageConsumer"/>
<!--隊(duì)列監(jiān)聽-->
<bean class="com.hand.hap.message.QueueListenerContainer">
<property name="connectionFactory" ref="v2redisConnectionFactory"/>
<property name="recoveryInterval" value="5000"/>
<property name="stringRedisSerializer" ref="stringRedisSerializer"/>
<property name="listeners">
<list>
<!-- auto detect bean with annotation QueueMonitor -->
</list>
</property>
</bean>
</beans>
v2redisConnectionFactory 在application-redis.x配置文件中已經(jīng)定義過了,這里沒必要刨根問底這些配置項(xiàng)對(duì)應(yīng)的的各個(gè)含義,沒有太大意思。SimpleMessageConsumer 在這里沒什么做哦有那個(gè),所以,這里主要關(guān)注的內(nèi)容有個(gè):MessagePublisherImpl、TopicListenerContainer、、QueueListenerContainer。除了MessagePublisherImpl是和消息發(fā)布有關(guān),其它兩個(gè)都是屬于消息監(jiān)聽,剛好對(duì)應(yīng)Redis消息隊(duì)列的兩種模式,TopicListenerContainer 對(duì)應(yīng) Topic 模式, QueueListenerContainer對(duì)應(yīng) Queue模式。
MessagePublisherImpl
這其實(shí)是一個(gè)發(fā)布消息工具類。不太清楚為什么要在配置文件里面配置,加個(gè)注解不是會(huì)更好嗎?或許也是為了方便我們理解吧。其源碼如下
/*
* #{copyright}#
*/
package com.hand.hap.message.impl;
import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hand.hap.message.IMessagePublisher;
@Component
public class MessagePublisherImpl implements IMessagePublisher {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private Logger logger = LoggerFactory.getLogger(MessagePublisherImpl.class);
@Override
public void publish(String channel, Object message) {
//添加前綴
channel = ChannelAndQueuePrefix.addPrefix(channel);
if (message instanceof String || message instanceof Number) {
redisTemplate.convertAndSend(channel, message.toString());
} else {
try {
redisTemplate.convertAndSend(channel, objectMapper.writeValueAsString(message));
} catch (JsonProcessingException e) {
if (logger.isErrorEnabled()) {
logger.error("publish message failed.", e);
}
}
}
}
@Override
public void rPush(String list, Object message) {
message(list, message);
}
@Override
public void message(String name, Object message) {
if (message instanceof String || message instanceof Number) {
redisTemplate.opsForList().rightPush(name, message.toString());
} else {
try {
redisTemplate.opsForList().rightPush(name, objectMapper.writeValueAsString(message));
} catch (JsonProcessingException e) {
if (logger.isErrorEnabled()) {
logger.error("push data failed.", e);
}
}
}
}
}
發(fā)布消息的時(shí)候,也是調(diào)用spring-data-redis的API,這里就不過多說明。
TopicListenerContainer
這個(gè)類要具體分析,起源嗎如下:
/*
* Copyright Hand China Co.,Ltd.
*/
package com.hand.hap.message;
import com.hand.hap.core.AppContextInitListener;
import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author shengyang.zhou@hand-china.com
*/
public class TopicListenerContainer extends RedisMessageListenerContainer implements AppContextInitListener {
private Logger logger = LoggerFactory.getLogger(TopicListenerContainer.class);
@Override
public boolean isAutoStartup(){
return false;
}
@Override
public void contextInitialized(ApplicationContext applicationContext) {
Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>();
monitors.forEach((k, v) -> {
Class<?> clazz = v.getClass();
TopicMonitor tm = clazz.getAnnotation(TopicMonitor.class);
String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
List<Method> avaMethods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
if (avaMethods.isEmpty()) {
if (logger.isErrorEnabled()) {
logger.error("can not find proper method of name '{}' for bean {}", mn, v);
}
return;
}
SimpleMessageListener adaptor = new SimpleMessageListener(v, avaMethods.get(0));
List<Topic> topics = new ArrayList<>();
for (String t : tm.channel()) {
//添加前綴
t = ChannelAndQueuePrefix.addPrefix(t);
Topic topic = new PatternTopic(t);
topics.add(topic);
}
listeners.put(adaptor, topics);
});
setMessageListeners(listeners);
// start(); // auto call
// if (listeners != null) {
// for (ITopicMessageListener receiver : listeners) {
// MessageListenerAdapter messageListener = new MessageListenerAdapter(receiver, "onTopicMessage");
// if (receiver.getRedisSerializer() != null) {
// messageListener.setSerializer(receiver.getRedisSerializer());
// }
// messageListener.afterPropertiesSet();
// List<Topic> topics = new ArrayList<>();
// for (String t : receiver.getTopic()) {
// Topic topic = new PatternTopic(t);
// topics.add(topic);
// }
// listeners.put(messageListener, topics);
// }
// }
}
private static class SimpleMessageListener implements MessageListener {
private RedisSerializer redisSerializer;
private Object target;
private Method method;
private Logger logger;
SimpleMessageListener(Object target, Method method) {
this.target = target;
this.method = method;
Class p0 = method.getParameterTypes()[0];
redisSerializer = MethodReflectUtils.getProperRedisSerializer(p0);
logger = LoggerFactory.getLogger(target.getClass());
}
@Override
public void onMessage(Message message, byte[] pattern) {
try {
Object obj = redisSerializer.deserialize(message.getBody());
String p = new String(pattern, "UTF-8");
//去掉前綴
p = ChannelAndQueuePrefix.removePrefix(p);
method.invoke(target, obj, p);
} catch (Exception e) {
Throwable thr = e;
while (thr.getCause() != null) {
thr = thr.getCause();
}
if (logger.isErrorEnabled()) {
logger.error(thr.getMessage(), thr);
}
}
}
}
}
實(shí)現(xiàn)了 AppContextInitListener 類,所以在項(xiàng)目啟動(dòng)的時(shí)候 contextInitialized 方法將被調(diào)用。整體思路就是:在項(xiàng)目啟動(dòng)的時(shí)候,根據(jù)注解找到所以的 訂閱者,然后維護(hù)“訂閱者”和“主題”的關(guān)系,然后交給消息監(jiān)聽器。
Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
就是通過這行代碼找到所有的消息訂閱者。TopicMonitor 注解如下,包括兩個(gè)屬性:channel 和 method。channel 就是主題,;可能大家對(duì)method 的作用不是很理解,我們知道,實(shí)現(xiàn)了 MessageListener 的消息監(jiān)聽器,在收到消息的時(shí)候,會(huì)執(zhí)行 onMessage() 方法,這個(gè) method ,就是為了讓消息監(jiān)聽器收到消息時(shí)可以執(zhí)行別的方法,而這個(gè)方法名就是通過 method 屬性定義。
package com.hand.hap.message;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicMonitor {
String[] channel() default {};
/**
* default empty,auto detect depends on object type.
* <p>
* IQueueMessageListener:onQueueMessage<br>
* OTHERS:onMessage
*
*/
String method() default "";
}
獲取到所有的消息執(zhí)訂閱者之后,然后進(jìn)行遍歷,下面就是獲取方法名的具體邏輯
String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
public static String getTopicMethodName(String mn, Object target) {
if (org.apache.commons.lang.StringUtils.isBlank(mn)) {
if (target instanceof ITopicMessageListener) {
mn = ITopicMessageListener.DEFAULT_METHOD_NAME;
} else {
mn = IMessageConsumer.DEFAULT_METHOD_NAME;
}
}
return mn;
}
也就是說,如果代碼中是同通過 實(shí)現(xiàn) ITopicMessageListener 來實(shí)現(xiàn)消息監(jiān)聽的話,則會(huì)執(zhí)行 onTopicMessage 方法,或者會(huì)執(zhí)行 onMessage 方法。
image.png
image.png
如上所示,在獲取到方法名之后,組裝 "消息訂閱者" 和 "Topic" 為Map對(duì)象, 因?yàn)?一個(gè) 訂閱者 可以訂閱多個(gè) Topic,所以是一對(duì)多的關(guān)系。組裝之后,然后將 listeners 交給 spring-data-redi管理,最后通過反射執(zhí)行具體的邏輯代碼。
QueueListenerContainer
QueueListenerContainer沒有繼承RedisMessageListenerContainer,所以它的實(shí)現(xiàn)方式有些不同,相當(dāng)于是框架為我們封裝了一套API,其具體實(shí)現(xiàn)如下:
/*
* #{copyright}#
*/
package com.hand.hap.message;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import com.hand.hap.core.AppContextInitListener;
import redis.clients.jedis.Jedis;
public class QueueListenerContainer implements AppContextInitListener, DisposableBean, SmartLifecycle {
private Logger logger = LoggerFactory.getLogger(QueueListenerContainer.class);
private RedisConnectionFactory connectionFactory;
private static final int PHASE = 9999;
private static final long MIN_RECOVERY_INTERVAL = 2000L;
private static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
/**
* 100ms.
*/
private static final long IDLE_SLEEP_TIME = 100L;
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
private volatile boolean running = false;
private ExecutorService executorService;
private List<IQueueMessageListener<?>> listeners;
private List<MonitorTask> monitorTaskList = new ArrayList<>();
private RedisSerializer<String> stringRedisSerializer;
public RedisConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public long getRecoveryInterval() {
return recoveryInterval;
}
public void setRecoveryInterval(long recoveryInterval) {
this.recoveryInterval = recoveryInterval;
if (recoveryInterval < MIN_RECOVERY_INTERVAL) {
if (logger.isWarnEnabled()) {
logger.warn("minimum for recoveryInterval is {}", MIN_RECOVERY_INTERVAL);
}
this.recoveryInterval = MIN_RECOVERY_INTERVAL;
}
}
public List<IQueueMessageListener<?>> getListeners() {
return listeners;
}
public void setListeners(List<IQueueMessageListener<?>> listeners) {
this.listeners = listeners;
}
public RedisSerializer<String> getStringRedisSerializer() {
return stringRedisSerializer;
}
@Autowired
public void setStringRedisSerializer(RedisSerializer<String> stringRedisSerializer) {
this.stringRedisSerializer = stringRedisSerializer;
}
@Override
public void destroy() throws Exception {
stop();
}
@Override
public void contextInitialized(ApplicationContext applicationContext) {
if (listeners == null) {
listeners = new ArrayList<>();
}
Map<String, Object> lts = applicationContext.getBeansWithAnnotation(QueueMonitor.class);
lts.forEach((k, v) -> {
Class clazz = v.getClass();
QueueMonitor qm = (QueueMonitor) clazz.getAnnotation(QueueMonitor.class);
final String queue = qm.queue();
String mn = MethodReflectUtils.getQueueMethodName(qm.method(), v);
List<Method> methods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
if (methods.isEmpty()) {
if (logger.isErrorEnabled()) {
logger.error("can not find proper method of name '{}' for bean {}", mn, v);
}
return;
}
final Method method = methods.get(0);
IQueueMessageListener qml = new SimpleQueueListener(queue, v, method);
listeners.add(qml);
});
executorService = Executors.newFixedThreadPool(listeners.size());
for (IQueueMessageListener<?> receiver : listeners) {
MonitorTask task = new MonitorTask(receiver);
monitorTaskList.add(task);
executorService.execute(task);
}
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (!running) {
running = true;
if (logger.isDebugEnabled()) {
logger.debug("startup success");
}
}
}
@Override
public void stop() {
if (isRunning()) {
running = false;
monitorTaskList.forEach(MonitorTask::stop);
executorService.shutdownNow();
if (logger.isDebugEnabled()) {
logger.debug("shutdown complete");
}
}
}
@Override
public boolean isRunning() {
return running;
}
@Override
public int getPhase() {
return PHASE;
}
private static class SimpleQueueListener implements IQueueMessageListener {
private String queue;
private Object target;
private Method method;
private RedisSerializer redisSerializer;
private Logger logger;
SimpleQueueListener(String queue, Object target, Method method) {
this.queue = queue;
this.target = target;
this.method = method;
this.redisSerializer = MethodReflectUtils.getProperRedisSerializer(method.getParameterTypes()[0]);
this.logger = LoggerFactory.getLogger(target.getClass());
}
@Override
public String getQueue() {
return queue;
}
@Override
public RedisSerializer getRedisSerializer() {
return redisSerializer;
}
@Override
public void onQueueMessage(Object message, String queue) {
try {
method.invoke(target, message, queue);
} catch (Exception e) {
Throwable thr = e;
while (thr.getCause() != null) {
thr = thr.getCause();
}
if (logger.isErrorEnabled()) {
logger.error(thr.getMessage(), thr);
}
}
}
}
/**
*
* @param <T>
*/
private class MonitorTask<T> implements SchedulingAwareRunnable {
private IQueueMessageListener<T> receiver;
private RedisConnection connection;
private boolean running = false;
MonitorTask(IQueueMessageListener<T> receiver) {
this.receiver = receiver;
Assert.notNull(receiver, "receiver is null.");
Assert.hasText(receiver.getQueue(), "queue is not valid");
}
public void stop() {
running = false;
safeClose(true);
}
@Override
public void run() {
running = true;
T message;
while (running) {
try {
if (connection == null) {
connection = connectionFactory.getConnection();
}
message = fetchMessage(connection, receiver.getQueue());
if (message == null) {
sleep_(IDLE_SLEEP_TIME);
continue;
}
} catch (Throwable thr) {
if (!running) {
break;
}
safeClose();
if (logger.isDebugEnabled()) {
logger.error("exception occurred while get message from queue [" + receiver.getQueue() + "]",
thr);
logger.debug("try recovery after {}ms", getRecoveryInterval());
}
sleep_(getRecoveryInterval());
continue;
}
try {
receiver.onQueueMessage(message, receiver.getQueue());
} catch (Throwable thr) {
if (logger.isWarnEnabled()) {
logger.warn("exception occurred while receiver consume message.", thr);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("stop monitor:" + this);
}
safeClose();
}
T fetchMessage(RedisConnection connection, String queue) {
List<byte[]> bytes = connection.bLPop(0, stringRedisSerializer.serialize(queue));
if (bytes == null || bytes.isEmpty()) {
return null;
}
return receiver.getRedisSerializer().deserialize(bytes.get(1));
}
void safeClose(boolean... closeNative) {
if (connection != null) {
try {
if (closeNative.length > 0 && closeNative[0]) {
// close native connection to interrupt blocked
// operation
((Jedis) connection.getNativeConnection()).disconnect();
}
connection.close();
} catch (Exception e) {
// if (logger.isErrorEnabled()) {
// logger.error(e.getMessage(), e);
// }
}
}
connection = null;
}
void sleep_(long time) {
try {
Thread.sleep(time);
} catch (Exception e) {
// if (logger.isErrorEnabled()) {
// logger.error(e.getMessage(), e);
// }
}
}
@Override
public boolean isLongLived() {
return true;
}
}
}
前面的實(shí)現(xiàn)基本類似,區(qū)別如果實(shí)現(xiàn)了IQueueMessageListener 類,消息監(jiān)聽器收到消息的時(shí)候會(huì)執(zhí)行 onQueueMessage 方法;另一個(gè)區(qū)別就是這里用了另外一個(gè) 注解 QueueMonitor
package com.hand.hap.message;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueMonitor {
String queue() default "";
/**
* default empty,auto detect depends on object type.
* <p>
* ITopicMessageListener:onTopicMessage<br>
* OTHERS:onMessage
*
*/
String method() default "";
}
image.png
image.png
獲取到所有的消息監(jiān)聽器之后,開啟多線程執(zhí)行消息監(jiān)聽。因?yàn)榇嬖诙鄠€(gè)消費(fèi)者監(jiān)聽同一個(gè)隊(duì)列的情況,存在消費(fèi)者競(jìng)爭(zhēng),所以需要判斷一下消息是否還在。
使用方式
package com.hand.hap.activiti.manager;
import com.hand.hap.activiti.util.ActivitiUtils;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.Position;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.mapper.PositionMapper;
import com.hand.hap.message.IMessageConsumer;
import com.hand.hap.message.TopicMonitor;
import org.activiti.engine.identity.Group;
import org.activiti.engine.impl.persistence.entity.UserEntity;
import org.activiti.engine.impl.persistence.entity.data.impl.MybatisUserDataManager;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author shengyang.zhou@hand-china.com
*/
@TopicMonitor(channel = "employee.change")
public class CustomUserDataManager extends MybatisUserDataManager
implements IMessageConsumer<Employee>, InitializingBean {
@Autowired
private PositionMapper positionMapper;
@Autowired
private EmployeeMapper employeeMapper;
public Map<String, UserEntity> userCache = new HashMap<>();
@Autowired
private SpringProcessEngineConfiguration pec;
public CustomUserDataManager() {
super(null);
}
@Override
public List<Group> findGroupsByUser(String userId) {
List<Position> positions = positionMapper.getPositionByEmployeeCode(userId);
List<Group> gs = new ArrayList<>();
for (Position position : positions) {
gs.add(ActivitiUtils.toActivitiGroup(position));
}
return gs;
}
/**
* 這個(gè)方法使用非常頻繁,做緩存支持
*
* @param entityId
* @return
*/
@Override
public UserEntity findById(String entityId) {
UserEntity userEntity = userCache.get(entityId);
if (userEntity == null) {
Employee employee = employeeMapper.queryByCode(entityId);
userEntity = ActivitiUtils.toActivitiUser(employee);
userCache.put(entityId, userEntity);
}
return userEntity;
}
@Override
public void onMessage(Employee message, String pattern) {
userCache.remove(message.getEmployeeCode());
}
@Override
public void afterPropertiesSet() throws Exception {
this.processEngineConfiguration = pec;
}
}
package com.hand.hap.hr.service.impl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.StringUtil;
import com.hand.hap.account.dto.User;
import com.hand.hap.account.dto.UserRole;
import com.hand.hap.account.mapper.UserRoleMapper;
import com.hand.hap.account.service.IUserRoleService;
import com.hand.hap.account.service.IUserService;
import com.hand.hap.cache.impl.UserCache;
import com.hand.hap.core.IRequest;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.UserAndRoles;
import com.hand.hap.hr.mapper.EmployeeAssignMapper;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.service.IEmployeeService;
import com.hand.hap.message.IMessagePublisher;
import com.hand.hap.message.TopicMonitor;
import com.hand.hap.mybatis.common.Criteria;
import com.hand.hap.system.dto.DTOStatus;
import com.hand.hap.system.service.impl.BaseServiceImpl;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Service
public class EmployeeServiceImpl extends BaseServiceImpl<Employee> implements IEmployeeService {
@Autowired
EmployeeMapper employeeMapper;
@Autowired
private IMessagePublisher messagePublisher;
@Autowired
private IUserRoleService userRoleService;
@Autowired
private IUserService userService;
@Autowired
private UserRoleMapper userRoleMapper;
@Autowired
private EmployeeAssignMapper employeeAssignMapper;
@Autowired
private UserCache userCache;
@Autowired
private ApplicationContext applicationContext;
@Override
public List<Employee> submit(IRequest request, List<Employee> list) {
self().batchUpdate(request, list);
for (Employee e : list) {
messagePublisher.publish("employee.change", e);
}
return list;
}
.......
}
當(dāng)在界面修改了員工信息的時(shí)候,會(huì)調(diào)用 messagePublisher.publish("employee.change", e); 方法,因?yàn)?CustomUserDataManager 監(jiān)聽了 "employee.change" 主題的消息,所以在發(fā)布之后,會(huì)執(zhí)行CustomUserDataManager .onMessage 的方法,消息隊(duì)列的使用方式與之類似。
思考
在Hap中,定義過多的 Queue 消息監(jiān)聽器會(huì)影響系統(tǒng)性能嗎?一個(gè)消息監(jiān)聽器就要開啟一個(gè)線程,如果消息監(jiān)聽器太多,線程池夠用嗎?不知道自己的思路有沒有問題,值得驗(yàn)證。








