
本篇文檔將演示如何使用 Apache Doris Flink Connector 結合 Flink CDC 以及 Doris Stream Load 的兩階段提交,實現(xiàn) MySQL 數(shù)據(jù)庫分庫分表實時高效接入,并實現(xiàn) Exactly Once。
一、概述
在實際業(yè)務系統(tǒng)中為了解決單表數(shù)據(jù)量大帶來的各種問題,我們通常采用分庫分表的方式對庫表進行拆分,以達到提高系統(tǒng)的吞吐量。
但是這樣給后面數(shù)據(jù)分析帶來了麻煩,這個時候我們通常試將業(yè)務數(shù)據(jù)庫的分庫分表同步到數(shù)據(jù)倉庫時,將這些分庫分表的數(shù)據(jù)合并成一個庫、一個表,便于我們后面的數(shù)據(jù)分析。
本篇文檔我們將演示如何基于 Flink CDC 結合 Apache Doris Flink Connector 及 Doris Stream Load 的兩階段提交,實現(xiàn) MySQL 數(shù)據(jù)庫分庫分表實時高效的接入到 Doris 數(shù)據(jù)倉庫中進行分析。
1.1 什么是 CDC
CDC 是 Change Data Capture 變更數(shù)據(jù)獲取的簡稱。
核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù)或數(shù)據(jù)表的插入 INSERT、更新 UPDATE、刪除 DELETE 等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。
CDC 技術應用場景也非常廣泛,包括:
- 數(shù)據(jù)分發(fā):將一個數(shù)據(jù)源分發(fā)給多個下游,常用于業(yè)務解耦、微服務。
- 數(shù)據(jù)集成:將分散異構的數(shù)據(jù)源集成到數(shù)據(jù)倉庫中,消除數(shù)據(jù)孤島,便于后續(xù)的分析。
- 數(shù)據(jù)遷移:常用于數(shù)據(jù)庫備份、容災等。
1.2 為什么選擇 Flink CDC
Flink CDC 基于數(shù)據(jù)庫日志的 Change Data Capture 技術,實現(xiàn)了全量和增量的一體化讀取能力,并借助 Flink 優(yōu)秀的管道能力和豐富的上下游生態(tài),支持捕獲多種數(shù)據(jù)庫的變更,并將這些變更實時同步到下游存儲。
目前,Flink CDC 的上游已經支持了 MySQL、MariaDB、PG、Oracle、MongoDB 、Oceanbase、TiDB、SQLServer 等數(shù)據(jù)庫。
Flink CDC 的下游則更加豐富,支持寫入 Kafka、Pulsar 消息隊列,也支持寫入 Hudi、Iceberg 、Doris 等,支持寫入各種數(shù)據(jù)倉庫及數(shù)據(jù)湖中。
同時,通過 Flink SQL 原生支持的 Changelog 機制,可以讓 CDC 數(shù)據(jù)的加工變得非常簡單。用戶通過 SQL 便能實現(xiàn)數(shù)據(jù)庫全量和增量數(shù)據(jù)的清洗、打寬、聚合等操作,極大地降低了用戶門檻。此外, Flink DataStream API 支持用戶編寫代碼實現(xiàn)自定義邏輯,給用戶提供了深度定制業(yè)務的自由度。
Flink CDC 技術的核心是支持將表中的全量數(shù)據(jù)和增量數(shù)據(jù)做實時一致性的同步與加工,讓用戶可以方便地獲每張表的實時一致性快照。比如一張表中有歷史的全量業(yè)務數(shù)據(jù),也有增量的業(yè)務數(shù)據(jù)在源源不斷寫入、更新。Flink CDC 會實時抓取增量的更新記錄,實時提供與數(shù)據(jù)庫中一致性的快照,如果是更新記錄,會更新已有數(shù)據(jù);如果是插入記錄,則會追加到已有數(shù)據(jù),整個過程中,F(xiàn)link CDC 提供了一致性保障,即不重不丟。
Flink CDC 具有如下優(yōu)勢:
- Flink 的算子和 SQL 模塊更為成熟和易用;
- Flink 作業(yè)可以通過調整算子并行度的方式輕松擴展處理能力;
- Flink 支持高級的狀態(tài)后端(State Backends),允許存取海量的狀態(tài)數(shù)據(jù);
- Flink 提供更多的 Source 和 Sink 等生態(tài)支持;
- Flink 有更大的用戶基數(shù)和活躍的支持社群,問題更容易解決。
而且 Flink Table / SQL 模塊將數(shù)據(jù)庫表和變動記錄流(例如 CDC 的數(shù)據(jù)流)看做是同一事物的兩面,因此內部提供的 Upsert 消息結構(+I表示新增、-U表示記錄更新前的值、+U表示記錄更新后的值,-D表示刪除)可以與 Debezium 等生成的變動記錄一一對應。
1.3 什么是 Apache Doris
Apache Doris 是一個現(xiàn)代化的 MPP 分析型數(shù)據(jù)庫產品。僅需亞秒級響應時間即可獲得查詢結果,有效地支持實時數(shù)據(jù)分析。Apache Doris 的分布式架構非常簡潔,易于運維,并且可以支持 10PB 以上的超大數(shù)據(jù)集。
Apache Doris 可以滿足多種數(shù)據(jù)分析需求,例如固定歷史報表,實時數(shù)據(jù)分析,交互式數(shù)據(jù)分析和探索式數(shù)據(jù)分析等??梢允箶?shù)據(jù)分析工作更加簡單高效!
1.4 Two-phase commit
什么是 Two-phase commit(2PC)
在分布式系統(tǒng)中,為了讓每個節(jié)點都能夠感知到其他節(jié)點的事務執(zhí)行狀況,需要引入一個中心節(jié)點來統(tǒng)一處理所有節(jié)點的執(zhí)行邏輯,這個中心節(jié)點叫做協(xié)調者(Coordinator),被中心節(jié)點調度的其他業(yè)務節(jié)點叫做參與者(Participant)。
2PC 將分布式事務分成了兩個階段,兩個階段分別為提交請求(投票)和提交(執(zhí)行)。協(xié)調者根據(jù)參與者的響應來決定是否需要真正地執(zhí)行事務,具體流程如下:
提交請求(投票)階段
- 協(xié)調者向所有參與者發(fā)送 prepare 請求與事務內容,詢問是否可以準備事務提交,并等待參與者的響應。
- 參與者執(zhí)行事務中包含的操作,并記錄 undo 日志(用于回滾)和 redo 日志(用于重放),但不真正提交。
- 參與者向協(xié)調者返回事務操作的執(zhí)行結果,執(zhí)行成功返回 yes,否則返回 no。
提交(執(zhí)行)階段
分為成功與失敗兩種情況。
-
若所有參與者都返回 yes,說明事務可以提交:
- 協(xié)調者向所有參與者發(fā)送 Commit 請求。
- 參與者收到 Commit 請求后,將事務真正地提交上去,并釋放占用的事務資源,并向協(xié)調者返回 Ack。
- 協(xié)調者收到所有參與者的 Ack 消息,事務成功完成。
-
若有參與者返回 no 或者超時未返回,說明事務中斷,需要回滾:
- 協(xié)調者向所有參與者發(fā)送 Rollback 請求。
- 參與者收到 Rollback 請求后,根據(jù) undo 日志回滾到事務執(zhí)行前的狀態(tài),釋放占用的事務資源,并向協(xié)調者返回 Ack。
- 協(xié)調者收到所有參與者的 Ack 消息,事務回滾完成。
1.5 Flink 2PC
Flink 作為流式處理引擎,自然也提供了對 Exactly Once 語義的保證。端到端的 Exactly Once 語義,是輸入、處理邏輯、輸出三部分協(xié)同作用的結果。Flink 內部依托檢查點機制和輕量級分布式快照算法 ABS 保證 Exactly Once。而要實現(xiàn)精確一次的輸出邏輯,則需要施加以下兩種限制之一:冪等性寫入(idempotent write)、事務性寫入(transactional write)。
預提交階段的流程

每當需要做 Checkpoint 時,JobManager 就在數(shù)據(jù)流中打入一個屏障(barrier),作為檢查點的界限。屏障隨著算子鏈向下游傳遞,每到達一個算子都會觸發(fā)將狀態(tài)快照寫入狀態(tài)后端的動作。當屏障到達 Kafka sink 后,通過 KafkaProducer.flush() 方法刷寫消息數(shù)據(jù),但還未真正提交。接下來還是需要通過檢查點來觸發(fā)提交階段:
提交階段流程

只有在所有檢查點都成功完成這個前提下,寫入才會成功。這符合前文所述 2PC 的流程,其中 JobManager 為協(xié)調者,各個算子為參與者(不過只有 Sink 一個參與者會執(zhí)行提交)。一旦有檢查點失敗,notifyCheckpointComplete() 方法就不會執(zhí)行。如果重試也不成功的話,最終會調用 abort() 方法回滾事務。
1.6 Doris Stream Load 2PC
1.6.1 Stream Load
Stream Load 是 Apache Doris 提供的一個同步的導入方式,用戶通過發(fā)送 HTTP 協(xié)議發(fā)送請求將本地文件或數(shù)據(jù)流導入到 Doris 中。Stream Load 同步執(zhí)行導入并返回導入結果。用戶可直接通過請求的返回體判斷本次導入是否成功。
Stream Load 主要適用于導入本地文件,或通過程序導入數(shù)據(jù)流中的數(shù)據(jù)。
使用方法:
用戶通過Http Client 進行操作,也可以使用 Curl 命令進行
curl --location-trusted -u user:passwd [-H ""...] -T data.file -H "label:label" -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
這里為了是防止用戶重復導入相同的數(shù)據(jù),使用了導入任務標識 Label。強烈推薦用戶同一批次數(shù)據(jù)使用相同的 Label。這樣同一批次數(shù)據(jù)的重復請求只會被接受一次,保證了 At-Most-Once。
1.6.2 Stream Load 2PC
Aapche Doris 最早的 Stream Load 是沒有兩階段提交的,導入數(shù)據(jù)的時候直接通過 Stream Load 的 HTTP 接口完成數(shù)據(jù)導入,只有成功和失敗。
- 這種在正常情況下是沒有問題的,在分布式環(huán)境下可能為因為某一個導入任務是失敗導致兩端數(shù)據(jù)不一致的情況,特別是在 Doris Flink Connector 里,之前的 Doris Flink Connector 數(shù)據(jù)導入失敗需要用戶自己控制,做異常處理,如果導入失敗之后,將數(shù)據(jù)保存到指定的地方(例如 Kafka),然后手動處理。
- 如果 Flink Job 因為其他問題突然掛掉,這樣會造成部分數(shù)據(jù)成功、部分數(shù)據(jù)失敗,而且失敗的數(shù)據(jù)因為沒有 Checkpoint,重新啟動 Job 也沒辦法重新消費失敗的數(shù)據(jù),造成兩端數(shù)據(jù)不一致。
為了解決上面的這些問題,保證兩端數(shù)據(jù)一致性,我們實現(xiàn)了 Doris Stream Load 2PC,原理如下:
- 提交分成兩個階段
- 第一階段,提交數(shù)據(jù)寫入任務,這個時候數(shù)據(jù)寫入成功后,數(shù)據(jù)狀態(tài)是不可見的,事務狀態(tài)是 PRECOMMITTED
- 數(shù)據(jù)寫入成功之后,用戶觸發(fā) Commit 操作,將事務狀態(tài)變成 VISIBLE,這個時候數(shù)據(jù)可以查詢到
- 如果用戶要方式這一批數(shù)據(jù)只需要通過事務 ID,對事務觸發(fā) Abort 操作,這批數(shù)據(jù)將會被自動刪除掉
1.6.3 Stream Load 2PC 使用方式
- 在 be.conf 中配置
disable_stream_load_2pc=false(重啟生效) - 并且 在 HEADER 中聲明 two_phase_commit=true
發(fā)起預提交:
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
觸發(fā)事務 Commit 操作:
curl -X PUT --location-trusted -u user:passwd -H "txn_id:18036" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc
對事物觸發(fā) abort 操作:
curl -X PUT --location-trusted -u user:passwd -H "txn_id:18037" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc
1.7 Doris Flink Connector 2PC
我們之前提供了 Doris Flink Connector ,支持對 Doris 表數(shù)據(jù)的讀,Upsert、Delete(Unique key 模型),但是存在可能因為 Job 失敗或者其他異常情況導致兩端數(shù)據(jù)不一致的問題。
為了解決這些問題,我們基于 FLink 2PC 和 Doris Stream Load 2PC 對 Doris Connector 進行了改造升級,保證兩端 Exactly Once。
- 我們會在內存中維護讀寫的 Buffer,在啟動的時候,開啟寫入,并異步的提交,期間通過 HTTP Chunked 的方式持續(xù)的將數(shù)據(jù)寫入到 BE,直到 Checkpoint 的時候,停止寫入,這樣做的好處是避免用戶頻繁提交 HTTP 帶來的開銷,Checkpoint 完成后會開啟下一階段的寫入。
- 在這個 Checkpoint 期間,可能是多個 Task 任務同時在寫一張表的數(shù)據(jù),這些我們都會在這個 Checkpoint 期間對應一個全局的 Label,在 Checkpoint 的時候將這個 Label 對應的寫入數(shù)據(jù)的事務進行統(tǒng)一的一次提交,將數(shù)據(jù)狀態(tài)變成可見。
- 如果失敗 Flink 在重啟的時候會對這些數(shù)據(jù)通過 Checkpoint 進行回放。
- 這樣就可以保證 Doris 兩端數(shù)據(jù)的一致。
二、系統(tǒng)架構
下面我們通過一個完整示例來看怎么去通過 Doris Flink Connector 最新版本(支持兩階段提交),來完成整合 Flink CDC 實現(xiàn) MySQL 分庫分表實時采集入庫。

- 這里通過 Flink CDC 完成 MySQL 分庫分表數(shù)據(jù)采集。
- 然后通過 Doris Flink Connector 來完成數(shù)據(jù)的入庫。
- 最后利用 Doris 的高并發(fā)、高性能的OLAP分析計算能力對外提供數(shù)據(jù)服務。
三、MySQL 安裝配置
3.1 安裝 MySQL
快速使用 Docker 安裝配置 MySQL,具體參照下面的連接:
https://segmentfault.com/a/1190000021523570
3.2 開啟 MySQL Binlog
進入 Docker 容器修改 /etc/my.cnf 文件,在 [mysqld] 下面添加以下內容:
log_bin=mysql_bin
binlog-format=Row
server-id=1
然后重啟 MySQL
systemctl restart mysqld
3.3 準備數(shù)據(jù)
這里演示我們準備了兩個庫 emp_1和emp_2, 每個庫下面主備了兩張表 employees_1,employees_2。并給出了一下初始化數(shù)據(jù):
CREATE DATABASE emp_1;
USE emp_1;
CREATE TABLE employees_1 (
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (emp_no)
);
INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24');
CREATE TABLE employees_2 (
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (emp_no)
);
INSERT INTO `employees_2` VALUES (10037,'1963-07-22','Pradeep','Makrucki','M','1990-12-05'),
(10038,'1960-07-20','Huan','Lortz','M','1989-09-20'),
(10039,'1959-10-01','Alejandro','Brender','M','1988-01-19'),
(10040,'1959-09-13','Weiyi','Meriste','F','1993-02-14'),
(10041,'1959-08-27','Uri','Lenart','F','1989-11-12'),
(10042,'1956-02-26','Magy','Stamatiou','F','1993-03-21'),
(10043,'1960-09-19','Yishay','Tzvieli','M','1990-10-20'),
(10044,'1961-09-21','Mingsen','Casley','F','1994-05-21'),
(10045,'1957-08-14','Moss','Shanbhogue','M','1989-09-02'),
(10046,'1960-07-23','Lucien','Rosenbaum','M','1992-06-20');
CREATE DATABASE emp_2;
USE emp_2;
CREATE TABLE employees_1 (
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (emp_no)
);
INSERT INTO `employees_1` VALUES (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20');
CREATE TABLE employees_2(
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (emp_no)
);
INSERT INTO `employees_1` VALUES (10085,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(10086,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(10087,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(10088,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02'),
(10089,'1963-03-21','Sudharsan','Flasterstein','F','1986-08-12'),
(10090,'1961-05-30','Kendra','Hofting','M','1986-03-14'),
(10091,'1955-10-04','Amabile','Gomatam','M','1992-11-18'),
(10092,'1964-10-18','Valdiodio','Niizuma','F','1989-09-22'),
(10093,'1964-06-11','Sailaja','Desikan','M','1996-11-05'),
(10094,'1957-05-25','Arumugam','Ossenbruggen','F','1987-04-18');
四、Doris 安裝配置
這里我們以單機版為例:
首先下載 Doris 1.1 release版本:
https://doris.apache.org/downloads/downloads.html
解壓到指定目錄:
tar zxvf apache-doris-1.1.0-bin.tar.gz -C doris-1.1
解壓到目錄結構是這樣的:
.
├── apache_hdfs_broker
│ ├── bin
│ ├── conf
│ └── lib
├── be
│ ├── bin
│ ├── conf
│ ├── lib
│ ├── log
│ ├── minidump
│ ├── storage
│ └── www
├── derby.log
├── fe
│ ├── bin
│ ├── conf
│ ├── doris-meta
│ ├── lib
│ ├── log
│ ├── plugins
│ ├── spark-dpp
│ ├── temp_dir
│ └── webroot
└── udf
├── include
└── lib
配置 FE 和 BE:
cd doris-1.0
# 配置 fe.conf 和 be.conf,這兩個文件分別在fe和be的conf目錄下
打開這個 priority_networks
修改成自己的IP地址,注意這里是CIDR方式配置IP地址
例如我本地的IP是172.19.0.12,我的配置如下:
priority_networks = 172.19.0.0/24
######
在be.conf配置文件最后加上下面這個配置
disable_stream_load_2pc=false
- 注意這里默認只需要修改 fe.conf 和 be.conf 同樣的上面配置就可以。
- 默認 FE 元數(shù)據(jù)的目錄在 fe/doris-meta 目錄下。
- BE 的數(shù)據(jù)存儲在 be/storage 目錄下。
啟動 FE:
sh fe/bin/start_fe.sh --daemon
啟動 BE:
sh be/bin/start_be.sh --daemon
MySQL 命令行連接 FE,這里新安裝的 Doris 集群默認用戶是 Root 和 Admin,密碼是空:
mysql -uroot -P9030 -h127.0.0.1
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 41
Server version: 5.7.37 Doris version trunk-440ad03
Copyright (c) 2000, 2022, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> show frontends;
+--------------------------------+-------------+-------------+----------+-----------+---------+----------+----------+------------+------+-------+-------------------+---------------------+----------+--------+---------------+------------------+
| Name | IP | EditLogPort | HttpPort | QueryPort | RpcPort | Role | IsMaster | ClusterId | Join | Alive | ReplayedJournalId | LastHeartbeat | IsHelper | ErrMsg | Version | CurrentConnected |
+--------------------------------+-------------+-------------+----------+-----------+---------+----------+----------+------------+------+-------+-------------------+---------------------+----------+--------+---------------+------------------+
| 172.19.0.12_9010_1654681464955 | 172.19.0.12 | 9010 | 8030 | 9030 | 9020 | FOLLOWER | true | 1690644599 | true | true | 381106 | 2022-06-22 18:13:34 | true | | trunk-440ad03 | Yes |
+--------------------------------+-------------+-------------+----------+-----------+---------+----------+----------+------------+------+-------+-------------------+---------------------+----------+--------+---------------+------------------+
1 row in set (0.01 sec)
將BE節(jié)點加入到集群中:
mysql>alter system add backend "172.19.0.12:9050";
這里是你自己的IP地址
查看BE:
mysql> show backends;
+-----------+-----------------+-------------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------------------+
| BackendId | Cluster | IP | HeartbeatPort | BePort | HttpPort | BrpcPort | LastStartTime | LastHeartbeat | Alive | SystemDecommissioned | ClusterDecommissioned | TabletNum | DataUsedCapacity | AvailCapacity | TotalCapacity | UsedPct | MaxDiskUsedPct | Tag | ErrMsg | Version | Status |
+-----------+-----------------+-------------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------------------+
| 10002 | default_cluster | 172.19.0.12 | 9050 | 9060 | 8040 | 8060 | 2022-06-22 12:51:58 | 2022-06-22 18:15:34 | true | false | false | 4369 | 328.686 MB | 144.083 GB | 196.735 GB | 26.76 % | 26.76 % | {"location" : "default"} | | trunk-440ad03 | {"lastSuccessReportTabletsTime":"2022-06-22 18:15:05","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false} |
+-----------+-----------------+-------------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
Doris 單機版安裝完成。
五、Flink 安裝配置
5.1 下載安裝 Flink 1.14.4
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.5-bin-scala_2.12.tgz
tar zxvf flink-1.14.4-bin-scala_2.12.tgz
需要將下面的依賴拷貝到 Flink 安裝目錄下的 lib 目錄下,具體的依賴的 lib 文件如下:
wget https://jiafeng-1308700295.cos.ap-hongkong.myqcloud.com/flink-doris-connector-1.14_2.12-1.0.0-SNAPSHOT.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
啟動 Flink:
bin/start-cluster.sh
啟動后的界面如下:

六、開始同步數(shù)據(jù)到 Doris
6.1 創(chuàng)建 Doris 數(shù)據(jù)庫及表
create database demo;
use demo;
CREATE TABLE all_employees_info (
emp_no int NOT NULL,
birth_date date,
first_name varchar(20),
last_name varchar(20),
gender char(2),
hire_date date,
database_name varchar(50),
table_name varchar(200)
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
6.2 進入 Flink SQL Client
bin/sql-client.sh embedded

開啟 Checkpoint,每隔 10 秒做一次 Checkpoint
Checkpoint 默認是不開啟的,我們需要開啟 Checkpoint 提交事務。
Source 在啟動時會掃描全表,將表按照主鍵分成多個 Chunk。并使用增量快照算法逐個讀取每個 Chunk 的數(shù)據(jù)。作業(yè)會周期性執(zhí)行 Checkpoint,記錄下已經完成的 Chunk。當發(fā)生 Failover 時,只需要繼續(xù)讀取未完成的 Chunk。當 Chunk 全部讀取完后,會從之前獲取的 Binlog 位點讀取增量的變更記錄。Flink 作業(yè)會繼續(xù)周期性執(zhí)行 Checkpoint,記錄下 Binlog 位點,當作業(yè)發(fā)生 Failover,便會從之前記錄的 Binlog 位點繼續(xù)處理,從而實現(xiàn) Exactly Once 語義。
SET execution.checkpointing.interval = 10s;
注意:這里是演示,生產環(huán)境建議 Checkpoint 間隔 60 秒。
6.3 創(chuàng)建 MySQL CDC 表
在 Flink SQL Client 下執(zhí)行下面的 SQL
CREATE TABLE employees_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
emp_no int NOT NULL,
birth_date date,
first_name STRING,
last_name STRING,
gender STRING,
hire_date date,
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'MyNewPass4!',
'database-name' = 'emp_[0-9]+',
'table-name' = 'employees_[0-9]+'
);
- 'database-name' = 'emp_[0-9]+':這里是使用了正則表達式,同時連接多個庫
- 'table-name' = 'employees_[0-9]+':這里是使用了正則表達式,同時連接多個表
查詢 CDC 表,我們可以看到下面的數(shù)據(jù),標識一切正常
select * from employees_source limit 10;

6.4 創(chuàng)建 Doris Sink 表
CREATE TABLE cdc_doris_sink (
emp_no int ,
birth_date STRING,
first_name STRING,
last_name STRING,
gender STRING,
hire_date STRING,
database_name STRING,
table_name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '172.19.0.12:8030',
'table.identifier' = 'demo.all_employees_info',
'username' = 'root',
'password' = '',
'sink.properties.two_phase_commit'='true',
'sink.label-prefix'='doris_demo_emp_001'
);
參數(shù)說明:
- connector :指定連接器是 Doris
- fenodes:Doris FE 節(jié)點 IP 地址及 HTTP Port
- table.identifier :Doris 對應的數(shù)據(jù)庫及表名
- username:Doris 用戶名
- password:Doris 用戶密碼
- sink.properties.two_phase_commit:指定使用兩階段提交,這樣在 Stream load 的時候,會在 Http header 里加上 two_phase_commit:true ,不然會失敗
- sink.label-prefix :這個是在兩階段提交的時候必須要加的一個參數(shù),才能保證兩端數(shù)據(jù)一致性,否則會失敗
- 其他參數(shù)參考官方文檔:
- https://doris.apache.org/zh-CN/docs/ecosystem/flink-doris-connector.html
這個時候查詢 Doris sink 表是沒有數(shù)據(jù)的
select * from cdc_doris_sink;

6.5 將數(shù)據(jù)插入到 Doris 表里
執(zhí)行下面的 SQL:
insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name)
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employees_source;
然后我們可以看到 Flink WEB UI 上的任務運行信息

這里我們可以看看 TaskManager 的日志信息,會發(fā)現(xiàn)這里是使用兩階段提交的,而且數(shù)據(jù)是通過 Http chunked 方式不斷朝 BE 端進行傳輸?shù)模钡?Checkpoint,才會停止。
Checkpoint 完成后會繼續(xù)下一個任務的提交。
2022-06-22 19:04:08,321 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
"TxnId": 6963,
"Label": "doris_demo_001_0_1",
"TwoPhaseCommit": "true",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 40,
"NumberLoadedRows": 40,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 35721,
"LoadTimeMs": 9046,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 0,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 9041,
"CommitAndPublishTimeMs": 0
}
....
2022-06-22 19:04:18,310 INFO org.apache.doris.flink.sink.writer.DorisStreamLoad [] - load Result {
"TxnId": 6964,
"Label": "doris_demo_001_0_2",
"TwoPhaseCommit": "true",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 0,
"NumberLoadedRows": 0,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 0,
"LoadTimeMs": 9988,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 0,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 9983,
"CommitAndPublishTimeMs": 0
}
2022-06-22 19:04:18,310 INFO org.apache.doris.flink.sink.writer.RecordBuffer [] - start buffer data, read queue size 0, write queue size 3
6.6 查詢 Doris 數(shù)據(jù)
這里插入了 636 條數(shù)
mysql> select count(1) from all_employees_info ;
+----------+
| count(1) |
+----------+
| 634 |
+----------+
1 row in set (0.01 sec)
mysql> select * from all_employees_info limit 20;
+--------+------------+------------+-------------+--------+------------+---------------+-------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date | database_name | table_name |
+--------+------------+------------+-------------+--------+------------+---------------+-------------+
| 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 | emp_1 | employees_1 |
| 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 | emp_1 | employees_1 |
| 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 | emp_1 | employees_1 |
| 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 | emp_1 | employees_1 |
| 10005 | 1955-01-21 | Kyoichi | Maliniak | M | 1989-09-12 | emp_1 | employees_1 |
| 10006 | 1953-04-20 | Anneke | Preusig | F | 1989-06-02 | emp_1 | employees_1 |
| 10007 | 1957-05-23 | Tzvetan | Zielinski | F | 1989-02-10 | emp_1 | employees_1 |
| 10008 | 1958-02-19 | Saniya | Kalloufi | M | 1994-09-15 | emp_1 | employees_1 |
| 10009 | 1952-04-19 | Sumant | Peac | F | 1985-02-18 | emp_1 | employees_1 |
| 10010 | 1963-06-01 | Duangkaew | Piveteau | F | 1989-08-24 | emp_1 | employees_1 |
| 10011 | 1953-11-07 | Mary | Sluis | F | 1990-01-22 | emp_1 | employees_1 |
| 10012 | 1960-10-04 | Patricio | Bridgland | M | 1992-12-18 | emp_1 | employees_1 |
| 10013 | 1963-06-07 | Eberhardt | Terkki | M | 1985-10-20 | emp_1 | employees_1 |
| 10014 | 1956-02-12 | Berni | Genin | M | 1987-03-11 | emp_1 | employees_1 |
| 10015 | 1959-08-19 | Guoxiang | Nooteboom | M | 1987-07-02 | emp_1 | employees_1 |
| 10016 | 1961-05-02 | Kazuhito | Cappelletti | M | 1995-01-27 | emp_1 | employees_1 |
| 10017 | 1958-07-06 | Cristinel | Bouloucos | F | 1993-08-03 | emp_1 | employees_1 |
| 10018 | 1954-06-19 | Kazuhide | Peha | F | 1987-04-03 | emp_1 | employees_1 |
| 10019 | 1953-01-23 | Lillian | Haddadi | M | 1999-04-30 | emp_1 | employees_1 |
| 10020 | 1952-12-24 | Mayuko | Warwick | M | 1991-01-26 | emp_1 | employees_1 |
+--------+------------+------------+-------------+--------+------------+---------------+-------------+
20 rows in set (0.00 sec)
6.7 測試刪除
mysql> use emp_2;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+-----------------+
| Tables_in_emp_2 |
+-----------------+
| employees_1 |
| employees_2 |
+-----------------+
2 rows in set (0.00 sec)
mysql> delete from employees_2 where emp_no in (12013,12014,12015);
Query OK, 3 rows affected (0.01 sec)
驗證 Doris 數(shù)據(jù)刪除
mysql> select count(1) from all_employees_info ;
+----------+
| count(1) |
+----------+
| 631 |
+----------+
1 row in set (0.01 sec)
七、總結
本文主要介紹了 Flink CDC 分庫分表怎么實時同步,以及其結合 Apache Doris Flink Connector 最新版本整合的 Flink 2PC 和 Doris Stream Load 2PC 的機制及整合原理、使用方法等。希望能給大家?guī)硪稽c幫助。
轉載|SelectDB 公眾號
作者|張家鋒