Flink 使用介紹相關(guān)文檔目錄
前言
近期一個需求是寫一套demo用來證明Flink能夠精準一次投送。筆者設(shè)計了如下場景:Flink從Kafka消費數(shù)據(jù),然后原封不動再發(fā)送回Kafka。中間模擬Flink作業(yè)失敗恢復(fù)的場景。Flink作業(yè)恢復(fù)之后,仍可以繼續(xù)發(fā)送數(shù)據(jù)到Kafka。輸出的數(shù)據(jù)和消費的數(shù)據(jù)相比,不丟失也不重復(fù)。
環(huán)境信息
- Flink 1.13.2
- Kafka 1.1.1
- Hadoop 3.1.1
要點分析
需要格外注意的有如下內(nèi)容:
- Flink一定要啟用checkpoint。
- Flink CheckpointMode一定要配置為
EXACTLY_ONCE。 - Flink Kafka數(shù)據(jù)源配置要禁用自動commit。
- Flink Kafka數(shù)據(jù)源配置要配置隔離級別為
read_committed。 - Flink Kafka Producer要配置
EXACTLY_ONCE(內(nèi)部會啟用事務(wù)和冪等性)。
主要注意的是,如果我們使用kafka-console-consumer等外部系統(tǒng)讀取Flink寫入到Kafka的數(shù)據(jù)來驗證數(shù)據(jù)是否重復(fù)或丟失的話,必須保證這個外部系統(tǒng)也是配置了EXACTLY_ONCE相關(guān)支持的。例如kafka-console-consumer需要配置隔離級別為read_committed。否則即便是Flink處理數(shù)據(jù)的時候的確實現(xiàn)了exactly_once,由于kafka-console-consumer讀到了未提交的數(shù)據(jù),讀到的數(shù)據(jù)會“重復(fù)”。干擾結(jié)果的驗證。
實現(xiàn)代碼
Flink主程序代碼如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 非必須
env.setParallelism(1);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String sourceBroker = parameterTool.get("source-broker");
String sinkBroker = parameterTool.get("sink-broker");
String sourceTopic = parameterTool.get("source-topic");
String sinkTopic = parameterTool.get("sink-topic");
String checkpointPath = parameterTool.get("ckp-path");
CheckpointingMode checkpointingMode = CheckpointingMode.valueOf(mode);
// checkpoint時間間隔,必須到啟動checkpoint
env.enableCheckpointing(666, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(checkpointPath);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(sourceBroker)
.setTopics(sourceTopic)
.setGroupId("source")
.setValueOnlyDeserializer(new SimpleStringSchema())
// 必須項,否則數(shù)據(jù)會有重復(fù)
// 禁用kafka source自動提交offset
.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
// 必須項,否則數(shù)據(jù)會有重復(fù)
// 配置kafka source隔離級別為讀提交
.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.build();
DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", sinkBroker);
// exactly once模式必須要配置
properties.setProperty("transaction.timeout.ms", String.valueOf(5 * 60 * 1000));
// 啟用冪等性,下面設(shè)置FlinkKafkaProducer的時候指定了EXACTLY_ONCE會自動啟用事務(wù),可以不配置此項
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// 配置transactional id,下面設(shè)置FlinkKafkaProducer的時候指定了EXACTLY_ONCE會自動啟用事務(wù),可以不配置此項
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer");
KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<>(
sinkTopic,
element.getBytes(StandardCharsets.UTF_8));
}
};
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
sinkTopic,
serializationSchema,
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
stream.addSink(kafkaSink).name("kafka_sink");
// execute program
env.execute("Exactly once demo");
演示方式
這里Kafka數(shù)據(jù)源topic為source,輸出數(shù)據(jù)topic為sink。
啟動任意kafka數(shù)據(jù)源(也可以使用console producer),向source topic寫入數(shù)據(jù)。
啟動kafka console consumer,監(jiān)視Flink輸出的數(shù)據(jù)。注意務(wù)必要添加隔離級別參數(shù),設(shè)置為read_committed。命令示例如下:
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --isolation-level read_committed --topic sink
將上面代碼編譯輸出為Flink作業(yè)jar,使用flink命令提交。
flink run -m yarn-cluster -c xxx.xxx.xxx xxx.jar --source-broker master:9092,node1:9092,node2:9092 --source-topic source --sink-broker master:9092,node1:9092,node2:9092 --sink-topic sink --ckp-path hdfs://xxx:9000/path/to/checkpoint/
任務(wù)運行之后一段時間。通過Flink管理頁面找到TaskManager container所在的節(jié)點。使用kill pid命令終止進程。等待Flink自動恢復(fù)。成功恢復(fù)后再觀察kafka console consumer,輸出數(shù)據(jù)應(yīng)不重復(fù)不遺漏。