flink教程-詳解flink 1.11中的新部署模式-Application模式

背景

目前對(duì)于flink來(lái)說(shuō),生產(chǎn)環(huán)境一般有兩個(gè)部署模式,一個(gè)是 session模式,一個(gè)是per job模式。

session模式

這種模式會(huì)預(yù)先在yarn或者或者k8s上啟動(dòng)一個(gè)flink集群,然后將任務(wù)提交到這個(gè)集群上,這種模式,集群中的任務(wù)使用相同的資源,如果某一個(gè)任務(wù)出現(xiàn)了問(wèn)題導(dǎo)致整個(gè)集群掛掉,那就得重啟集群中的所有任務(wù),這樣就會(huì)給集群造成很大的負(fù)面影響。

per job模式

考慮到集群的資源隔離情況,一般生產(chǎn)上的任務(wù)都會(huì)選擇per job模式,也就是每個(gè)任務(wù)啟動(dòng)一個(gè)flink集群,各個(gè)集群之間獨(dú)立運(yùn)行,互不影響,且每個(gè)集群可以設(shè)置獨(dú)立的配置。

per job模式的問(wèn)題

目前,對(duì)于per job模式,jar包的解析、生成JobGraph是在客戶(hù)端上執(zhí)行的,然后將生成的jobgraph提交到集群。很多公司都會(huì)有自己的實(shí)時(shí)計(jì)算平臺(tái),用戶(hù)可以使用這些平臺(tái)提交flink任務(wù),如果任務(wù)特別多的話(huà),那么這些生成JobGraph、提交到集群的操作都會(huì)在實(shí)時(shí)平臺(tái)所在的機(jī)器上執(zhí)行,那么將會(huì)給服務(wù)器造成很大的壓力。

此外這種模式提交任務(wù)的時(shí)候會(huì)把本地flink的所有jar包先上傳到hdfs上相應(yīng) 的臨時(shí)目錄,這個(gè)也會(huì)帶來(lái)大量的網(wǎng)絡(luò)的開(kāi)銷(xiāo),所以如果任務(wù)特別多的情況下,平臺(tái)的吞吐量將會(huì)直線(xiàn)下降。

引入application模式

所以針對(duì)flink per job模式的一些問(wèn)題,flink 引入了一個(gè)新的部署模式--Application模式。 目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式會(huì)在客戶(hù)端將運(yùn)行任務(wù)需要的依賴(lài)都上傳到 Flink Master,然后在 Master 端進(jìn)行任務(wù)的提交。

此外,還支持遠(yuǎn)程的用戶(hù)jar包來(lái)提交任務(wù),比如可以將jar放到hdfs上,進(jìn)一步減少上傳jar所需的時(shí)間,從而減少部署作業(yè)的時(shí)間。

具體的使用命令是:

/bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost/flink/libs" \
hdfs://localhost/user-jars/HelloWold.jar

通過(guò)程序提交任務(wù)

當(dāng)我們要做一個(gè)實(shí)時(shí)計(jì)算平臺(tái)的時(shí)候,會(huì)需要通過(guò)程序來(lái)提交任務(wù)到集群,這時(shí)候需要我們自己封裝一套API來(lái)實(shí)現(xiàn)提交flink任務(wù)到集群,目前主要的生產(chǎn)環(huán)境還是以yarn居多,所以我們今天講講怎么通過(guò)api的方式把一個(gè)任務(wù)以application的方法提交到y(tǒng)arn集群。

  • 引入相關(guān)的配置到classpath里
    core-site.xml
    hdfs-site.xml
    yarn-site.xml

  • 定義相關(guān)的配置參數(shù)

        //flink的本地配置目錄,為了得到flink的配置
        String configurationDirectory = "/Users/user/work/flink/conf/";
        //存放flink集群相關(guān)的jar包目錄
        String flinkLibs = "hdfs://hadoopcluster/data/flink/libs";
        //用戶(hù)jar
        String userJarPath = "hdfs://hadoopcluster/data/flink/user-lib/TopSpeedWindowing.jar";
        String flinkDistJar = "hdfs://hadoopcluster/data/flink/libs/flink-yarn_2.11-1.11.0.jar";
  • 獲取flink的配置

這里其實(shí)還可以設(shè)置很多的配置參數(shù),比如yarn的隊(duì)列名字等等,大家根據(jù)自己的需要來(lái)設(shè)置。

//      獲取flink的配置
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory);
                
        //設(shè)置為application模式
        flinkConfiguration.set(
                DeploymentOptions.TARGET,
                YarnDeploymentTarget.APPLICATION.getName());
        //yarn application name
        flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobName");
        
        .........
  • 設(shè)置用戶(hù)jar的參數(shù)和主類(lèi)
//      設(shè)置用戶(hù)jar的參數(shù)和主類(lèi)
        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
  • 提交任務(wù)到集群
        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConfiguration,
                yarnConfiguration,
                yarnClient,
                clusterInformationRetriever,
                true);
        ClusterClientProvider<ApplicationId> clusterClientProvider = null;
        try {
            clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                    clusterSpecification,
                    appConfig);
        } catch (ClusterDeploymentException e){
            e.printStackTrace();
        }

完整代碼請(qǐng)參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/SubmitJobApplicationMode.java

Application模式源碼解析

通過(guò)上面提交的腳本我們看到入口是從flink bin目錄下flink命令開(kāi)始的,我們看下這個(gè)文件的最后一行代碼,也就是提交任務(wù)的入口類(lèi):org.apache.flink.client.cli.CliFrontend,接下來(lái)我們基于flink 1.11的源碼簡(jiǎn)單梳理一下flink是如何把一個(gè)任務(wù)提交到y(tǒng)arn集群的。

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

入口

在CliFrontend的main方法里,我們看到做了這么幾件事。

  1. 獲取flink的配置目錄
  2. 加載flink的配置
  3. 加載并解析命令行參數(shù)
  4. 通過(guò)CliFrontend.parseParameters方法來(lái)執(zhí)行具體的操作
        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
            configuration,
            configurationDirectory);

        try {
            final CliFrontend cli = new CliFrontend(
                configuration,
                customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            int retCode = SecurityUtils.getInstalledContext()
                    .runSecured(() -> cli.parseParameters(args));
            System.exit(retCode);
        }

執(zhí)行具體的操作

在parseParameters方法里,解析出來(lái)要執(zhí)行的操作,然后通過(guò)一個(gè)switch來(lái)進(jìn)入要執(zhí)行的方法,我們這里是進(jìn)入runApplication方法。

            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                 ..........
            }        

runApplication方法

在這個(gè)方法里,主要是用傳進(jìn)來(lái)的命令行參數(shù)構(gòu)造出來(lái)flink的配置對(duì)象Configuration,以及application模式所需的配置ApplicationConfiguration,包括入口類(lèi),jar包參數(shù),最后

   // 用傳進(jìn)來(lái)的命令行參數(shù)構(gòu)造出來(lái)flink的配置對(duì)象Configuration
    final Configuration effectiveConfiguration = getEffectiveConfiguration(
                activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
                
        //構(gòu)造包含入口類(lèi)和jar包參數(shù)的配置ApplicationConfiguration
        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
                
        deployer.run(effectiveConfiguration, applicationConfiguration);

構(gòu)造ClusterDescriptor

上面的方法會(huì)進(jìn)入ApplicationClusterDeployer的run方法,在這里會(huì)根據(jù)配置使用工廠(chǎng)類(lèi)構(gòu)造不同的ClusterDescriptor,比如是k8s的話(huà)會(huì)構(gòu)造KubernetesClusterDescriptor,部署在yarn的話(huà)會(huì)構(gòu)造YarnClusterDescriptor。之后會(huì)通過(guò)deployApplicationCluster來(lái)部署application模式的flink程序。

Deploy Application Cluster

我們這里以yarn集群為例,進(jìn)入YarnClusterDescriptor#deployApplicationCluster方法,在這個(gè)方法里,我們看到經(jīng)過(guò)一些簡(jiǎn)單的檢查之后,調(diào)用了private方法YarnClusterDescriptor#deployInternal,這個(gè)deployInternal是一個(gè)提供公共功能的方法,可以看下其他的部署模式,yarn session模式,per job模式,都是調(diào)用的這個(gè)方法,只是參數(shù)不同而已。

我們簡(jiǎn)單看下這個(gè)方法:

    /**
     * This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
     *
     * @param clusterSpecification 一些配置參數(shù)
     * @param applicationName yarn job的名字
     * @param yarnClusterEntrypoint 入口類(lèi)
     * @param jobGraph 程序的jobGraph,可為空
     * @param detached 是否是隔離模式
     */
    private ClusterClientProvider<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached) throws Exception {

在這個(gè)方法里,將會(huì)根據(jù)不同的部署模式做一些必要的檢查,然后啟動(dòng)yarn容器的操作。比如per job模式,上傳flink jar包等等,都是在這個(gè)方法完成的。此外,該方法會(huì)一直阻塞到ApplicationMaster/JobManager部署成功,之后會(huì)進(jìn)入用戶(hù)程序的入口類(lèi)ApplicationClusterEntryPoint來(lái)執(zhí)行用戶(hù)程序。

ApplicationClusterEntryPoint

yarn組件啟動(dòng)完成之后,開(kāi)始執(zhí)行用戶(hù)的程序,在這個(gè)類(lèi)里,會(huì)做以下的一些工作:

  • 下載必要的jar或者resources
  • 進(jìn)行l(wèi)eader選舉,決定誰(shuí)執(zhí) main 方法
  • 用戶(hù)程序退出時(shí)終止集群
  • 保證HA和容錯(cuò)

application模式提交任務(wù)到y(tǒng)arn集群,大概的流程就先講到這里,flink任務(wù)執(zhí)行的流程,后續(xù)再寫(xiě)篇文章專(zhuān)門(mén)介紹。

更多精彩信息,歡迎關(guān)注我的公眾號(hào)【大數(shù)據(jù)技術(shù)與應(yīng)用實(shí)戰(zhàn)】

logo.jpg

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容