RocketMQ—Producer(五)路由隊(duì)列選擇

前言

路由隊(duì)列選擇的作用在于發(fā)送消息時(shí)可以指定發(fā)送到某個(gè)broker隊(duì)列,或均衡發(fā)送到broker隊(duì)列,其作用就是在于選擇合適的隊(duì)列進(jìn)行消息發(fā)送。

目前客戶端隊(duì)列選擇分為三種方式:

  • 第一種:可根據(jù)MessageQueueSelector的實(shí)現(xiàn)或自擴(kuò)展實(shí)現(xiàn)選擇隊(duì)列;

  • 第二種:未開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:false),會(huì)采用默認(rèn)輪訓(xùn)機(jī)制(默認(rèn)是此種實(shí)現(xiàn)方式);

  • 第三種:開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:true),會(huì)根據(jù)brokerName的可用性選擇隊(duì)列發(fā)送。

接下來我們就以這三種方式展開討論。

一、隊(duì)列選擇

MessageQueueSelector方式隊(duì)列選擇
在了解MessageQueueSelector的方式進(jìn)行隊(duì)列選擇時(shí),我們先回顧下MQProducer接口:
里面有多個(gè)方法簽名帶參數(shù)MessageQueueSelector,其實(shí)就是表明使用此種方式選擇消息隊(duì)列需要顯示穿參數(shù)才能使用;用下面這個(gè)接口方法進(jìn)行舉例分析:

SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

接下來我們直接看內(nèi)部實(shí)現(xiàn)源碼如何實(shí)現(xiàn)的:

DefaultMQProducerImpl#sendSelectImpl

private SendResult sendSelectImpl(
  Message msg, MessageQueueSelector selector,
  Object arg, final CommunicationMode communicationMode,
  final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, 
  MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  this.makeSureStateOK();     // 狀態(tài)檢測
  Validators.checkMessage(msg, this.defaultMQProducer); // 消息驗(yàn)證

  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  if (topicPublishInfo != null && topicPublishInfo.ok()) {
      MessageQueue mq = null;
      try {
          mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); // 1-隊(duì)列選擇
      } catch (Throwable e) {
          throw new MQClientException("select message queue throwed exception.", e);
      }

      long costTime = System.currentTimeMillis() - beginStartTime;
      if (timeout < costTime) {
          throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
      }
      if (mq != null) {
          return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); // 熟悉的配方,前面已經(jīng)分析
      } else {
          throw new MQClientException("select message queue return null.", null); // 異常拋出
      }
  }
  throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
  }

分析:

selector.select(topicPublishInfo.getMessageQueueList(), msg, arg) 隊(duì)列選擇;

其實(shí)現(xiàn)有以下三種:

  • SelectMessageQueueByHash(hash)

  • SelectMessageQueueByMachineRoom(機(jī)器隨機(jī))

  • SelectMessageQueueByRandom(隊(duì)列隨機(jī))

當(dāng)然自己也可以定制化擴(kuò)展,你說簡單不簡單?我們可簡單查看其中之一的實(shí)現(xiàn)源碼:

SelectMessageQueueByRandom

public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}

二、輪訓(xùn)機(jī)制

未開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:false),會(huì)采用默認(rèn)輪訓(xùn)機(jī)制;

來來來,我們直接上大餐,看到下面你就明白原來默認(rèn)機(jī)制是如此的簡單哈。

以下方法為入口:

DefaultMQProducerImpl#selectOneMessageQueue

/**
* 選擇一個(gè)消息隊(duì)列, lastBrokerName 就是上 一 次選擇的執(zhí)行發(fā)送消息失敗的 Broker。第一次執(zhí)行消息隊(duì)列選擇時(shí), lastBrokerName 為 null
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

重頭戲:

MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        ...下面分析...
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

備注:

TopicPublishInfo 熟悉不熟悉哈?可以翻翻文章(路由動(dòng)態(tài)更新)selectOneMessageQueue源碼注釋分析,可以簡單理解為隊(duì)列輪詢。

三、隊(duì)列發(fā)送

開啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:true),進(jìn)行選擇隊(duì)列發(fā)送。

3.1 發(fā)送延遲故障

如果發(fā)送延遲故障打開[sendLatencyFaultEnable:true],則發(fā)送時(shí)會(huì)統(tǒng)計(jì)發(fā)送耗時(shí)和失敗[updateFaultItem],當(dāng)某個(gè)broker節(jié)點(diǎn)發(fā)送失敗和發(fā)送耗時(shí)較長,則在一段時(shí)間內(nèi)不再選擇該broker[selectOneMessageQueue]

3.2 流程圖

簡單流程圖片如下:

1.png

描述:

  1. LatencyFaultToleranceImpl包含一個(gè)Map(key:brokerName,value:FaultItem可用信息):ConcurrentHashMap faultItemTable

  2. FaultItem 的數(shù)據(jù)結(jié)構(gòu)如下:

class FaultItem implements Comparable<FaultItem> {
  //條目唯一鍵,這里為 brokerName。
  private final String name;
  //本次消息發(fā)送延遲 。(消耗時(shí)間)
  private volatile long currentLatency;
  //故障規(guī)避開始時(shí)間 = 發(fā)生的時(shí)間 + notAvailableDuration 。(故障恢復(fù)時(shí)間)
  private volatile long startTimestamp;

  .....
}

備注:

FaultItem包含currentLatency發(fā)送耗時(shí),brokerName節(jié)點(diǎn)名稱,startTimestamp時(shí)間戳后broker可用。

3.3 元數(shù)據(jù)映射

從MQFaultStrategy可以看出:發(fā)送延時(shí)一可用性延時(shí)~元數(shù)據(jù)映射

private final static InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//啟用 Broker故障延遲機(jī)制 ,默認(rèn)不啟用
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

簡單總結(jié)如下:

  • latencyMax數(shù)組:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L 對(duì)應(yīng):發(fā)送耗時(shí)50ms 100ms 550ms 1s 2s 3s 15s

  • notAvailableDuration數(shù)組:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L 對(duì)應(yīng):可用延時(shí) 0s 0s 30s 60s 2min 3min 10min

故可以理解為:

  1. 發(fā)送耗時(shí)50和100毫秒,則當(dāng)前broker延遲0秒

  2. 發(fā)送耗時(shí)550和1000毫秒,則當(dāng)前broker延遲30秒和60秒

  3. 發(fā)送耗時(shí)2秒,則當(dāng)前broker延遲2min

  4. 發(fā)送耗時(shí)3秒,則當(dāng)前broker延遲3min

  5. 發(fā)送耗時(shí)15秒,則當(dāng)前broker延遲10min6

  6. 如果發(fā)送失敗,則直接延遲10min

3.4 源碼分析:

1:選擇隊(duì)列

MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  if (this.sendLatencyFaultEnable) { // 1> 默認(rèn)false,等于true相當(dāng)于開啟
      try {
          int index = tpInfo.getSendWhichQueue().getAndIncrement();
          for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
              int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
              if (pos < 0)
                  pos = 0;
              MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
              if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 2> 判斷可用性-時(shí)間判斷(存儲(chǔ)了所有發(fā)送消息失敗過的broker)
                  if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                      return mq; // 3> 說明找到了可用的MessageQueue直接返回
              }
          }
          //4> 嘗試從規(guī)避的 Broker 中選擇一個(gè)可用的 Broker(shuffle),如果沒有找到,將返回 null。--
          final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
          int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
          if (writeQueueNums > 0) {
              final MessageQueue mq = tpInfo.selectOneMessageQueue();
              if (notBestBroker != null) {
                  mq.setBrokerName(notBestBroker);
                  mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
              }
              return mq;
          } else {
              latencyFaultTolerance.remove(notBestBroker);
          }
      } catch (Exception e) {
          log.error("Error occurred when selecting message queue", e);
      }
      //5>兜底選擇隊(duì)列
      return tpInfo.selectOneMessageQueue();
  }
  return tpInfo.selectOneMessageQueue(lastBrokerName);
}

備注:

此處主要是通過判斷brokerName是否可用、不可用則該brokerName所有queue不可能、繼續(xù)找下一個(gè)brokerName、如果找不到則排序shuffle找一個(gè)可用的。如果最終找不到則調(diào)用TopicPublishInfo.selectOneMessageQueue兜底選擇一個(gè)隊(duì)列返回。

2:更新broker的可用性

(根據(jù)發(fā)送延時(shí)換算可用性延時(shí)):updateFaultItem#updateFaultItem

/**
 * 如果 isolation為 true,則使用 30s作為 computeNotAvailableDuration方法的參數(shù);
 * 如果 isolation為 false,則使用本次消息發(fā)送時(shí)延作為 computeNotAvailableDuration方法的參數(shù),
 * 那 computeNotAvailableDuration 的作用 是 計(jì)算因本次消息發(fā)送故障需要 將 Broker 規(guī)避的時(shí)長,
 * @param brokerName
 * @param currentLatency
 * @param isolation =true 表示10分鐘后可用,參見MQFaultStrategy的數(shù)組元數(shù)據(jù)
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i]; // 計(jì)算可用性延時(shí)
    }

    return 0;
}

備注:

此處邏輯相對(duì)簡單、小知識(shí)點(diǎn):記?。篿solation=true的特殊情況

3:更新broker可用性延時(shí)

LatencyFaultToleranceImpl#faultItemTable

/**
 * 根據(jù) broker名稱從 緩存表中獲 取 Faultitem,如果找到則更新 Faultltem,否則創(chuàng) 建 Faultltem。
 * 1) currentLatency、 startTimeStamp被volatile修飾。
 * 2) startTimeStamp 為當(dāng)前系統(tǒng)時(shí)間加上需要規(guī)避的時(shí)長 。startTimeStamp 是 判斷 broker當(dāng)前是否可用的直接一句,請(qǐng)看 Faultltem#isAvailable方法。
 * @param name brokerName
 * @param currentLatency 消息發(fā)送故障延遲時(shí)間 。
 * @param notAvailableDuration 不可用持續(xù)時(shí)辰, 在這個(gè)時(shí)間內(nèi), Broker 將被規(guī)避 。
 */
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);//計(jì)算新的可用性延時(shí)
        }
    } else {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //計(jì)算新的可用性延時(shí)
    }
}

備注:

此處邏輯就是更新計(jì)算FaultItem的StartTimestamp

四、結(jié)論

  1. 通過本文分析我們已經(jīng)清楚消息發(fā)送流程隊(duì)列選擇的三種方式,由于發(fā)送消息流程過程中不能動(dòng)態(tài)切換此三種方式,故每種選擇隊(duì)列方式建議根據(jù)實(shí)際情況進(jìn)行選擇使用;

  2. 至此Producer的核心流程源碼已經(jīng)分析完、建議有興趣可以回顧歷史文章。


程序員的核心競爭力其實(shí)還是技術(shù),因此對(duì)技術(shù)還是要不斷的學(xué)習(xí),關(guān)注 “IT巔峰技術(shù)” 公眾號(hào) ,該公眾號(hào)內(nèi)容定位:中高級(jí)開發(fā)、架構(gòu)師、中層管理人員等中高端崗位服務(wù)的,除了技術(shù)交流外還有很多架構(gòu)思想和實(shí)戰(zhàn)案例。

作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書作者,同時(shí)也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人,曾就職于拼多多、德邦等公司,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人,主要負(fù)責(zé)開發(fā)框架的搭建、中間件相關(guān)技術(shù)的二次開發(fā)和運(yùn)維管理、混合云及基礎(chǔ)服務(wù)平臺(tái)的建設(shè)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容