FlinkSQL 平臺(tái)

背景

由于公司內(nèi)部需求較多,并不想每次都寫(xiě)一個(gè) streaming 程序,故而開(kāi)始搭建 flinksql 平臺(tái),基于 jdk1.8,flink1.12.x

效果

傳一個(gè) sql 文件給 jar 包,然后 sql 文件內(nèi)的 sql 將自動(dòng)執(zhí)行

jar 包 vs web 界面

調(diào)研了基于 web 的 zeppline

  1. zeppline 設(shè)計(jì)的初衷其實(shí)是為了交互式分析
  2. 基于 zeppline rest api 與現(xiàn)有的監(jiān)控不兼容,需要修改現(xiàn)有監(jiān)控的代碼
  3. 雖然帶有 web 界面的對(duì)用戶(hù)很是友好,對(duì)于分析人員來(lái)說(shuō),是一個(gè)不錯(cuò)的選擇,但對(duì)于開(kāi)發(fā)人員來(lái)說(shuō),真正的線(xiàn)上長(zhǎng)時(shí)間的運(yùn)行程序,開(kāi)發(fā)成 HA 的 server 還是有必要的

基于以上 3 點(diǎn)最終選擇 jar 作為最終的方式

使用

  1. 將 sql 寫(xiě)入 xxx.sql 文件中,如
CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;


-- ExecutionCheckpointingOptions
set execution.checkpointing.mode=EXACTLY_ONCE;
set execution.checkpointing.timeout=30 min;--  30min
set execution.checkpointing.interval=1 min ; -- 1min
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;

-- ExecutionConfigOptions
set table.exec.state.ttl=1 day;  -- 1 day
set table.exec.mini-batch.enabled=true; -- enable mini-batch optimization
set table.exec.mini-batch.allow-latency=1 s; -- 1s
set table.exec.mini-batch.size=1000;
set table.exec.sink.not-null-enforcer=drop;

-- -- dadadadadada
CREATE TABLE orders
(
   status      int,
   courier_id  bigint,
   id          bigint,
   finish_time BIGINT
)
WITH (
   'connector' = 'kafka','topic' = 'canal_monitor_order',
   'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
   'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');

-- flink.partition-discovery.interval-millis;
CREATE TABLE infos
(
   info_index int,
   order_id   bigint
)
WITH (
   'connector' = 'kafka','topic' = 'canal_monitor_order',
   'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
   'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');


CREATE TABLE redisCache
(
   finishOrders BIGINT,
   courier_id   BIGINT,
   dayStr       String
)
WITH (
   'connector' = 'redis',
   'hostPort'='localhost:6400',
   'keyType'='hash',
   'keyTemplate'='test2_${courier_id}',
   'fieldTemplate'='${dayStr}',
   'valueNames'='finishOrders',
   'expireTime'='259200');

create view temp as
select o.courier_id,
      (CASE
           WHEN sum(infosMaxIndex.info_index) is null then 0
           else sum(infosMaxIndex.info_index) end) finishOrders,
      o.status,
      dayStr
from ((select courier_id,
             id,
             last_value(status)                             status,
             MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr
      from orders
      where status = 60
      group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) o
        left join (select max(info_index) info_index, order_id
                   from infos
                   group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_id
group by o.courier_id, o.status, dayStr;


INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;

  1. 將 flinksql-platform 打包并上傳至服務(wù)器
  2. 將必要的 connector jar 放入到相應(yīng)的目錄下
  3. 執(zhí)行,如
flink-1.12.0/bin/flink  run -p 3 -yt ./flinkjar/  -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar  -m yarn-cluster -ynm sqlDemo  -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql

其中
-C 添加 udfJar 等第三方 jar 包 -C 參數(shù)apply到了client端生成的JobGraph里,然后提交JobGraph來(lái)運(yùn)行的
-yt 目錄 將 udfJar 等第三方 jar 包提交到 TaskManager 上

總括

更詳細(xì)的內(nèi)容,請(qǐng)移步 flinksql-platform

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    王知無(wú)閱讀 3,339評(píng)論 2 11
  • 久違的晴天,家長(zhǎng)會(huì)。 家長(zhǎng)大會(huì)開(kāi)好到教室時(shí),離放學(xué)已經(jīng)沒(méi)多少時(shí)間了。班主任說(shuō)已經(jīng)安排了三個(gè)家長(zhǎng)分享經(jīng)驗(yàn)。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,868評(píng)論 16 22
  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友。感恩相遇!感恩不離不棄。 中午開(kāi)了第一次的黨會(huì),身份的轉(zhuǎn)變要...
    余生動(dòng)聽(tīng)閱讀 10,918評(píng)論 0 11
  • 可愛(ài)進(jìn)取,孤獨(dú)成精。努力飛翔,天堂翱翔。戰(zhàn)爭(zhēng)美好,孤獨(dú)進(jìn)取。膽大飛翔,成就輝煌。努力進(jìn)取,遙望,和諧家園??蓯?ài)游走...
    趙原野閱讀 3,549評(píng)論 1 1
  • 在妖界我有個(gè)名頭叫胡百曉,無(wú)論是何事,只要找到胡百曉即可有解決的辦法。因?yàn)槭侵缓偞蠹乙杂瀭饔灲形摇皟A城百曉”,...
    貓九0110閱讀 3,728評(píng)論 7 3

友情鏈接更多精彩內(nèi)容