Flink kafka sink源碼解析

初始化

通常添加一個(gè)kafka sink的代碼如下:

input.addSink(
   new FlinkKafkaProducer<>(
      "bar",
      new KafkaSerializationSchemaImpl(),
         properties,
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");

初始化執(zhí)行env.addSink的時(shí)候會(huì)創(chuàng)建StreamSink對(duì)象,即StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));這里的sinkFunction就是傳入的FlinkKafkaProducer對(duì)象,StreamSink構(gòu)造函數(shù)中將這個(gè)對(duì)象傳給父類AbstractUdfStreamOperator的userFunction變量,源碼如下:

StreamSink.java

public StreamSink(SinkFunction<IN> sinkFunction) {
  super(sinkFunction);
  chainingStrategy = ChainingStrategy.ALWAYS;
}

AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}

Task運(yùn)行

StreamSink會(huì)調(diào)用下面的方法發(fā)送數(shù)據(jù)

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   sinkContext.element = element;
   userFunction.invoke(element.getValue(), sinkContext);
}

也就是實(shí)際調(diào)用的是FlinkKafkaProducer#invoke方法。在FlinkKafkaProducer的構(gòu)造函數(shù)中需要指FlinkKafkaProducer.Semantic,即:

public enum Semantic {
   EXACTLY_ONCE,
   AT_LEAST_ONCE,
   NONE
}

下面就基于3種語(yǔ)義分別說(shuō)一下總體的向kafka發(fā)送數(shù)據(jù)的流程

Semantic.NONE

這種方式不會(huì)做任何額外的操作,完全依靠kafka producer自身的特性,也就是FlinkKafkaProducer#invoke里面發(fā)送數(shù)據(jù)之后,flink不會(huì)再考慮kafka是否已經(jīng)正確的收到數(shù)據(jù)。

transaction.producer.send(record, callback);

Semantic.AT_LEAST_ONCE

這種語(yǔ)義下,除了會(huì)走上面說(shuō)到的發(fā)送數(shù)據(jù)的流程外,如果開啟了checkpoint功能,在FlinkKafkaProducer#snapshotState中會(huì)首先執(zhí)行父類的snapshotState方法,里面最終會(huì)執(zhí)行FlinkKafkaProducer#preCommit

@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
   switch (semantic) {
      case EXACTLY_ONCE:
      case AT_LEAST_ONCE:
         flush(transaction);
         break;
      case NONE:
         break;
      default:
         throw new UnsupportedOperationException("Not implemented semantic");
   }
   checkErroneous();
}

AT_LEAST_ONCE會(huì)執(zhí)行了flush方法,里面執(zhí)行了

transaction.producer.flush();

就是將send的數(shù)據(jù)立即發(fā)送給kafka服務(wù)端,詳細(xì)含義可以參考KafkaProducer api

flush()

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

Semantic.EXACTLY_ONCE

EXACTLY_ONCE語(yǔ)義也會(huì)執(zhí)行sendflush方法,但是同時(shí)會(huì)開啟kafka producer的事務(wù)機(jī)制,詳細(xì)內(nèi)容請(qǐng)參考Kafka事務(wù)。FlinkKafkaProducer中beginTransaction的源碼如下,可以看到只有是EXACTLY_ONCE模式才會(huì)真正開始一個(gè)事務(wù)。

@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
   switch (semantic) {
      case EXACTLY_ONCE:
         FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
         producer.beginTransaction();
         return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
      case AT_LEAST_ONCE:
      case NONE:
         // Do not create new producer on each beginTransaction() if it is not necessary
         final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
         if (currentTransaction != null && currentTransaction.producer != null) {
            return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
         }
         return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
      default:
         throw new UnsupportedOperationException("Not implemented semantic");
   }
}

和AT_LEAST_ONCE另一個(gè)不同的地方在于checkpoint的時(shí)候,會(huì)將事務(wù)相關(guān)信息保存到變量nextTransactionalIdHintState中,這個(gè)變量存儲(chǔ)的信息會(huì)作為checkpoint中的一部分進(jìn)行持久化。

if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
   checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
   long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

   // If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
   // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
   // scaling up.
   if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
      nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
   }

   nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
      getRuntimeContext().getNumberOfParallelSubtasks(),
      nextFreeTransactionalId));
}

總結(jié)

本文介紹了FlinkKafkaProducer的基本實(shí)現(xiàn)原理,后續(xù)會(huì)詳細(xì)介紹flink在結(jié)合kafka的時(shí)候如何做到端到端的Exactly Once語(yǔ)義的。

注:本文基于flink 1.9.0和kafka 2.3

參考

Flink kafka source源碼解析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容