kafka在設計上大量使用了Selector+Channel+Buffer的設計原理.所以在開始之前簡單介紹一下NIO 的Selector+Channel+Buffer
NIO 的Selector+Channel+Buffer
Buffers(緩沖區(qū))
Java NIO中的Buffer用于和NIO通道進行交互。
緩沖區(qū)本質(zhì)上是一塊可以寫入數(shù)據(jù),然后可以從中讀取數(shù)據(jù)的內(nèi)存。這塊內(nèi)存被包裝成NIO Buffer對象,并提供了一組方法,用來方便的訪問該塊內(nèi)存
標準的IO基于字節(jié)流和字符流進行操作的,而NIO是基于通道(Channel)和緩沖區(qū)(Buffer)進行操作,數(shù)據(jù)總是從通道讀取到緩沖區(qū)中,或者從緩沖區(qū)寫入到通道中。
Channels(通道)
Java NIO的通道類似流,但又有些不同:
既可以從通道中讀取數(shù)據(jù),又可以寫數(shù)據(jù)到通道。但流的讀寫通常是單向的。
通道可以異步地讀寫。
如下面圖示是Buffers與Channel交互:

Selectors(選擇器)
選擇器用于監(jiān)聽多個通道的事件(比如:連接打開,數(shù)據(jù)到達)。Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,并能夠知曉通道是否為諸如讀寫事件做好準備的組件。這樣,一個單獨的線程可以管理多個channel,從而管理多個網(wǎng)絡連接
下面是單線程中使用一個Selector處理3個Channel的圖示:

Non-blocking IO(非阻塞IO)
當線程從通道讀取數(shù)據(jù)到緩沖區(qū)時,線程還是可以進行其他事情。當數(shù)據(jù)被寫入到緩沖區(qū)時,線程可以繼續(xù)處理它。從緩沖區(qū)寫入通道也類似。
Broker請求處理流程
下面通過重要環(huán)節(jié)的源碼分析,來梳理請求處理的整個過程(kafka2.3)
- KafkaServer Kafka的網(wǎng)絡層入口類是SocketServer。
kafka.Kafka是Kafka Broker的入口類,kafka.Kafka.main()是Kafka Server的main()方法,即Kafka Broker的啟動入口。我們跟蹤代碼,即沿著方法調(diào)用棧kafka.Kafka.main() -> kafkaServerStartable.startup() -> KafkaServer().startup可以從main()方法入口一直跟蹤到SocketServer即網(wǎng)絡層對象的創(chuàng)建,這意味著Kafka Server啟動的時候會初始化并啟動SocketServer。
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// 部分省略 ...
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
}
catch {
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
}
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
...
def startup() {
try server.startup()
catch {
...
}
}
}
SocketServer處理與代理之間的新連接、請求和響應。
Kafka支持兩種類型的請求數(shù)據(jù)層面:處理來自集群中的客戶端和其他代理的請求。
線程模型是每個監(jiān)聽器有一個Acceptor線程,用來處理新的連接??梢酝ㄟ^在KafkaConfig中為“ listeners”指定多個“、”分隔的endpoint來配置多個監(jiān)聽端口。
Acceptor有N個處理器線程(每個線程都有自己的selector并從套接字中讀取請求)和M處理程序線程(它處理請求并將響應返回給處理器線程進行編寫)控制層面:處理來自控制器的請求。這是可選的,可以通過指定“control.plan .listener.name”來配置。如果沒有配置,控制器請求由數(shù)據(jù)層面處理。
線程模型是處理新連接的接受線程Acceptor有一個處理器線程(它有自己的選擇器并從套接字中讀取請求)和1處理程序線程,它處理請求并將響應生成回處理器線程進行編寫SocketServer的startup方法,創(chuàng)建Control和Data層面的Acceptor和Processor線程并啟動所有的processor線程
def startup(startupProcessors: Boolean = true) {
this.synchronized {
connectionQuotas = new ConnectionQuotas(config, time)
//控制層面
createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
//數(shù)據(jù)層面
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
if (startupProcessors) {
//在控制層面啟動Processor線程
startControlPlaneProcessor()
//在數(shù)據(jù)層面啟動Processor線程
startDataPlaneProcessors()
}
}
}
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
endpoints.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
//每一個endPoint創(chuàng)建一個Acceptor,創(chuàng)建多個Processor放入processor線程數(shù)組
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
}
}
- Acceptor的構造方法中,首先通過openServerSocket()打開自己負責的EndPoint的Socket,即打開端口并啟動監(jiān)聽。
然后,Acceptor會負責構造并管理的一個Processor的ArrayBuffer。其實,每一個Processor都是一個獨立線程 - Acceptor線程的run()方法,是不斷監(jiān)聽對應ServerChannel上的連接請求(ACCEPT),如果有新的連接請求,使用的輪詢方式將通道分配給Processor.
新連接交付給Processor的具體的調(diào)用是在方法assignNewConnection方法中
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
private val processors = new ArrayBuffer[Processor]()
/**
* Accept loop that checks for new connection attempts
*/
def run() {
//將ServerChannel注冊到Selector,并監(jiān)聽ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessorIndex = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
processor = synchronized {
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
// 此處調(diào)用assignNewConnection
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// ...
}
}
}
//...
}
- assignNewConnection中通過processor.accept的調(diào)用,將SocketChannel放入每個processor自己維護的新連接的隊列,后面processor會從隊列取出做后續(xù)處理
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
//調(diào)用processor.accept
if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
// ...
true
} else
false
}
- 每一個Processor都維護了一個單獨的Selector對象,這個Selector只負責這個Processor上所有channel的監(jiān)聽。這樣最大程度上保證了不同Processor線程之間的完全并行和業(yè)務隔離.同時每一個processor維護一個responseQueue,用于KafkaRequestHandler交互,在下面的流程會提到
private[kafka] class Processor(val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 維護一個新連接隊列,在run方法里會取出處理
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
//每一個processor維護一個responseQueue
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
// processor都維護了一個單獨的Selector
private val selector = createSelector(
ChannelBuilders.serverChannelBuilder(listenerName,
listenerName == config.interBrokerListenerName,
securityProtocol,
config,
credentialProvider.credentialCache,
credentialProvider.tokenCache,
time))
// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
channelBuilder match {
case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
case _ =>
}
new KSelector(
maxRequestSize,
connectionsMaxIdleMs,
failedAuthenticationDelayMs,
metrics,
time,
"socket-server",
metricTags,
false,
true,
channelBuilder,
memoryPool,
logContext)
}
override def run() {
//表示初始化流程已經(jīng)結束,通過這個CountDownLatch代表初始化已經(jīng)結束,這個Processor已經(jīng)開始正常運行了
startupComplete()
try {
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
//處理響應隊列,這個響應隊列是Handler線程處理以后的結果,會交付給RequestChannel.responseQueue.同時調(diào)用unmute,開始接受請求
processNewResponses()
//調(diào)用KSelector.poll(),進行真正的數(shù)據(jù)讀寫
poll()
//調(diào)用Selector.mute,不再接受Read請求,發(fā)送響應之前,不可以再接收任何請求
processCompletedReceives()
processCompletedSends()
processDisconnected()
closeExcessConnections()
} catch {
// ...
}
}
} finally {
// ...
}
}
}
- run方法中configureNewConnections是processor從自己維護的newConnections隊列取出新連接,并將其注冊到selector并監(jiān)聽OR_READ事件。configureNewConnections 內(nèi)部調(diào)用register()方法,會將新接收的新連接SocketChannel注冊到服務器端的Selector,并監(jiān)聽OP_READ事件,如果發(fā)生讀請求,可以取出對應的request進行后續(xù)處理
private def configureNewConnections() {
var connectionsProcessed = 0
while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
// 取出新連接SocketChannel
val channel = newConnections.poll()
try {
// 將SocketChannel注冊到selector
selector.register(connectionId(channel.socket), channel)
connectionsProcessed += 1
} catch {
case e: Throwable =>
// ...
}
}
}
public void register(String id, SocketChannel socketChannel) throws IOException {
ensureNotRegistered(id);
registerChannel(id, socketChannel, SelectionKey.OP_READ);
this.sensors.connectionCreated.record();
}
- RequestChannel 負責消息從網(wǎng)絡層轉接到業(yè)務層,以及將業(yè)務層的處理結果交付給網(wǎng)絡層進而返回給客戶端。每一個SocketServer只有一個RequestChannel對象,在SocketServer中構造。RequestChannel構造方法中初始化了requestQueue,用來存放網(wǎng)絡層接收到的請求,這些請求即將交付給業(yè)務層進行處理。同時,初始化了responseQueues,為每一個Processor建立了一個response隊列,用來存放這個Processor的一個或者多個Response,這些response即將交付給網(wǎng)絡層返回給客戶端。
class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
def sendRequest(request: RequestChannel.Request) {
requestQueue.put(request)
}
}
}
- Processor.processCompletedReceives()通過遍歷completedReceives,對于每一個已經(jīng)完成接收的數(shù)據(jù),對數(shù)據(jù)進行解析和封裝,交付給RequestChannel,RequestChannel會交付給具體的業(yè)務處理層進行處理。其中RequestChannel拿到請求數(shù)據(jù),會調(diào)用RequestChannel.sendRequest方法,將請求put到requestQueue中,以供后續(xù)的處理請求線程處理
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
else {
val nowNanos = time.nanoseconds()
if (channel.serverAuthenticationSessionExpired(nowNanos)) {
// ...
} else {
//將請求通過RequestChannel.requestQueue交付給Handler
requestChannel.sendRequest(req)
selector.mute(connectionId)//不再接受Read請求,發(fā)送響應之前,不可以再接收任何請求
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
} catch {
// ...
}
}
}
- KafkaRequestHandler請求處理線程和KafkaRequestHandlerPool線程池
KafkaRequestHandler 主要關注run方法,該方法的具體邏輯是從RequestChannel取出processor之前put請求,調(diào)用KafkaApi針對不同請求類型分別處理
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: KafkaApis,
time: Time) extends Runnable with Logging {
def run() {
while (!stopped) {
//從RequestChannel.requestQueue中取出請求
val req = requestChannel.receiveRequest(300)
req match {
case RequestChannel.ShutdownRequest =>
shutdownComplete.countDown()
return
case request: RequestChannel.Request =>
try {
// 調(diào)用KafkaApi.handle(),將請求交付給業(yè)務
apis.handle(request)
} catch {
// 異常處理 ...
} finally {
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
- KafkaRequestHandlerPool構造方法中初始化并啟動了多個KafkaRequestHandler線程對象,線程池大小通過Kafka配置文件配置項num.io.threads進行配置。
KafkaRequestHandlerPool線程池中的所有KafkaRequestHandler,通過競爭方式從RequestChannel.requestQueue中獲取請求進行處理。由于requestQueue的類型是ArrayBlockingQueue,通過調(diào)用ArrayBlockingQueue.poll()方法取出請求.
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
//初始化由KafkaRequestHandler線程構成的線程數(shù)組
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}
def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}
// ...
}
- KafkaApis類似一個工具類,解析用戶請求并將請求交付給業(yè)務層,我們可以把它看做Kafka的API層。從上面KafkaRequestHandler.run()方法可以看到,這是通過調(diào)用KafkaApis.handle()方法完成的
def handle(request: RequestChannel.Request) {
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
//其它ApiKeys,略
//異常處理略
}
}
- 我們以ApiKeys.PRODUCE 的流程來分析后續(xù)流程,handleProduceRequest方法中有兩個重要的方法sendResponseCallback()和replicaManager.appendRecords() .其中sendResponseCallback回調(diào)函數(shù)中調(diào)用requestChannel.sendResponse()將response交付給RequestChannel
def handleProduceRequest(request: RequestChannel.Request) {
val produceRequest = request.body[ProduceRequest]
// 回調(diào)函數(shù),內(nèi)部將業(yè)務層處理的最終結果發(fā)送到對應processor負責的響應隊列
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
// Send the response immediately. In case of throttling, the channel has already been muted.
if (produceRequest.acks == 0) {
// 通過RequestChannel將response放入processor的響應隊列,調(diào)用requestChannel.sendResponse()將response交付給RequestChannel
sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)
}
}
// appendRecords方法是records寫入的邏輯
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
isFromClient = true,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)
// ...
}
}
- 最后,在上文講解Processor的時候說過,Procossor.processNewResponses()就是從requestChannel.responseQueues取出屬于自己的連接上的響應,準備返回給客戶端
一圖勝千言,最后通過一張圖來回顧整個Broker請求處理流程
整體流程圖示如下:

參考自
https://blog.csdn.net/zhanyuanlin/article/details/76906583
http://ifeve.com/channels/