1.序篇
廢話不多說,咱們先直接上本文的目錄和結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來什么幫助:
- 背景及應(yīng)用場景介紹:博主期望你能了解到,flink sql 提供了輕松訪問外部存儲的 lookup join(與上節(jié)不同,上節(jié)說的是流與流的 join)。lookup join 可以簡單理解為使用 flatmap 訪問外部存儲數(shù)據(jù)然后將維度字段拼接到當(dāng)前這條數(shù)據(jù)上面
- 來一個實(shí)戰(zhàn)案例:博主以曝光用戶日志流關(guān)聯(lián)用戶畫像(年齡、性別)維表為例介紹 lookup join 應(yīng)該達(dá)到的關(guān)聯(lián)的預(yù)期效果。
-
flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實(shí)戰(zhàn)案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時間的,并且 lookup join 經(jīng)常會由于訪問外部存儲的 qps 過高而導(dǎo)致背壓,產(chǎn)出延遲等性能問題。我們可以借鑒在 DataStream api 中的維表 join 優(yōu)化思路在 flink sql 使用
local cache,異步訪問維表,批量訪問維表三種方式去解決性能問題。 -
總結(jié)及展望:官方并沒有提供
批量訪問維表的能力,因此博主自己實(shí)現(xiàn)了一套,具體使用方式和原理實(shí)現(xiàn)敬請期待下篇文章。
2.背景及應(yīng)用場景介紹
維表作為 sql 任務(wù)中一種常見表的類型,其本質(zhì)就是關(guān)聯(lián)表數(shù)據(jù)的額外數(shù)據(jù)屬性,通常在 join 語句中進(jìn)行使用。比如源數(shù)據(jù)有人的 id,你現(xiàn)在想要得到人的性別、年齡,那么可以通過用戶 id 去關(guān)聯(lián)人的性別、年齡,就可以得到更全的數(shù)據(jù)。
維表 join 在離線數(shù)倉中是最常見的一種數(shù)據(jù)處理方式了,在實(shí)時數(shù)倉的場景中,flink sql 目前也支持了維表的 join,即 lookup join,生產(chǎn)環(huán)境可以用 mysql,redis,hbase 來作為高速維表存儲引擎。
Notes:
在實(shí)時數(shù)倉中,常用實(shí)時維表有兩種更新頻率
- 實(shí)時的更新:維度信息是實(shí)時新建的,實(shí)時寫入到高速存儲引擎中。然后其他實(shí)時任務(wù)在做處理時實(shí)時的關(guān)聯(lián)這些維度信息。
- 周期性的更新:對于一些緩慢變化維度,比如年齡、性別的用戶畫像等,幾萬年都不變化一次的東西??,實(shí)時維表的更新可以是小時級別,天級別的。
3.來一個實(shí)戰(zhàn)案例
來看看在具體場景下,對應(yīng)輸入值的輸出值應(yīng)該長啥樣。
需求指標(biāo):使用曝光用戶日志流(show_log)關(guān)聯(lián)用戶畫像維表(user_profile)關(guān)聯(lián)到用戶的維度之后,提供給下游計(jì)算分性別,年齡段的曝光用戶數(shù)使用。此處我們只關(guān)心關(guān)聯(lián)維表這一部分的輸入輸出數(shù)據(jù)。
來一波輸入數(shù)據(jù):
曝光用戶日志流(show_log)數(shù)據(jù)(數(shù)據(jù)存儲在 kafka 中):
| log_id | timestamp | user_id |
|---|---|---|
| 1 | 2021-11-01 00:01:03 | a |
| 2 | 2021-11-01 00:03:00 | b |
| 3 | 2021-11-01 00:05:00 | c |
| 4 | 2021-11-01 00:06:00 | b |
| 5 | 2021-11-01 00:07:00 | c |
用戶畫像維表(user_profile)數(shù)據(jù)(數(shù)據(jù)存儲在 redis 中):
| user_id(主鍵) | age | sex |
|---|---|---|
| a | 12-18 | 男 |
| b | 18-24 | 女 |
| c | 18-24 | 男 |
注意:redis 中的數(shù)據(jù)結(jié)構(gòu)存儲是按照 key,value 去存儲的。其中 key 為 user_id,value 為 age,sex 的 json。如下圖所示:
user_profile redis
預(yù)期輸出數(shù)據(jù)如下:
| log_id | timestamp | user_id | age | sex |
|---|---|---|---|---|
| 1 | 2021-11-01 00:01:03 | a | 12-18 | 男 |
| 2 | 2021-11-01 00:03:00 | b | 18-24 | 女 |
| 3 | 2021-11-01 00:05:00 | c | 18-24 | 男 |
| 4 | 2021-11-01 00:06:00 | b | 18-24 | 女 |
| 5 | 2021-11-01 00:07:00 | c | 18-24 | 男 |
flink sql lookup join 登場。下面是官網(wǎng)的鏈接。
4.flink sql lookup join
4.1.lookup join 定義
以上述案例來說,lookup join 其實(shí)簡單理解來,就是每來一條數(shù)據(jù)去 redis 里面摟一次數(shù)據(jù)。然后把關(guān)聯(lián)到的維度數(shù)據(jù)給拼接到當(dāng)前數(shù)據(jù)中。
熟悉 DataStream api 的小伙伴萌,簡單來理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中處理每一條來的數(shù)據(jù),針對每一條數(shù)據(jù)去訪問用戶畫像的 redis。(實(shí)際上,flink sql api 中也確實(shí)是這樣實(shí)現(xiàn)的!sql 生成的 lookup join 代碼就是繼承了 flatmap)
4.2.上述案例解決方案
來看看上述案例的 flink sql lookup join sql 怎么寫:
CREATE TABLE show_log ( log_id BIGINT, `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)), user_id STRING, proctime AS PROCTIME())WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.user_id.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10');CREATE TABLE user_profile ( user_id STRING, age STRING, sex STRING ) WITH ( 'connector' = 'redis', 'hostname' = '127.0.0.1', 'port' = '6379', 'format' = 'json', 'lookup.cache.max-rows' = '500', 'lookup.cache.ttl' = '3600', 'lookup.max-retries' = '1');CREATE TABLE sink_table ( log_id BIGINT, `timestamp` TIMESTAMP(3), user_id STRING, proctime TIMESTAMP(3), age STRING, sex STRING) WITH ( 'connector' = 'print');-- lookup join 的 query 邏輯INSERT INTO sink_tableSELECT s.log_id as log_id , s.`timestamp` as `timestamp` , s.user_id as user_id , s.proctime as proctime , u.sex as sex , u.age as ageFROM show_log AS sLEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS uON s.user_id = u.user_id
這里使用了 for SYSTEM_TIME as of 時態(tài)表的語法來作為維表關(guān)聯(lián)的標(biāo)識語法。
Notes:
實(shí)時的 lookup 維表關(guān)聯(lián)能使用處理時間去做關(guān)聯(lián)。
運(yùn)行結(jié)果如下:
| log_id | timestamp | user_id | age | sex |
|---|---|---|---|---|
| 1 | 2021-11-01 00:01:03 | a | 12-18 | 男 |
| 2 | 2021-11-01 00:03:00 | b | 18-24 | 女 |
| 3 | 2021-11-01 00:05:00 | c | 18-24 | 男 |
| 4 | 2021-11-01 00:06:00 | b | 18-24 | 女 |
| 5 | 2021-11-01 00:07:00 | c | 18-24 | 男 |
flink web ui 算子圖如下:
flink web ui
但是?。。〉牵。?!但是?。?!
flink 官方并沒有提供 redis 的維表 connector 實(shí)現(xiàn)。
沒錯,博主自己實(shí)現(xiàn)了一套。關(guān)于 redis 維表的 connector 實(shí)現(xiàn),直接參考下面的文章。都是可以從 github 上找到源碼拿來用的!
flink sql 知其所以然(二)| 自定義 redis 數(shù)據(jù)維表(附源碼)
4.3.關(guān)于維表使用的一些注意事項(xiàng)
- 同一條數(shù)據(jù)關(guān)聯(lián)到的維度數(shù)據(jù)可能不同:實(shí)時數(shù)倉中常用的實(shí)時維表都是在不斷的變化中的,當(dāng)前流表數(shù)據(jù)關(guān)聯(lián)完維表數(shù)據(jù)后,如果同一個 key 的維表的數(shù)據(jù)發(fā)生了變化,已關(guān)聯(lián)到的維表的結(jié)果數(shù)據(jù)不會再同步更新。舉個例子,維表中 user_id 為 1 的數(shù)據(jù)在 08:00 時 age 由 12-18 變?yōu)榱?18-24,那么當(dāng)我們的任務(wù)在 08:01 failover 之后從 07:59 開始回溯數(shù)據(jù)時,原本應(yīng)該關(guān)聯(lián)到 12-18 的數(shù)據(jù)會關(guān)聯(lián)到 18-24 的 age 數(shù)據(jù)。這是有可能會影響數(shù)據(jù)質(zhì)量的。所以小伙伴萌在評估你們的實(shí)時任務(wù)時要考慮到這一點(diǎn)。
- 會發(fā)生實(shí)時的新建及更新的維表博主建議小伙伴萌應(yīng)該建立起數(shù)據(jù)延遲的監(jiān)控機(jī)制,防止出現(xiàn)流表數(shù)據(jù)先于維表數(shù)據(jù)到達(dá),導(dǎo)致關(guān)聯(lián)不到維表數(shù)據(jù)
4.4.再說說維表常見的性能問題及優(yōu)化思路
所有的維表性能問題都可以總結(jié)為:高 qps 下訪問維表存儲引擎產(chǎn)生的任務(wù)背壓,數(shù)據(jù)產(chǎn)出延遲問題。
舉個例子:
-
在沒有使用維表的情況下:一條數(shù)據(jù)從輸入 flink 任務(wù)到輸出 flink 任務(wù)的時延假如為
0.1 ms,那么并行度為1的任務(wù)的吞吐可以達(dá)到1 query / 0.1 ms = 1w qps。 -
在使用維表之后:每條數(shù)據(jù)訪問維表的外部存儲的時長為
2 ms,那么一條數(shù)據(jù)從輸入 flink 任務(wù)到輸出 flink 任務(wù)的時延就會變成2.1 ms,那么同樣并行度為1的任務(wù)的吞吐只能達(dá)到1 query / 2.1 ms = 476 qps。兩者的吞吐量相差21 倍。
這就是為什么維表 join 的算子會產(chǎn)生背壓,任務(wù)產(chǎn)出會延遲。
那么當(dāng)然,解決方案也是有很多的。拋開 flink sql 想一下,如果我們使用 DataStream api,甚至是在做一個后端應(yīng)用,需要訪問外部存儲時,常用的優(yōu)化方案有哪些?這里列舉一下:
- 按照 redis 維表的 key 分桶 + local cache:通過按照 key 分桶的方式,讓大多數(shù)據(jù)的維表關(guān)聯(lián)的數(shù)據(jù)訪問走之前訪問過得 local cache 即可。這樣就可以把訪問外部存儲 2.1 ms 處理一個 query 變?yōu)樵L問內(nèi)存的 0.1 ms 處理一個 query 的時長。
-
異步訪問外存:DataStream api 有異步算子,可以利用線程池去同時多次請求維表外部存儲。這樣就可以把 2.1 ms 處理 1 個 query 變?yōu)?2.1 ms 處理 10 個 query。吞吐可變優(yōu)化到
10 / 2.1 ms = 4761 qps。 -
批量訪問外存:除了異步訪問之外,我們還可以批量訪問外部存儲。舉一個例子:在訪問 redis 維表的 1 query 占用 2.1 ms 時長中,其中可能有 2 ms 都是在網(wǎng)絡(luò)請求上面的耗時 ,其中只有 0.1 ms 是 redis server 處理請求的時長。那么我們就可以使用 redis 提供的 pipeline 能力,在客戶端(也就是 flink 任務(wù) lookup join 算子中),攢一批數(shù)據(jù),使用 pipeline 去同時訪問 redis sever。這樣就可以把 2.1 ms 處理 1 個 query 變?yōu)?7ms(2ms + 50 * 0.1ms) 處理 50 個 query。吞吐可變?yōu)?
50 query / 7 ms = 7143 qps。博主這里測試了下使用 redis pipeline 和未使用的時長消耗對比。如下圖所示。
redis pipeline
博主認(rèn)為上述優(yōu)化效果中,最好用的是 1 + 3,2 相比 3 還是一條一條發(fā)請求,性能會差一些。
既然 DataStream 可以這樣做,flink sql 必須必的也可以借鑒上面的這些優(yōu)化方案。具體怎么操作呢?看下文騷操作
4.5.lookup join 的具體性能優(yōu)化方案
- 按照 redis 維表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做訪問 redis 處理,并且 udaf 產(chǎn)出的結(jié)果只能是一條,所以這種實(shí)現(xiàn)起來非常復(fù)雜。我們選擇不做 keyby 分桶。但是我們可以直接使用 local cache 去做本地緩存,雖然【直接緩存】的效果比【先按照 key 分桶再做緩存】的效果差,但是也能一定程度上減少訪問 redis 壓力。在博主實(shí)現(xiàn)的 redis connector 中,內(nèi)置了 local cache 的實(shí)現(xiàn),小伙伴萌可以參考下面這部篇文章進(jìn)行配置。
-
異步訪問外存:目前博主實(shí)現(xiàn)的 redis connector 不支持異步訪問,但是官方實(shí)現(xiàn)的 hbase connector 支持這個功能,參考下面鏈接文章的,點(diǎn)開之后搜索
lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/ - 批量訪問外存:這玩意官方必然沒有實(shí)現(xiàn)啊,但是,但是,但是,經(jīng)過博主周末兩天的瘋狂 debug,改了改源碼,搞定了基于 redis 的批量訪問外存優(yōu)化的功能。
4.6.基于 redis connector 的批量訪問機(jī)制優(yōu)化
先描述一下大概是個什么東西,具體怎么用。
你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.mode 為 true,sql 不用做任何改動的情況下,flink lookup join 算子會自動優(yōu)化,優(yōu)化效果如下:
lookup join 算子的每個 task 上,每攢夠 30 條數(shù)據(jù) or 每隔五秒(處理時間) 去觸發(fā)一次批量訪問 redis 的請求,使用的是 jedis client 的 pipeline 功能訪問 redis server。實(shí)測性能有很大提升。
關(guān)于這個批量訪問機(jī)制的優(yōu)化介紹和使用方式介紹,小伙伴們先別急,下篇文章會詳細(xì)介紹到。
5.總結(jié)與展望
本文主要介紹了 flink sql lookup join 的使用方式,并介紹了一些經(jīng)常出現(xiàn)的性能問題以及優(yōu)化思路,總結(jié)如下:
- 背景及應(yīng)用場景介紹:博主期望你能了解到,flink sql 提供了輕松訪問外部存儲的 lookup join(與上節(jié)不同,上節(jié)說的是流與流的 join)。lookup join 可以簡單理解為使用 flatmap 訪問外部存儲數(shù)據(jù)然后將維度字段拼接到當(dāng)前這條數(shù)據(jù)上面
- 來一個實(shí)戰(zhàn)案例:博主以曝光用戶日志流關(guān)聯(lián)用戶畫像(年齡、性別)維表為例介紹 lookup join 應(yīng)該達(dá)到的關(guān)聯(lián)的預(yù)期效果。
-
flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實(shí)戰(zhàn)案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時間的,并且 lookup join 經(jīng)常會由于訪問外部存儲的 qps 過高而導(dǎo)致背壓,產(chǎn)出延遲等性能問題。我們可以借鑒在 DataStream api 中的維表 join 優(yōu)化思路在 flink sql 使用
local cache,異步訪問維表,批量訪問維表三種方式去解決性能問題。 -
總結(jié)及展望:官方并沒有提供
批量訪問維表的能力,因此博主自己實(shí)現(xiàn)了一套,具體使用方式和原理實(shí)現(xiàn)敬請期待下篇文章。