基于0.10.1版本
整體流程

發(fā)送消息流程.png
Producer.send()入口
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 獲取集群信息
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
//省略序列化serializedKey,serializedValue
//獲取分區(qū)
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
//校驗消息體大小
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
//把消息寫入消息收集器
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
//如果最后一個RecordBatch已經(jīng)寫滿,或者Deque隊列大小>1就喚醒發(fā)送線程
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
}

doSend.png
RecordAccumulator.append方法利用了分段鎖,當(dāng)并發(fā)出現(xiàn)時,前一個線程需要的內(nèi)存空間比較大,不滿足寫入,后一個線程需要的內(nèi)存空間比較小,滿足寫入。提高了并發(fā),此處也可以看出,如果消息需要有發(fā)送順序的保證,一個Producer實例不要在多處調(diào)用。
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
appendsInProgress.incrementAndGet();
try {
//從ConcurrentMap<TopicPartition, Deque<RecordBatch>>中換取分區(qū)對應(yīng)的Deque隊列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//再次嘗試寫入
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
//如果寫入成功,說明別的線程new了一個新的RecordBatch
if (appendResult != null) {
// 釋放之前申請的buffer
free.deallocate(buffer);
return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}

消息寫入收集器流程.png
RecordAccumulator.tryAppend方法,獲取最后一個RecordBatch嘗試寫入
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
RecordBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
//寫入失敗,需要把MemoryRecords置為不可寫狀態(tài),buffer.flip()切換到讀
if (future == null)
last.records.close();
else
//判斷是否可以喚醒Sender發(fā)送線程,deque.size() > 1 || last.records.isFull()
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
}
return null;
}
RecordBatch.tryAppend寫入
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
//判斷是否需要新創(chuàng)建RecordBatch來存儲消息
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
//如果需要回調(diào)把回調(diào)加入thunks
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
判斷是否需要new RecordBatch
public boolean hasRoomFor(byte[] key, byte[] value) {
//當(dāng)前不是寫模式,說明消息已經(jīng)寫滿,正在發(fā)送,或者在發(fā)送中
if (!this.writable)
return false;
/**
* MemoryRecords是否寫入過消息?
* 沒寫過,申請Buffer初始化內(nèi)存大小是否>=消息體大小
* 寫入過,batch.size是否>=已寫入的內(nèi)存大小+消息體大小
* 考慮這兩種情況是因為如果曾經(jīng)寫入了消息體大小大于batch.size,則該Buffer只會保存這一條消息,
* 從BufferPool獲取的Buffer大小都是batch.size,此時initialCapacity=writeLimit
*
*/
return this.compressor.numRecordsWritten() == 0 ?
this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}
BufferPool源碼解讀
BufferPool是Buffer緩沖區(qū),創(chuàng)建出來的Buffer大小都等于batch.size,如果控制每條消息都小于或者等于batch.size,充分利用了緩沖區(qū),性能會更好。
BufferPool 類主要變量
public final class BufferPool {
/**
* BufferPool緩沖池最大的字節(jié)大小,也就是配置buffer.memory
*/
private final long totalMemory;
/**
* 每個Buffer固定大小,也就是配置batch.size
*/
private final int poolableSize;
private final ReentrantLock lock;
/**
* 空閑的buffer隊列
*/
private final Deque<ByteBuffer> free;
/**
* 等待分配Buffer的隊列
*/
private final Deque<Condition> waiters;
/**
* 可用的內(nèi)存大小
*/
private long availableMemory;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
}
通過下面的代碼獲取當(dāng)前需要申請的Buffer內(nèi)存空間大小
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
如果超過batch.size,通過ByteBuffer.allocate(size)來申請空間,不從BufferPool中獲取。
BufferPool主要方法就是申請空間allocate,釋放空間deallocate,具體流程如下

申請空間流程.png

釋放空間流程.png
Sender 發(fā)送線程
每次往消息收集器寫完消息,都會檢驗下,是否需要喚醒發(fā)送線程Sender
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
//如果最后一個RecordBatch已經(jīng)寫滿,或者Deque隊列大小>1就喚醒發(fā)送線程
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
KafkaClient 分析
KafkaClient 關(guān)鍵字段
public class NetworkClient implements KafkaClient {
/* the selector used to perform network i/o */
//執(zhí)行NIO的選擇器
private final Selectable selector;
/* the state of each node's connection */
//每個連接Node的狀態(tài)
private final ClusterConnectionStates connectionStates;
/* the set of requests currently being sent or awaiting a response */
//準(zhǔn)備發(fā)送或者等待響應(yīng)的消息
private final InFlightRequests inFlightRequests;
}
Sender 分析
Sender 關(guān)鍵字段
public class Sender implements Runnable {
/* the state of each nodes connection */
//每個連接的狀態(tài)
private final KafkaClient client;
/* the record accumulator that batches records */
//消息收集器
private final RecordAccumulator accumulator;
/* the flag indicating whether the producer should guarantee the message order on the broker or not. */
//是否需要保證一個topic正在發(fā)送的RecordBatch只有一個,max.in.flight.requests.per.connection 設(shè)置為1時會保證
private final boolean guaranteeMessageOrder;
/* the maximum request size to attempt to send to the server */
private final int maxRequestSize;
/* the number of acknowledgements to request from the server */
private final short acks;
}
Sender是一個線程類,看看它的核心方法run
void run(long now) {
Cluster cluster = metadata.fetch();
// 獲取滿足發(fā)送條件的RecordBatch對應(yīng)的nodes
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果有任何一個leader node未知,則強(qiáng)制更新標(biāo)示
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 移除沒有連接的Node的,并且初始化網(wǎng)絡(luò)連接,等待下次再調(diào)用
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
//把TopicPartition->List<RecordBatch> 轉(zhuǎn)化為 NodeId(每個Broker節(jié)點Id)->List<RecordBatch>
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
//max.in.flight.requests.per.connection 設(shè)置為1時,保證一個topic只有一個RecordBatch在發(fā)送,保證有序性
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
//移除發(fā)送超時的RecordBatch,并執(zhí)行RecordBatch對應(yīng)的done,最后執(zhí)行callback.onCompletion方法,可以根據(jù)自定義是否補(bǔ)發(fā)
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
//把NodeId(每個Broker節(jié)點Id)->List<RecordBatch>轉(zhuǎn)化為List<ClientRequest>
List<ClientRequest> requests = createProduceRequests(batches, now);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
//寫入待發(fā)送的KafkaChannel中的Send
for (ClientRequest request : requests)
client.send(request, now);
this.client.poll(pollTimeout, now);
}
client.send(request, now)需要滿足下面的條件,才能寫入待發(fā)送的Send中
private boolean canSendRequest(String node) {
//該Node是否連接狀態(tài)&&該Node是否已準(zhǔn)備好發(fā)送&&是否可以發(fā)送更多
return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
//inFlightRequests.canSendMore
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
看看NetworkClient.poll方法
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//真正操作NIO的地方
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
//處理已發(fā)送的消息
handleCompletedSends(responses, updatedNow);
//處理已發(fā)送成功響應(yīng)的消息
handleCompletedReceives(responses, updatedNow);
//處理已斷開的連接,重新請求 meta
handleDisconnections(responses, updatedNow);
//處理新建立的連接,需要驗證通過
handleConnections();
//處理超時請求
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
//回調(diào)Callback的onComplete方法
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
來看看真正操作NIO的
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//處理已經(jīng)就緒的連接
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
//處理新建立的連接
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
//關(guān)閉空閑的連接,根據(jù)配置的最大空閑時間connections.max.idle.ms判斷
maybeCloseOldestConnection(endSelect);
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try {
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else
continue;
}
if (channel.isConnected() && !channel.ready())
channel.prepare();
//讀取消息操作,一次讀完所有可讀buffer
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
//寫消息事件操作
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
//不是有效的事件操作
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}