flink配置

腳本

-c,--class <classname> 程序的入口(main method or getplan()).只有在jar程序的manifest中沒有指定class

-m,--jobmanager <host:port> 在哪運行yarn-cluster

-C,--classpath <url> 代碼路徑

-p,--parallelism <parallelism> 并行度

-ynm,--yarnname <arg> 設置application的名字

-yjm,--yarnjobManagerMemory <arg> JobManager Container的內存

-ytm,--yarntaskManagerMemory <arg> TaskManager Container的內存

-s,--fromSavepoint <savepointPath> savepoint保存的地方,路徑需寫到chk-某個數(shù)

-yn,--yarncontainer <arg> Number of YARN container to allocate(=Number of Task Managers)

-ys,--yarnslots <arg> Number of slots per TaskManager

-yq -yD env.java.opts.taskmanager="-Dsun.stdout.encoding=utf-8"

代碼中配置

//狀態(tài)管理器MemoryStateBackend,FsStateBackend,RocksDBStateBackend后倆需要指定路徑

env.setStateBackend(stateBackend);

//設置保存間隔,每 1000ms 開始一次 checkpoint

env.enableCheckpointing(1000);

//exactly-ance 和 at-least-once 語義選擇,設置模式為精確一次 (這是默認值)

env.enableCheckpointing(10,EXACTLY_ONCE);

//checkpoint最小時間間隔

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

//Checkpoint 超時時間,Checkpoint 必須在一分鐘內完成,否則就會被拋棄

env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

//最大并行執(zhí)行的檢查點數(shù)量,默認是一個

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//開啟在 job 中止后仍然保留的 externalized checkpoints

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//當checkpoint出現(xiàn)錯誤時是否關閉應用,默認是true,我們可以手動設置為false

env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

//默認的重啟策略是:固定延遲無限重啟

//此處設置重啟策略為:出現(xiàn)異常重啟1次,隔5秒一次

bsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(5)));

//設置任務處理的時間,事件時間,注入時間,處理時間

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容