背景
- 本章主要記錄PMQ中關(guān)于消息從生產(chǎn)者端提交到消費(fèi)者消費(fèi)消息的過程
- 從中我們可以看到實(shí)現(xiàn)一個(gè)消息系統(tǒng)需要的發(fā)送、存儲(chǔ)、消費(fèi)和重試環(huán)節(jié)
- 一些有用的編碼風(fēng)格和技巧也值得學(xué)習(xí)
消息產(chǎn)生和消費(fèi)架構(gòu)總覽

PMQ架構(gòu)圖

PMQ消息端到端流程
Client端上報(bào)消息的過程
簡單流程

客戶端消息發(fā)送流程圖
client代碼流程
- (1)從client中上報(bào)消息代碼
//topicName=消息topic的名字,ProducerDataDto=消息體中的內(nèi)容
MqClient.publish(topicName, "",new ProducerDataDto("kantlin"+String.valueOf(i)));
- (2)client中publish方法的實(shí)現(xiàn)(MqResource.class)
public boolean publish(PublishMessageRequest request, int retryTimes) {
......
//這里集成了cat監(jiān)控,對(duì)每個(gè)發(fā)送的消息進(jìn)行鏈路追蹤【排查問題的關(guān)鍵】
Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
Timer.Context timer1 = MetricSingleton.getMetricRegistry()
.timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
try {
//上報(bào)到PMD broke的接口地址
String url = MqConstanst.CONSUMERPRE + "/publish";
//調(diào)用post請(qǐng)求將消息發(fā)送到broker中【轉(zhuǎn)(3)】
PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
......
transaction.setStatus(Transaction.SUCCESS);
//如果可處理的發(fā)送未成功,則上報(bào)到cat中
if (!response.isSuc()) {
......
addCat(request2);
}
//返回給客戶端本地上報(bào)結(jié)果
return response.isSuc();
} catch (Exception e) {
//如果是未處理異常的話,除了會(huì)上報(bào)到cat還會(huì)發(fā)送對(duì)應(yīng)的郵件告警
......
addCat(request2);
......
sendMail(mailRequest);
return false;
} finally {
transaction.complete();
timer1.stop();
}
}
- (3)client中post方法的實(shí)現(xiàn)(HttpClient.class)
public String post(String url, Object reqObj) throws IOException, BrokerException {
......
Response response = null;
//基于cat進(jìn)行鏈路追蹤
Transaction transaction = Tracer.newTransaction("mq-http", url);
try {
......
//獲取http客戶端并發(fā)送,有異常的話記錄到cat中并向上拋,不做處理
Request request=requestbuilder.build();
response = client.newCall(request).execute();
transaction.setStatus(Transaction.SUCCESS);
......
}
......
finally {
transaction.complete();
......
}
}
Rest端接受消費(fèi)者消息
簡單流程
- rest端收到client publish上來的消息,先對(duì)消息進(jìn)行驗(yàn)證
- 將消息轉(zhuǎn)換為適合存儲(chǔ)的bean對(duì)象
- 對(duì)消息進(jìn)行保存并通知消費(fèi)者消費(fèi)
- 通過消息查詢出全部訂閱者,并向符合條件的訂閱者起可pull數(shù)據(jù)的消息
代碼流程
- (1)client上報(bào)到rest端的接口(ConsumerController.java)
@PostMapping("/publish")
public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) {
setSubEnv(request);
//轉(zhuǎn)【2】
PublishMessageResponse response = consumerService.publish(request);
return response;
}
- (2)rest端的push方法(ConsumerServiceImpl.java),這里面比較關(guān)鍵的方法是getAllLocatedTopicWriteQueue()用于獲取可寫的隊(duì)列(一個(gè)隊(duì)列可以理解為一張表),getAllLocatedTopicQueue()則是獲取全部隊(duì)列。上述兩個(gè)方法是通過QueueServiceImpl.start()中起來常駐線程來更新的。
public PublishMessageResponse publish(PublishMessageRequest request) {
......
try {
Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
if (queueEntities == null || queueEntities.size() == 0) {
......
//如果可寫隊(duì)列中沒有包含的話,但是總的topic中有,則可能表示有偏差,因此需要重新更新緩存數(shù)據(jù)
if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
queueEntities = topicQueueMap.get(request.getTopicName());
updateQueueCache(request.getTopicName());
} else {
updateQueueCache(request.getTopicName());
return response;
}
}
//如果可寫隊(duì)列包含對(duì)應(yīng)的topic name 那么才對(duì)消息進(jìn)行保存
if (queueEntities.size() > 0) {
//轉(zhuǎn)【3】
saveMsg(request, response, queueEntities);
}
} else {
......
}
} catch (Exception e) {
......
} finally {
......
}
return response;
}
- (3)將請(qǐng)求轉(zhuǎn)換成對(duì)應(yīng)的消息對(duì)象進(jìn)行保存(ConsumerServiceImpl.java)
protected void saveSynMsg1(PublishMessageRequest request, PublishMessageResponse response,
List<QueueEntity> queueEntities) {
......
Map<String, PartitionInfo> partitionMap = new HashMap<>();
Map<Long, List<Message01Entity>> msgQueueMap = new HashMap<>();
//引用傳遞用于構(gòu)造消息實(shí)體,如果request沒有指定partitionInfo的話,那么partitionMap的key為默認(rèn)值Long.MAX_VALUE
createMsg(request, msgQueueMap, partitionMap);
for (Map.Entry<Long, List<Message01Entity>> entry : msgQueueMap.entrySet()) {
//分三種情況進(jìn)行消息進(jìn)行保存
if (queueMap.containsKey(entry.getKey())) {
doSaveMsg(request, response, Arrays.asList(queueMap.get(entry.getKey())), entry.getValue());
} else if (entry.getKey() == Long.MAX_VALUE) {
//轉(zhuǎn)【4】
doSaveMsg(request, response, queueEntities, entry.getValue());
} else {
entry.getValue().forEach(t1 -> {
if (partitionMap.containsKey(t1.getTraceId())) {
if (partitionMap.get(t1.getTraceId()).getStrictMode() == 0) {
doSaveMsg(request, response, queueEntities, Arrays.asList(t1));
}
}
});
}
}
}
- (4)每個(gè)可寫的隊(duì)列都保存一份消息(ConsumerServiceImpl.java)
private void doSaveMsg(PublishMessageRequest request, PublishMessageResponse response,
List<QueueEntity> queueEntities, List<Message01Entity> message01Entities) {
int tryCount = 0;
int queueSize = queueEntities.size();
......
int count = counterTemp.get(key).incrementAndGet();
while (tryCount <= queueSize) {
try {
QueueEntity temp = queueEntities.get(count % queueEntities.size());
count++;
......
//關(guān)鍵邏輯,上一步可支持寫的隊(duì)列都要保存消息
//轉(zhuǎn)【5】
doSaveMsg(message01Entities, request, response, temp);
......
} catch (Exception e) {
......
}
}
if (last != null) {
......
}
......
}
- (5)對(duì)消息進(jìn)程保存并通知相關(guān)的客戶端來拉取消息消費(fèi)(ConsumerServiceImpl.java)
protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
PublishMessageResponse response, QueueEntity temp) {
//動(dòng)態(tài)的設(shè)置service連接的數(shù)據(jù)庫
message01Service.setDbId(temp.getDbNodeId());
......
try {
......
//將消息批量的插入數(shù)據(jù)庫中
message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
if (soaConfig.getMqPushFlag() == 1) {
//通知對(duì)應(yīng)的client端來pull data進(jìn)行消費(fèi)轉(zhuǎn)【6】
notifyClient(temp);
}
......
return;
} catch (Exception e) {
......
} finally {
......
}
}
- (6)對(duì)client進(jìn)行消息拉取的通知(ConsumerServiceImpl.java)
public void notifyClient(QueueEntity queueEntity) {
try {
......
//獲取消息主要是topic相關(guān)的全部訂閱者的詳情
List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId());
......
Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>();
//對(duì)訂閱者進(jìn)行遍歷
for (QueueOffsetEntity queueOffset : queueOffsetList) {
//如果訂閱者可以立刻接收且滿足限速條件的話,則立刻發(fā)送
if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1
&& speedLimit(queueEntity.getId())) {
//拼接訂閱者的回調(diào)client的ip和地址
String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port;
if (!notifyMap.containsKey(clienturl)) {
notifyMap.put(clienturl, new ArrayList<>());
}
//封裝回調(diào)的消息體,這里只有消費(fèi)者名和消息ID
MsgNotifyDto msgNotifyDto = new MsgNotifyDto();
msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName());
msgNotifyDto.setQueueId(queueEntity.getId());
notifyMap.get(clienturl).add(msgNotifyDto);
}
}
if (notifyMap.size() == 0) {
return;
}
......
for (String url : notifyMap.keySet()) {
//構(gòu)建完客戶端地址后,將消息體進(jìn)行封裝后發(fā)送給客戶端接口
try {
MsgNotifyRequest request = new MsgNotifyRequest();
request.setMsgNotifyDtos(notifyMap.get(url));
if (notifyFailTentativeLimit(url)) {
//請(qǐng)求的client的地址上/mq/client/notify
httpClient.postAsyn(url + "/mq/client/notify", request, new ConsumerServiceImpl.NotifyCallBack(url));
}
} catch (Exception e) {
log.error("給客戶端發(fā)送拉取通知異常:", e);
}
}
......
} catch (Exception e) {
}
}
Client pull消息及消費(fèi)過程
client消費(fèi)消息的過程
- (1)當(dāng)客戶端啟動(dòng)完成后,重平衡監(jiān)聽線程會(huì)通過長鏈接的方式向broker查詢當(dāng)前實(shí)例的隊(duì)列訂閱情況。此過程即是重平衡的過程,重平衡過程在上一章節(jié)有介紹,不在此細(xì)述。
- (2)當(dāng)重平衡完成后,當(dāng)前消費(fèi)實(shí)例,最終可能會(huì)被分配到一些隊(duì)列(如上圖的queue),客戶端會(huì)根據(jù)這些隊(duì)列信息進(jìn)行消費(fèi)。
- (3)queue的處理是PMQ消息系統(tǒng)客戶端消費(fèi)的關(guān)鍵。下面來詳細(xì)介紹單個(gè)queue的處理過程,多個(gè)queue跟單個(gè)queue處理過程一致。
- (4)當(dāng)重平衡完成后,broker會(huì)將分配的隊(duì)列的元數(shù)據(jù)信息返回給客戶端,比如queue的id,queue偏移量等信息。客戶端會(huì)為每個(gè)queue開啟一個(gè)線程,此線程會(huì)根據(jù)這些元數(shù)據(jù)信息定時(shí)向broker拉取消息。注意是每個(gè)queue都有對(duì)應(yīng)的獨(dú)立拉取線程。
- (5)當(dāng)線程拉取到消息后,會(huì)將拉取的消息緩存到當(dāng)前queue對(duì)應(yīng)的緩沖隊(duì)列中。如果此時(shí)緩沖隊(duì)列滿了,則暫停拉取直到緩沖隊(duì)列不再滿載為止。緩存完畢后,會(huì)開啟新的一輪拉取,如果出現(xiàn)拉取的消息為空,則拉取線程會(huì)sleep 50ms時(shí)間,再開啟新的拉取。如果再次拉取還是沒有消息,則加大等待拉取時(shí)間。直到拉取等待最大值。一旦拉取到新的消息,則重新開始新的循環(huán)。
- (6)重平衡完成后,在每個(gè)隊(duì)列中,也會(huì)啟動(dòng)一個(gè)消費(fèi)調(diào)度線程,它會(huì)定時(shí)循環(huán)獲取緩沖隊(duì)列的消息,然后根據(jù)緩沖消息數(shù)量和批量消費(fèi)的條數(shù),計(jì)算出執(zhí)行線程的個(gè)數(shù)。在PMQ中,批量消費(fèi)條數(shù)和線程數(shù)可以通過后臺(tái)控制頁面進(jìn)行動(dòng)態(tài)調(diào)整,實(shí)時(shí)生效。
- (7)計(jì)算出待執(zhí)行的消費(fèi)條數(shù)后,會(huì)啟動(dòng)相對(duì)應(yīng)的線程進(jìn)行消費(fèi)。注意此線程是一個(gè)線程池。消費(fèi)線程根據(jù)批量消費(fèi)條數(shù)從緩沖隊(duì)列中獲取待消費(fèi)的消息,然后調(diào)用消費(fèi)者實(shí)例的本地方法。
- (8)當(dāng)消息消費(fèi)完成后,會(huì)更新內(nèi)存中當(dāng)前隊(duì)列的偏移量。偏移量提交線程,會(huì)定時(shí)提交相關(guān)的偏移量。
-
(9)如果出現(xiàn)消費(fèi)失敗,會(huì)將此消費(fèi)失敗的消息,發(fā)送到對(duì)應(yīng)的失敗topic中,然后進(jìn)行重新消費(fèi)。失敗topic的消費(fèi)邏輯與正常topic的消費(fèi)邏輯一致。
客戶端消費(fèi)核心架構(gòu)圖
Client pull消息代碼流程
- (1)當(dāng)broker保存完消息后,如果client是可立刻通知且開啟狀態(tài)的話,則會(huì)回調(diào)對(duì)應(yīng)請(qǐng)求接口(MqClientStatController.java)
@RequestMapping("/mq/client/notify")
public void notify(@RequestBody MsgNotifyRequest request) {
//如果客戶端是開啟標(biāo)準(zhǔn)(客戶端有黑名單和不開啟消費(fèi)的卻別)
if (isOpenFlag()) {
......
try {
//轉(zhuǎn)【2】
msgNotifyService.notify(request);
......
} catch (Exception e) {
......
}
}
}
- (2)client端開始處理拉取消息消費(fèi)的通知(MsgNotifyService.java)
public void notify(MsgNotifyRequest request) {
//從broke處獲取消費(fèi)者組的最新詳情,同時(shí)更新緩存中的信息
IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService();
Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors();
if (groups != null && request != null && request.getMsgNotifyDtos() != null) {
request.getMsgNotifyDtos().forEach(msgNotifyDto -> {
if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) {
//獲取當(dāng)前最新消費(fèi)者組及其對(duì)應(yīng)的隊(duì)列詳情
IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName());
Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx();
if (queues.containsKey(msgNotifyDto.getQueueId())) {
//消費(fèi)者組中包含了待消費(fèi)消息的隊(duì)列Id的話,則要進(jìn)行消費(fèi)(這里可以理解為消費(fèi)前進(jìn)行下判斷)轉(zhuǎn)【3】
queues.get(msgNotifyDto.getQueueId()).notifyMsg();
}
}
});
}
}
- (3)client端請(qǐng)求broke拉取消息(MqQueueExcutorService.java)
protected boolean doPullingData() {
if (pullFlag.compareAndSet(false, true)) {
......
if (consumerQueueDto != null) {
......
try {
......
if (checkOffsetVersion(consumerQueueDto)) {
......
PullDataResponse response = null;
if (checkOffsetVersion(consumerQueueDto)) {
//請(qǐng)求broke端的pullData方法來拉取數(shù)據(jù)
response = mqResource.pullData(request);
}
......
if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) {
//如果請(qǐng)求的消息列表不為空,則緩存起來轉(zhuǎn)【4】
cacheData(response, consumerQueueDto);
......
}
}
......
} catch (Exception e) {
......
} finally {
......
}
}
return false;
} else {
return true;
}
}
- (4)將response中獲取到的消息放到阻塞隊(duì)列blockQueue中(MqQueueExcutorService.java)
protected void cacheData(PullDataResponse response, ConsumerQueueDto pre) {
if (checkOffsetVersion(pre)) {
for (MessageDto t1 : response.getMsgs()) {
......
//將請(qǐng)求回來的數(shù)據(jù)放到阻塞隊(duì)列messages中,并記錄log
while (true && checkOffsetVersion(pre)) {
try {
messages.put(t1);
addPullLog(t1);
break;
} catch (Exception e) {
}
......
}
......
}
}
}
Client 消費(fèi)消息代碼流程
- 從上面流程中我們知道client端拉取到了消息并放在阻塞隊(duì)列中,那么在client中也得啟動(dòng)一個(gè)消費(fèi)線程在處理messages隊(duì)列中的數(shù)據(jù)。而這個(gè)啟動(dòng)流程則是發(fā)生在client端進(jìn)行初始化的時(shí)候。
- 在客戶端啟動(dòng)后調(diào)用consumerPollingService的start()方法(MqClient.java)
private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {
......
//消費(fèi)組service在client初始化時(shí)候執(zhí)行start方法
consumerPollingService = mqFactory.createConsumerPollingService();
//轉(zhuǎn)【2】
consumerPollingService.start();
......
}
- (2)獲取消費(fèi)者組列表并分別啟動(dòng)MqGroupExcutorService(ConsumerPollingService.java)
@Override
public void start() {
if (startFlag.compareAndSet(false, true)) {
......
executor.execute(new Runnable() {
@Override
public void run() {
while (!isStop) {
......
//啟動(dòng)常駐進(jìn)程,每秒更新消費(fèi)者組信息
longPolling();
......
}
}
});
}
}
protected void longPolling() {
if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
&& mqContext.getConsumerGroupVersion().size() > 0) {
......
//請(qǐng)求broker獲取最新的消費(fèi)者組的信息
GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
//處理相關(guān)的請(qǐng)求信息
handleGroup(response);
......
} else {
Util.sleep(1000);
}
}
protected void handleGroup(GetConsumerGroupResponse response) {
......
//遍歷消費(fèi)者組,創(chuàng)建對(duì)應(yīng)的消費(fèi)組實(shí)例
response.getConsumerGroups().entrySet().forEach(t1 -> {
if (!isStop) {
if (!mqExcutors.containsKey(t1.getKey())) {
mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
}
log.info("consumer_group_data_change,消費(fèi)者組" + t1.getKey() + "發(fā)生重平衡或者meta更新");
// 進(jìn)行重平衡操作或者更新元數(shù)據(jù)信息
mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());
mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
}
});
......
mqExcutors.values().forEach(t1 -> {
//啟動(dòng)消費(fèi)組實(shí)例MqGroupExcutorService類中的start方法,轉(zhuǎn)【3】
t1.start();
});
}
- (3)消費(fèi)者組實(shí)例中啟動(dòng)對(duì)應(yīng)queue的消費(fèi)實(shí)例(MqGroupExcutorService.java)
//消費(fèi)者組連續(xù)三次重平衡的版本號(hào)不變的話則開始啟動(dòng)組下隊(duì)列的消費(fèi)
public void start() {
if (!isRunning) {
versionCount++;
......
if (versionCount >= mqContext.getConfig().getRbTimes()) {
//啟動(dòng)隊(duì)列
doStartQueue();
isRunning = true;
}
}
}
protected void doStartQueue() {
......
if (localConsumerGroup != null && localConsumerGroup.getQueues() != null
&& localConsumerGroup.getQueues().size() > 0) {
//從消費(fèi)者組中獲取隊(duì)列表,每個(gè)都單獨(dú)啟動(dòng)消費(fèi)實(shí)例
localConsumerGroup.getQueues().values().forEach(t1 -> {
IMqQueueExcutorService mqQueueExcutorService = mqFactory
.createMqQueueExcutorService(localConsumerGroup.getMeta().getName(), t1);
mqEx.put(t1.getQueueId(), mqQueueExcutorService);
//啟動(dòng)每個(gè)隊(duì)列的監(jiān)聽和消費(fèi)線程,轉(zhuǎn)【4】
mqQueueExcutorService.start();
});
}
......
}
- (4)監(jiān)聽隊(duì)列中的消息,啟動(dòng)單獨(dú)的處理線程消費(fèi)(MqQueueExcutorService.java)
//此時(shí)監(jiān)聽到具體的待消費(fèi)隊(duì)列
@Override
public void start() {
if (this.iSubscriber != null || this.iAsynSubscriber != null) {
if (isStart.compareAndSet(false, true)) {
//啟動(dòng)時(shí)會(huì)pull一次待消費(fèi)的消息,后續(xù)的pull都需要由broke進(jìn)行觸發(fā)【如果開啟立刻消費(fèi)的話】
executor.execute(new Runnable() {
@Override
public void run() {
pullingData();
}
});
executor.execute(new Runnable() {
@Override
public void run() {
while (!isStop) {
if (isRunning) {
//開啟重置的消息出來線程,注意此處不能加鎖,因?yàn)橛行?huì)出現(xiàn)延消費(fèi),然后出現(xiàn)阻塞
handleData();
} else {
Util.sleep(50);
}
}
}
});
}
}
}
protected void handleData() {
......
//阻塞隊(duì)列messages中的消息量
int msgSize = messages.size();
//刷新訂閱關(guān)系,根據(jù)消費(fèi)者和topic name找到對(duì)應(yīng)的消費(fèi)處理類,更新在iSubscriber對(duì)象中
refreshSubscriber();
if (temp != null && msgSize > 0 && temp.getThreadSize() + 2 - executor.getActiveCount() > 0
&& (iSubscriber != null|| iAsynSubscriber != null)
&& (temp.getTimeout() == 0 || (temp.getTimeout() > 0 && timeOutCount.get() == 0))) {
......
//開啟處理數(shù)據(jù)
doHandleData(temp, msgSize);
......
} else {
Util.sleep(10);
}
}
private void doHandleData(ConsumerQueueDto pre, int msgSize) {
// 線程批次概念關(guān)鍵代碼
......
CountDownLatch countDownLatch = new CountDownLatch(startThread);
//按照批次進(jìn)行消息消費(fèi)
batchExcute(pre, startThread, batchRecorderId, countDownLatch);
......
}
private void batchExcute(ConsumerQueueDto pre, int startThread, long batchRecorderId,
CountDownLatch countDownLatch) {
for (int i = 0; i < startThread; i++) {
if (executor != null) {
//每個(gè)消息對(duì)應(yīng)單獨(dú)的處理類MsgThread,轉(zhuǎn)【5】
executor.execute(new MqQueueExcutorService.MsgThread(pre, batchRecorderId, countDownLatch, timeOutCount));
}
}
}
- (5)反射到對(duì)應(yīng)的處理類中進(jìn)行處理(MqQueueExcutorService.java)
public class MsgThread implements Runnable {
......
@Override
public void run() {
......
if (isRunning && checkOffsetVersion(pre)) {
//啟動(dòng)消息消費(fèi)
maxId = threadExcute(pre);
//更新offset
updateOffset(pre, maxId);
}
......
}
}
protected long threadExcute(ConsumerQueueDto pre) {
if (isRunning && (iSubscriber != null || iAsynSubscriber != null)) {
......
//從
if (messageMap.size() > 0) {
......
//對(duì)消息進(jìn)行記錄并提交到對(duì)應(yīng)處理類進(jìn)行出來
List<Long> failIds = invokeMessage(pre, messageMap);
......
}
......
}
return 0;
}
protected List<Long> invokeMessage(ConsumerQueueDto temp, Map<Long, MessageDto> messageMap) {
List<MessageDto> dtos = new ArrayList<>(messageMap.values());
......
//消費(fèi)消息
failIds = doMessageReceived(dtos);
......
}
protected List<Long> doMessageReceived(List<MessageDto> dtos) throws Exception {
if (consumerQueueRef.get().getTimeout() > 0) {
return new MessageInvokeCommandForThreadIsolation(consumerGroupName, consumerQueueRef.get(), dtos,
iSubscriber, iAsynSubscriber).execute();
} else {
//走此分支的話,則根據(jù)剛剛(4)中已經(jīng)確定iSubscriber處理類來進(jìn)行出來,轉(zhuǎn)【6】
return MessageInvokeCommandForThreadIsolation.invoke(dtos, iSubscriber, iAsynSubscriber,
consumerQueueRef.get());
}
}
- (6)反射類方法(MessageInvokeCommandForThreadIsolation.java)
public static List<Long> invoke(List<MessageDto> dtos, ISubscriber iSubscriber, IAsynSubscriber iAsynSubscriber,
ConsumerQueueDto pre) throws Exception {
......
if (iSubscriber != null) {
//回到client消費(fèi)實(shí)現(xiàn)類中
failIds = iSubscriber.onMessageReceived(dtos);
......
}else if (iAsynSubscriber != null) {
......
}
return failIds;
}
- (7)定義一個(gè)消費(fèi)者實(shí)現(xiàn)類
public class TestSub implements ISubscriber {
@Override
public List<Long> onMessageReceived(List<MessageDto> messages) {
System.out.println(messages.get(0).getBody());
return new ArrayList<>();
}
}
- (8)消費(fèi)組和topic的對(duì)應(yīng)關(guān)系是配置在xml和web ui中
<?xml version="1.0" encoding="UTF-8" ?>
<messageQueue>
<consumer groupName="test1sub">
<topics>
<topic name="test1" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
<topic name="test4" receiverType="com.ppdai.infrastructure.demo.TestSub"></topic>
</topics>
</consumer>
</messageQueue>
