前言
路由隊(duì)列選擇的作用在于發(fā)送消息時(shí)可以指定發(fā)送到某個(gè)broker隊(duì)列,或均衡發(fā)送到broker隊(duì)列,其作用就是在于選擇合適的隊(duì)列進(jìn)行消息發(fā)送。
目前客戶端隊(duì)列選擇分為三種方式:
第一種:可根據(jù)MessageQueueSelector的實(shí)現(xiàn)或自擴(kuò)展實(shí)現(xiàn)選擇隊(duì)列;
第二種:未開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:false),會(huì)采用默認(rèn)輪訓(xùn)機(jī)制(默認(rèn)是此種實(shí)現(xiàn)方式);
第三種:開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:true),會(huì)根據(jù)brokerName的可用性選擇隊(duì)列發(fā)送。
接下來我們就以這三種方式展開討論。
一、隊(duì)列選擇
MessageQueueSelector方式隊(duì)列選擇
在了解MessageQueueSelector的方式進(jìn)行隊(duì)列選擇時(shí),我們先回顧下MQProducer接口:
里面有多個(gè)方法簽名帶參數(shù)MessageQueueSelector,其實(shí)就是表明使用此種方式選擇消息隊(duì)列需要顯示穿參數(shù)才能使用;用下面這個(gè)接口方法進(jìn)行舉例分析:
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
接下來我們直接看內(nèi)部實(shí)現(xiàn)源碼如何實(shí)現(xiàn)的:
DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(
Message msg, MessageQueueSelector selector,
Object arg, final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK(); // 狀態(tài)檢測
Validators.checkMessage(msg, this.defaultMQProducer); // 消息驗(yàn)證
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); // 1-隊(duì)列選擇
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); // 熟悉的配方,前面已經(jīng)分析
} else {
throw new MQClientException("select message queue return null.", null); // 異常拋出
}
}
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
分析:
selector.select(topicPublishInfo.getMessageQueueList(), msg, arg) 隊(duì)列選擇;
其實(shí)現(xiàn)有以下三種:
SelectMessageQueueByHash(hash)
SelectMessageQueueByMachineRoom(機(jī)器隨機(jī))
SelectMessageQueueByRandom(隊(duì)列隨機(jī))
當(dāng)然自己也可以定制化擴(kuò)展,你說簡單不簡單?我們可簡單查看其中之一的實(shí)現(xiàn)源碼:
SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
二、輪訓(xùn)機(jī)制
未開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:false),會(huì)采用默認(rèn)輪訓(xùn)機(jī)制;
來來來,我們直接上大餐,看到下面你就明白原來默認(rèn)機(jī)制是如此的簡單哈。
以下方法為入口:
DefaultMQProducerImpl#selectOneMessageQueue
/**
* 選擇一個(gè)消息隊(duì)列, lastBrokerName 就是上 一 次選擇的執(zhí)行發(fā)送消息失敗的 Broker。第一次執(zhí)行消息隊(duì)列選擇時(shí), lastBrokerName 為 null
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
重頭戲:
MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
...下面分析...
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
備注:
TopicPublishInfo 熟悉不熟悉哈?可以翻翻文章(路由動(dòng)態(tài)更新)selectOneMessageQueue源碼注釋分析,可以簡單理解為隊(duì)列輪詢。
三、隊(duì)列發(fā)送
開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:true),進(jìn)行選擇隊(duì)列發(fā)送。
3.1 發(fā)送延遲故障
如果發(fā)送延遲故障打開[sendLatencyFaultEnable:true],則發(fā)送時(shí)會(huì)統(tǒng)計(jì)發(fā)送耗時(shí)和失敗[updateFaultItem],當(dāng)某個(gè)broker節(jié)點(diǎn)發(fā)送失敗和發(fā)送耗時(shí)較長,則在一段時(shí)間內(nèi)不再選擇該broker[selectOneMessageQueue]
3.2 流程圖
簡單流程圖片如下:

描述:
LatencyFaultToleranceImpl包含一個(gè)Map(key:brokerName,value:FaultItem可用信息):ConcurrentHashMap faultItemTable
FaultItem 的數(shù)據(jù)結(jié)構(gòu)如下:
class FaultItem implements Comparable<FaultItem> {
//條目唯一鍵,這里為 brokerName。
private final String name;
//本次消息發(fā)送延遲 。(消耗時(shí)間)
private volatile long currentLatency;
//故障規(guī)避開始時(shí)間 = 發(fā)生的時(shí)間 + notAvailableDuration 。(故障恢復(fù)時(shí)間)
private volatile long startTimestamp;
.....
}
備注:
FaultItem包含currentLatency發(fā)送耗時(shí),brokerName節(jié)點(diǎn)名稱,startTimestamp時(shí)間戳后broker可用。
3.3 元數(shù)據(jù)映射
從MQFaultStrategy可以看出:發(fā)送延時(shí)一可用性延時(shí)~元數(shù)據(jù)映射
private final static InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//啟用 Broker故障延遲機(jī)制 ,默認(rèn)不啟用
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
簡單總結(jié)如下:
latencyMax數(shù)組:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L 對(duì)應(yīng):發(fā)送耗時(shí)50ms 100ms 550ms 1s 2s 3s 15s
notAvailableDuration數(shù)組:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L 對(duì)應(yīng):可用延時(shí) 0s 0s 30s 60s 2min 3min 10min
故可以理解為:
發(fā)送耗時(shí)50和100毫秒,則當(dāng)前broker延遲0秒
發(fā)送耗時(shí)550和1000毫秒,則當(dāng)前broker延遲30秒和60秒
發(fā)送耗時(shí)2秒,則當(dāng)前broker延遲2min
發(fā)送耗時(shí)3秒,則當(dāng)前broker延遲3min
發(fā)送耗時(shí)15秒,則當(dāng)前broker延遲10min6
如果發(fā)送失敗,則直接延遲10min
3.4 源碼分析:
1:選擇隊(duì)列
MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) { // 1> 默認(rèn)false,等于true相當(dāng)于開啟
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 2> 判斷可用性-時(shí)間判斷(存儲(chǔ)了所有發(fā)送消息失敗過的broker)
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq; // 3> 說明找到了可用的MessageQueue直接返回
}
}
//4> 嘗試從規(guī)避的 Broker 中選擇一個(gè)可用的 Broker(shuffle),如果沒有找到,將返回 null。--
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//5>兜底選擇隊(duì)列
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
備注:
此處主要是通過判斷brokerName是否可用、不可用則該brokerName所有queue不可能、繼續(xù)找下一個(gè)brokerName、如果找不到則排序shuffle找一個(gè)可用的。如果最終找不到則調(diào)用TopicPublishInfo.selectOneMessageQueue兜底選擇一個(gè)隊(duì)列返回。
2:更新broker的可用性
(根據(jù)發(fā)送延時(shí)換算可用性延時(shí)):updateFaultItem#updateFaultItem
/**
* 如果 isolation為 true,則使用 30s作為 computeNotAvailableDuration方法的參數(shù);
* 如果 isolation為 false,則使用本次消息發(fā)送時(shí)延作為 computeNotAvailableDuration方法的參數(shù),
* 那 computeNotAvailableDuration 的作用 是 計(jì)算因本次消息發(fā)送故障需要 將 Broker 規(guī)避的時(shí)長,
* @param brokerName
* @param currentLatency
* @param isolation =true 表示10分鐘后可用,參見MQFaultStrategy的數(shù)組元數(shù)據(jù)
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i]; // 計(jì)算可用性延時(shí)
}
return 0;
}
備注:
此處邏輯相對(duì)簡單、小知識(shí)點(diǎn):記?。篿solation=true的特殊情況
3:更新broker可用性延時(shí)
LatencyFaultToleranceImpl#faultItemTable
/**
* 根據(jù) broker名稱從 緩存表中獲 取 Faultitem,如果找到則更新 Faultltem,否則創(chuàng) 建 Faultltem。
* 1) currentLatency、 startTimeStamp被volatile修飾。
* 2) startTimeStamp 為當(dāng)前系統(tǒng)時(shí)間加上需要規(guī)避的時(shí)長 。startTimeStamp 是 判斷 broker當(dāng)前是否可用的直接一句,請(qǐng)看 Faultltem#isAvailable方法。
* @param name brokerName
* @param currentLatency 消息發(fā)送故障延遲時(shí)間 。
* @param notAvailableDuration 不可用持續(xù)時(shí)辰, 在這個(gè)時(shí)間內(nèi), Broker 將被規(guī)避 。
*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);//計(jì)算新的可用性延時(shí)
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //計(jì)算新的可用性延時(shí)
}
}
備注:
此處邏輯就是更新計(jì)算FaultItem的StartTimestamp
四、結(jié)論
通過本文分析我們已經(jīng)清楚消息發(fā)送流程隊(duì)列選擇的三種方式,由于發(fā)送消息流程過程中不能動(dòng)態(tài)切換此三種方式,故每種選擇隊(duì)列方式建議根據(jù)實(shí)際情況進(jìn)行選擇使用;
至此Producer的核心流程源碼已經(jīng)分析完、建議有興趣可以回顧歷史文章。
程序員的核心競爭力其實(shí)還是技術(shù),因此對(duì)技術(shù)還是要不斷的學(xué)習(xí),關(guān)注 “IT巔峰技術(shù)” 公眾號(hào) ,該公眾號(hào)內(nèi)容定位:中高級(jí)開發(fā)、架構(gòu)師、中層管理人員等中高端崗位服務(wù)的,除了技術(shù)交流外還有很多架構(gòu)思想和實(shí)戰(zhàn)案例。
作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書作者,同時(shí)也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人,曾就職于拼多多、德邦等公司,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人,主要負(fù)責(zé)開發(fā)框架的搭建、中間件相關(guān)技術(shù)的二次開發(fā)和運(yùn)維管理、混合云及基礎(chǔ)服務(wù)平臺(tái)的建設(shè)。