Spark通信框架Spark Network Common

Spark Network 模塊分析

為什么用Netty通信框架代替Akka

一直以來,基于Akka實(shí)現(xiàn)的RPC通信框架是Spark引以為豪的主要特性,也是與Hadoop等分布式計(jì)算框架對比過程中一大亮點(diǎn),但是時(shí)代和技術(shù)都在演化,從Spark1.3.1版本開始,為了解決大塊數(shù)據(jù)(如Shuffle)的傳輸問題,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承擔(dān)Spark內(nèi)部所有的RPC通信以及數(shù)據(jù)流傳輸。

JAVA IO也經(jīng)歷了幾次演化,從最早的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO復(fù)用),到1.7版本的NIO2.0/AIO(異步IO)。
基于早期BIO來實(shí)現(xiàn)高并發(fā)網(wǎng)絡(luò)服務(wù)器都是依賴多線程來實(shí)現(xiàn),但是線程開銷較大,BIO的瓶頸明顯,NIO的出現(xiàn)解決了這一大難題,基于IO復(fù)用解決了IO高并發(fā)
但是NIO有也有幾個(gè)缺點(diǎn):

  • API可用性較低(拿ByteBuffer來說,共用一個(gè)curent指針,讀寫切換需要進(jìn)行flip和rewind,相當(dāng)麻煩)
  • 僅僅是API,如果想在NIO上實(shí)現(xiàn)一個(gè)網(wǎng)絡(luò)模型,還需要自己寫很多比如線程池,解碼,半包/粘包,限流等邏輯
  • 著名的NIO-Epoll死循環(huán)的BUG

因?yàn)檫@幾個(gè)原因,促使了很多JAVA-IO通信框架的出現(xiàn),Netty就是其中一員,它也因?yàn)楦叨鹊姆€(wěn)定性,功能性,性能等特性,成為Java開發(fā)的首選

那么Netty和JDK-NIO之間到底是什么關(guān)系?

首先是NIO的上層封裝,Netty提供了NioEventLoopGroup / NioSocketChannel / NioServerSocketChannel的組合來完成實(shí)際IO操作,繼而在此之上實(shí)現(xiàn)數(shù)據(jù)流Pipeline以及EventLoop線程池等功能。

另外它又重寫了NIO,JDK-NIO底層是基于Epoll的LT模式來實(shí)現(xiàn),而Netty是基于Epoll的ET模式實(shí)現(xiàn)的一組IO操作EpollEventLoopGroup / EpollSocketChannel / EpollServerSocketChannel
Netty對兩種實(shí)現(xiàn)進(jìn)行完美的封裝,可以根據(jù)業(yè)務(wù)的需求來選擇不同的實(shí)現(xiàn)

Epoll的ET和LT模式真的有很大的性能差別嗎?單從Epoll的角度來看,ET肯定是比LT要性能好那么一點(diǎn)。如果為了編碼簡潔性,LT還是首選,ET如果用戶層邏輯實(shí)現(xiàn)不夠優(yōu)美,相比ET還會帶來更大大性能開銷

那么Akka又是什么?

從Akka出現(xiàn)背景來說,它是基于Actor的RPC通信系統(tǒng),它的核心概念也是Message,它是基于協(xié)程的,性能不容置疑;基于scala的偏函數(shù),易用性也沒有話說,但是它畢竟只是RPC通信,無法適用大的package/stream的數(shù)據(jù)傳輸,這也是Spark早期引入Netty的原因。

那么Netty為什么可以取代Akka?

首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka卻無法做到。原因是啥?在軟件棧中,Akka相比Netty要Higher一點(diǎn),它專門針對RPC做了很多事情,而Netty相比更加基礎(chǔ)一點(diǎn),可以為不同的應(yīng)用層通信協(xié)議(RPC,F(xiàn)TP,HTTP等)提供支持,在早期的Akka版本,底層的NIO通信就是用的Netty。

其次一個(gè)優(yōu)雅的工程師是不會允許一個(gè)系統(tǒng)中容納兩套通信框架!最后,雖然Netty沒有Akka協(xié)程級的性能優(yōu)勢,但是Netty內(nèi)部高效的Reactor線程模型,無鎖化的串行設(shè)計(jì),高效的序列化,零拷貝,內(nèi)存池等特性也保證了Netty不會存在性能問題。
那么Spark是怎么用Netty來取代Akka呢?一句話,利用偏函數(shù)的特性,基于Netty“仿造”出一個(gè)簡約版本的Actor模型。

Spark Network Common的實(shí)現(xiàn)

Byte的表示

對于Network通信,不管傳輸?shù)氖切蛄谢蟮膶ο筮€是文件,在網(wǎng)絡(luò)上表現(xiàn)的都是字節(jié)流。在傳統(tǒng)IO中,字節(jié)流表示為Stream;在NIO中,字節(jié)流表示為ByteBuffer;在Netty中字節(jié)流表示為ByteBuff或FileRegion;在Spark中,針對Byte也做了一層包裝,支持對Byte和文件流進(jìn)行處理,即ManagedBuffer;
ManagedBuffer包含了三個(gè)函數(shù)createInputStream(),nioByteBuffer(),convertToNetty()來對Buffer進(jìn)行“類型轉(zhuǎn)換”,分別獲取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer / NettyManagedBuffer / FileSegmentManagedBuffer也是針對性提供了具體的實(shí)現(xiàn)。

更好的理解ManagedBuffer:比如Shuffle BlockManager模塊需要在內(nèi)存中維護(hù)本地executor生成的shuffle-map輸出的文件引用,從而可以提供給shuffleFetch進(jìn)行遠(yuǎn)程讀取,此時(shí)文件表示為FileSegmentManagedBuffer,shuffleFetch遠(yuǎn)程調(diào)用FileSegmentManagedBuffer.nioByteBuffer / createInputStream函數(shù)從文件中讀取為Bytes,并進(jìn)行后面的網(wǎng)絡(luò)傳輸。如果已經(jīng)在內(nèi)存中bytes就更好理解了,比如將一個(gè)字符數(shù)組表示為NettyManagedBuffer。

Protocol的表示

協(xié)議是應(yīng)用層通信的基礎(chǔ),它提供了應(yīng)用層通信的數(shù)據(jù)表示,以及編碼和解碼的能力。在Spark Network Common中,繼承AKKA中的定義,將協(xié)議命名為Message,它繼承Encodable,提供了encode的能力。

Message根據(jù)請求響應(yīng)可以劃分為RequestMessage和ResponseMessage兩種;對于Response,根據(jù)處理結(jié)果,可以劃分為Failure和Success兩種類型;根據(jù)功能的不同,主要劃分為Stream,ChunkFetch,Rpc。

  • Stream消息就是上面提到的ManagedBuffer中的Stream流,在Spark內(nèi)部,比如SparkContext.addFile操作會在Driver中針對每一個(gè)add進(jìn)來的file / jar會分配唯一的StreamID(file / [filename],jars / [filename]);worker通過該StreamID向Driver發(fā)起一個(gè)StreamRequest的請求,Driver將文件轉(zhuǎn)換為FileSegmentManagedBuffer返回給Worker,這就是StreamMessage的用途之一;

  • ChunkFetch也有一個(gè)類似Stream的概念,ChunkFetch的對象是“一個(gè)內(nèi)存中的Iterator[ManagedBuffer]”,即一組Buffer,每一個(gè)Buffer對應(yīng)一個(gè)chunkIndex,整個(gè)Iterator[ManagedBuffer]由一個(gè)StreamID標(biāo)識。Client每次的ChunkFetch請求是由(streamId,chunkIndex)組成的唯一的StreamChunkId,Server端根據(jù)StreamChunkId獲取為一個(gè)Buffer并返回給Client; 不管是Stream還是ChunkFetch,在Server的內(nèi)存中都需要管理一組由StreamID與資源之間映射,即StreamManager類,它提供了getChunk和openStream兩個(gè)接口來分別響應(yīng)ChunkFetch與Stream兩種操作,并且針對Server的ChunkFetch提供一個(gè)registerStream接口來注冊一組Buffer,比如可以將BlockManager中一組BlockID對應(yīng)的Iterator[ManagedBuffer]注冊到StreamManager,從而支持遠(yuǎn)程Block Fetch操作。

  • Case:對于ExternalShuffleService(一種單獨(dú)shuffle服務(wù)進(jìn)程,對其他計(jì)算節(jié)點(diǎn)提供本節(jié)點(diǎn)上面的所有shuffle map輸出),它為遠(yuǎn)程Executor提供了一種OpenBlocks的RPC接口,即根據(jù)請求的appid,executorid,blockid(appid+executor對應(yīng)本地一組目錄,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內(nèi)存,并返回加載后的streamId返回給客戶端,從而支持后續(xù)的ChunkFetch的操作。
    RPC是第三種核心的Message,和Stream/ChunkFetch的Message不同,每次通信的Body是類型是確定的,在rpcHandler可以根據(jù)每種Body的類型進(jìn)行相應(yīng)的處理。 在Spark1.6.*版本中,也正式使用基于Netty的RPC框架來替代Akka。

Server的結(jié)構(gòu)

Server構(gòu)建在Netty之上,它提供兩種模型NIO和Epoll,可以通過參數(shù)(spark.[module].io.mode)進(jìn)行配置,最基礎(chǔ)的module就是shuffle,不同的IOMode選型,對應(yīng)了Netty底層不同的實(shí)現(xiàn),Server的Init過程中,最重要的步驟就是根據(jù)不同的IOModel完成EventLoop和Pipeline的構(gòu)造

EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
    switch (mode) {
    case NIO:
        return new NioEventLoopGroup(numThreads, threadFactory);
    case EPOLL:
        return new EpollEventLoopGroup(numThreads, threadFactory);
    }
}

public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
    switch(mode) {
        case NIO:
            return NioServerSocketChannel.class;
        case EPOLL:
            return EpollServerSocketChannel.class;
    }
}

Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
    switch (mode) {
        case NIO:
            return NioServerSocketChannel.class;
        case EPOLL:
            return EpollServerSocketChannel.class;
    }
}

channel.pipeline()
    .addLast("encoder", this.encoder)
    .addLast("frameDecoder", NettyUtils.createFrameDecoder())
    .addLast("decoder", this.decoder)
    .addLast("idleStateHandler", new IdleStateHandler(0, 0, this.conf.connectionTimeoutMs() / 1000))
    .addLast("handler", channelHandler);

其中,MessageEncoder/Decoder針對網(wǎng)絡(luò)包到Message的編碼和解碼,而最為核心就TransportRequestHandler,它封裝了對所有請求/響應(yīng)的處理;

TransportChannelHandler內(nèi)部實(shí)現(xiàn)也很簡單,它封裝了responseHandler和requestHandler,當(dāng)從Netty中讀取一條Message以后,根據(jù)判斷路由給相應(yīng)的responseHandler和requestHandler。

public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
        this.processFetchRequest((ChunkFetchRequest)request);
    } else if (request instanceof RpcRequest) {
        this.processRpcRequest((RpcRequest)request);
    } else if (request instanceof OneWayMessage) {
        this.processOneWayMessage((OneWayMessage)request);
    } else {
        if (!(request instanceof StreamRequest)) {
            throw new IllegalArgumentException("Unknown request type: " + request);
        }    
        this.processStreamRequest((StreamRequest)request);
    }
}

public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
    if (request instanceof RequestMessage) {
        this.requestHandler.handle((RequestMessage)request);
    } else {
        this.responseHandler.handle((ResponseMessage)request);
    }
}

Sever提供的RPC,ChunkFecth,Stream的功能都是依賴TransportRequestHandler來實(shí)現(xiàn)的;從原理上來說,RPC與ChunkFecth / Stream還是有很大不同的,其中RPC對于TransportRequestHandler來說是功能依賴,而ChunkFecth / Stream對于TransportRequestHandler來說只是數(shù)據(jù)依賴。

怎么理解?即TransportRequestHandler已經(jīng)提供了ChunkFecth / Stream的實(shí)現(xiàn),只需要在構(gòu)造的時(shí)候,向TransportRequestHandler提供一個(gè)streamManager,告訴RequestHandler從哪里可以讀取到Chunk或者Stream。而RPC需要向TransportRequestHandler注冊一個(gè)rpcHandler,針對每個(gè)RPC接口進(jìn)行功能實(shí)現(xiàn),同時(shí)RPC與ChunkFecth / Stream都會有同一個(gè)streamManager的依賴,因此注入到TransportRequestHandler中的streamManager也是依賴rpcHandler來實(shí)現(xiàn),即rpcHandler中提供了RPC功能實(shí)現(xiàn)和streamManager的數(shù)據(jù)依賴。

Client的結(jié)構(gòu)

Server是通過監(jiān)聽一個(gè)端口,注入rpcHandler和streamManager從而對外提供RPC,ChunkFecth,Stream的服務(wù),而Client即為一個(gè)客戶端類,通過該類,可以將一個(gè)streamId / chunkIndex對應(yīng)的ChunkFetch請求,streamId對應(yīng)的Stream請求,以及一個(gè)RPC數(shù)據(jù)包對應(yīng)的RPC請求發(fā)送到服務(wù)端,并監(jiān)聽和處理來自服務(wù)端的響應(yīng);其中最重要的兩個(gè)類即為TransportClient和TransportResponseHandler分別為上述的“客戶端類”和“監(jiān)聽和處理來自服務(wù)端的響應(yīng)"。

那么TransportClient和TransportResponseHandler是怎么配合一起完成Client的工作呢?由TransportClient將用戶的RPC,ChunkFecth,Stream的請求進(jìn)行打包并發(fā)送到Server端,同時(shí)將用戶提供的回調(diào)函數(shù)注冊到TransportResponseHandler,TransportResponseHandler是TransportChannelHandler的一部分,在TransportChannelHandler接收到數(shù)據(jù)包,并判斷為響應(yīng)包以后,將包數(shù)據(jù)路由到TransportResponseHandler中,在TransportResponseHandler中通過注冊的回調(diào)函數(shù),將響應(yīng)包的數(shù)據(jù)返回給客戶端

Spark Network的功能應(yīng)用--BlockTransfer&&Shuffle

無論是BlockTransfer還是ShuffleFetch都需要跨executor的數(shù)據(jù)傳輸,在每一個(gè)executor里面都需要運(yùn)行一個(gè)Server線程(后面也會分析到,對于Shuffle也可能是一個(gè)獨(dú)立的ShuffleServer進(jìn)程存在)來提供對Block數(shù)據(jù)的遠(yuǎn)程讀寫服務(wù)

在每個(gè)Executor里面,都有一個(gè)BlockManager模塊,它提供了對當(dāng)前Executor所有的Block的“本地管理”,并對進(jìn)程內(nèi)其他模塊暴露getBlockData(blockId: BlockId): ManagedBuffer的Block讀取接口,但是這里GetBlockData僅僅是提供本地的管理功能,對于跨遠(yuǎn)程的Block傳輸,則由NettyBlockTransferService提供服務(wù)。

NettyBlockTransferService本身即是Server,為其他其他遠(yuǎn)程Executor提供Block的讀取功能,同時(shí)它即為Client,為本地其他模塊暴露fetchBlocks的接口,支持通過host/port拉取任何Executor上的一組的Blocks。
源碼位置 spark-core: org.apache.spark.network.netty

NettyBlockTransferService作為一個(gè)Server

NettyBlockTransferService作為一個(gè)Server,與Executor或Driver里面其他的服務(wù)一樣,在進(jìn)程啟動時(shí),由SparkEnv初始化構(gòu)造并啟動服務(wù),在整個(gè)運(yùn)行時(shí)的一部分。

SparkEnv.create

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
val envInstance = new SparkEnv(... blockTransferService ...)

一個(gè)Server的構(gòu)造依賴RpcHandler提供RPC的功能注入以及提供streamManager的數(shù)據(jù)注入。對于NettyBlockTransferService,該RpcHandler即為NettyBlockRpcServer,在構(gòu)造的過程中,需要與本地的BlockManager進(jìn)行管理,從而支持對外提供本地BlockMananger中管理的數(shù)據(jù)

RpcHandler提供RPC的功能注入在這里還是屬于比較“簡陋的”,畢竟他是屬于數(shù)據(jù)傳輸模塊,Server中提供的chunkFetch和stream已經(jīng)足夠滿足他的功能需要,那現(xiàn)在問題就是怎么從streamManager中讀取數(shù)據(jù)來提供給chunkFetch和stream進(jìn)行使用呢?
就是NettyBlockRpcServer作為RpcHandler提供的一個(gè)Rpc接口之一:OpenBlocks,它接受由Client提供一個(gè)Blockids列表,Server根據(jù)該BlockIds從BlockManager獲取到相應(yīng)的數(shù)據(jù)并注冊到streamManager中,同時(shí)返回一個(gè)StreamID,后續(xù)Client即可以使用該StreamID發(fā)起ChunkFetch的操作。

message match {
    case openBlocks: OpenBlocks =>
        val blocks: Seq[ManagedBuffer] =
            openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
        val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
            logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
        responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
}
NettyBlockTransferService作為一個(gè)Client

從NettyBlockTransferService作為一個(gè)Server,我們基本可以推測NettyBlockTransferService作為一個(gè)Client支持fetchBlocks的功能的基本方法:

  • Client將一組Blockid表示為一個(gè)openMessage請求,發(fā)送到服務(wù)端,服務(wù)針對該組Blockid返回一個(gè)唯一的streamId
  • Client針對該streamId發(fā)起size(blockids)個(gè)fetchChunk操作
override def uploadBlock(
    hostname: String,
    port: Int,
    execId: String,
    blockId: BlockId,
    blockData: ManagedBuffer,
    level: StorageLevel): Future[Unit] = {
        //發(fā)出openMessage請求
        client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
            streamHandle = (StreamHandle)response;//獲取streamId
            //針對streamid發(fā)出一組fetchChunk
            for (int i = 0; i < streamHandle.numChunks; i++) {
                client.fetchChunk(streamHandle.streamId, i, chunkCallback);
            }
        }
    })
    result.future
}

同時(shí),為了提高服務(wù)端穩(wěn)定性,針對fetchBlocks操作NettyBlockTransferService提供了非重試版本和重試版本的BlockFetcher,分別為OneForOneBlockFetcher和RetryingBlockFetcher,通過參數(shù)(spark.[module].io.maxRetries)進(jìn)行配置,默認(rèn)是重試3次

在Spark,Block有各種類型,可以是ShuffleBlock,也可以是BroadcastBlock等等,對于ShuffleBlock的Fetch,除了由Executor內(nèi)部的NettyBlockTransferService提供服務(wù)以外,也可以由外部的ShuffleService來充當(dāng)Server的功能,并由專門的ExternalShuffleClient來與其進(jìn)行交互,從而獲取到相應(yīng)Block數(shù)據(jù)。功能的原理和實(shí)現(xiàn),基本一致,但是問題來了,為什么需要一個(gè)專門的ShuffleService服務(wù)呢?主要原因還是為了做到任務(wù)隔離,即減輕因?yàn)閒etch帶來對Executor的壓力,讓其專心的進(jìn)行數(shù)據(jù)的計(jì)算。

其實(shí)外部的ShuffleService最終是來自Hadoop的AuxiliaryService概念,AuxiliaryService為計(jì)算節(jié)點(diǎn)NodeManager常駐的服務(wù)線程,早期的MapReduce是進(jìn)程級別的調(diào)度,ShuffleMap完成shuffle文件的輸出以后,即立即退出,在ShuffleReduce過程中由誰來提供文件的讀取服務(wù)呢?即AuxiliaryService,每一個(gè)ShuffleMap都會將自己在本地的輸出,注冊到AuxiliaryService,由AuxiliaryService提供本地?cái)?shù)據(jù)的清理以及外部讀取的功能。

在目前Spark中,也提供了這樣的一個(gè)AuxiliaryService:YarnShuffleService,但是對于Spark不是必須的,如果你考慮到需要“通過減輕因?yàn)閒etch帶來對Executor的壓力”,那么就可以嘗試嘗試。

同時(shí),如果啟用了外部的ShuffleService,對于shuffleClient也不是使用上面的NettyBlockTransferService,而是專門的ExternalShuffleClient,功能邏輯基本一致!

Spark Network的功能應(yīng)用--新的RPC框架

Akka的通信模型是基于Actor,一個(gè)Actor可以理解為一個(gè)Service服務(wù)對象,它可以針對相應(yīng)的RPC請求進(jìn)行處理,如下所示,定義了一個(gè)最為基本的Actor:

class HelloActor extends Actor {
    def receive = {
        case "hello" => println("world")
        case _       => println("huh?")
    }
}

Actor內(nèi)部只有唯一一個(gè)變量(當(dāng)然也可以理解為函數(shù)了),即Receive,它為一個(gè)偏函數(shù),通過case語句可以針對Any信息可以進(jìn)行相應(yīng)的處理,這里Any消息在實(shí)際項(xiàng)目中就是消息包。

另外一個(gè)很重要的概念就是ActorSystem,它是一個(gè)Actor的容器,多個(gè)Actor可以通過name->Actor的注冊到Actor中,在ActorSystem中可以根據(jù)請求不同將請求路由給相應(yīng)的Actor。ActorSystem和一組Actor構(gòu)成一個(gè)完整的Server端,此時(shí)客戶端通過host:port與ActorSystem建立連接,通過指定name就可以相應(yīng)的Actor進(jìn)行通信,這里客戶端就是ActorRef。所有Akka整個(gè)RPC通信系列是由Actor,ActorRef,ActorSystem組成。

Spark基于這個(gè)思想在上述的Network的基礎(chǔ)上實(shí)現(xiàn)一套自己的RPC Actor模型,從而取代Akka。其中RpcEndpoint對應(yīng)Actor,RpcEndpointRef對應(yīng)ActorRef,RpcEnv即對應(yīng)了ActorSystem。

private[spark] trait RpcEndpoint {
    def receive: PartialFunction[Any, Unit] = {
        case _ => throw new SparkException()
    }
    def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        case _ => context.sendFailure(new SparkException())
    }
    //onStart(),onStop()
}

RpcEndpoint與Actor一樣,不同RPC Server可以根據(jù)業(yè)務(wù)需要指定相應(yīng)receive/receiveAndReply的實(shí)現(xiàn),在Spark內(nèi)部現(xiàn)在有N多個(gè)這樣的Actor,比如Executor就是一個(gè)Actor,它處理來自Driver的LaunchTask/KillTask等消息。

RpcEnv相對于ActorSystem:

  • 首先它作為一個(gè)Server,它通過NettyRpcHandler來提供了Server的服務(wù)能力
  • 其次它作為RpcEndpoint的容器,它提供了setupEndpoint(name,endpoint)接口,從而實(shí)現(xiàn)將一個(gè)RpcEndpoint以一個(gè)Name對應(yīng)關(guān)系注冊到容器中,從而通過Server對外提供Service
  • 最后它作為Client的適配器,它提供了setupEndpointRef/setupEndpointRefByURI接口,通過指定Server端的Host和PORT,并指定RpcEndpointName,從而獲取一個(gè)與指定Endpoint通信的引用。

RpcEndpointRef即為與相應(yīng)Endpoint通信的引用,它對外暴露了send/ask等接口,實(shí)現(xiàn)將一個(gè)Message發(fā)送到Endpoint中。

這就是新版本的RPC框架的基本功能,它的實(shí)現(xiàn)基本上與Akka無縫對接,業(yè)務(wù)的遷移的功能很小,目前基本上都全部遷移完了。

RpcEnv內(nèi)部實(shí)現(xiàn)原理

RpcEnv不僅從外部接口與Akka基本一致,在內(nèi)部的實(shí)現(xiàn)上,也基本差不多,都是按照MailBox的設(shè)計(jì)思路來實(shí)現(xiàn)的;

RpcEnv即充當(dāng)著Server,同時(shí)也為Client內(nèi)部實(shí)現(xiàn)。
當(dāng)作為Server,RpcEnv會初始化一個(gè)Server,并注冊NettyRpcHandler。RpcHandler的receive接口負(fù)責(zé)對每一個(gè)請求進(jìn)行處理,一般情況下,簡單業(yè)務(wù)可以在RpcHandler直接完成請求的處理,但是考慮一個(gè)RpcEnv的Server上會掛載了很多個(gè)RpcEndpoint,每個(gè)RpcEndpoint的RPC請求頻率不可控,因此需要對一定的分發(fā)機(jī)制和隊(duì)列來維護(hù)這些請求,其中Dispatcher為分發(fā)器,InBox即為請求隊(duì)列;

在將RpcEndpoint注冊到RpcEnv過程中,也間接的將RpcEnv注冊到Dispatcher分發(fā)器中,Dispatcher針對每個(gè)RpcEndpoint維護(hù)一個(gè)InBox,在Dispatcher維持一個(gè)線程池(線程池大小默認(rèn)為系統(tǒng)可用的核數(shù),當(dāng)然也可以通過spark.rpc.netty.dispatcher.numThreads進(jìn)行配置),線程針對每個(gè)InBox里面的請求進(jìn)行處理。當(dāng)然實(shí)際的處理過程是由RpcEndpoint來完成。

其次RpcEnv也完成Client的功能實(shí)現(xiàn),RpcEndpointRef是以RpcEndpoint為單位,即如果一個(gè)進(jìn)程需要和遠(yuǎn)程機(jī)器上N個(gè)RpcEndpoint服務(wù)進(jìn)行通信,就對應(yīng)N個(gè)RpcEndpointRef(后端的實(shí)際的網(wǎng)絡(luò)連接是公用,這個(gè)是TransportClient內(nèi)部提供了連接池來實(shí)現(xiàn)的),當(dāng)調(diào)用一個(gè)RpcEndpointRef的ask/send等接口時(shí)候,會將把“消息內(nèi)容+RpcEndpointRef+本地地址”一起打包為一個(gè)RequestMessage,交由RpcEnv進(jìn)行發(fā)送。注意這里打包的消息里面包括RpcEndpointRef本身是很重要的,從而可以由Server端識別出這個(gè)消息對應(yīng)的是哪一個(gè)RpcEndpoint。

和發(fā)送端一樣,在RpcEnv中,針對每個(gè)remote端的host:port維護(hù)一個(gè)隊(duì)列,即OutBox,RpcEnv的發(fā)送僅僅是把消息放入到相應(yīng)的隊(duì)列中,但是和發(fā)送端不一樣的是:在OutBox中沒有維護(hù)一個(gè)所謂的線程池來定時(shí)清理OutBox,而是通過一堆synchronized來實(shí)現(xiàn)的,add之后立刻消費(fèi)。

摘自:Github/ColZer

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容