
線程池的創(chuàng)建
org.apache.zookeeper.server.WorkerService.workers的創(chuàng)建


如果threadsAreAssignable=true
則workers會(huì)有numWorkerThreads=8個(gè)執(zhí)行器ExecutorService對(duì)象,其中corePoolSize=1
通過(guò)AcceptThread接收連接
org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#run

org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#select
selector.select()是個(gè)阻塞操作,當(dāng)有客戶端連接。這里會(huì)往下執(zhí)行,此時(shí)的事件是OP_ACCEPT事件

org.apache.zookeeper.server.NIOServerCnxnFactory.AcceptThread#doAccept
創(chuàng)建SocketChannel Socket通道

設(shè)置非阻塞configureBlocking,并獲取SelectorThread選擇器線程

org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
把當(dāng)前連接添加到SelectorThread#acceptedQueue隊(duì)列中,并喚醒阻塞。

這里需要了解喚醒的是哪里的阻塞呢?
SelectorThread中的select()也調(diào)用了selector.select()方法,這個(gè)線程也是一直運(yùn)行的。如果有客戶端連接,通過(guò)AcceptThread線程處理后添加到acceptedQueue隊(duì)列后,說(shuō)明有客戶端可能需要讀寫(xiě)事件,便會(huì)這里解阻塞。如何本來(lái)就沒(méi)有客戶端連接,這里是一直阻塞的。
連接后添加到那個(gè)selectorThreads的隊(duì)列中,即如何選擇selectorThreads線程?
創(chuàng)建AcceptThread對(duì)象時(shí),便會(huì)獲取selectorIterator的迭代器對(duì)象

在添加隊(duì)列時(shí),便對(duì)selectorIterator進(jìn)行迭代獲取。
SelectorThreads數(shù)量可以通過(guò)numSelectorThreads設(shè)置

SelectorThread注冊(cè)讀事件
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
如果沒(méi)有連接事件過(guò)來(lái),便會(huì)一直阻塞

能夠繼續(xù)往下執(zhí)行是由于添加到acceptedQueue隊(duì)列后,解阻塞了。
這里主要有三步:
1、select()
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#select
這里將會(huì)一直阻塞在這selector.select();在第一次循環(huán)時(shí),由于SelectionKey對(duì)象為null,沒(méi)有讀寫(xiě)事件便會(huì)調(diào)用②的processAcceptedConnections方法處理這個(gè)連接。為什么不能繼續(xù)執(zhí)行也就是SelectionKey為什么是null,因?yàn)樵诮邮盏娇蛻舳诉B接后,并沒(méi)有注冊(cè)讀或?qū)懯录?。所以?huì)先調(diào)用processAcceptedConnections處理這個(gè)連接

注冊(cè)了讀或?qū)懯录螅艜?huì)調(diào)用到handleIO方法
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run進(jìn)行while(!stopped)循環(huán)。

當(dāng)我們?cè)诳蛻舳溯斎隿reate /aaa 123時(shí),這是一個(gè)isWritable事件,會(huì)調(diào)用到handleIO處理讀寫(xiě)事件,交給workerPool工作線程池處理
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#handleIO

handleIO方法主要步驟包括:
把當(dāng)前SelectorThread和當(dāng)前SelectionKey封裝為IOWorkRequest對(duì)象、獲取當(dāng)前SelectionKey的NIOServerCnxn對(duì)象。
NIOServerCnxn#disableSelectable設(shè)置selectable.set(false),key.interestOps(0);不能處理當(dāng)前連接的其他請(qǐng)求
交給workerPool調(diào)度workRequest
org.apache.zookeeper.server.WorkerService#schedule

ScheduledWorkRequest.run()
org.apache.zookeeper.server.WorkerService.ScheduledWorkRequest

org.apache.zookeeper.server.NIOServerCnxnFactory.IOWorkRequest#doWork

org.apache.zookeeper.server.NIOServerCnxn#doIO
服務(wù)端接收到客戶端的命令,讀取數(shù)據(jù)


2、processAcceptedConnections
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
把放入到acceptedQueue隊(duì)列中接收的連接,取出來(lái)注冊(cè)O(shè)P_READ讀事件,然后添加NIOServerCnxn對(duì)象與當(dāng)前key綁定。這里就是給當(dāng)前連接添加附加對(duì)象NIOServerCnxn,即每一個(gè)連接都會(huì)有一個(gè)表示當(dāng)前連接的上下文對(duì)象。

org.apache.zookeeper.server.NIOServerCnxnFactory#addCnxn
把當(dāng)前連接的NIOServerCnxn(Nio服務(wù)端上下文)添加到cnxns集合

3、processInterestOpsUpdateRequests
從updateQueue隊(duì)列中獲取,并更改當(dāng)前事件
org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processInterestOpsUpdateRequests

總結(jié):
zookeeper默認(rèn)的接收數(shù)據(jù)模式是通過(guò)NIO。
如果想更改數(shù)據(jù)接收模式,可以修改zookeeper.serverCnxnFactory配置是org.apache.zookeeper.server.NettyServerCnxnFactory
通過(guò)AcceptThread線程接收連接,然后把當(dāng)前連接通過(guò)SelectorThread線程處理讀或?qū)懯录?br>
通過(guò)一系列的封裝,再把封裝后的對(duì)象交給work工作線程池處理。最終會(huì)調(diào)用到NIOServerCnxn的doIO處理讀寫(xiě)事件。
讀事件就是客戶端發(fā)送的數(shù)據(jù),寫(xiě)事件就是服務(wù)端返回(響應(yīng))的數(shù)據(jù)