Learning Apache Flink(API)

本文是參考Apache Flink v1.3官方文檔,本文所使用的是scala版本的API,基礎(chǔ)架構(gòu)參見《Learning Apache Flink(BASIC)》

業(yè)務(wù)場景


Flink接kafka的數(shù)據(jù),然后通過初步的過濾得到一個結(jié)果集,再進(jìn)行“打標(biāo)簽”,最后對“打標(biāo)簽”的結(jié)果進(jìn)行過濾,最終輸出到kafka中。例如,在topic foo中的數(shù)據(jù)表示"imsi,lac,cell",先通過imsi字段篩選出所有以460開頭的字段,再通過lac和cell字段判斷是否在指定的區(qū)域,增加一個字段isSpecifiedLocation,值為true或者false。最終輸出到kafka中的字段為"imsi,lac,cell,isSpecifiedLocation,timestamp",且isSpecifiedLocation為true。

Flink讀kafka數(shù)據(jù)


注:本文中所使用的kafka的版本為0.10.0
官方文檔中Provided TableSources針對kafka指提供了json和avro格式的接入,所以如果是在topic中的數(shù)據(jù)是csv格式的,可以模仿Kafka010JsonTableSourceJsonRowDeserializationSchema自定義KafkaCsvTableSourceCsvRowDeserializationSchema解析csv格式數(shù)據(jù)(具體實現(xiàn)參見完整代碼章節(jié)),然后就可以通過下面的方法注冊一個TableSource

//Register a TableSource
val kafkaTableSource = new KafkaCsvTableSource(
      "foo",
      properties,
      new CsvRowDeserializationSchema(typeInfo),
      typeInfo)

tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource)
val kafkaCsvTable = tableEnv.scan("KafkaCsvTable")

得到一個Table之后,就可以使用Table API,進(jìn)行數(shù)據(jù)的過濾

val filterResult = kafkaCsvTable.where('imsi like "460%").select("imsi,lac,cell")

DataStream動態(tài)增加字段


  1. 將Table轉(zhuǎn)換為DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream(filterResult)
  1. 增加字段
val newDsRows = dsRow.map(row => {
  val ret = new Row(row.getArity() + 2)

  for(i <- 0 to row.getArity()-1) {
    ret.setField(i, row.getField(i))
  }

  val isSpecifiedLocation = if(ret.getField(1).equals(ret.getField(2))) true else false
  ret.setField(row.getArity(), isSpecifiedLocation)

  ret.setField(row.getArity()+1, System.currentTimeMillis())

  ret
})
  1. 再將新生成的DataStream注冊為Table,進(jìn)行最終的過濾
 tableEnv.registerDataStream("newTable", newDsRows)
 val newKafkaCsvTable = tableEnv.scan("newTable")
 val newResult = newKafkaCsvTable.filter('isSpecifiedLocation === true).select("imsi,lac,cell,isSpecifiedLocation,timestamp")

Flink向kafka寫數(shù)據(jù)


本文使用的是Flink提供的Kafka09JsonTableSink類直接將結(jié)果輸出為json格式

 val sink = new Kafka09JsonTableSink("bar", properties, new FlinkFixedPartitioner[Row])
 newResult.writeToSink(sink)

測試用例


  1. 執(zhí)行./bin/flink run -c com.woople.streaming.scala.examples.kafka.FlinkKafkaDemo /opt/flink-tutorials-1.0-bundle.jar

  2. 向topic foo中寫入4601234,1,1數(shù)據(jù),在topic bar中可以得到{"imsi":"4601234","lac":"1","cell":"1","isSpecifiedLocation":true,"timestamp":1511222771896}結(jié)果,如果輸入的是4601234,2,1則不符合條件不會輸出。

Troubleshooting


在代碼調(diào)試過程中遇到一個錯誤

org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.

在網(wǎng)上找到FLINK-6500,參考里面的方法,在代碼中添加了這行代碼之后,問題解決了

implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names)

完整代碼


pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.woople</groupId>
    <artifactId>flink-tutorials</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>
    
    <build>
        <defaultGoal>package</defaultGoal>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <configuration>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>eclipse-add-source</id>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile-first</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>attach-scaladocs</id>
                        <phase>verify</phase>
                        <goals>
                            <goal>doc-jar</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>2.11.8</scalaVersion>
                    <recompileMode>incremental</recompileMode>
                    <useZincServer>true</useZincServer>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <shadedArtifactAttached>false</shadedArtifactAttached>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <finalName>${project.artifactId}-${project.version}-bundle</finalName>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

KafkaCsvTableSource.java

package com.woople.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Properties;

public class KafkaCsvTableSource implements StreamTableSource<Row> {
    /** The Kafka topic to consume. */
    private final String topic;

    /** Properties for the Kafka consumer. */
    private final Properties properties;

    /** Deserialization schema to use for Kafka records. */
    private final DeserializationSchema<Row> deserializationSchema;

    /** Type information describing the result type. */
    private final TypeInformation<Row> typeInfo;

    /**
     * Creates a generic Kafka {@link StreamTableSource}.
     *
     * @param topic                 Kafka topic to consume.
     * @param properties            Properties for the Kafka consumer.
     * @param deserializationSchema Deserialization schema to use for Kafka records.
     * @param typeInfo              Type information describing the result type.
     */
    public KafkaCsvTableSource(
            String topic,
            Properties properties,
            DeserializationSchema<Row> deserializationSchema,
            TypeInformation<Row> typeInfo) {

        this.topic = Preconditions.checkNotNull(topic, "Topic");
        this.properties = Preconditions.checkNotNull(properties, "Properties");
        this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
        this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information");
    }

    /**
     * NOTE: This method is for internal use only for defining a TableSource.
     *       Do not use it in Table API programs.
     */
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
        // Version-specific Kafka consumer
        FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
        return env.addSource(kafkaConsumer);
    }

    @Override
    public TypeInformation<Row> getReturnType() {
        return typeInfo;
    }

    /**
     * Returns the version-specific Kafka consumer.
     *
     * @param topic                 Kafka topic to consume.
     * @param properties            Properties for the Kafka consumer.
     * @param deserializationSchema Deserialization schema to use for Kafka records.
     * @return The version-specific Kafka consumer
     */
    private FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
        return new FlinkKafkaConsumer010<Row>(topic, deserializationSchema, properties);
    }
    /**
     * Returns the deserialization schema.
     *
     * @return The deserialization schema
     */
    protected DeserializationSchema<Row> getDeserializationSchema() {
        return deserializationSchema;
    }

    @Override
    public String explainSource() {
        return "";
    }
}

CsvRowDeserializationSchema.java

package com.woople.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

public class CsvRowDeserializationSchema implements DeserializationSchema<Row> {
    /** Type information describing the result type. */
    private final TypeInformation<Row> typeInfo;

    /** Field names to parse. Indices match fieldTypes indices. */
    private final String[] fieldNames;

    /** Types to parse fields as. Indices match fieldNames indices. */
    private final TypeInformation<?>[] fieldTypes;

    /** Flag indicating whether to fail on a missing field. */
    private boolean failOnMissingField;

    /**
     * Creates a JSON deserialization schema for the given fields and types.
     *
     * @param typeInfo   Type information describing the result type. The field names are used
     *                   to parse the JSON file and so are the types.
     */
    public CsvRowDeserializationSchema(TypeInformation<Row> typeInfo) {
        Preconditions.checkNotNull(typeInfo, "Type information");
        this.typeInfo = typeInfo;

        this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
        this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
    }

    @Override
    public Row deserialize(byte[] message) throws IOException {
        try {
            String messages = new String(message);
            String[] messagesArray = messages.split(",");

            Row row = new Row(fieldNames.length);
            for (int i = 0; i < fieldNames.length; i++) {
                row.setField(i, messagesArray[i]);
            }

            return row;
        } catch (Throwable t) {
            throw new IOException("Failed to deserialize JSON object.", t);
        }
    }

    @Override
    public boolean isEndOfStream(Row nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Row> getProducedType() {
        return typeInfo;
    }

    /**
     * Configures the failure behaviour if a JSON field is missing.
     *
     * <p>By default, a missing field is ignored and the field is set to null.
     *
     * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
     */
    public void setFailOnMissingField(boolean failOnMissingField) {
        this.failOnMissingField = failOnMissingField;
    }
}

FlinkKafkaDemo.scala

package com.woople.streaming.scala.examples.kafka
import java.util.Properties
import com.woople.flink.streaming.connectors.kafka.{CsvRowDeserializationSchema, KafkaCsvTableSource}
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSink
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

object FlinkKafkaDemo {

  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    val typeInfo = Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "10.1.236.66:6667")
    properties.setProperty("group.id", "test")

    //Register a TableSource
    val kafkaTableSource = new KafkaCsvTableSource(
      "foo",
      properties,
      new CsvRowDeserializationSchema(typeInfo),
      typeInfo)

    tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource)
    val kafkaCsvTable = tableEnv.scan("KafkaCsvTable")
    val filterResult = kafkaCsvTable.where('imsi like "460%").select("imsi,lac,cell")
    val dsRow: DataStream[Row] = tableEnv.toAppendStream(filterResult)

    {
      val types = Array[TypeInformation[_]](
        Types.STRING,
        Types.STRING,
        Types.STRING,
        Types.BOOLEAN,
        Types.LONG)
      val names =  Array("imsi","lac","cell","isSpecifiedLocation","timestamp")

      implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names)

      val newDsRows = dsRow.map(row => {
        val ret = new Row(row.getArity() + 2)

        for(i <- 0 to row.getArity()-1) {
          ret.setField(i, row.getField(i))
        }

        val isSpecifiedLocation = if(ret.getField(1).equals(ret.getField(2))) true else false
        ret.setField(row.getArity(), isSpecifiedLocation)

        ret.setField(row.getArity()+1, System.currentTimeMillis())

        ret
      })

      tableEnv.registerDataStream("newTable", newDsRows)
      val newKafkaCsvTable = tableEnv.scan("newTable")
      val newResult = newKafkaCsvTable.filter('isSpecifiedLocation === true).select("imsi,lac,cell,isSpecifiedLocation,timestamp")

      val sink = new Kafka09JsonTableSink("bar", properties, new FlinkFixedPartitioner[Row])
      newResult.writeToSink(sink)
      env.execute("Flink kafka demo")
    }
  }
}

總結(jié)


本文只是一個簡單的樣例,代碼中并沒有考慮性能等因素。后續(xù)會對相關(guān)內(nèi)容進(jìn)行深入的研究。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容