初始化
通常添加一個(gè)kafka sink的代碼如下:
input.addSink(
new FlinkKafkaProducer<>(
"bar",
new KafkaSerializationSchemaImpl(),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");
初始化執(zhí)行env.addSink的時(shí)候會(huì)創(chuàng)建StreamSink對(duì)象,即StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));這里的sinkFunction就是傳入的FlinkKafkaProducer對(duì)象,StreamSink構(gòu)造函數(shù)中將這個(gè)對(duì)象傳給父類AbstractUdfStreamOperator的userFunction變量,源碼如下:
StreamSink.java
public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
}
AbstractUdfStreamOperator.java
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
Task運(yùn)行
StreamSink會(huì)調(diào)用下面的方法發(fā)送數(shù)據(jù)
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
sinkContext.element = element;
userFunction.invoke(element.getValue(), sinkContext);
}
也就是實(shí)際調(diào)用的是FlinkKafkaProducer#invoke方法。在FlinkKafkaProducer的構(gòu)造函數(shù)中需要指FlinkKafkaProducer.Semantic,即:
public enum Semantic {
EXACTLY_ONCE,
AT_LEAST_ONCE,
NONE
}
下面就基于3種語(yǔ)義分別說(shuō)一下總體的向kafka發(fā)送數(shù)據(jù)的流程
Semantic.NONE
這種方式不會(huì)做任何額外的操作,完全依靠kafka producer自身的特性,也就是FlinkKafkaProducer#invoke里面發(fā)送數(shù)據(jù)之后,flink不會(huì)再考慮kafka是否已經(jīng)正確的收到數(shù)據(jù)。
transaction.producer.send(record, callback);
Semantic.AT_LEAST_ONCE
這種語(yǔ)義下,除了會(huì)走上面說(shuō)到的發(fā)送數(shù)據(jù)的流程外,如果開啟了checkpoint功能,在FlinkKafkaProducer#snapshotState中會(huì)首先執(zhí)行父類的snapshotState方法,里面最終會(huì)執(zhí)行FlinkKafkaProducer#preCommit
@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
checkErroneous();
}
AT_LEAST_ONCE會(huì)執(zhí)行了flush方法,里面執(zhí)行了
transaction.producer.flush();
就是將send的數(shù)據(jù)立即發(fā)送給kafka服務(wù)端,詳細(xì)含義可以參考KafkaProducer api
flush()
Invoking this method makes all buffered records immediately available to send (even if
linger.msis greater than 0) and blocks on the completion of the requests associated with these records.
Semantic.EXACTLY_ONCE
EXACTLY_ONCE語(yǔ)義也會(huì)執(zhí)行send和flush方法,但是同時(shí)會(huì)開啟kafka producer的事務(wù)機(jī)制,詳細(xì)內(nèi)容請(qǐng)參考Kafka事務(wù)。FlinkKafkaProducer中beginTransaction的源碼如下,可以看到只有是EXACTLY_ONCE模式才會(huì)真正開始一個(gè)事務(wù)。
@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null && currentTransaction.producer != null) {
return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
}
return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
和AT_LEAST_ONCE另一個(gè)不同的地方在于checkpoint的時(shí)候,會(huì)將事務(wù)相關(guān)信息保存到變量nextTransactionalIdHintState中,這個(gè)變量存儲(chǔ)的信息會(huì)作為checkpoint中的一部分進(jìn)行持久化。
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
// scaling up.
if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
}
nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
}
總結(jié)
本文介紹了FlinkKafkaProducer的基本實(shí)現(xiàn)原理,后續(xù)會(huì)詳細(xì)介紹flink在結(jié)合kafka的時(shí)候如何做到端到端的Exactly Once語(yǔ)義的。
注:本文基于flink 1.9.0和kafka 2.3