PMQ源碼閱讀(2)---消息產(chǎn)生和消費(fèi)過程

背景

  • 本章主要記錄PMQ中關(guān)于消息從生產(chǎn)者端提交到消費(fèi)者消費(fèi)消息的過程
  • 從中我們可以看到實(shí)現(xiàn)一個(gè)消息系統(tǒng)需要的發(fā)送、存儲(chǔ)、消費(fèi)和重試環(huán)節(jié)
  • 一些有用的編碼風(fēng)格和技巧也值得學(xué)習(xí)

消息產(chǎn)生和消費(fèi)架構(gòu)總覽

PMQ架構(gòu)圖

PMQ消息端到端流程

Client端上報(bào)消息的過程

簡單流程
客戶端消息發(fā)送流程圖
client代碼流程
  • (1)從client中上報(bào)消息代碼
//topicName=消息topic的名字,ProducerDataDto=消息體中的內(nèi)容
MqClient.publish(topicName, "",new ProducerDataDto("kantlin"+String.valueOf(i)));
  • (2)client中publish方法的實(shí)現(xiàn)(MqResource.class)
    public boolean publish(PublishMessageRequest request, int retryTimes) {
        ......
        //這里集成了cat監(jiān)控,對(duì)每個(gè)發(fā)送的消息進(jìn)行鏈路追蹤【排查問題的關(guān)鍵】
        Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
        Timer.Context timer1 = MetricSingleton.getMetricRegistry()
                .timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
        try {
            //上報(bào)到PMD broke的接口地址
            String url = MqConstanst.CONSUMERPRE + "/publish";
            //調(diào)用post請(qǐng)求將消息發(fā)送到broker中【轉(zhuǎn)(3)】
            PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
            ......
            transaction.setStatus(Transaction.SUCCESS);
            //如果可處理的發(fā)送未成功,則上報(bào)到cat中
            if (!response.isSuc()) {
                ......
                addCat(request2);
            }
            //返回給客戶端本地上報(bào)結(jié)果
            return response.isSuc();
        } catch (Exception e) {
            //如果是未處理異常的話,除了會(huì)上報(bào)到cat還會(huì)發(fā)送對(duì)應(yīng)的郵件告警
            ......
            addCat(request2);
            ......
            sendMail(mailRequest);
            return false;
        } finally {
            transaction.complete();
            timer1.stop();
        }
    }
  • (3)client中post方法的實(shí)現(xiàn)(HttpClient.class)
    public String post(String url, Object reqObj) throws IOException, BrokerException {
        ......
        Response response = null;
        //基于cat進(jìn)行鏈路追蹤
        Transaction transaction = Tracer.newTransaction("mq-http", url);
        try {
            ......
            //獲取http客戶端并發(fā)送,有異常的話記錄到cat中并向上拋,不做處理
            Request request=requestbuilder.build();
            response = client.newCall(request).execute();
            transaction.setStatus(Transaction.SUCCESS);
            ......
        }
            ......
        finally {
            transaction.complete();
            ......
        }
    }

Rest端接受消費(fèi)者消息

簡單流程
  • rest端收到client publish上來的消息,先對(duì)消息進(jìn)行驗(yàn)證
  • 將消息轉(zhuǎn)換為適合存儲(chǔ)的bean對(duì)象
  • 對(duì)消息進(jìn)行保存并通知消費(fèi)者消費(fèi)
  • 通過消息查詢出全部訂閱者,并向符合條件的訂閱者起可pull數(shù)據(jù)的消息
代碼流程
  • (1)client上報(bào)到rest端的接口(ConsumerController.java)
    @PostMapping("/publish")
    public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) {
        setSubEnv(request);
        //轉(zhuǎn)【2】
        PublishMessageResponse response = consumerService.publish(request);
        return response;
    }
  • (2)rest端的push方法(ConsumerServiceImpl.java),這里面比較關(guān)鍵的方法是getAllLocatedTopicWriteQueue()用于獲取可寫的隊(duì)列(一個(gè)隊(duì)列可以理解為一張表),getAllLocatedTopicQueue()則是獲取全部隊(duì)列。上述兩個(gè)方法是通過QueueServiceImpl.start()中起來常駐線程來更新的。
      public PublishMessageResponse publish(PublishMessageRequest request) {
        ......
        try {
            Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
            Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
            if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
                List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
                if (queueEntities == null || queueEntities.size() == 0) {
                    ......
                    //如果可寫隊(duì)列中沒有包含的話,但是總的topic中有,則可能表示有偏差,因此需要重新更新緩存數(shù)據(jù)
                    if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
                        queueEntities = topicQueueMap.get(request.getTopicName());
                        updateQueueCache(request.getTopicName());
                    } else {
                        updateQueueCache(request.getTopicName());
                        return response;
                    }
                }
                //如果可寫隊(duì)列包含對(duì)應(yīng)的topic name 那么才對(duì)消息進(jìn)行保存
                if (queueEntities.size() > 0) {
                    //轉(zhuǎn)【3】
                    saveMsg(request, response, queueEntities);
                }
            } else {
                ......
            }
        } catch (Exception e) {
            ......
        } finally {
            ......
        }
        return response;
    }
  • (3)將請(qǐng)求轉(zhuǎn)換成對(duì)應(yīng)的消息對(duì)象進(jìn)行保存(ConsumerServiceImpl.java)
        protected void saveSynMsg1(PublishMessageRequest request, PublishMessageResponse response,
                               List<QueueEntity> queueEntities) {
        ......
        Map<String, PartitionInfo> partitionMap = new HashMap<>();
        Map<Long, List<Message01Entity>> msgQueueMap = new HashMap<>();
        //引用傳遞用于構(gòu)造消息實(shí)體,如果request沒有指定partitionInfo的話,那么partitionMap的key為默認(rèn)值Long.MAX_VALUE
        createMsg(request, msgQueueMap, partitionMap);
        for (Map.Entry<Long, List<Message01Entity>> entry : msgQueueMap.entrySet()) {
            //分三種情況進(jìn)行消息進(jìn)行保存
            if (queueMap.containsKey(entry.getKey())) {
                doSaveMsg(request, response, Arrays.asList(queueMap.get(entry.getKey())), entry.getValue());
            } else if (entry.getKey() == Long.MAX_VALUE) {
                //轉(zhuǎn)【4】
                doSaveMsg(request, response, queueEntities, entry.getValue());
            } else {
                entry.getValue().forEach(t1 -> {
                    if (partitionMap.containsKey(t1.getTraceId())) {
                        if (partitionMap.get(t1.getTraceId()).getStrictMode() == 0) {
                            doSaveMsg(request, response, queueEntities, Arrays.asList(t1));
                        }
                    }
                });
            }
        }
    }
  • (4)每個(gè)可寫的隊(duì)列都保存一份消息(ConsumerServiceImpl.java)
private void doSaveMsg(PublishMessageRequest request, PublishMessageResponse response,
                           List<QueueEntity> queueEntities, List<Message01Entity> message01Entities) {
        int tryCount = 0;
        int queueSize = queueEntities.size();
        ......
        int count = counterTemp.get(key).incrementAndGet();
        while (tryCount <= queueSize) {
            try {
                QueueEntity temp = queueEntities.get(count % queueEntities.size());
                count++;
                ......
                //關(guān)鍵邏輯,上一步可支持寫的隊(duì)列都要保存消息
                //轉(zhuǎn)【5】
                doSaveMsg(message01Entities, request, response, temp);
                ......
            } catch (Exception e) {
                ......
            }
        }
        if (last != null) {
            ......
        }
        ......
    }
  • (5)對(duì)消息進(jìn)程保存并通知相關(guān)的客戶端來拉取消息消費(fèi)(ConsumerServiceImpl.java)
    protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
                             PublishMessageResponse response, QueueEntity temp) {
        //動(dòng)態(tài)的設(shè)置service連接的數(shù)據(jù)庫
        message01Service.setDbId(temp.getDbNodeId());
        ......
        try {
            ......
            //將消息批量的插入數(shù)據(jù)庫中
            message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
            if (soaConfig.getMqPushFlag() == 1) {
                //通知對(duì)應(yīng)的client端來pull data進(jìn)行消費(fèi)轉(zhuǎn)【6】
                notifyClient(temp);
            }
            ......
            return;
        } catch (Exception e) {
           ......
        } finally {
            ......
        }
    }
  • (6)對(duì)client進(jìn)行消息拉取的通知(ConsumerServiceImpl.java)
 public void notifyClient(QueueEntity queueEntity) {
        try {
             ......
            //獲取消息主要是topic相關(guān)的全部訂閱者的詳情
            List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId());
             ......
            Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>();
             //對(duì)訂閱者進(jìn)行遍歷
            for (QueueOffsetEntity queueOffset : queueOffsetList) {
                //如果訂閱者可以立刻接收且滿足限速條件的話,則立刻發(fā)送
                if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1
                        && speedLimit(queueEntity.getId())) {

                    //拼接訂閱者的回調(diào)client的ip和地址
                    String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port;

                    if (!notifyMap.containsKey(clienturl)) {
                        notifyMap.put(clienturl, new ArrayList<>());
                    }
                    //封裝回調(diào)的消息體,這里只有消費(fèi)者名和消息ID
                    MsgNotifyDto msgNotifyDto = new MsgNotifyDto();
                    msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName());
                    msgNotifyDto.setQueueId(queueEntity.getId());
                    notifyMap.get(clienturl).add(msgNotifyDto);
                }
            }
            if (notifyMap.size() == 0) {
                return;
            }
            ......
            for (String url : notifyMap.keySet()) {
                //構(gòu)建完客戶端地址后,將消息體進(jìn)行封裝后發(fā)送給客戶端接口
                try {
                    MsgNotifyRequest request = new MsgNotifyRequest();
                    request.setMsgNotifyDtos(notifyMap.get(url));
                    if (notifyFailTentativeLimit(url)) {
                        //請(qǐng)求的client的地址上/mq/client/notify
                        httpClient.postAsyn(url + "/mq/client/notify", request, new ConsumerServiceImpl.NotifyCallBack(url));
                    }
                } catch (Exception e) {
                    log.error("給客戶端發(fā)送拉取通知異常:", e);
                }
            }
            ......
        } catch (Exception e) {

        }
    }

Client pull消息及消費(fèi)過程

client消費(fèi)消息的過程
  • (1)當(dāng)客戶端啟動(dòng)完成后,重平衡監(jiān)聽線程會(huì)通過長鏈接的方式向broker查詢當(dāng)前實(shí)例的隊(duì)列訂閱情況。此過程即是重平衡的過程,重平衡過程在上一章節(jié)有介紹,不在此細(xì)述。
  • (2)當(dāng)重平衡完成后,當(dāng)前消費(fèi)實(shí)例,最終可能會(huì)被分配到一些隊(duì)列(如上圖的queue),客戶端會(huì)根據(jù)這些隊(duì)列信息進(jìn)行消費(fèi)。
  • (3)queue的處理是PMQ消息系統(tǒng)客戶端消費(fèi)的關(guān)鍵。下面來詳細(xì)介紹單個(gè)queue的處理過程,多個(gè)queue跟單個(gè)queue處理過程一致。
  • (4)當(dāng)重平衡完成后,broker會(huì)將分配的隊(duì)列的元數(shù)據(jù)信息返回給客戶端,比如queue的id,queue偏移量等信息。客戶端會(huì)為每個(gè)queue開啟一個(gè)線程,此線程會(huì)根據(jù)這些元數(shù)據(jù)信息定時(shí)向broker拉取消息。注意是每個(gè)queue都有對(duì)應(yīng)的獨(dú)立拉取線程。
  • (5)當(dāng)線程拉取到消息后,會(huì)將拉取的消息緩存到當(dāng)前queue對(duì)應(yīng)的緩沖隊(duì)列中。如果此時(shí)緩沖隊(duì)列滿了,則暫停拉取直到緩沖隊(duì)列不再滿載為止。緩存完畢后,會(huì)開啟新的一輪拉取,如果出現(xiàn)拉取的消息為空,則拉取線程會(huì)sleep 50ms時(shí)間,再開啟新的拉取。如果再次拉取還是沒有消息,則加大等待拉取時(shí)間。直到拉取等待最大值。一旦拉取到新的消息,則重新開始新的循環(huán)。
  • (6)重平衡完成后,在每個(gè)隊(duì)列中,也會(huì)啟動(dòng)一個(gè)消費(fèi)調(diào)度線程,它會(huì)定時(shí)循環(huán)獲取緩沖隊(duì)列的消息,然后根據(jù)緩沖消息數(shù)量和批量消費(fèi)的條數(shù),計(jì)算出執(zhí)行線程的個(gè)數(shù)。在PMQ中,批量消費(fèi)條數(shù)和線程數(shù)可以通過后臺(tái)控制頁面進(jìn)行動(dòng)態(tài)調(diào)整,實(shí)時(shí)生效。
  • (7)計(jì)算出待執(zhí)行的消費(fèi)條數(shù)后,會(huì)啟動(dòng)相對(duì)應(yīng)的線程進(jìn)行消費(fèi)。注意此線程是一個(gè)線程池。消費(fèi)線程根據(jù)批量消費(fèi)條數(shù)從緩沖隊(duì)列中獲取待消費(fèi)的消息,然后調(diào)用消費(fèi)者實(shí)例的本地方法。
  • (8)當(dāng)消息消費(fèi)完成后,會(huì)更新內(nèi)存中當(dāng)前隊(duì)列的偏移量。偏移量提交線程,會(huì)定時(shí)提交相關(guān)的偏移量。
  • (9)如果出現(xiàn)消費(fèi)失敗,會(huì)將此消費(fèi)失敗的消息,發(fā)送到對(duì)應(yīng)的失敗topic中,然后進(jìn)行重新消費(fèi)。失敗topic的消費(fèi)邏輯與正常topic的消費(fèi)邏輯一致。


    客戶端消費(fèi)核心架構(gòu)圖
Client pull消息代碼流程
  • (1)當(dāng)broker保存完消息后,如果client是可立刻通知且開啟狀態(tài)的話,則會(huì)回調(diào)對(duì)應(yīng)請(qǐng)求接口(MqClientStatController.java)
    @RequestMapping("/mq/client/notify")
    public void notify(@RequestBody MsgNotifyRequest request) {
        //如果客戶端是開啟標(biāo)準(zhǔn)(客戶端有黑名單和不開啟消費(fèi)的卻別)
        if (isOpenFlag()) {
            ......
            try {
                //轉(zhuǎn)【2】
                msgNotifyService.notify(request);
                ......
            } catch (Exception e) {
                ......
            }
        }
    }
  • (2)client端開始處理拉取消息消費(fèi)的通知(MsgNotifyService.java)
public void notify(MsgNotifyRequest request) {
        //從broke處獲取消費(fèi)者組的最新詳情,同時(shí)更新緩存中的信息
        IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService();
        Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors();
        if (groups != null && request != null && request.getMsgNotifyDtos() != null) {
            request.getMsgNotifyDtos().forEach(msgNotifyDto -> {
                if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) {
                    //獲取當(dāng)前最新消費(fèi)者組及其對(duì)應(yīng)的隊(duì)列詳情
                    IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName());
                    Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx();
                    if (queues.containsKey(msgNotifyDto.getQueueId())) {
                        //消費(fèi)者組中包含了待消費(fèi)消息的隊(duì)列Id的話,則要進(jìn)行消費(fèi)(這里可以理解為消費(fèi)前進(jìn)行下判斷)轉(zhuǎn)【3】
                        queues.get(msgNotifyDto.getQueueId()).notifyMsg();
                    }
                }
            });
        }
    }
  • (3)client端請(qǐng)求broke拉取消息(MqQueueExcutorService.java)
    protected boolean doPullingData() {
        if (pullFlag.compareAndSet(false, true)) {
            ......
            if (consumerQueueDto != null) {
                ......
                try {
                    ......
                    if (checkOffsetVersion(consumerQueueDto)) {
                        ......
                        PullDataResponse response = null;
                        if (checkOffsetVersion(consumerQueueDto)) {
                            //請(qǐng)求broke端的pullData方法來拉取數(shù)據(jù)
                            response = mqResource.pullData(request);
                        }
                        ......
                        if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) {
                            //如果請(qǐng)求的消息列表不為空,則緩存起來轉(zhuǎn)【4】
                            cacheData(response, consumerQueueDto);
                            ......
                        }
                    }
                    ......
                } catch (Exception e) {
                    ......
                } finally {
                    ......
                }
            }
            return false;
        } else {
            return true;
        }
    }
  • (4)將response中獲取到的消息放到阻塞隊(duì)列blockQueue中(MqQueueExcutorService.java)
 protected void cacheData(PullDataResponse response, ConsumerQueueDto pre) {
        if (checkOffsetVersion(pre)) {
            for (MessageDto t1 : response.getMsgs()) {
               ......
                //將請(qǐng)求回來的數(shù)據(jù)放到阻塞隊(duì)列messages中,并記錄log
                while (true && checkOffsetVersion(pre)) {
                    try {
                        messages.put(t1);
                        addPullLog(t1);
                        break;
                    } catch (Exception e) {
                    }
                   ......
                }
                ......
            }
        }
    }
Client 消費(fèi)消息代碼流程
  • 從上面流程中我們知道client端拉取到了消息并放在阻塞隊(duì)列中,那么在client中也得啟動(dòng)一個(gè)消費(fèi)線程在處理messages隊(duì)列中的數(shù)據(jù)。而這個(gè)啟動(dòng)流程則是發(fā)生在client端進(jìn)行初始化的時(shí)候。
  • 在客戶端啟動(dòng)后調(diào)用consumerPollingService的start()方法(MqClient.java)
    private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {
        ......
        //消費(fèi)組service在client初始化時(shí)候執(zhí)行start方法
        consumerPollingService = mqFactory.createConsumerPollingService();
        //轉(zhuǎn)【2】
        consumerPollingService.start();
        ......
    }
  • (2)獲取消費(fèi)者組列表并分別啟動(dòng)MqGroupExcutorService(ConsumerPollingService.java)
    @Override
    public void start() {
        if (startFlag.compareAndSet(false, true)) {
            ......
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    while (!isStop) {
                        ......
                        //啟動(dòng)常駐進(jìn)程,每秒更新消費(fèi)者組信息
                        longPolling();
                        ......
                    }
                }
            });
        }
    }

    protected void longPolling() {
        if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
                && mqContext.getConsumerGroupVersion().size() > 0) {
            ......
            //請(qǐng)求broker獲取最新的消費(fèi)者組的信息
            GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
            //處理相關(guān)的請(qǐng)求信息
            handleGroup(response);
            ......
        } else {
            Util.sleep(1000);
        }
    }

    protected void handleGroup(GetConsumerGroupResponse response) {
        ......
        //遍歷消費(fèi)者組,創(chuàng)建對(duì)應(yīng)的消費(fèi)組實(shí)例
        response.getConsumerGroups().entrySet().forEach(t1 -> {
            if (!isStop) {
                if (!mqExcutors.containsKey(t1.getKey())) {
                    mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
                }
                log.info("consumer_group_data_change,消費(fèi)者組" + t1.getKey() + "發(fā)生重平衡或者meta更新");
                // 進(jìn)行重平衡操作或者更新元數(shù)據(jù)信息
                mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());
                mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
            }
        });
        ......
        mqExcutors.values().forEach(t1 -> {
            //啟動(dòng)消費(fèi)組實(shí)例MqGroupExcutorService類中的start方法,轉(zhuǎn)【3】
            t1.start();
        });
    }
  • (3)消費(fèi)者組實(shí)例中啟動(dòng)對(duì)應(yīng)queue的消費(fèi)實(shí)例(MqGroupExcutorService.java)
 //消費(fèi)者組連續(xù)三次重平衡的版本號(hào)不變的話則開始啟動(dòng)組下隊(duì)列的消費(fèi)
    public void start() {
        if (!isRunning) {
            versionCount++;
            ......
            if (versionCount >= mqContext.getConfig().getRbTimes()) {
                //啟動(dòng)隊(duì)列
                doStartQueue();
                isRunning = true;
            }
        }
    }

    protected void doStartQueue() {
        ......
        if (localConsumerGroup != null && localConsumerGroup.getQueues() != null
                && localConsumerGroup.getQueues().size() > 0) {
            //從消費(fèi)者組中獲取隊(duì)列表,每個(gè)都單獨(dú)啟動(dòng)消費(fèi)實(shí)例
            localConsumerGroup.getQueues().values().forEach(t1 -> {
                IMqQueueExcutorService mqQueueExcutorService = mqFactory
                        .createMqQueueExcutorService(localConsumerGroup.getMeta().getName(), t1);
                mqEx.put(t1.getQueueId(), mqQueueExcutorService);
                //啟動(dòng)每個(gè)隊(duì)列的監(jiān)聽和消費(fèi)線程,轉(zhuǎn)【4】
                mqQueueExcutorService.start();
            });
        }
         ......
    }
  • (4)監(jiān)聽隊(duì)列中的消息,啟動(dòng)單獨(dú)的處理線程消費(fèi)(MqQueueExcutorService.java)
//此時(shí)監(jiān)聽到具體的待消費(fèi)隊(duì)列
    @Override
    public void start() {
        if (this.iSubscriber != null || this.iAsynSubscriber != null) {

            if (isStart.compareAndSet(false, true)) {
                //啟動(dòng)時(shí)會(huì)pull一次待消費(fèi)的消息,后續(xù)的pull都需要由broke進(jìn)行觸發(fā)【如果開啟立刻消費(fèi)的話】
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        pullingData();
                    }
                });
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        while (!isStop) {
                            if (isRunning) {
                                //開啟重置的消息出來線程,注意此處不能加鎖,因?yàn)橛行?huì)出現(xiàn)延消費(fèi),然后出現(xiàn)阻塞
                                handleData();
                            } else {
                                Util.sleep(50);
                            }
                        }
                    }
                });
            }
        }
    }

    protected void handleData() {
        ......
        //阻塞隊(duì)列messages中的消息量
        int msgSize = messages.size();
        //刷新訂閱關(guān)系,根據(jù)消費(fèi)者和topic name找到對(duì)應(yīng)的消費(fèi)處理類,更新在iSubscriber對(duì)象中
        refreshSubscriber();
        if (temp != null && msgSize > 0 && temp.getThreadSize() + 2 - executor.getActiveCount() > 0
                && (iSubscriber != null|| iAsynSubscriber != null)
                && (temp.getTimeout() == 0 || (temp.getTimeout() > 0 && timeOutCount.get() == 0))) {
            ......
            //開啟處理數(shù)據(jù)
            doHandleData(temp, msgSize);
            ......
        } else {
            Util.sleep(10);
        }
    }

    private void doHandleData(ConsumerQueueDto pre, int msgSize) {
        // 線程批次概念關(guān)鍵代碼
        ......
        CountDownLatch countDownLatch = new CountDownLatch(startThread);
        //按照批次進(jìn)行消息消費(fèi)
        batchExcute(pre, startThread, batchRecorderId, countDownLatch);
        ......
    }

    private void batchExcute(ConsumerQueueDto pre, int startThread, long batchRecorderId,
                             CountDownLatch countDownLatch) {
        for (int i = 0; i < startThread; i++) {
            if (executor != null) {
                //每個(gè)消息對(duì)應(yīng)單獨(dú)的處理類MsgThread,轉(zhuǎn)【5】
                executor.execute(new MqQueueExcutorService.MsgThread(pre, batchRecorderId, countDownLatch, timeOutCount));
            }
        }
    }
  • (5)反射到對(duì)應(yīng)的處理類中進(jìn)行處理(MqQueueExcutorService.java)
    public class MsgThread implements Runnable {
        ......
        @Override
        public void run() {
            ......
            if (isRunning && checkOffsetVersion(pre)) {
                //啟動(dòng)消息消費(fèi)
                maxId = threadExcute(pre);
                //更新offset
                updateOffset(pre, maxId);
            }
            ......
        }
    }
    protected long threadExcute(ConsumerQueueDto pre) {
        if (isRunning && (iSubscriber != null || iAsynSubscriber != null)) {
            ......
            //從
            if (messageMap.size() > 0) {
                ......
                //對(duì)消息進(jìn)行記錄并提交到對(duì)應(yīng)處理類進(jìn)行出來
                List<Long> failIds = invokeMessage(pre, messageMap);
                ......
            }
            ......
        }
        return 0;

    }
    protected List<Long> invokeMessage(ConsumerQueueDto temp, Map<Long, MessageDto> messageMap) {
        List<MessageDto> dtos = new ArrayList<>(messageMap.values());
        ......
        //消費(fèi)消息
        failIds = doMessageReceived(dtos);
        ......
    }
    protected List<Long> doMessageReceived(List<MessageDto> dtos) throws Exception {
        if (consumerQueueRef.get().getTimeout() > 0) {
            return new MessageInvokeCommandForThreadIsolation(consumerGroupName, consumerQueueRef.get(), dtos,
                    iSubscriber,  iAsynSubscriber).execute();
        } else {
            //走此分支的話,則根據(jù)剛剛(4)中已經(jīng)確定iSubscriber處理類來進(jìn)行出來,轉(zhuǎn)【6】
            return MessageInvokeCommandForThreadIsolation.invoke(dtos, iSubscriber,  iAsynSubscriber,
                    consumerQueueRef.get());
        }
    }
  • (6)反射類方法(MessageInvokeCommandForThreadIsolation.java)
    public static List<Long> invoke(List<MessageDto> dtos, ISubscriber iSubscriber, IAsynSubscriber iAsynSubscriber,
                                    ConsumerQueueDto pre) throws Exception {
         ......
        if (iSubscriber != null) {
            //回到client消費(fèi)實(shí)現(xiàn)類中
            failIds = iSubscriber.onMessageReceived(dtos);
             ......
        }else if (iAsynSubscriber != null) {
             ......
        }
        return failIds;
    }
  • (7)定義一個(gè)消費(fèi)者實(shí)現(xiàn)類
    public class TestSub implements ISubscriber {
        @Override
        public List<Long> onMessageReceived(List<MessageDto> messages) {
            System.out.println(messages.get(0).getBody());
            return new ArrayList<>();
        }
    }
  • (8)消費(fèi)組和topic的對(duì)應(yīng)關(guān)系是配置在xml和web ui中
<?xml version="1.0" encoding="UTF-8" ?>
<messageQueue>
    <consumer groupName="test1sub">
        <topics>
            <topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
            <topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
        </topics>
    </consumer>
</messageQueue>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容