Hudi 使用之Flink讀寫

前言

本篇從Hudi角度介紹Flink常用的幾種讀寫操作。內(nèi)容主要來源于官網(wǎng)例子和個人理解。

讀者如果想了解從編譯、部署到使用的步驟,請參考:Flink 使用之 Hudi 編譯部署、配置和使用

Flink讀

參考鏈接:
Table & Query Types | Apache Hudi
SQL Queries | Apache Hudi

控制讀類型的配置項為hoodie.datasource.query.type,具有如下三個值:

  • snapshot:快照讀,是默認(rèn)的行為,獲取最新版本的全量數(shù)據(jù)。從行存儲和列存儲中獲取最新的數(shù)據(jù)版本返回給用戶。
  • incremental:增量讀。讀取某一段時間范圍內(nèi)的數(shù)據(jù)。用戶如果配置了commit時間范圍(起或者止,也可以兩者均有),自動啟動增量讀模式,無需用戶顯式去配置此配置項。
  • read_optimized:讀優(yōu)化。僅基于列存儲獲取最新版本的數(shù)據(jù)。對于COW表而言獲取的是最新數(shù)據(jù),對于MOR表而言,行存儲(log文件)中的數(shù)據(jù)會被忽略掉。直到compaction將log中的數(shù)據(jù)合并到列存儲的時候,新變更的數(shù)據(jù)才能夠被查詢到。讀優(yōu)化模式讀取數(shù)據(jù)的耗時要比快照讀模式少。

Snapshot Query

默認(rèn)的查詢方式,查詢表中已存在的全量數(shù)據(jù)。如果是MOR表,查詢的時候不僅考慮parquet列存儲的數(shù)據(jù),還會合并avro log中的增量數(shù)據(jù)??梢詥⒂胐ata skipping加快讀取速度。啟用data skipping需要配置如下參數(shù):

metadata.enabled = true
hoodie.metadata.index.column.stats.enable = true
read.data.skipping.enabled = true

此外還有一個配置項hoodie.metadata.index.column.stats.column.list,表示哪些字段的統(tǒng)計信息需要索引。如果不配置,默認(rèn)為索引所有的字段。

Streaming Query

流模式增量讀取。和批模式讀取不同的是,流模式讀取會一直持續(xù)不斷的返回實時更新的結(jié)果。而批模式則在查詢結(jié)束之后退出,往后再更新的數(shù)據(jù),只有再次執(zhí)行批模式讀取的時候才能夠獲取到。
啟用流模式讀取需要配置read.streaming.enabledtrue
和流模式讀取相關(guān)的配置項還有:

  • read.start-commit:從哪個commit開始讀取。需要配置commit的時間戳,格式為yyyyMMddHHmmss。還可以配置為earliest,意為最早的commit。默認(rèn)值為最近一次的commit(即lastest commit)。
  • read.streaming.skip_compaction:是否跳過compaction instant。跳過compaction instant的目的是防止數(shù)據(jù)重復(fù)和在開啟changelog模式的情況下,保持其語義的正確性。

增量讀取

增量讀取字面上理解是只讀取部分時間范圍內(nèi)的數(shù)據(jù)??膳浜狭髂J阶x取使用,讀取新攝入的數(shù)據(jù)。也可以配合批模式查詢,指定起(read.start-commit)止(read.end-commit)時間范圍,查詢某時間范圍內(nèi)的數(shù)據(jù)歷史值。也可以只指定read.end-commit,查詢某個時間點之前的歷史值,實現(xiàn)時間穿梭查詢。需要注意的是時間范圍是閉區(qū)間。
前面提及的配置項的解釋如下:

  • read.start-commit:從哪個commit開始讀取。需要配置commit的時間戳,格式為yyyyMMddHHmmss。還可以配置為earliest,意為最早的commit。默認(rèn)值為最近一次的commit(即lastest commit)。
  • read.end-commit:讀取截止到哪個commit,配置格式,默認(rèn)值與read.start-commit相同。

CDC Query

Hudi 0.14.1版本開始支持。下游可以查詢到數(shù)據(jù)的變更信息,更新前的數(shù)據(jù)和更新之后的數(shù)據(jù)。適用于Flink流式增量讀取。CDC讀取需要配置'cdc.enabled' = 'true'
使用例子:

CREATE TABLE hudi_table(  
ts BIGINT,  
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,  
rider VARCHAR(20),  
driver VARCHAR(20),  
fare DOUBLE,  
city VARCHAR(20)  
)  
PARTITIONED BY (`city`)  
WITH (  
'connector' = 'hudi',  
'path' = 'file:///tmp/hudi_table',  
'table.type' = 'COPY_ON_WRITE',  
'cdc.enabled' = 'true' -- this option enable the cdc log enabled  
);
-- 插入初始數(shù)據(jù)
INSERT INTO hudi_table  
VALUES  
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),  
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),  
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),  
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),  
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),  
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),  
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),  
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-- 設(shè)置為批運行模式
SET 'execution.runtime-mode' = 'batch';  

此時打開另一個Flink SQL client,執(zhí)行:

CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi_table',
  'table.type' = 'COPY_ON_WRITE',
  'cdc.enabled' = 'true',
  'read.streaming.enabled'='true'
);
-- 設(shè)置執(zhí)行模式為流模式
SET 'execution.runtime-mode' = 'streaming';  
-- 使用流模式查詢hudi_table表
select * from hudi_table;

在此窗口等待執(zhí)行結(jié)果。
然后在第一個窗口中執(zhí)行:

UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';

在第二個窗口中可觀察到CDC查詢結(jié)果。

Flink寫

參考鏈接:Using Flink | Apache Hudi

Bulk insert

Bulk insert用戶將來自其他數(shù)據(jù)源的初始數(shù)據(jù)導(dǎo)入到Hudi表中。避免index bootstrap步驟。適合將初始數(shù)據(jù)導(dǎo)入到Hudi中,是效率最高的寫入方式。
需要注意的是:
bulk_insert可以減少數(shù)據(jù)序列化以及合并操作,于此同時,該數(shù)據(jù)寫入方式會跳過數(shù)據(jù)去重,所以用戶需要保證數(shù)據(jù)的唯一性。
bulk_insert在批量寫入模式中更有效率。默認(rèn)情況下,批量執(zhí)行模式按照partition path對輸入記錄進(jìn)行排序,然后將這些記錄寫入Hudi,該方式可以避免頻繁切換文件句柄,導(dǎo)致寫入性能下降。
bulk_insert通過write.tasks參數(shù)指定并行度,并行度影響小文件的數(shù)量。

和bulk insert相關(guān)的配置項有:

參數(shù)名稱 是否必須 默認(rèn)值 參數(shù)說明
write.operation true upsert 設(shè)置為 bulk_insert 開啟bulk insert
write.tasks false 4 bulk_insert 并行度對應(yīng)寫入的文件數(shù),但是有最大文件大小限制。實際寫入的文件個數(shù) >= write.bucket_assign.tasks
write.bulk_insert.shuffle_input false true 寫入前是否根據(jù)分區(qū)字段進(jìn)行數(shù)據(jù)重分布。 啟用此選項將減少小文件的數(shù)量,可能導(dǎo)致數(shù)據(jù)傾斜
write.bulk_insert.sort_input false true 寫入前是否根據(jù)分區(qū)字段對數(shù)據(jù)進(jìn)行排序。 啟用此選項將在寫任務(wù)寫多個分區(qū)時減少小文件的數(shù)量
write.bulk_insert.sort_input.by_record_key false false 寫入前是否根據(jù)record_key字段對數(shù)據(jù)進(jìn)行排序。
write.sort.memory false 128 排序算子可用托管內(nèi)存。 默認(rèn)為 128 MB

使用示例:

create table hudi.call_center  
    with (  
        'connector' = 'hudi',  
        'path' = 'hdfs:///call_center',  
        'table.type' = 'COPY_ON_WRITE',  
        'write.operation' = 'bulk_insert',  
        'hoodie.datasource.write.recordkey.field' = 'cc_call_center_sk'  
        )  
    like call_center;

Index bootstrap

載入表中存量數(shù)據(jù)的索引到Flink的state中,保存于checkpoint。通俗來說是為現(xiàn)有的Hudi表建立起索引,加快后面upsert的速度。

相關(guān)的配置參數(shù):

參數(shù)名稱 是否必須 默認(rèn)值 參數(shù)說明
index.bootstrap.enabled true false 當(dāng)啟用index bootstrap功能時,將Hudi表中的記錄全部加載到Flink state中
index.partition.regex false * 優(yōu)化參數(shù),設(shè)置正則表達(dá)式來過濾分區(qū)。 默認(rèn)所有分區(qū)都加載到Flink狀態(tài)
write.index_bootstrap.tasks false write.tasks值相同 決定index bootstrap過程的task數(shù)量,影響index bootstrap的速度

使用方法:

  1. CREATE TABLE創(chuàng)建與Hudi表,table.type必須正確配置。
  2. 設(shè)置index.bootstrap.enabled = true啟用index bootstrap功能。
  3. flink-conf.yaml文件中啟用 checkpoint,設(shè)置execution.checkpointing.tolerable-failed-checkpoints = n(取決于Flink checkpoint執(zhí)行時間)。因為index bootstrap時間可能會很長,只有在index bootstrap完成的時候,checkpoint才能夠成功。
  4. 等待checkpoint第一次成功,預(yù)示著index bootstrap完成。
  5. 在index bootstrap完成后,用戶可以退出并保存savepoint。
  6. 重啟任務(wù),設(shè)置index.bootstrap.enablefalse。

注意事項:

  1. Index bootstrap過程是阻塞的,期間無法完成checkpoint。
  2. Index bootstrap由輸入數(shù)據(jù)觸發(fā)。 用戶需要確保每個分區(qū)中至少有一條記錄。
  3. Index bootstrap并發(fā)執(zhí)行。用戶可以在日志文件中查找finish loading the index under partition以及Load record form file相關(guān)字眼,跟蹤index bootstrap的進(jìn)度。
  4. Checkpoint第一次成功表明index bootstrap已完成。 如果從checkpoint恢復(fù),不需要重復(fù)索引過程。

Changelog模式

Hudi可以記錄數(shù)據(jù)的中間狀態(tài)(I / -U / U / D) ,類似于Flink的changelog stream。Hudi MOR表以行的形式存儲,支持保留變更狀態(tài)信息。
啟用changelog模式需要在表中開啟changelog.enabled=true配置項。開啟之后數(shù)據(jù)變更的中間結(jié)果都會被保留下來。
注意:

  1. 批量讀方式任然會合并中間結(jié)果,無論是否啟用changelog。
  2. 啟用changelog模式Hudi也只是盡力去保留中間變更數(shù)據(jù)。異步壓縮會將changelog數(shù)據(jù)合并為最終結(jié)果。所以說如果數(shù)據(jù)沒有被及時消費掉,那么這條數(shù)據(jù)只能讀取到它的最終狀態(tài)。為了緩解這種情況,可以配合設(shè)置compaction.delta_commits 和/或 compaction.delta_seconds,讓compaction間隔時間加大,從而增加中間變更數(shù)據(jù)的保留時間。clean.retain_commits可以控制clean操作保留最近多少個commit。保留的commit個數(shù)越多,changelog記錄保留的時間越長,容許下游延遲消費的能力越強(qiáng)。

Upsert和Insert

Upsert和Insert都是插入數(shù)據(jù)操作。不同之處在于upsert比insert多了數(shù)據(jù)去重的功能。Upsert在插入數(shù)據(jù)之前會查詢已存在的數(shù)據(jù)在哪個數(shù)據(jù)文件(根據(jù)表schema定義的primary key字段去確認(rèn)數(shù)據(jù)是否存在),然后去更新這條數(shù)據(jù)。Insert操作沒有查詢已存在數(shù)據(jù)和更新數(shù)據(jù)的行為,直接將新數(shù)據(jù)插入,因此會出現(xiàn)重復(fù)數(shù)據(jù)。Insert相比upsert操作速度更快。
Upsert和Insert使用write.operation配置項控制。默認(rèn)為upsert。
下面的例子我們修改默認(rèn)的數(shù)據(jù)寫入操作為insert:

CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi_table',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation' = 'insert'
);
-- 使用insert模式插入uuid字段重復(fù)的數(shù)據(jù),不會更新已有數(shù)據(jù),uuid重復(fù)的數(shù)據(jù)會插入
insert into hudi_table values(...);
-- 查詢時發(fā)現(xiàn)新插入的uuid重復(fù)的數(shù)據(jù)和原有數(shù)據(jù)可以共存
select * from hudi_table;

對于Insert模式,MOR表使用小文件合并策略,新數(shù)據(jù)優(yōu)先追加在最小的log文件中;COW表會直接寫入parquet文件,不適用任何的小文件合并策略。如果我們需要在COW insert模式啟動小文件和并策略的話,需要設(shè)置write.insert.clustertrue,啟用inline clustering。

參考鏈接:Using Flink | Apache Hudi

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容