背景
目前對(duì)于flink來(lái)說(shuō),生產(chǎn)環(huán)境一般有兩個(gè)部署模式,一個(gè)是 session模式,一個(gè)是per job模式。
session模式
這種模式會(huì)預(yù)先在yarn或者或者k8s上啟動(dòng)一個(gè)flink集群,然后將任務(wù)提交到這個(gè)集群上,這種模式,集群中的任務(wù)使用相同的資源,如果某一個(gè)任務(wù)出現(xiàn)了問(wèn)題導(dǎo)致整個(gè)集群掛掉,那就得重啟集群中的所有任務(wù),這樣就會(huì)給集群造成很大的負(fù)面影響。
per job模式
考慮到集群的資源隔離情況,一般生產(chǎn)上的任務(wù)都會(huì)選擇per job模式,也就是每個(gè)任務(wù)啟動(dòng)一個(gè)flink集群,各個(gè)集群之間獨(dú)立運(yùn)行,互不影響,且每個(gè)集群可以設(shè)置獨(dú)立的配置。
per job模式的問(wèn)題
目前,對(duì)于per job模式,jar包的解析、生成JobGraph是在客戶(hù)端上執(zhí)行的,然后將生成的jobgraph提交到集群。很多公司都會(huì)有自己的實(shí)時(shí)計(jì)算平臺(tái),用戶(hù)可以使用這些平臺(tái)提交flink任務(wù),如果任務(wù)特別多的話(huà),那么這些生成JobGraph、提交到集群的操作都會(huì)在實(shí)時(shí)平臺(tái)所在的機(jī)器上執(zhí)行,那么將會(huì)給服務(wù)器造成很大的壓力。
此外這種模式提交任務(wù)的時(shí)候會(huì)把本地flink的所有jar包先上傳到hdfs上相應(yīng) 的臨時(shí)目錄,這個(gè)也會(huì)帶來(lái)大量的網(wǎng)絡(luò)的開(kāi)銷(xiāo),所以如果任務(wù)特別多的情況下,平臺(tái)的吞吐量將會(huì)直線(xiàn)下降。
引入application模式
所以針對(duì)flink per job模式的一些問(wèn)題,flink 引入了一個(gè)新的部署模式--Application模式。 目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式會(huì)在客戶(hù)端將運(yùn)行任務(wù)需要的依賴(lài)都上傳到 Flink Master,然后在 Master 端進(jìn)行任務(wù)的提交。
此外,還支持遠(yuǎn)程的用戶(hù)jar包來(lái)提交任務(wù),比如可以將jar放到hdfs上,進(jìn)一步減少上傳jar所需的時(shí)間,從而減少部署作業(yè)的時(shí)間。
具體的使用命令是:
/bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost/flink/libs" \
hdfs://localhost/user-jars/HelloWold.jar
通過(guò)程序提交任務(wù)
當(dāng)我們要做一個(gè)實(shí)時(shí)計(jì)算平臺(tái)的時(shí)候,會(huì)需要通過(guò)程序來(lái)提交任務(wù)到集群,這時(shí)候需要我們自己封裝一套API來(lái)實(shí)現(xiàn)提交flink任務(wù)到集群,目前主要的生產(chǎn)環(huán)境還是以yarn居多,所以我們今天講講怎么通過(guò)api的方式把一個(gè)任務(wù)以application的方法提交到y(tǒng)arn集群。
引入相關(guān)的配置到classpath里
core-site.xml
hdfs-site.xml
yarn-site.xml定義相關(guān)的配置參數(shù)
//flink的本地配置目錄,為了得到flink的配置
String configurationDirectory = "/Users/user/work/flink/conf/";
//存放flink集群相關(guān)的jar包目錄
String flinkLibs = "hdfs://hadoopcluster/data/flink/libs";
//用戶(hù)jar
String userJarPath = "hdfs://hadoopcluster/data/flink/user-lib/TopSpeedWindowing.jar";
String flinkDistJar = "hdfs://hadoopcluster/data/flink/libs/flink-yarn_2.11-1.11.0.jar";
- 獲取flink的配置
這里其實(shí)還可以設(shè)置很多的配置參數(shù),比如yarn的隊(duì)列名字等等,大家根據(jù)自己的需要來(lái)設(shè)置。
// 獲取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory);
//設(shè)置為application模式
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName());
//yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobName");
.........
- 設(shè)置用戶(hù)jar的參數(shù)和主類(lèi)
// 設(shè)置用戶(hù)jar的參數(shù)和主類(lèi)
ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
- 提交任務(wù)到集群
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
yarnConfiguration,
yarnClient,
clusterInformationRetriever,
true);
ClusterClientProvider<ApplicationId> clusterClientProvider = null;
try {
clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);
} catch (ClusterDeploymentException e){
e.printStackTrace();
}
完整代碼請(qǐng)參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/SubmitJobApplicationMode.java
Application模式源碼解析
通過(guò)上面提交的腳本我們看到入口是從flink bin目錄下flink命令開(kāi)始的,我們看下這個(gè)文件的最后一行代碼,也就是提交任務(wù)的入口類(lèi):org.apache.flink.client.cli.CliFrontend,接下來(lái)我們基于flink 1.11的源碼簡(jiǎn)單梳理一下flink是如何把一個(gè)任務(wù)提交到y(tǒng)arn集群的。
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
入口
在CliFrontend的main方法里,我們看到做了這么幾件事。
- 獲取flink的配置目錄
- 加載flink的配置
- 加載并解析命令行參數(shù)
- 通過(guò)CliFrontend.parseParameters方法來(lái)執(zhí)行具體的操作
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
執(zhí)行具體的操作
在parseParameters方法里,解析出來(lái)要執(zhí)行的操作,然后通過(guò)一個(gè)switch來(lái)進(jìn)入要執(zhí)行的方法,我們這里是進(jìn)入runApplication方法。
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
..........
}
runApplication方法
在這個(gè)方法里,主要是用傳進(jìn)來(lái)的命令行參數(shù)構(gòu)造出來(lái)flink的配置對(duì)象Configuration,以及application模式所需的配置ApplicationConfiguration,包括入口類(lèi),jar包參數(shù),最后
// 用傳進(jìn)來(lái)的命令行參數(shù)構(gòu)造出來(lái)flink的配置對(duì)象Configuration
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
//構(gòu)造包含入口類(lèi)和jar包參數(shù)的配置ApplicationConfiguration
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
構(gòu)造ClusterDescriptor
上面的方法會(huì)進(jìn)入ApplicationClusterDeployer的run方法,在這里會(huì)根據(jù)配置使用工廠(chǎng)類(lèi)構(gòu)造不同的ClusterDescriptor,比如是k8s的話(huà)會(huì)構(gòu)造KubernetesClusterDescriptor,部署在yarn的話(huà)會(huì)構(gòu)造YarnClusterDescriptor。之后會(huì)通過(guò)deployApplicationCluster來(lái)部署application模式的flink程序。
Deploy Application Cluster
我們這里以yarn集群為例,進(jìn)入YarnClusterDescriptor#deployApplicationCluster方法,在這個(gè)方法里,我們看到經(jīng)過(guò)一些簡(jiǎn)單的檢查之后,調(diào)用了private方法YarnClusterDescriptor#deployInternal,這個(gè)deployInternal是一個(gè)提供公共功能的方法,可以看下其他的部署模式,yarn session模式,per job模式,都是調(diào)用的這個(gè)方法,只是參數(shù)不同而已。
我們簡(jiǎn)單看下這個(gè)方法:
/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
* @param clusterSpecification 一些配置參數(shù)
* @param applicationName yarn job的名字
* @param yarnClusterEntrypoint 入口類(lèi)
* @param jobGraph 程序的jobGraph,可為空
* @param detached 是否是隔離模式
*/
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
在這個(gè)方法里,將會(huì)根據(jù)不同的部署模式做一些必要的檢查,然后啟動(dòng)yarn容器的操作。比如per job模式,上傳flink jar包等等,都是在這個(gè)方法完成的。此外,該方法會(huì)一直阻塞到ApplicationMaster/JobManager部署成功,之后會(huì)進(jìn)入用戶(hù)程序的入口類(lèi)ApplicationClusterEntryPoint來(lái)執(zhí)行用戶(hù)程序。
ApplicationClusterEntryPoint
yarn組件啟動(dòng)完成之后,開(kāi)始執(zhí)行用戶(hù)的程序,在這個(gè)類(lèi)里,會(huì)做以下的一些工作:
- 下載必要的jar或者resources
- 進(jìn)行l(wèi)eader選舉,決定誰(shuí)執(zhí) main 方法
- 用戶(hù)程序退出時(shí)終止集群
- 保證HA和容錯(cuò)
application模式提交任務(wù)到y(tǒng)arn集群,大概的流程就先講到這里,flink任務(wù)執(zhí)行的流程,后續(xù)再寫(xiě)篇文章專(zhuān)門(mén)介紹。
更多精彩信息,歡迎關(guān)注我的公眾號(hào)【大數(shù)據(jù)技術(shù)與應(yīng)用實(shí)戰(zhàn)】
