Flink實(shí)踐-同步Mysql數(shù)據(jù)到doris 以及遇到的問(wèn)題

flink 官方中文文檔 Apache Flink Documentation | Apache Flink

一、環(huán)境準(zhǔn)備

mysql 8.0.x 安裝mysql
doris 2.1.x 手動(dòng)部署Doris
flink 1.8 Flink 本地部署
jdk

二、初始化表

flink sql并不會(huì)自動(dòng)給mysql和doris創(chuàng)建表
所以需要自行提前創(chuàng)建表
mysql 略
doris

DROP TABLE IF EXISTS testdb.d_table_a;
CREATE TABLE IF NOT EXISTS testdb.d_table_a
(
    `id`                  BIGINT COMMENT '主鍵id',
    `uid`                 BIGINT COMMENT '用戶ID',
    `address`             varchar(512) COMMENT '地址',
)
UNIQUE KEY(id)
COMMENT "用戶地址表"
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1",
    "enable_unique_key_merge_on_write" = "true"
);

#參數(shù)解釋:
#數(shù)據(jù)表模型 目前分為三類:DUPLICATE KEY, UNIQUE KEY, AGGREGATE KEY
#UNIQUE KEY(id)
#指定表的數(shù)據(jù)分布方式為按id列的哈希值分布,并設(shè)置桶的數(shù)量為1。
#DISTRIBUTED BY HASH(id) BUCKETS 1
#設(shè)置表的屬性,指定副本分配策略,這里設(shè)置的是默認(rèn)位置的一個(gè)副本。
#PROPERTIES ("replication_allocation" = "tag.location.default: 1"): 

#部分列更新:比如第一個(gè)job同步a,b,c列,第二個(gè)job同步x,y,z列,如果不開(kāi)啟,第二個(gè)job會(huì)覆蓋掉a,b,c列
#"enable_unique_key_merge_on_write" = "true"

三、實(shí)現(xiàn)方式

(一)flink sql

1、flink bin目錄下啟動(dòng)sql-client會(huì)進(jìn)入到flink sql客戶端

./sql-client.sh
flink sql客戶端.png

設(shè)置執(zhí)行檢查點(diǎn)間隔 10s

2、SET  execution.checkpointing.interval = 10s;

3、創(chuàng)建 source

CREATE TABLE mysqlSource (
    id BIGINT,
    uid BIGINT,
    address VARCHAR(512),
    PRIMARY KEY (id) NOT ENFORCED
)
WITH (
    'connector' = 'mysql-cdc',
    'hostname' = ${mysql.hostname},
    'port' = ${mysql.port},
    'username' = ${mysql.username},
    'password' = ${mysql.password},
    'database-name' = 'testdb',
    'table-name' = 'm_table_a'
);

4、創(chuàng)建 sink

CREATE TABLE dorisSink (
    id BIGINT,
    uid BIGINT,
    address VARCHAR(512),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'doris',
  'fenodes' = ${doris.fenodes} ,
  'table.identifier' = 'testdb.d_table_a',
  'username' = ${doris.username} ,
  'password' = ${doris.password} ,
'sink.properties.partial_columns' = 'true',
  'sink.properties.column_separator' = '$^&@#$',
  'sink.properties.line_delimiter' = '!^*^!'
);

配置解釋:

#flink sql 讀取出來(lái)的數(shù)據(jù)類似如下 :
# 1,張三,湖南
# 2,李四,湖北
# 3,王五,北京
# 西城區(qū)
# 4,趙六,廣東

#其中列分隔符為',',行分隔符為'\n',其中'西城區(qū)'一行被當(dāng)成了一條單獨(dú)的數(shù)據(jù),但是又與doris中表字段不匹配,所以設(shè)置行分隔符為'!^*^!'(寫成自己數(shù)據(jù)中不容易出現(xiàn)的組合符號(hào)就行),否則寫入失敗
#設(shè)置以下參數(shù)
#列分隔符
# 'sink.properties.column_separator' = '$^&@#$',
#行分隔符
# 'sink.properties.line_delimiter' = '!^*^!'

# 支持根據(jù)id更新部分字段
'sink.properties.partial_columns' = 'true'
#并且需要在創(chuàng)建表時(shí)添加
"enable_unique_key_merge_on_write" = "true"

5、同步source數(shù)據(jù)到sink

INSERT INTO MysqlSink SELECT id, uid, address FROM dorisSink ;

6、訪問(wèn)本地Flink Web UI 查看運(yùn)行情況

(二)TableApi/DataStream

1、創(chuàng)建maven工程
導(dǎo)入相關(guān)依賴

 <properties>
        <flink.cdc.version>3.2.1</flink.cdc.version>
        <doris.version>24.0.1</doris.version>
        <mysql.version>8.0.26</mysql.version>
        <flink.version>1.18.0</flink.version>
    </properties>

    <dependencies>
        <!-- 連接器 -->
        <!-- flink-CDC -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>
        <!-- doris-connector -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.18</artifactId>
            <version>${doris.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.16.1</version>
        </dependency>
        <!-- mysql-connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>


        <!-- Flink 運(yùn)行環(huán)境所需依賴包 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>

2、Java代碼

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class ExecuteHandler {
    public static void main(String[] args) {
        // 用于接收args 入?yún)ⅲㄈ绻杏玫剑?        // ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 啟用檢查點(diǎn)機(jī)制,并設(shè)置檢查點(diǎn)的時(shí)間間隔為 10000 毫秒(即 10 秒)。這意味著 Flink 每 10 秒會(huì)自動(dòng)觸發(fā)一次檢查點(diǎn)。
        env.enableCheckpointing(10000);
        // 配置檢查點(diǎn)模式為 EXACTLY_ONCE,這意味著 Flink 會(huì)確保每個(gè)數(shù)據(jù)記錄在發(fā)生故障后恰好被處理一次,從而保證數(shù)據(jù)的一致性和準(zhǔn)確性。
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 設(shè)置兩次檢查點(diǎn)之間的最小暫停時(shí)間間隔為 500 毫秒。這可以防止頻繁的檢查點(diǎn)操作對(duì)性能造成影響。
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 設(shè)置檢查點(diǎn)超時(shí)時(shí)間為 60000 毫秒(即 60 秒)。如果一個(gè)檢查點(diǎn)在 60 秒內(nèi)沒(méi)有完成,則會(huì)被視為失敗。
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 設(shè)置同時(shí)進(jìn)行的最大檢查點(diǎn)數(shù)為 1。這意味著在前一個(gè)檢查點(diǎn)完成之前,不會(huì)開(kāi)始新的檢查點(diǎn)。
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 設(shè)置可容忍的檢查點(diǎn)失敗次數(shù)為 5。這意味著在連續(xù) 5 次檢查點(diǎn)失敗后,F(xiàn)link 才會(huì)停止嘗試新的檢查點(diǎn)。
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        // 設(shè)置本地,checkpoint的保持位置,發(fā)生故障時(shí)可以找到故障的checkpoint,重啟時(shí)指定對(duì)應(yīng)的checkpoint
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
        // 設(shè)置重啟策略為基于失敗率的重啟策略。具體來(lái)說(shuō),如果在 5 分鐘內(nèi)發(fā)生 3 次失敗,F(xiàn)link 將嘗試重啟作業(yè),每次重啟之間等待 10 秒。
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)));
        // 并行度設(shè)置為1
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 創(chuàng)建 source
        tableEnv.executeSql("CREATE TABLE mysqlSource (\n" +
                "    id BIGINT,\n" +
                "    uid BIGINT,\n" +
                "    address VARCHAR(512),\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ")\n" +
                "WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = ${mysql.hostname},\n" +
                "    'port' = ${mysql.port},\n" +
                "    'username' = ${mysql.username},\n" +
                "    'password' = ${mysql.password},\n" +
                "    'database-name' = 'testdb',\n" +
                "    'table-name' = 'm_table_a'\n" +
                ");");

        // 創(chuàng)建 sink
        tableEnv.executeSql("CREATE TABLE dorisSink (\n" +
                "    id BIGINT,\n" +
                "    uid BIGINT,\n" +
                "    address VARCHAR(512),\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'doris',\n" +
                "  'fenodes' = ${doris.fenodes} ,\n" +
                "  'table.identifier' = 'testdb.d_table_a',\n" +
                "  'username' = ${doris.username} ,\n" +
                "  'password' = ${doris.password} ,\n" +
                "  'sink.properties.column_separator' = '$^&@#$',\n" +
                "  'sink.properties.line_delimiter' = '!^*^!'\n" +
                ");");
        // 同步source數(shù)據(jù)到sink
        tableEnv.executeSql("INSERT INTO MysqlSink SELECT id, uid, address FROM dorisSink ;");
    }
}

3、打包并上傳到flink


上傳任務(wù)jar包到flink.png

入口類必填,其他三個(gè)參數(shù)視情況而定,然后點(diǎn)擊submit即可看到對(duì)應(yīng)job成功啟動(dòng)

四、遇到的問(wèn)題

本地單機(jī)部署任務(wù)沒(méi)有報(bào)錯(cuò),但是通過(guò)flink kubernetes operator部署application模式,提示如下,但是在flink啟動(dòng)日志中有對(duì)應(yīng)的連接器信息

Cannot discover a connector using option: 'connector'='mysql-cdc'
Cannot discover a connector using option: 'connector'='doris'

原因:
Flink使用Java的服務(wù)提供者接口(SPI)通過(guò)標(biāo)識(shí)符加載表連接器/格式工廠。由于SPI資源文件名為org.apache.flink.table.factorys。每個(gè)表連接器/格式的工廠都在同一個(gè)目錄META-INF/services下,當(dāng)構(gòu)建使用多個(gè)表連接器/格式化的項(xiàng)目的uber jar時(shí),這些資源文件將相互覆蓋,這將導(dǎo)致Flink無(wú)法加載表連接器/模板工廠。
以flink-sql-connector-mysql-cdc-2.4.1為例


flink-sql-connector-mysql-cdc-2.4.1.png

官網(wǎng)參考:概覽 | Apache Flink

image.png

解決方案:
使用官網(wǎng)的打包方式打包

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容