背景
由于公司內(nèi)部需求較多,并不想每次都寫(xiě)一個(gè) streaming 程序,故而開(kāi)始搭建 flinksql 平臺(tái),基于 jdk1.8,flink1.12.x
效果
傳一個(gè) sql 文件給 jar 包,然后 sql 文件內(nèi)的 sql 將自動(dòng)執(zhí)行
jar 包 vs web 界面
調(diào)研了基于 web 的 zeppline
- zeppline 設(shè)計(jì)的初衷其實(shí)是為了交互式分析
- 基于 zeppline rest api 與現(xiàn)有的監(jiān)控不兼容,需要修改現(xiàn)有監(jiān)控的代碼
- 雖然帶有 web 界面的對(duì)用戶(hù)很是友好,對(duì)于分析人員來(lái)說(shuō),是一個(gè)不錯(cuò)的選擇,但對(duì)于開(kāi)發(fā)人員來(lái)說(shuō),真正的線(xiàn)上長(zhǎng)時(shí)間的運(yùn)行程序,開(kāi)發(fā)成 HA 的 server 還是有必要的
基于以上 3 點(diǎn)最終選擇 jar 作為最終的方式
使用
- 將 sql 寫(xiě)入 xxx.sql 文件中,如
CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;
-- ExecutionCheckpointingOptions
set execution.checkpointing.mode=EXACTLY_ONCE;
set execution.checkpointing.timeout=30 min;-- 30min
set execution.checkpointing.interval=1 min ; -- 1min
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
-- ExecutionConfigOptions
set table.exec.state.ttl=1 day; -- 1 day
set table.exec.mini-batch.enabled=true; -- enable mini-batch optimization
set table.exec.mini-batch.allow-latency=1 s; -- 1s
set table.exec.mini-batch.size=1000;
set table.exec.sink.not-null-enforcer=drop;
-- -- dadadadadada
CREATE TABLE orders
(
status int,
courier_id bigint,
id bigint,
finish_time BIGINT
)
WITH (
'connector' = 'kafka','topic' = 'canal_monitor_order',
'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');
-- flink.partition-discovery.interval-millis;
CREATE TABLE infos
(
info_index int,
order_id bigint
)
WITH (
'connector' = 'kafka','topic' = 'canal_monitor_order',
'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');
CREATE TABLE redisCache
(
finishOrders BIGINT,
courier_id BIGINT,
dayStr String
)
WITH (
'connector' = 'redis',
'hostPort'='localhost:6400',
'keyType'='hash',
'keyTemplate'='test2_${courier_id}',
'fieldTemplate'='${dayStr}',
'valueNames'='finishOrders',
'expireTime'='259200');
create view temp as
select o.courier_id,
(CASE
WHEN sum(infosMaxIndex.info_index) is null then 0
else sum(infosMaxIndex.info_index) end) finishOrders,
o.status,
dayStr
from ((select courier_id,
id,
last_value(status) status,
MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr
from orders
where status = 60
group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) o
left join (select max(info_index) info_index, order_id
from infos
group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_id
group by o.courier_id, o.status, dayStr;
INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;
- 將 flinksql-platform 打包并上傳至服務(wù)器
- 將必要的 connector jar 放入到相應(yīng)的目錄下
- 執(zhí)行,如
flink-1.12.0/bin/flink run -p 3 -yt ./flinkjar/ -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar -m yarn-cluster -ynm sqlDemo -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql
其中
-C 添加 udfJar 等第三方 jar 包 -C 參數(shù)apply到了client端生成的JobGraph里,然后提交JobGraph來(lái)運(yùn)行的
-yt 目錄 將 udfJar 等第三方 jar 包提交到 TaskManager 上
總括
更詳細(xì)的內(nèi)容,請(qǐng)移步 flinksql-platform