Redis Queue

消息模式

一般來說,消息隊(duì)列有兩種模式:

  • 生產(chǎn)者消費(fèi)者模式(Queue);
image.png

消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息。
消息被消費(fèi)以后,queue中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。

  • 一種是發(fā)布者訂閱者模式(Topic);
image.png

消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)。

利用redis這兩種場(chǎng)景的消息隊(duì)列都能實(shí)現(xiàn)。

Queue 模式

生產(chǎn)者生產(chǎn)消息放到隊(duì)列中,多個(gè)消費(fèi)者同時(shí)監(jiān)聽隊(duì)列,誰先搶到消息誰就會(huì)從隊(duì)列中取走消息,即對(duì)于每個(gè)消息最多只能被一個(gè)消費(fèi)者擁有。
具體的方法就是創(chuàng)建一個(gè)任務(wù)隊(duì)列,生產(chǎn)者主動(dòng)lpush消息,而消費(fèi)者去rpop數(shù)據(jù)。但是這樣存在一個(gè)問題,就是消費(fèi)者需要主動(dòng)去請(qǐng)求數(shù)據(jù),周期性的請(qǐng)求會(huì)造成資源的浪費(fèi)。如果可以實(shí)現(xiàn)一旦有新消息加入隊(duì)列就通知消費(fèi)者就好了,這時(shí)借助brpop命令就可以實(shí)現(xiàn)這樣的需求。brpop和rpop命令相似,唯一區(qū)別就是當(dāng)列表中沒有元素時(shí),brpop命令會(huì)一直阻塞住連接,直到有新元素加入。

package com.hand.hap.message;

import redis.clients.jedis.Jedis;

import java.util.List;

public class TestQueue {
    static Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) throws Exception {
        TestQueue test = new TestQueue();
        Thread thred2 = new Thread(test.runnable, "消息發(fā)送者線程");
        thred2.start();

        while (true) {
            Thread.currentThread().sleep(1);

            // 設(shè)置超時(shí)時(shí)間為0,表示無限期阻塞
            List<String> message = jedis.brpop(0, "queue1");

            System.out.println(message.toString());
        }
    }


    Runnable runnable = () -> {
        Long count = 0L;
        while (count < 10) {
            System.out.println(Thread.currentThread().getName());
            count++;
            jedis.lpush("queue1", "message: hello redis queue" + count);

        }
    };

}

分別在命令行下執(zhí)行以下命令,結(jié)果如下

brpop queue1 0

pop queue1 0
image.png

發(fā)布者訂閱者模式

發(fā)布者生產(chǎn)消息放到隊(duì)列里,多個(gè)監(jiān)聽隊(duì)列的訂閱者都會(huì)受到同一份消息,訂閱者可以訂閱多個(gè)Topic。

package com.hand.hap.message;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import java.util.Date;

public class TestTopic {
    static Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) throws Exception {
        MessageHandler n = new MessageHandler();
        Thread thread1 = new Thread(n);
        thread1.start();

        Thread thread2 = new Thread(n);
        thread2.start();

        Thread thread3 = new Thread(n);
        thread3.start();

        Thread.currentThread().sleep(1000);

        // 向“channel1”的頻道發(fā)送消息, 返回訂閱者的數(shù)量
        Long publishCount = jedis.publish("channel1", new Date() + ": hello redis channel1");
        System.out.println("發(fā)送成功,該頻道有" + publishCount + "個(gè)訂閱者");
        jedis.publish("channel1", "close channel");


    }
}


class MessageHandler extends JedisPubSub implements Runnable {

    /**
     * channel頻道接收到新消息后,執(zhí)行的邏輯
     *
     * @param channel
     * @param message
     */
    @Override
    public void onMessage(String channel, String message) {
        // 執(zhí)行邏輯
        System.out.println(channel + "頻道發(fā)來消息:" + message);
        // 如果消息為 close channel, 則取消此頻道的訂閱
        if ("close channel".equals(message)) {
            this.unsubscribe(channel);
        }
    }

    /**
     * channel頻道有新的訂閱者時(shí)執(zhí)行的邏輯
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(channel + "頻道新增了" + subscribedChannels + "個(gè)訂閱者");
    }


    /**
     * channel頻道有訂閱者退訂時(shí)執(zhí)行的邏輯
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(channel + "頻道退訂成功");
    }

    @Override
    public void run() {
        MessageHandler handler = new MessageHandler();
        Jedis jedis = new Jedis("localhost", 6379);
        jedis.subscribe(handler, "channel1");

        /**
         * 使用下面會(huì)報(bào)錯(cuò)
         * ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
         */
//        TestTopic.jedis.subscribe(handler, "channel1");
    }
}

Spring繼承

本章節(jié)要介紹 Redis兩種消息隊(duì)列模式在Spring中的應(yīng)用,依賴于spring-data-redis。所以在此之前,簡(jiǎn)單的了解以下spring-data-redis。

RedisMessageListenerContainer

RedisMessageListenerContainer在spring-data-redis中負(fù)責(zé)消息監(jiān)聽??蛻舫绦蛐枰约簩?shí)現(xiàn)MessageListener,并以指定的topic注冊(cè)到RedisMessageListenerContainer,這樣,在指定的topic上如果有消息,RedisMessageListenerContainer便會(huì)通知該MessageListener。好了,有關(guān)于對(duì) spring-data-redis 的了解到這里就可以結(jié)束了,因?yàn)楸疚牡闹饕康氖墙榻BRedis兩種消息隊(duì)列模式在Hap中的應(yīng)用,其它的可以根據(jù)自己興趣選擇看或者不看。

配置文件

Hap中提供了兩種消息隊(duì)列支持:Redis 和 RabbitMQ。相對(duì)來說 RabbitMQ 相對(duì) Redis 更重量級(jí)一些,如果只是一些簡(jiǎn)單的業(yè)務(wù)場(chǎng)景,使用 Redis 作為消息隊(duì)列也足夠了。RabbitMQ不屬本文的講解范文,所以主要看看 Redis 消息隊(duì)列的配置文件。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">


    <bean id="mapSerializer" class="org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer">
        <constructor-arg type="java.lang.Class" value="java.util.HashMap"/>
        <property name="objectMapper" ref="objectMapper"/>
    </bean>

    <!-- 發(fā)布消息工具類 -->
    <bean class="com.hand.hap.message.impl.MessagePublisherImpl"/>

    <!--發(fā)布/訂閱監(jiān)聽-->
    <bean class="com.hand.hap.message.TopicListenerContainer">
        <property name="connectionFactory" ref="v2redisConnectionFactory"/>
        <property name="recoveryInterval" value="10000"/>
        <!--<property name="messageListeners" ref="messageListeners"/>-->
    </bean>

    <bean id="simpleQueueConsumer" class="com.hand.hap.message.impl.SimpleMessageConsumer"/>

    <!--隊(duì)列監(jiān)聽-->
    <bean class="com.hand.hap.message.QueueListenerContainer">
        <property name="connectionFactory" ref="v2redisConnectionFactory"/>
        <property name="recoveryInterval" value="5000"/>
        <property name="stringRedisSerializer" ref="stringRedisSerializer"/>
        <property name="listeners">
            <list>
                <!-- auto detect bean with annotation QueueMonitor -->
            </list>
        </property>
    </bean>

</beans>

v2redisConnectionFactory 在application-redis.x配置文件中已經(jīng)定義過了,這里沒必要刨根問底這些配置項(xiàng)對(duì)應(yīng)的的各個(gè)含義,沒有太大意思。SimpleMessageConsumer 在這里沒什么做哦有那個(gè),所以,這里主要關(guān)注的內(nèi)容有個(gè):MessagePublisherImplTopicListenerContainer、、QueueListenerContainer。除了MessagePublisherImpl是和消息發(fā)布有關(guān),其它兩個(gè)都是屬于消息監(jiān)聽,剛好對(duì)應(yīng)Redis消息隊(duì)列的兩種模式,TopicListenerContainer 對(duì)應(yīng) Topic 模式, QueueListenerContainer對(duì)應(yīng) Queue模式。

MessagePublisherImpl

這其實(shí)是一個(gè)發(fā)布消息工具類。不太清楚為什么要在配置文件里面配置,加個(gè)注解不是會(huì)更好嗎?或許也是為了方便我們理解吧。其源碼如下

/*
 * #{copyright}#
 */

package com.hand.hap.message.impl;

import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hand.hap.message.IMessagePublisher;

@Component
public class MessagePublisherImpl implements IMessagePublisher {

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private Logger logger = LoggerFactory.getLogger(MessagePublisherImpl.class);

    @Override
    public void publish(String channel, Object message) {
        //添加前綴
        channel = ChannelAndQueuePrefix.addPrefix(channel);
        if (message instanceof String || message instanceof Number) {
            redisTemplate.convertAndSend(channel, message.toString());
        } else {
            try {
                redisTemplate.convertAndSend(channel, objectMapper.writeValueAsString(message));
            } catch (JsonProcessingException e) {
                if (logger.isErrorEnabled()) {
                    logger.error("publish message failed.", e);
                }
            }
        }
    }

    @Override
    public void rPush(String list, Object message) {
        message(list, message);
    }

    @Override
    public void message(String name, Object message) {
        if (message instanceof String || message instanceof Number) {
            redisTemplate.opsForList().rightPush(name, message.toString());
        } else {
            try {
                redisTemplate.opsForList().rightPush(name, objectMapper.writeValueAsString(message));
            } catch (JsonProcessingException e) {
                if (logger.isErrorEnabled()) {
                    logger.error("push data failed.", e);
                }
            }
        }
    }
}

發(fā)布消息的時(shí)候,也是調(diào)用spring-data-redis的API,這里就不過多說明。

TopicListenerContainer

這個(gè)類要具體分析,起源嗎如下:

/*
 * Copyright Hand China Co.,Ltd.
 */

package com.hand.hap.message;

import com.hand.hap.core.AppContextInitListener;
import com.hand.hap.message.components.ChannelAndQueuePrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author shengyang.zhou@hand-china.com
 */
public class TopicListenerContainer extends RedisMessageListenerContainer implements AppContextInitListener {

    private Logger logger = LoggerFactory.getLogger(TopicListenerContainer.class);

    @Override
    public boolean isAutoStartup(){
        return false;
    }

    @Override
    public void contextInitialized(ApplicationContext applicationContext) {
        Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);
        Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>();
        monitors.forEach((k, v) -> {
            Class<?> clazz = v.getClass();
            TopicMonitor tm = clazz.getAnnotation(TopicMonitor.class);
            String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);
            List<Method> avaMethods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));

            if (avaMethods.isEmpty()) {
                if (logger.isErrorEnabled()) {
                    logger.error("can not find proper method of name '{}' for bean {}", mn, v);
                }
                return;
            }

            SimpleMessageListener adaptor = new SimpleMessageListener(v, avaMethods.get(0));
            List<Topic> topics = new ArrayList<>();
            for (String t : tm.channel()) {
                //添加前綴
                t = ChannelAndQueuePrefix.addPrefix(t);
                Topic topic = new PatternTopic(t);
                topics.add(topic);
            }

            listeners.put(adaptor, topics);
        });
        setMessageListeners(listeners);
        // start(); // auto call
//        if (listeners != null) {
//            for (ITopicMessageListener receiver : listeners) {
//                MessageListenerAdapter messageListener = new MessageListenerAdapter(receiver, "onTopicMessage");
//                if (receiver.getRedisSerializer() != null) {
//                    messageListener.setSerializer(receiver.getRedisSerializer());
//                }
//                messageListener.afterPropertiesSet();
//                List<Topic> topics = new ArrayList<>();
//                for (String t : receiver.getTopic()) {
//                    Topic topic = new PatternTopic(t);
//                    topics.add(topic);
//                }
//                listeners.put(messageListener, topics);
//            }
//        }
    }

    private static class SimpleMessageListener implements MessageListener {
        private RedisSerializer redisSerializer;

        private Object target;
        private Method method;

        private Logger logger;

        SimpleMessageListener(Object target, Method method) {
            this.target = target;
            this.method = method;
            Class p0 = method.getParameterTypes()[0];
            redisSerializer = MethodReflectUtils.getProperRedisSerializer(p0);
            logger = LoggerFactory.getLogger(target.getClass());
        }

        @Override
        public void onMessage(Message message, byte[] pattern) {
            try {
                Object obj = redisSerializer.deserialize(message.getBody());
                String p = new String(pattern, "UTF-8");
                //去掉前綴
                p = ChannelAndQueuePrefix.removePrefix(p);
                method.invoke(target, obj, p);
            } catch (Exception e) {
                Throwable thr = e;
                while (thr.getCause() != null) {
                    thr = thr.getCause();
                }
                if (logger.isErrorEnabled()) {
                    logger.error(thr.getMessage(), thr);
                }
            }
        }
    }
}

實(shí)現(xiàn)了 AppContextInitListener 類,所以在項(xiàng)目啟動(dòng)的時(shí)候 contextInitialized 方法將被調(diào)用。整體思路就是:在項(xiàng)目啟動(dòng)的時(shí)候,根據(jù)注解找到所以的 訂閱者,然后維護(hù)“訂閱者”和“主題”的關(guān)系,然后交給消息監(jiān)聽器。

        Map<String, Object> monitors = applicationContext.getBeansWithAnnotation(TopicMonitor.class);

就是通過這行代碼找到所有的消息訂閱者。TopicMonitor 注解如下,包括兩個(gè)屬性:channel 和 method。channel 就是主題,;可能大家對(duì)method 的作用不是很理解,我們知道,實(shí)現(xiàn)了 MessageListener 的消息監(jiān)聽器,在收到消息的時(shí)候,會(huì)執(zhí)行 onMessage() 方法,這個(gè) method ,就是為了讓消息監(jiān)聽器收到消息時(shí)可以執(zhí)行別的方法,而這個(gè)方法名就是通過 method 屬性定義。

package com.hand.hap.message;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface TopicMonitor {
    String[] channel() default {};

    /**
     * default empty,auto detect depends on object type.
     * <p>
     * IQueueMessageListener:onQueueMessage<br>
     * OTHERS:onMessage
     *
     */
    String method() default "";
}

獲取到所有的消息執(zhí)訂閱者之后,然后進(jìn)行遍歷,下面就是獲取方法名的具體邏輯

    String mn = MethodReflectUtils.getTopicMethodName(tm.method(), v);

    public static String getTopicMethodName(String mn, Object target) {
        if (org.apache.commons.lang.StringUtils.isBlank(mn)) {
            if (target instanceof ITopicMessageListener) {
                mn = ITopicMessageListener.DEFAULT_METHOD_NAME;
            } else {
                mn = IMessageConsumer.DEFAULT_METHOD_NAME;
            }
        }
        return mn;
    }

也就是說,如果代碼中是同通過 實(shí)現(xiàn) ITopicMessageListener 來實(shí)現(xiàn)消息監(jiān)聽的話,則會(huì)執(zhí)行 onTopicMessage 方法,或者會(huì)執(zhí)行 onMessage 方法。

image.png

image.png

如上所示,在獲取到方法名之后,組裝 "消息訂閱者" 和 "Topic" 為Map對(duì)象, 因?yàn)?一個(gè) 訂閱者 可以訂閱多個(gè) Topic,所以是一對(duì)多的關(guān)系。組裝之后,然后將 listeners 交給 spring-data-redi管理,最后通過反射執(zhí)行具體的邏輯代碼。

QueueListenerContainer

QueueListenerContainer沒有繼承RedisMessageListenerContainer,所以它的實(shí)現(xiàn)方式有些不同,相當(dāng)于是框架為我們封裝了一套API,其具體實(shí)現(xiàn)如下:

/*
 * #{copyright}#
 */

package com.hand.hap.message;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

import com.hand.hap.core.AppContextInitListener;

import redis.clients.jedis.Jedis;


public class QueueListenerContainer implements AppContextInitListener, DisposableBean, SmartLifecycle {

    private Logger logger = LoggerFactory.getLogger(QueueListenerContainer.class);

    private RedisConnectionFactory connectionFactory;

    private static final int PHASE = 9999;

    private static final long MIN_RECOVERY_INTERVAL = 2000L;

    private static final long DEFAULT_RECOVERY_INTERVAL = 5000L;

    /**
     * 100ms.
     */
    private static final long IDLE_SLEEP_TIME = 100L;

    private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

    private volatile boolean running = false;

    private ExecutorService executorService;

    private List<IQueueMessageListener<?>> listeners;

    private List<MonitorTask> monitorTaskList = new ArrayList<>();

    private RedisSerializer<String> stringRedisSerializer;

    public RedisConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public long getRecoveryInterval() {
        return recoveryInterval;
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
        if (recoveryInterval < MIN_RECOVERY_INTERVAL) {
            if (logger.isWarnEnabled()) {
                logger.warn("minimum for recoveryInterval is {}", MIN_RECOVERY_INTERVAL);
            }
            this.recoveryInterval = MIN_RECOVERY_INTERVAL;
        }
    }

    public List<IQueueMessageListener<?>> getListeners() {
        return listeners;
    }

    public void setListeners(List<IQueueMessageListener<?>> listeners) {
        this.listeners = listeners;
    }

    public RedisSerializer<String> getStringRedisSerializer() {
        return stringRedisSerializer;
    }

    @Autowired
    public void setStringRedisSerializer(RedisSerializer<String> stringRedisSerializer) {
        this.stringRedisSerializer = stringRedisSerializer;
    }

    @Override
    public void destroy() throws Exception {
        stop();
    }

    @Override
    public void contextInitialized(ApplicationContext applicationContext) {
        if (listeners == null) {
            listeners = new ArrayList<>();
        }
        Map<String, Object> lts = applicationContext.getBeansWithAnnotation(QueueMonitor.class);
        lts.forEach((k, v) -> {
            Class clazz = v.getClass();
            QueueMonitor qm = (QueueMonitor) clazz.getAnnotation(QueueMonitor.class);
            final String queue = qm.queue();
            String mn = MethodReflectUtils.getQueueMethodName(qm.method(), v);
            List<Method> methods = MethodReflectUtils.findMethod(clazz, new MethodReflectUtils.FindDesc(mn, 2));
            if (methods.isEmpty()) {
                if (logger.isErrorEnabled()) {
                    logger.error("can not find proper method of name '{}' for bean {}", mn, v);
                }
                return;
            }
            final Method method = methods.get(0);
            IQueueMessageListener qml = new SimpleQueueListener(queue, v, method);
            listeners.add(qml);

        });
        executorService = Executors.newFixedThreadPool(listeners.size());
        for (IQueueMessageListener<?> receiver : listeners) {
            MonitorTask task = new MonitorTask(receiver);
            monitorTaskList.add(task);
            executorService.execute(task);
        }
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void stop(Runnable callback) {
        stop();
        callback.run();
    }

    @Override
    public void start() {
        if (!running) {
            running = true;

            if (logger.isDebugEnabled()) {
                logger.debug("startup success");
            }
        }
    }

    @Override
    public void stop() {
        if (isRunning()) {
            running = false;
            monitorTaskList.forEach(MonitorTask::stop);
            executorService.shutdownNow();
            if (logger.isDebugEnabled()) {
                logger.debug("shutdown complete");
            }
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    @Override
    public int getPhase() {
        return PHASE;
    }

    private static class SimpleQueueListener implements IQueueMessageListener {
        private String queue;
        private Object target;
        private Method method;
        private RedisSerializer redisSerializer;
        private Logger logger;

        SimpleQueueListener(String queue, Object target, Method method) {
            this.queue = queue;
            this.target = target;
            this.method = method;
            this.redisSerializer = MethodReflectUtils.getProperRedisSerializer(method.getParameterTypes()[0]);
            this.logger = LoggerFactory.getLogger(target.getClass());
        }

        @Override
        public String getQueue() {
            return queue;
        }

        @Override
        public RedisSerializer getRedisSerializer() {
            return redisSerializer;
        }

        @Override
        public void onQueueMessage(Object message, String queue) {
            try {
                method.invoke(target, message, queue);
            } catch (Exception e) {
                Throwable thr = e;
                while (thr.getCause() != null) {
                    thr = thr.getCause();
                }
                if (logger.isErrorEnabled()) {
                    logger.error(thr.getMessage(), thr);
                }
            }
        }
    }

    /**
     * 
     * @param <T>
     */
    private class MonitorTask<T> implements SchedulingAwareRunnable {

        private IQueueMessageListener<T> receiver;
        private RedisConnection connection;

        private boolean running = false;

        MonitorTask(IQueueMessageListener<T> receiver) {
            this.receiver = receiver;
            Assert.notNull(receiver, "receiver is null.");
            Assert.hasText(receiver.getQueue(), "queue is not valid");
        }

        public void stop() {
            running = false;
            safeClose(true);
        }

        @Override
        public void run() {
            running = true;
            T message;
            while (running) {
                try {
                    if (connection == null) {
                        connection = connectionFactory.getConnection();
                    }
                    message = fetchMessage(connection, receiver.getQueue());
                    if (message == null) {
                        sleep_(IDLE_SLEEP_TIME);
                        continue;
                    }
                } catch (Throwable thr) {
                    if (!running) {
                        break;
                    }
                    safeClose();
                    if (logger.isDebugEnabled()) {
                        logger.error("exception occurred while get message from queue [" + receiver.getQueue() + "]",
                                thr);
                        logger.debug("try recovery after {}ms", getRecoveryInterval());
                    }
                    sleep_(getRecoveryInterval());
                    continue;
                }
                try {
                    receiver.onQueueMessage(message, receiver.getQueue());
                } catch (Throwable thr) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("exception occurred while receiver consume message.", thr);
                    }
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug("stop monitor:" + this);
            }
            safeClose();
        }

        T fetchMessage(RedisConnection connection, String queue) {
            List<byte[]> bytes = connection.bLPop(0, stringRedisSerializer.serialize(queue));
            if (bytes == null || bytes.isEmpty()) {
                return null;
            }
            return receiver.getRedisSerializer().deserialize(bytes.get(1));
        }

        void safeClose(boolean... closeNative) {
            if (connection != null) {
                try {
                    if (closeNative.length > 0 && closeNative[0]) {
                        // close native connection to interrupt blocked
                        // operation
                        ((Jedis) connection.getNativeConnection()).disconnect();
                    }
                    connection.close();
                } catch (Exception e) {
                    // if (logger.isErrorEnabled()) {
                    // logger.error(e.getMessage(), e);
                    // }
                }
            }
            connection = null;
        }

        void sleep_(long time) {
            try {
                Thread.sleep(time);
            } catch (Exception e) {
                // if (logger.isErrorEnabled()) {
                // logger.error(e.getMessage(), e);
                // }
            }
        }

        @Override
        public boolean isLongLived() {
            return true;
        }
    }
}

前面的實(shí)現(xiàn)基本類似,區(qū)別如果實(shí)現(xiàn)了IQueueMessageListener 類,消息監(jiān)聽器收到消息的時(shí)候會(huì)執(zhí)行 onQueueMessage 方法;另一個(gè)區(qū)別就是這里用了另外一個(gè) 注解 QueueMonitor

package com.hand.hap.message;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueMonitor {
    String queue() default "";

    /**
     * default empty,auto detect depends on object type.
     * <p>
     * ITopicMessageListener:onTopicMessage<br>
     * OTHERS:onMessage
     * 
     */
    String method() default "";

}
image.png

image.png

獲取到所有的消息監(jiān)聽器之后,開啟多線程執(zhí)行消息監(jiān)聽。因?yàn)榇嬖诙鄠€(gè)消費(fèi)者監(jiān)聽同一個(gè)隊(duì)列的情況,存在消費(fèi)者競(jìng)爭(zhēng),所以需要判斷一下消息是否還在。

使用方式

package com.hand.hap.activiti.manager;

import com.hand.hap.activiti.util.ActivitiUtils;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.Position;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.mapper.PositionMapper;
import com.hand.hap.message.IMessageConsumer;
import com.hand.hap.message.TopicMonitor;
import org.activiti.engine.identity.Group;
import org.activiti.engine.impl.persistence.entity.UserEntity;
import org.activiti.engine.impl.persistence.entity.data.impl.MybatisUserDataManager;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author shengyang.zhou@hand-china.com
 */
@TopicMonitor(channel = "employee.change")
public class CustomUserDataManager extends MybatisUserDataManager
        implements IMessageConsumer<Employee>, InitializingBean {

    @Autowired
    private PositionMapper positionMapper;

    @Autowired
    private EmployeeMapper employeeMapper;

    public Map<String, UserEntity> userCache = new HashMap<>();

    @Autowired
    private SpringProcessEngineConfiguration pec;

    public CustomUserDataManager() {
        super(null);
    }

    @Override
    public List<Group> findGroupsByUser(String userId) {
        List<Position> positions = positionMapper.getPositionByEmployeeCode(userId);
        List<Group> gs = new ArrayList<>();
        for (Position position : positions) {
            gs.add(ActivitiUtils.toActivitiGroup(position));
        }
        return gs;
    }

    /**
     * 這個(gè)方法使用非常頻繁,做緩存支持
     *
     * @param entityId
     * @return
     */
    @Override
    public UserEntity findById(String entityId) {
        UserEntity userEntity = userCache.get(entityId);
        if (userEntity == null) {
            Employee employee = employeeMapper.queryByCode(entityId);
            userEntity = ActivitiUtils.toActivitiUser(employee);
            userCache.put(entityId, userEntity);
        }
        return userEntity;
    }

    @Override
    public void onMessage(Employee message, String pattern) {
        userCache.remove(message.getEmployeeCode());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.processEngineConfiguration = pec;
    }
}
package com.hand.hap.hr.service.impl;

import com.github.pagehelper.PageHelper;
import com.github.pagehelper.StringUtil;
import com.hand.hap.account.dto.User;
import com.hand.hap.account.dto.UserRole;
import com.hand.hap.account.mapper.UserRoleMapper;
import com.hand.hap.account.service.IUserRoleService;
import com.hand.hap.account.service.IUserService;
import com.hand.hap.cache.impl.UserCache;
import com.hand.hap.core.IRequest;
import com.hand.hap.hr.dto.Employee;
import com.hand.hap.hr.dto.UserAndRoles;
import com.hand.hap.hr.mapper.EmployeeAssignMapper;
import com.hand.hap.hr.mapper.EmployeeMapper;
import com.hand.hap.hr.service.IEmployeeService;
import com.hand.hap.message.IMessagePublisher;
import com.hand.hap.message.TopicMonitor;
import com.hand.hap.mybatis.common.Criteria;
import com.hand.hap.system.dto.DTOStatus;
import com.hand.hap.system.service.impl.BaseServiceImpl;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Service
public class EmployeeServiceImpl extends BaseServiceImpl<Employee> implements IEmployeeService {

    @Autowired
    EmployeeMapper employeeMapper;

    @Autowired
    private IMessagePublisher messagePublisher;

    @Autowired
    private IUserRoleService userRoleService;

    @Autowired
    private IUserService userService;

    @Autowired
    private UserRoleMapper userRoleMapper;

    @Autowired
    private EmployeeAssignMapper employeeAssignMapper;

    @Autowired
    private UserCache userCache;

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public List<Employee> submit(IRequest request, List<Employee> list) {
        self().batchUpdate(request, list);
        for (Employee e : list) {
            messagePublisher.publish("employee.change", e);
        }
        return list;
    }
.......
}

當(dāng)在界面修改了員工信息的時(shí)候,會(huì)調(diào)用 messagePublisher.publish("employee.change", e); 方法,因?yàn)?CustomUserDataManager 監(jiān)聽了 "employee.change" 主題的消息,所以在發(fā)布之后,會(huì)執(zhí)行CustomUserDataManager .onMessage 的方法,消息隊(duì)列的使用方式與之類似。

思考

在Hap中,定義過多的 Queue 消息監(jiān)聽器會(huì)影響系統(tǒng)性能嗎?一個(gè)消息監(jiān)聽器就要開啟一個(gè)線程,如果消息監(jiān)聽器太多,線程池夠用嗎?不知道自己的思路有沒有問題,值得驗(yàn)證。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,697評(píng)論 19 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,351評(píng)論 25 708
  • 朋友難求,能碰到志同道合之人,更是人生中莫大的幸事。有心人會(huì)發(fā)現(xiàn),我是很少為某個(gè)人寫東西的,但,今天我有一些話要送...
    左左兮閱讀 343評(píng)論 0 0
  • 沙沙大盜閱讀 130評(píng)論 0 0
  • 今天的我,又在看過去的動(dòng)漫,電視劇了。我很喜歡懷念過去,懷念童年,因?yàn)楝F(xiàn)在的日子過得乏味無趣,但我也懶得去...
    芝芝AZ閱讀 225評(píng)論 0 0

友情鏈接更多精彩內(nèi)容