RocketMq 讀寫(xiě)分離機(jī)制

系列

開(kāi)篇

  • RocketMQ 有屬于自己的一套讀寫(xiě)分離邏輯,會(huì)判斷主服務(wù)器的消息堆積量來(lái)決定消費(fèi)者是否向從服務(wù)器拉取消息消費(fèi)。

  • Consumer在向 Broker 發(fā)送消息拉取請(qǐng)求時(shí),會(huì)根據(jù)篩選出來(lái)的消息隊(duì)列,判定是從Master,還是從Slave拉取消息,默認(rèn)是Master。

  • Broker 接收到消息消費(fèi)者拉取請(qǐng)求,在獲取本地堆積的消息量后,會(huì)計(jì)算服務(wù)器的消息堆積量是否大于物理內(nèi)存的一定值,如果是,則標(biāo)記下次從 Slave服務(wù)器拉取,計(jì)算 Slave服務(wù)器的 Broker Id,并響應(yīng)給消費(fèi)者。

  • Consumer在接收到 Broker的響應(yīng)后,會(huì)把消息隊(duì)列與建議下一次拉取節(jié)點(diǎn)的 Broker Id 關(guān)聯(lián)起來(lái),并緩存在內(nèi)存中,以便下次拉取消息時(shí),確定從哪個(gè)節(jié)點(diǎn)發(fā)送請(qǐng)求。


Broker

public class GetMessageResult {

    private final List<SelectMappedBufferResult> messageMapedList =
        new ArrayList<SelectMappedBufferResult>(100);
    private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
    private GetMessageStatus status;
    private long nextBeginOffset;
    private long minOffset;
    private long maxOffset;
    private int bufferTotalSize = 0;
    // 標(biāo)識(shí)是否通過(guò)Slave拉拉取消息
    private boolean suggestPullingFromSlave = false;
    private int msgCount4Commercial = 0;
}

// 針對(duì)消息堆積量過(guò)大會(huì)切換到Slave進(jìn)行查詢(xún)。
// maxOffsetPy 為當(dāng)前最大物理偏移量,maxPhyOffsetPulling 為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量。
// TOTAL_PHYSICAL_MEMORY_SIZE 表示當(dāng)前系統(tǒng)物理內(nèi)存,accessMessageInMemoryMaxRatio 的默認(rèn)值為 40,
// 以上邏輯即可算出當(dāng)前消息堆積量是否大于物理內(nèi)存的 40%,如果大于則將 suggestPullingFromSlave 設(shè)置為 true。

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
  • 決定消費(fèi)者是否向從服務(wù)器拉取消息消費(fèi)的值存在 GetMessageResult 類(lèi)中。

  • suggestPullingFromSlave的默認(rèn)值為 false,即默認(rèn)消費(fèi)者不會(huì)消費(fèi)從服務(wù)器,但它會(huì)在消費(fèi)者發(fā)送消息拉取請(qǐng)求時(shí),動(dòng)態(tài)改變?cè)撝?,Broker 接收、處理消費(fèi)者拉取消息請(qǐng)求。

  • 針對(duì)本MessageQueue消息堆積量過(guò)大會(huì)切換到Slave進(jìn)行查詢(xún),maxOffsetPy 為當(dāng)前最大物理偏移量,maxPhyOffsetPulling 為本次消息拉取最大物理偏移量,他們的差即可表示消息堆積量,當(dāng)前消息堆積量是否大于物理內(nèi)存的 40%就會(huì)切換到Slave進(jìn)行查詢(xún)。


public class PullMessageResponseHeader implements CommandCustomHeader {
    // suggestWhichBrokerId標(biāo)識(shí)從哪個(gè)broker進(jìn)行查詢(xún)
    private Long suggestWhichBrokerId;
    private Long nextBeginOffset;
    private Long minOffset;
    private Long maxOffset;
}


public class PullMessageProcessor implements NettyRequestProcessor {

    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        response.setOpaque(request.getOpaque());

        final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

        if (getMessageResult != null) {
            response.setRemark(getMessageResult.getStatus().name());
            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
            responseHeader.setMinOffset(getMessageResult.getMinOffset());
            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

            // 建議從slave消費(fèi)消息
            if (getMessageResult.isSuggestPullingFromSlave()) {
                // 從slave查詢(xún)
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            } else {
                // 從master查詢(xún)
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }

            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    break;
                case SLAVE:
                    // 針對(duì)SLAVE需要判斷是否可讀,不可讀的情況下讀MASTER
                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                    }
                    break;
            }

            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                // consume too slow ,redirect to another machine
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                }
                // consume ok
                else {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                }
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
        }

        return response;
    }
}
  • PullMessageResponseHeader的suggestWhichBrokerId標(biāo)識(shí)某個(gè)MessageQueue的消息從具體的brokerId進(jìn)行查詢(xún)。
  • 針對(duì)Slave不可讀的情況會(huì)設(shè)置為從MASTER_ID進(jìn)行查詢(xún)。


Consumer

public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        // 處理MessageQueue對(duì)應(yīng)拉取的brokerId
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

        // 省略相關(guān)代碼

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }

    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
        // 保存在pullFromWhichNodeTable對(duì)象中
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }
}
  • Consumer收到拉取響應(yīng)回來(lái)的數(shù)據(jù)后,會(huì)將下次建議拉取的 brokerId緩存起來(lái)。


public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        // 查找MessageQueue應(yīng)該從brokerName的哪個(gè)節(jié)點(diǎn)查詢(xún)
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }


    public long recalculatePullFromWhichNode(final MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {
            return this.defaultBrokerId;
        }

        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }

        return MixAll.MASTER_ID;
    }
}
  • Consumer拉取消息的時(shí)候會(huì)從 pullFromWhichNodeTable 中取出拉取 brokerId確定去具體的broker進(jìn)行查詢(xún)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容