Kafka源碼學(xué)習(xí) Producer

基于0.10.1版本

整體流程

發(fā)送消息流程.png

Producer.send()入口

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // 獲取集群信息
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            //省略序列化serializedKey,serializedValue
            //獲取分區(qū)
            int partition = partition(record, serializedKey, serializedValue, cluster);
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            //校驗消息體大小
            ensureValidRecordSize(serializedSize);
            tp = new TopicPartition(record.topic(), partition);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
            //把消息寫入消息收集器
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            //如果最后一個RecordBatch已經(jīng)寫滿,或者Deque隊列大小>1就喚醒發(fā)送線程
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        }
doSend.png

RecordAccumulator.append方法利用了分段鎖,當(dāng)并發(fā)出現(xiàn)時,前一個線程需要的內(nèi)存空間比較大,不滿足寫入,后一個線程需要的內(nèi)存空間比較小,滿足寫入。提高了并發(fā),此處也可以看出,如果消息需要有發(fā)送順序的保證,一個Producer實例不要在多處調(diào)用。

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        appendsInProgress.incrementAndGet();
        try {
            //從ConcurrentMap<TopicPartition, Deque<RecordBatch>>中換取分區(qū)對應(yīng)的Deque隊列
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //再次嘗試寫入
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                //如果寫入成功,說明別的線程new了一個新的RecordBatch
                if (appendResult != null) {
                    // 釋放之前申請的buffer
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }
消息寫入收集器流程.png

RecordAccumulator.tryAppend方法,獲取最后一個RecordBatch嘗試寫入

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
        RecordBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
            //寫入失敗,需要把MemoryRecords置為不可寫狀態(tài),buffer.flip()切換到讀
            if (future == null)
                last.records.close();
            else
                //判斷是否可以喚醒Sender發(fā)送線程,deque.size() > 1 || last.records.isFull()
                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
        }
        return null;
    }

RecordBatch.tryAppend寫入

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        //判斷是否需要新創(chuàng)建RecordBatch來存儲消息
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            //如果需要回調(diào)把回調(diào)加入thunks
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

判斷是否需要new RecordBatch

public boolean hasRoomFor(byte[] key, byte[] value) {
        //當(dāng)前不是寫模式,說明消息已經(jīng)寫滿,正在發(fā)送,或者在發(fā)送中
        if (!this.writable)
            return false;
        /**
         * MemoryRecords是否寫入過消息?
         * 沒寫過,申請Buffer初始化內(nèi)存大小是否>=消息體大小
         * 寫入過,batch.size是否>=已寫入的內(nèi)存大小+消息體大小
         * 考慮這兩種情況是因為如果曾經(jīng)寫入了消息體大小大于batch.size,則該Buffer只會保存這一條消息,
         * 從BufferPool獲取的Buffer大小都是batch.size,此時initialCapacity=writeLimit
         *
          */
        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }

BufferPool源碼解讀

BufferPool是Buffer緩沖區(qū),創(chuàng)建出來的Buffer大小都等于batch.size,如果控制每條消息都小于或者等于batch.size,充分利用了緩沖區(qū),性能會更好。

BufferPool 類主要變量

public final class BufferPool {

    /**
     * BufferPool緩沖池最大的字節(jié)大小,也就是配置buffer.memory
     */
    private final long totalMemory;
    /**
     * 每個Buffer固定大小,也就是配置batch.size
     */
    private final int poolableSize;
    private final ReentrantLock lock;
    /**
     * 空閑的buffer隊列
     */
    private final Deque<ByteBuffer> free;
    /**
     * 等待分配Buffer的隊列
     */
    private final Deque<Condition> waiters;
    /**
     * 可用的內(nèi)存大小
     */
    private long availableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;
}

通過下面的代碼獲取當(dāng)前需要申請的Buffer內(nèi)存空間大小

int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));

如果超過batch.size,通過ByteBuffer.allocate(size)來申請空間,不從BufferPool中獲取。

BufferPool主要方法就是申請空間allocate,釋放空間deallocate,具體流程如下


申請空間流程.png
釋放空間流程.png

Sender 發(fā)送線程

每次往消息收集器寫完消息,都會檢驗下,是否需要喚醒發(fā)送線程Sender

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
 //如果最后一個RecordBatch已經(jīng)寫滿,或者Deque隊列大小>1就喚醒發(fā)送線程
 if (result.batchIsFull || result.newBatchCreated) {
        log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
 }

KafkaClient 分析

KafkaClient 關(guān)鍵字段

public class NetworkClient implements KafkaClient {

    /* the selector used to perform network i/o */
    //執(zhí)行NIO的選擇器
    private final Selectable selector;
    /* the state of each node's connection */
    //每個連接Node的狀態(tài)
    private final ClusterConnectionStates connectionStates;
    /* the set of requests currently being sent or awaiting a response */
    //準(zhǔn)備發(fā)送或者等待響應(yīng)的消息
    private final InFlightRequests inFlightRequests;

}

Sender 分析

Sender 關(guān)鍵字段

public class Sender implements Runnable {

    /* the state of each nodes connection */
    //每個連接的狀態(tài)
    private final KafkaClient client;

    /* the record accumulator that batches records */
    //消息收集器
    private final RecordAccumulator accumulator;

    /* the flag indicating whether the producer should guarantee the message order on the broker or not. */
    //是否需要保證一個topic正在發(fā)送的RecordBatch只有一個,max.in.flight.requests.per.connection 設(shè)置為1時會保證
    private final boolean guaranteeMessageOrder;

    /* the maximum request size to attempt to send to the server */
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    private final short acks;
}

Sender是一個線程類,看看它的核心方法run

    void run(long now) {
        Cluster cluster = metadata.fetch();
        // 獲取滿足發(fā)送條件的RecordBatch對應(yīng)的nodes
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
        // 如果有任何一個leader node未知,則強(qiáng)制更新標(biāo)示
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 移除沒有連接的Node的,并且初始化網(wǎng)絡(luò)連接,等待下次再調(diào)用
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        //把TopicPartition->List<RecordBatch> 轉(zhuǎn)化為 NodeId(每個Broker節(jié)點Id)->List<RecordBatch>
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        //max.in.flight.requests.per.connection 設(shè)置為1時,保證一個topic只有一個RecordBatch在發(fā)送,保證有序性
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        //移除發(fā)送超時的RecordBatch,并執(zhí)行RecordBatch對應(yīng)的done,最后執(zhí)行callback.onCompletion方法,可以根據(jù)自定義是否補(bǔ)發(fā)
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        //把NodeId(每個Broker節(jié)點Id)->List<RecordBatch>轉(zhuǎn)化為List<ClientRequest>
        List<ClientRequest> requests = createProduceRequests(batches, now);
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        //寫入待發(fā)送的KafkaChannel中的Send
        for (ClientRequest request : requests)
            client.send(request, now);
        this.client.poll(pollTimeout, now);
    }

client.send(request, now)需要滿足下面的條件,才能寫入待發(fā)送的Send中

private boolean canSendRequest(String node) {
        //該Node是否連接狀態(tài)&&該Node是否已準(zhǔn)備好發(fā)送&&是否可以發(fā)送更多
        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}

//inFlightRequests.canSendMore
public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

看看NetworkClient.poll方法

public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            //真正操作NIO的地方
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        //處理已發(fā)送的消息
        handleCompletedSends(responses, updatedNow);
        //處理已發(fā)送成功響應(yīng)的消息
        handleCompletedReceives(responses, updatedNow);
        //處理已斷開的連接,重新請求 meta
        handleDisconnections(responses, updatedNow);
        //處理新建立的連接,需要驗證通過
        handleConnections();
        //處理超時請求
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    //回調(diào)Callback的onComplete方法
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
}

來看看真正操作NIO的

public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");

        clear();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;

        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            //處理已經(jīng)就緒的連接
            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
            //處理新建立的連接
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

        // we use the time at the end of select to ensure that we don't close any connections that
        // have just been processed in pollSelectionKeys
        //關(guān)閉空閑的連接,根據(jù)配置的最大空閑時間connections.max.idle.ms判斷
        maybeCloseOldestConnection(endSelect);
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);
            sensors.maybeRegisterConnectionMetrics(channel.id());
            if (idleExpiryManager != null)
                idleExpiryManager.update(channel.id(), currentTimeNanos);
            try {
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                                socketChannel.socket().getReceiveBufferSize(),
                                socketChannel.socket().getSendBufferSize(),
                                socketChannel.socket().getSoTimeout(),
                                channel.id());
                    } else
                        continue;
                }

                if (channel.isConnected() && !channel.ready())
                    channel.prepare();
                //讀取消息操作,一次讀完所有可讀buffer
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                        addToStagedReceives(channel, networkReceive);
                }
                //寫消息事件操作
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }
                //不是有效的事件操作
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容