從kafka到mysql
新建Java項目
最簡單的方式是按照官網的方法,命令行執(zhí)行
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0,不過這種方法有些包還得自行添加,大家可以復制我的pom.xml,我已經將常用的包都放進去了,并且排除了沖突的包。注意的是,本地測試的時候,記得將scope注掉,不然會出現少包的情況。也可以在Run -> Edit Configurations中,勾選Include dependencies with "Provided" scope。最好在resources目錄下丟個log4j的配置文件,這樣有時候方便我們看日志找問題。-
新建完項目之后,我們要做的第一件事,自然是寫個Flink 版本的
Hello World。所以,新建測試類,然后輸入代碼StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env.fromElements("Hello World"); dataStream.print(); env.execute("test");看一下控制臺
4> Hello World如愿以償的得到了想要的結果,不過這個
4>是什么玩應?其實這個4代表是第四個分區(qū)輸出的結果。很多人可能會問,我也妹指定并發(fā)啊,數據怎么會跑到第四個分區(qū)呢?其實是因為本地模式的時候,會以匹配CPU的核數,啟動對應數量的分區(qū)。只要我們在每個算子之后加上setParallelism(1),就會只以一個分區(qū)來執(zhí)行了。至此,我們的DataStream 版的Hellow World試驗完畢,這里主要是為了驗證一下環(huán)境是否正確,接下來才是我們今天的主題從kafka到mysql。另外,如果更想了解DataStream的內容,歡迎大家關注另一個系列Flink DataStream(不過目前還沒開始寫)
新建kafka數據源表
接下來咱們廢話不多說,直接貼代碼
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkSql02 {
public static final String KAFKA_TABLE_SOURCE_DDL = "" +
"CREATE TABLE user_behavior (\n" +
" user_id BIGINT,\n" +
" item_id BIGINT,\n" +
" category_id BIGINT,\n" +
" behavior STRING,\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定連接類型是kafka\n" +
" 'connector.version' = '0.11', -- 與我們之前Docker安裝的kafka版本要一致\n" +
" 'connector.topic' = 'mykafka', -- 之前創(chuàng)建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
" 'connector.startup-mode' = 'earliest-offset', --指定從最早消費\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'json' -- json格式,和topic中的消息格式保持一致\n" +
")";
public static void main(String[] args) throws Exception {
//構建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//構建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//構建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//通過DDL,注冊kafka數據源表
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);
//執(zhí)行查詢
Table table = tEnv.sqlQuery("select * from user_behavior");
//轉回DataStream并輸出
tEnv.toAppendStream(table, Row.class).print().setParallelism(1);
//任務啟動,這行必不可少!
env.execute("test");
}
}
接下來就是激動人性的測試了,右擊,run!查看控制臺
543462,1715,1464116,pv,2017-11-26T01:00
543462,1715,1464116,pv,2017-11-26T01:00
543462,1715,1464116,pv,2017-11-26T01:00
543462,1715,1464116,pv,2017-11-26T01:00
嗯,跟我之前往kafka中丟的數據一樣,沒毛??!
如果大家在使用過程中遇到Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in這種異常,請仔細查看你的DDL語句,是否缺少或者用錯了配置,這里大家可以參考一下Flink官網的手冊,查看一下對應的配置。也可以在下方留言,一起交流。
新建mysql數據結果表
- 現在mysql中把表創(chuàng)建,畢竟flink現在還沒法幫你自動建表,只能自己動手豐衣足食咯。
CREATE TABLE `user_behavior` (
`user_id` bigint(20) DEFAULT NULL,
`item_id` bigint(20) DEFAULT NULL,
`behavior` varchar(255) DEFAULT NULL,
`category_id` bigint(20) DEFAULT NULL,
`ts` timestamp(6) NULL DEFAULT NULL
)
在mysql端創(chuàng)建完成后,回到我們的代碼,注冊mysql數據結果表,并將從kafka中讀取到的數據,插入mysql結果表中。下面是完整代碼,包含kafka數據源表的構建。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkSql02 {
public static final String KAFKA_TABLE_SOURCE_DDL = "" +
"CREATE TABLE user_behavior (\n" +
" user_id BIGINT,\n" +
" item_id BIGINT,\n" +
" category_id BIGINT,\n" +
" behavior STRING,\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定連接類型是kafka\n" +
" 'connector.version' = '0.11', -- 與我們之前Docker安裝的kafka版本要一致\n" +
" 'connector.topic' = 'mykafka', -- 之前創(chuàng)建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
" 'connector.startup-mode' = 'earliest-offset', --指定從最早消費\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'json' -- json格式,和topic中的消息格式保持一致\n" +
")";
public static final String MYSQL_TABLE_SINK_DDL=""+
"CREATE TABLE `user_behavior_mysql` (\n" +
" `user_id` bigint ,\n" +
" `item_id` bigint ,\n" +
" `behavior` varchar ,\n" +
" `category_id` bigint ,\n" +
" `ts` timestamp(3) \n" +
")WITH (\n" +
" 'connector.type' = 'jdbc', -- 連接方式\n" +
" 'connector.url' = 'jdbc:mysql://localhost:3306/mysql', -- jdbc的url\n" +
" 'connector.table' = 'user_behavior', -- 表名\n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', -- 驅動名字,可以不填,會自動從上面的jdbc url解析 \n" +
" 'connector.username' = 'root', -- 顧名思義 用戶名\n" +
" 'connector.password' = '123456' , -- 密碼\n" +
" 'connector.write.flush.max-rows' = '5000', -- 意思是攢滿多少條才觸發(fā)寫入 \n" +
" 'connector.write.flush.interval' = '2s' -- 意思是攢滿多少秒才觸發(fā)寫入;這2個參數,無論數據滿足哪個條件,就會觸發(fā)寫入\n"+
")"
;
public static void main(String[] args) throws Exception {
//構建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//構建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//構建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//通過DDL,注冊kafka數據源表
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);
//通過DDL,注冊mysql數據結果表
tEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL);
//將從kafka中查到的數據,插入mysql中
tEnv.sqlUpdate("insert into user_behavior_mysql select user_id,item_id,behavior,category_id,ts from user_behavior");
//任務啟動,這行必不可少!
env.execute("test");
}
}
打開我們的Navicat,看看我們的數據是否正確輸入到mysql中。
| user_id | item_id | behavior | category_id | ts |
|---|---|---|---|---|
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
成功!并且數據和我們kafka中的數據也是一致,大家也可以通過上一章講過的Java連接kafka來對比驗證數據的一致性,此處就不再贅述。那么好了,本次的Flink Sql之旅就結束,下一章我們將帶大家,在這次課程的基礎上,完成常用聚合查詢以及目前Flink Sql原生支持的維表Join。另外,有同學反映有些地方不知道為什么要這樣做,不想只知其然而不知所以然,我們之后同樣會有另外的專題講述Flink 原理。
附錄
pom.xml
<properties>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<!-- Flink modules -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- CLI dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>javassist</artifactId>
<groupId>org.javassist</groupId>
</exclusion>
<exclusion>
<artifactId>scala-parser-combinators_2.11</artifactId>
<groupId>org.scala-lang.modules</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<artifactId>force-shading</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.4.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>junit:junit</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
有點亂,懶得整理了,大家直接復制過去用就行。
log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/' >
<appender name="myConsole" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern"
value="[%d{dd HH:mm:ss,SSS\} %-5p] [%t] %c{2\} - %m%n" />
</layout>
<!--過濾器設置輸出的級別-->
<filter class="org.apache.log4j.varia.LevelRangeFilter">
<param name="levelMin" value="info" />
<param name="levelMax" value="error" />
<param name="AcceptOnMatch" value="true" />
</filter>
</appender>
<!-- 指定logger的設置,additivity指示是否遵循缺省的繼承機制-->
<logger name="com.runway.bssp.activeXdemo" additivity="false">
<appender-ref ref="myConsole" />
</logger>
<!-- 根logger的設置-->
<root>
<priority value ="debug"/>
<appender-ref ref="myConsole"/>
</root>
</log4j:configuration>
記得要放在resource目錄下,別放錯了。