目錄:
- 前言
- quickStart
- 單刀直入
- Remoting 模塊發(fā)送消息實(shí)現(xiàn)
- 如何處理返回值
- Broker Server 處理消息流程
前言
RocketMQ 目前在國內(nèi)應(yīng)該是比較流行的 MQ 了,樓主目前也在使用中,今天借著本文,理理 RocketMQ 發(fā)送一條消息到存儲(chǔ)一條消息的過程。
注意:本文主線是發(fā)送到存儲(chǔ),因此,閱讀源碼時(shí),其他和這條線相關(guān)度不高的代碼,會(huì)酌情閱讀。另外,本文的目的是為了看清一條消息是如何被發(fā)出且被存儲(chǔ)的,代碼中,關(guān)于 MQ 文件系統(tǒng)的優(yōu)化,設(shè)計(jì)等,并不會(huì)花很多篇幅介紹。
quickStart
來自官方源碼 example 的一段發(fā)送代碼:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
單刀直入
我們直接看看 send 方法。
send 方法會(huì)設(shè)置一個(gè)默認(rèn)的 timeout, 3 秒。
默認(rèn)使用 SYNC 模式,另外有 Async 和 OneWay 模式。
我們需要處理方法簽名中的 Client 端的異常,網(wǎng)絡(luò)異常,Broker 端的異常,線程中斷異常。
DefaultMQProducerImpl # sendDefaultImpl 方法就是發(fā)送的主要邏輯。

這端代碼里,有個(gè)有趣的地方,可以提一下,關(guān)于更新故障時(shí)間的策略,RMQ 有一個(gè)類 MQFaultStrategy,用來處理 MQ 錯(cuò)誤,然后對(duì) MQ Server 進(jìn)行服務(wù)降級(jí)。
對(duì)照?qǐng)D:

這個(gè)策略具體內(nèi)容:如果發(fā)送一條消息在 550 毫秒以內(nèi),那么就不用降級(jí),如果550 毫秒以外,就進(jìn)行容錯(cuò)降級(jí)(熔斷)30 秒,以此類推。
再看 DefaultMQProducerImpl # sendKernelImpl 發(fā)送到內(nèi)核的方法實(shí)現(xiàn)。
先找到 broker 的地址。嘗試壓縮大于 4M 的消息(批量消息不壓縮)。執(zhí)行各種鉤子。構(gòu)造 Request 對(duì)象(存放數(shù)據(jù)),Context 上下文對(duì)象(存放調(diào)用上下文)。
這里會(huì)設(shè)置一個(gè)消息生成時(shí)間,即 bornTimestamp。后面使用消息軌跡的時(shí)候,可以查看。
最后,如果是 SYNC 模式,就調(diào)用 MQClientAPIImpl 來發(fā)送消息,這一層還是在 Client 模塊里,在這一層,會(huì)設(shè)置更詳細(xì)的消息細(xì)節(jié),構(gòu)造命令對(duì)象。最后調(diào)用 remotingClient # invokeSync 發(fā)送消息。
注意,在 MQClientAPIImpl # sendMessage 這一層,會(huì)給命令對(duì)象設(shè)置一個(gè) CmdCode,叫 SEND_MESSAGE,這個(gè)東西就是一個(gè)和 Broker 的契約,Broker 會(huì)根據(jù)這個(gè) Code 進(jìn)行不同的策略。另外,如果這里用 RPC 的方式,例如,使用一個(gè)接口的抽象方法,然后 Broker 對(duì)抽象方法進(jìn)行 RPC 調(diào)用,這樣可不可以呢?
最后,看看 remotingClient # invokeSync 是如何實(shí)現(xiàn)的。
Remoting 模塊發(fā)送消息實(shí)現(xiàn)
invokeSync 方法首先執(zhí)行 RPCBefore 鉤子,類似 Spring 的各種 Bean 擴(kuò)展組件,然后就是對(duì)超時(shí)進(jìn)行判斷??梢钥吹?,每個(gè)方法幾乎都有對(duì)超時(shí)的判斷,超時(shí)判斷和超時(shí)處理在分布式場(chǎng)景非常重要。
然后根據(jù) addr 找到對(duì)應(yīng)的 Socket Channel。然后執(zhí)行 invokeSyncImpl 方法。
這里其實(shí)和其他大部分的 RPC 框架都是類似的了,生產(chǎn)一個(gè)永遠(yuǎn)自增的 Request ID,創(chuàng)建一個(gè) Feature 對(duì)象,和這個(gè) ID 綁定,方便 Netty 返回?cái)?shù)據(jù)對(duì)這個(gè) ID 對(duì)應(yīng)的線程進(jìn)行喚醒。
然后調(diào)用 Netty 的 writeAndFlush 方法,將數(shù)據(jù)寫進(jìn) Socket,同時(shí)添加一個(gè)監(jiān)聽器,如果發(fā)送失敗,喚醒當(dāng)前線程。
發(fā)送完畢之后,當(dāng)前線程進(jìn)行等待,使用 CountDownLatch.wait 方法實(shí)現(xiàn),當(dāng) Netty 返回?cái)?shù)據(jù)時(shí),使用 CountDownLatch.countDown 進(jìn)行喚醒,然后返回從 Broker 寫入的結(jié)果,可能成功,也可能失敗,需要到上層(Client 層)解析,網(wǎng)絡(luò)層只負(fù)責(zé)網(wǎng)絡(luò)的事情。
我們知道, Netty 會(huì)使用 Handler 處理出去的數(shù)據(jù)和返回的數(shù)據(jù),我們看看 Client 端 Netty 有哪些 Handler.
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
我們看到,這里使用了一個(gè) Encoder,Decoder,空閑處理器,連接管理器,ClientHandler。
XXCoder 就是對(duì) Cmd 對(duì)象進(jìn)行序列化和反序列化的。這里的空閑使用的讀寫最大空閑時(shí)間為 120s,超過這個(gè),就會(huì)觸發(fā)空閑事件。RMQ 就會(huì)關(guān)閉 Channel 連接。而針對(duì)空閑事件進(jìn)行處理的就是連接管理器了。
連接管理器處理空閑、Close、Connect、異常等事件,使用監(jiān)聽器模式,不同的監(jiān)聽器對(duì)不同的事件進(jìn)行處理。另外,這里也許可以借鑒 EventBus,每個(gè)事件可以設(shè)置多個(gè)監(jiān)聽器。
如何處理返回值
我們看了 RMQ 中 Netty 的設(shè)計(jì),再看看返回值處理就簡單了,NettyClientHandler 會(huì)在 channelRead0 方法處理 Netty Server 的返回值。對(duì)應(yīng) RMQ,則是 processMessageReceived 方法。該方法很簡潔:
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
其實(shí),這是一個(gè)模板方法,固定算法,由子類實(shí)現(xiàn),分為 Request 實(shí)現(xiàn)和 Response 實(shí)現(xiàn)。我們看看 Response 實(shí)現(xiàn)。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// 找到 Response .
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {// 返回結(jié)果
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
這里,通過 cmd 對(duì)象的 Request ID 找到 Feature,執(zhí)行 responseFuture.putResponse,設(shè)置返回值,喚醒阻塞等待的發(fā)送線程。這里還有一個(gè) release 調(diào)用,這個(gè)和異步發(fā)送有關(guān),默認(rèn)最大同時(shí) 65535 個(gè)異步請(qǐng)求,具體就不展開了。
好,到這里,喚醒阻塞的發(fā)送線程,返回?cái)?shù)據(jù),客戶端層面的發(fā)送就結(jié)束了,我們小結(jié)一下。根據(jù)模塊層次,我們記錄一下 sendMessage 的過程:

層次還是比較清晰的。
我們?cè)賮砜纯?Server 端如何處理一條消息的。
Broker Server 處理消息流程
從哪里入手呢?
我們上面看源碼,看到有個(gè) SEND_MESSAGE Code,是 Client 和 Broker Server 的一個(gè)約定代碼,我們看看這個(gè)代碼在哪里用的。
在 broker 模塊的 BrokerController 類中,有個(gè) registerProcessor 方法,會(huì)將 SEND_MESSAGE Code 和一個(gè) SendMessageProcessor 對(duì)象綁定。
這一步我們停一下,再去看看 netty Server 端的 Handler。
NettyRemotingServer 是處理 Request 的類,他的 ServerBootstrap 會(huì)在 pipeline 中添加一個(gè) NettyServerHandler 處理器,這個(gè)處理器的 channelRead0 方法會(huì)調(diào)用 NettyRemotingServer 的父類 processMessageReceived 方法。
這個(gè)方法會(huì)從 processorTable 里,根據(jù) Cmd Code,也就是 SEND_MESSAGE 獲取對(duì)應(yīng)的 Processor, Processor 由 2 部分組成,一部分是處理數(shù)據(jù)的對(duì)象,一部分是這個(gè)對(duì)象所對(duì)應(yīng)的線程池。用于異步處理邏輯,防止阻塞 Netty IO 線程。
關(guān)鍵代碼:
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);// 處理.
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
前后都是執(zhí)行一些鉤子,例如 ACL 啥的。
這里我們小結(jié)一下,RMQ 會(huì)有一個(gè) BrokerController 類,會(huì)注冊(cè) Code 和 Processor 的綁定關(guān)系,BrokerController 也會(huì)把這些綁定,注冊(cè)到 Netty Server 中,當(dāng) Netty Server 從 Socket 收到 Cmd 對(duì)象,根據(jù) Cmd 對(duì)象的 Code,就可以找到對(duì)應(yīng) Processor 類,對(duì)數(shù)據(jù)進(jìn)行處理。
中間是處理 Request 請(qǐng)求的。這個(gè) processRequest 方法,有很多的實(shí)現(xiàn),如下圖,我們主要看 SendMessageProcessor 的實(shí)現(xiàn)。

SendMessageProcessor # sendMessage 是處理消息的主要邏輯。
關(guān)鍵代碼:
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
消息存儲(chǔ)引擎,這里我們看 DefaultMessageStore 的 putMessage 實(shí)現(xiàn)。
首先一堆校驗(yàn)。注意,其中有一個(gè)地方:
if (this.isOSPageCacheBusy()) {// 檢查 mmp 忙不忙.
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
由于 RMQ 寫數(shù)據(jù)是王 PageCache 里面寫的,因此,如果寫的慢,就是 PageCache 忙,這里忙的標(biāo)準(zhǔn)是,如果鎖文件的時(shí)間,超過了 1 秒,那就是忙。
最后調(diào)用 PutMessageResult result = this.commitLog.putMessage(msg) 寫數(shù)據(jù)。
如果耗時(shí)超過 500 毫秒,就會(huì)打印日志。這樣我們排查問題的時(shí)候,可以看看 storeStats 的日志。
看看 commitLog 的 putMessage 方法實(shí)現(xiàn)。
先拿到最新的 MappedFile 文件,MappedFile 文件的命名是用 offset 命名的,一個(gè)文件默認(rèn) 1gb,這個(gè)大小和 mmp 的機(jī)制有關(guān),通常不能過大。
然后上鎖,這段代碼是可以說整個(gè) RMQ Server 的熱點(diǎn)區(qū)域,
這里上鎖會(huì)記錄上鎖的時(shí)間,方便前面做 PageCache Busy 的判斷。
寫入代碼:
result = mappedFile.appendMessage(msg, this.appendMessageCallback)
寫完之后,釋放鎖,如果超過 500 毫秒,打印 cost time 日志。
統(tǒng)計(jì)。
處理刷盤和slave 同步,這里看刷盤策略和同步策略,是 SYNC 還是 ASYNC。
經(jīng)過我的測(cè)試,同步刷盤和異步刷盤的性能差距是 10 倍。
而 Slave 的數(shù)據(jù)同步,如果用 SYNC 模式,tps 最高也就 2000 多一丟度,為什么?內(nèi)網(wǎng),兩臺(tái)機(jī)器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上處理邏輯, 2000 已經(jīng)到頂了,網(wǎng)絡(luò)成了瓶頸。
如果用全異步的話,我的 4c8g 的機(jī)器,單機(jī) tps 最高能 2 萬多。美滋滋。
跑題了。
我們看看 mappedFile.appendMessage 方法的實(shí)現(xiàn)。
一路追蹤,有個(gè)關(guān)鍵邏輯, 在 appendMessagesInner 里:
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
// 寫數(shù)據(jù)到 緩存
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
代碼中,使用了 mappedFile 從 Linux 映射的 MMap buffer,對(duì)數(shù)據(jù)進(jìn)行寫入。我們看看 doAppend 方法。
我們可以看到,一條消息有太多的內(nèi)容:
總長度、魔數(shù)、CRC 校驗(yàn)、隊(duì)列 ID、各種 flag、存儲(chǔ)時(shí)間,物理 offset、存儲(chǔ) IP、時(shí)間戳、擴(kuò)展屬性等等。
最終,這條消息會(huì)被寫入到 MMap 中。
那什么時(shí)候刷盤呢?
如果是 SYNC 模式,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時(shí),就會(huì)立刻刷盤并等待刷盤結(jié)果。
如果是 ASYNC 模式,執(zhí)行 CommitLog 的 handleDiskFlush 的方法時(shí),會(huì)通知異步線程進(jìn)行刷盤,但不等待結(jié)果。
另外,如果沒有新數(shù)據(jù),則為 500ms 執(zhí)行一次刷盤策略。
簡單說下異步刷盤:
- 默認(rèn)刷盤 4 頁,Linux 一頁是 4kb 數(shù)據(jù),4頁就是 16kb。
- 如果寫的數(shù)據(jù)減去已經(jīng)刷的數(shù)據(jù),剩下的數(shù)據(jù)大于等于 4 頁,就執(zhí)行刷盤.
- 執(zhí)行 mappedByteBuffer.force() 或者 fileChannel.force(false);
我們這里小結(jié)一下,看看 RMQ Server 處理處理一條消息的:

總結(jié)
來張大圖總結(jié)一下。

篇幅有限,下篇再一起看看 RMQ 如何消費(fèi)消息。