前言
本篇從Hudi角度介紹Flink常用的幾種讀寫操作。內(nèi)容主要來源于官網(wǎng)例子和個人理解。
讀者如果想了解從編譯、部署到使用的步驟,請參考:Flink 使用之 Hudi 編譯部署、配置和使用
Flink讀
參考鏈接:
Table & Query Types | Apache Hudi
SQL Queries | Apache Hudi
控制讀類型的配置項為hoodie.datasource.query.type,具有如下三個值:
- snapshot:快照讀,是默認(rèn)的行為,獲取最新版本的全量數(shù)據(jù)。從行存儲和列存儲中獲取最新的數(shù)據(jù)版本返回給用戶。
- incremental:增量讀。讀取某一段時間范圍內(nèi)的數(shù)據(jù)。用戶如果配置了commit時間范圍(起或者止,也可以兩者均有),自動啟動增量讀模式,無需用戶顯式去配置此配置項。
- read_optimized:讀優(yōu)化。僅基于列存儲獲取最新版本的數(shù)據(jù)。對于COW表而言獲取的是最新數(shù)據(jù),對于MOR表而言,行存儲(log文件)中的數(shù)據(jù)會被忽略掉。直到compaction將log中的數(shù)據(jù)合并到列存儲的時候,新變更的數(shù)據(jù)才能夠被查詢到。讀優(yōu)化模式讀取數(shù)據(jù)的耗時要比快照讀模式少。
Snapshot Query
默認(rèn)的查詢方式,查詢表中已存在的全量數(shù)據(jù)。如果是MOR表,查詢的時候不僅考慮parquet列存儲的數(shù)據(jù),還會合并avro log中的增量數(shù)據(jù)??梢詥⒂胐ata skipping加快讀取速度。啟用data skipping需要配置如下參數(shù):
metadata.enabled = true
hoodie.metadata.index.column.stats.enable = true
read.data.skipping.enabled = true
此外還有一個配置項hoodie.metadata.index.column.stats.column.list,表示哪些字段的統(tǒng)計信息需要索引。如果不配置,默認(rèn)為索引所有的字段。
Streaming Query
流模式增量讀取。和批模式讀取不同的是,流模式讀取會一直持續(xù)不斷的返回實時更新的結(jié)果。而批模式則在查詢結(jié)束之后退出,往后再更新的數(shù)據(jù),只有再次執(zhí)行批模式讀取的時候才能夠獲取到。
啟用流模式讀取需要配置read.streaming.enabled為true。
和流模式讀取相關(guān)的配置項還有:
- read.start-commit:從哪個commit開始讀取。需要配置commit的時間戳,格式為
yyyyMMddHHmmss。還可以配置為earliest,意為最早的commit。默認(rèn)值為最近一次的commit(即lastest commit)。 - read.streaming.skip_compaction:是否跳過compaction instant。跳過compaction instant的目的是防止數(shù)據(jù)重復(fù)和在開啟changelog模式的情況下,保持其語義的正確性。
增量讀取
增量讀取字面上理解是只讀取部分時間范圍內(nèi)的數(shù)據(jù)??膳浜狭髂J阶x取使用,讀取新攝入的數(shù)據(jù)。也可以配合批模式查詢,指定起(read.start-commit)止(read.end-commit)時間范圍,查詢某時間范圍內(nèi)的數(shù)據(jù)歷史值。也可以只指定read.end-commit,查詢某個時間點之前的歷史值,實現(xiàn)時間穿梭查詢。需要注意的是時間范圍是閉區(qū)間。
前面提及的配置項的解釋如下:
- read.start-commit:從哪個commit開始讀取。需要配置commit的時間戳,格式為
yyyyMMddHHmmss。還可以配置為earliest,意為最早的commit。默認(rèn)值為最近一次的commit(即lastest commit)。 - read.end-commit:讀取截止到哪個commit,配置格式,默認(rèn)值與
read.start-commit相同。
CDC Query
Hudi 0.14.1版本開始支持。下游可以查詢到數(shù)據(jù)的變更信息,更新前的數(shù)據(jù)和更新之后的數(shù)據(jù)。適用于Flink流式增量讀取。CDC讀取需要配置'cdc.enabled' = 'true'。
使用例子:
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = 'file:///tmp/hudi_table',
'table.type' = 'COPY_ON_WRITE',
'cdc.enabled' = 'true' -- this option enable the cdc log enabled
);
-- 插入初始數(shù)據(jù)
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-- 設(shè)置為批運行模式
SET 'execution.runtime-mode' = 'batch';
此時打開另一個Flink SQL client,執(zhí)行:
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///hudi_table',
'table.type' = 'COPY_ON_WRITE',
'cdc.enabled' = 'true',
'read.streaming.enabled'='true'
);
-- 設(shè)置執(zhí)行模式為流模式
SET 'execution.runtime-mode' = 'streaming';
-- 使用流模式查詢hudi_table表
select * from hudi_table;
在此窗口等待執(zhí)行結(jié)果。
然后在第一個窗口中執(zhí)行:
UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
在第二個窗口中可觀察到CDC查詢結(jié)果。
Flink寫
參考鏈接:Using Flink | Apache Hudi
Bulk insert
Bulk insert用戶將來自其他數(shù)據(jù)源的初始數(shù)據(jù)導(dǎo)入到Hudi表中。避免index bootstrap步驟。適合將初始數(shù)據(jù)導(dǎo)入到Hudi中,是效率最高的寫入方式。
需要注意的是:
bulk_insert可以減少數(shù)據(jù)序列化以及合并操作,于此同時,該數(shù)據(jù)寫入方式會跳過數(shù)據(jù)去重,所以用戶需要保證數(shù)據(jù)的唯一性。
bulk_insert在批量寫入模式中更有效率。默認(rèn)情況下,批量執(zhí)行模式按照partition path對輸入記錄進(jìn)行排序,然后將這些記錄寫入Hudi,該方式可以避免頻繁切換文件句柄,導(dǎo)致寫入性能下降。
bulk_insert通過write.tasks參數(shù)指定并行度,并行度影響小文件的數(shù)量。
和bulk insert相關(guān)的配置項有:
| 參數(shù)名稱 | 是否必須 | 默認(rèn)值 | 參數(shù)說明 |
|---|---|---|---|
write.operation |
true |
upsert |
設(shè)置為 bulk_insert 開啟bulk insert |
write.tasks |
false |
4 |
bulk_insert 并行度對應(yīng)寫入的文件數(shù),但是有最大文件大小限制。實際寫入的文件個數(shù) >= write.bucket_assign.tasks
|
write.bulk_insert.shuffle_input |
false |
true |
寫入前是否根據(jù)分區(qū)字段進(jìn)行數(shù)據(jù)重分布。 啟用此選項將減少小文件的數(shù)量,可能導(dǎo)致數(shù)據(jù)傾斜 |
write.bulk_insert.sort_input |
false |
true |
寫入前是否根據(jù)分區(qū)字段對數(shù)據(jù)進(jìn)行排序。 啟用此選項將在寫任務(wù)寫多個分區(qū)時減少小文件的數(shù)量 |
write.bulk_insert.sort_input.by_record_key |
false |
false |
寫入前是否根據(jù)record_key字段對數(shù)據(jù)進(jìn)行排序。 |
write.sort.memory |
false |
128 |
排序算子可用托管內(nèi)存。 默認(rèn)為 128 MB |
使用示例:
create table hudi.call_center
with (
'connector' = 'hudi',
'path' = 'hdfs:///call_center',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'bulk_insert',
'hoodie.datasource.write.recordkey.field' = 'cc_call_center_sk'
)
like call_center;
Index bootstrap
載入表中存量數(shù)據(jù)的索引到Flink的state中,保存于checkpoint。通俗來說是為現(xiàn)有的Hudi表建立起索引,加快后面upsert的速度。
相關(guān)的配置參數(shù):
| 參數(shù)名稱 | 是否必須 | 默認(rèn)值 | 參數(shù)說明 |
|---|---|---|---|
index.bootstrap.enabled |
true |
false |
當(dāng)啟用index bootstrap功能時,將Hudi表中的記錄全部加載到Flink state中 |
index.partition.regex |
false |
* |
優(yōu)化參數(shù),設(shè)置正則表達(dá)式來過濾分區(qū)。 默認(rèn)所有分區(qū)都加載到Flink狀態(tài) |
write.index_bootstrap.tasks |
false |
和write.tasks值相同 |
決定index bootstrap過程的task數(shù)量,影響index bootstrap的速度 |
使用方法:
-
CREATE TABLE創(chuàng)建與Hudi表,table.type必須正確配置。 - 設(shè)置
index.bootstrap.enabled = true啟用index bootstrap功能。 - 在
flink-conf.yaml文件中啟用 checkpoint,設(shè)置execution.checkpointing.tolerable-failed-checkpoints = n(取決于Flink checkpoint執(zhí)行時間)。因為index bootstrap時間可能會很長,只有在index bootstrap完成的時候,checkpoint才能夠成功。 - 等待checkpoint第一次成功,預(yù)示著index bootstrap完成。
- 在index bootstrap完成后,用戶可以退出并保存savepoint。
- 重啟任務(wù),設(shè)置
index.bootstrap.enable為false。
注意事項:
- Index bootstrap過程是阻塞的,期間無法完成checkpoint。
- Index bootstrap由輸入數(shù)據(jù)觸發(fā)。 用戶需要確保每個分區(qū)中至少有一條記錄。
- Index bootstrap并發(fā)執(zhí)行。用戶可以在日志文件中查找
finish loading the index under partition以及Load record form file相關(guān)字眼,跟蹤index bootstrap的進(jìn)度。 - Checkpoint第一次成功表明index bootstrap已完成。 如果從checkpoint恢復(fù),不需要重復(fù)索引過程。
Changelog模式
Hudi可以記錄數(shù)據(jù)的中間狀態(tài)(I / -U / U / D) ,類似于Flink的changelog stream。Hudi MOR表以行的形式存儲,支持保留變更狀態(tài)信息。
啟用changelog模式需要在表中開啟changelog.enabled=true配置項。開啟之后數(shù)據(jù)變更的中間結(jié)果都會被保留下來。
注意:
- 批量讀方式任然會合并中間結(jié)果,無論是否啟用changelog。
- 啟用changelog模式Hudi也只是盡力去保留中間變更數(shù)據(jù)。異步壓縮會將changelog數(shù)據(jù)合并為最終結(jié)果。所以說如果數(shù)據(jù)沒有被及時消費掉,那么這條數(shù)據(jù)只能讀取到它的最終狀態(tài)。為了緩解這種情況,可以配合設(shè)置
compaction.delta_commits和/或compaction.delta_seconds,讓compaction間隔時間加大,從而增加中間變更數(shù)據(jù)的保留時間。clean.retain_commits可以控制clean操作保留最近多少個commit。保留的commit個數(shù)越多,changelog記錄保留的時間越長,容許下游延遲消費的能力越強(qiáng)。
Upsert和Insert
Upsert和Insert都是插入數(shù)據(jù)操作。不同之處在于upsert比insert多了數(shù)據(jù)去重的功能。Upsert在插入數(shù)據(jù)之前會查詢已存在的數(shù)據(jù)在哪個數(shù)據(jù)文件(根據(jù)表schema定義的primary key字段去確認(rèn)數(shù)據(jù)是否存在),然后去更新這條數(shù)據(jù)。Insert操作沒有查詢已存在數(shù)據(jù)和更新數(shù)據(jù)的行為,直接將新數(shù)據(jù)插入,因此會出現(xiàn)重復(fù)數(shù)據(jù)。Insert相比upsert操作速度更快。
Upsert和Insert使用write.operation配置項控制。默認(rèn)為upsert。
下面的例子我們修改默認(rèn)的數(shù)據(jù)寫入操作為insert:
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///hudi_table',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'insert'
);
-- 使用insert模式插入uuid字段重復(fù)的數(shù)據(jù),不會更新已有數(shù)據(jù),uuid重復(fù)的數(shù)據(jù)會插入
insert into hudi_table values(...);
-- 查詢時發(fā)現(xiàn)新插入的uuid重復(fù)的數(shù)據(jù)和原有數(shù)據(jù)可以共存
select * from hudi_table;
對于Insert模式,MOR表使用小文件合并策略,新數(shù)據(jù)優(yōu)先追加在最小的log文件中;COW表會直接寫入parquet文件,不適用任何的小文件合并策略。如果我們需要在COW insert模式啟動小文件和并策略的話,需要設(shè)置write.insert.cluster為true,啟用inline clustering。