01_A_flink集群部署 (standalone 、yarn)與job提交

flink常用的部署模式可能有如下幾種

  • standalone cluster模式
  • flink on yarn模式
  • flink on kubernetes模式
  • flink on Mesos
    本章主要介紹前兩種模式,以Centos6.8為例,選擇三臺(tái)機(jī)器(linux01、linux02、linux03)來(lái)搭建flink集群

1.standalone cluster模式

1.1安裝環(huán)境

  • Java 1.8.x或更高版本
  • ssh(集群節(jié)點(diǎn)之間配置互信,可以免密登錄)
    集群之間每臺(tái)節(jié)點(diǎn)的安裝結(jié)構(gòu)保持一致

1.2下載安裝

1.1.1版本選擇

可以根據(jù)對(duì)flink功能的選擇、hadoop的版本、scala的版本選擇合適的flink,這里我們選擇Apache Flink 1.7.2 with Hadoop? 2.7 for Scala 2.11版本的flink

如果機(jī)器不可訪問(wèn)外網(wǎng)則直接進(jìn)入flink下載頁(yè)下載合適版本

flink下載頁(yè)

如果可以訪問(wèn)外網(wǎng)則直接wget

cd /opt/soft
wget http://mirror.bit.edu.cn/apache/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.11.tgz

1.1.2安裝規(guī)劃

linux01 linux02 linux03
slave slave master

1.1.3安裝

在/opt/soft目錄下解壓文件

#解壓
tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz

#進(jìn)入目錄
cd flink-1.7.2

1.3集群配置

#進(jìn)入conf目錄
cd conf

#配置flink-conf.yaml
vim flink-conf.yaml
#修改如下屬性
jobmanager.rpc.address: linux03
env.java.home=/opt/soft/jdk1.8.0_144

#修改masters文件
vim masters
#添加主節(jié)點(diǎn)
linux03

#修改slaves文件
vim slaves
#添加從節(jié)點(diǎn)
linux01
linux02

常用配置參數(shù)列表如下

屬性 說(shuō)明 默認(rèn)值
jobmanager.rpc.address jobmanager地址 localhost
jobmanager.rpc.port jobmanager端口 6123
jobmanager.heap.size jobmanagerJVM堆內(nèi)存大小 1024m
taskmanager.heap.size taskmanagerJVM堆內(nèi)存大小 1024m
taskmanager.numberOfTaskSlots 每個(gè)taskmanager的slot數(shù)量,根據(jù)taskmanager所在節(jié)點(diǎn)的cpu數(shù)量決定 1
parallelism.default flink任務(wù)默認(rèn)的并行度 1
rest.port flink webui端口 8081
io.tmp.dirs flink中間計(jì)算結(jié)果的臨時(shí)存儲(chǔ)路徑 /tmp

官方配置參數(shù)

1.4同步flink

將linux01上配置好的flink推到linux02和linux03上

scp -r /opt/soft/flink-1.7.2 work@linux02:/opt/soft
scp -r /opt/soft/flink-1.7.2 work@linux03:/opt/soft

1.5啟動(dòng)集群

linux03節(jié)點(diǎn)啟動(dòng)集群

#在master節(jié)點(diǎn)啟動(dòng)集群
bin/start-cluster.sh

出現(xiàn)如下提示則說(shuō)明啟動(dòng)成功


啟動(dòng)成功

查看三臺(tái)節(jié)點(diǎn)linux01、linux02、linux03的java進(jìn)程
linux01


linux01

linux02
linux02

linux03


image.png

訪問(wèn)master節(jié)點(diǎn)的8081端口http://linux03:8081

flink-webui

1.6 提交測(cè)試

編寫(xiě)測(cè)試代碼如下,并編譯打包 在linux01節(jié)點(diǎn)提前執(zhí)行nc -lk 12345,不然程序報(bào)錯(cuò)

object SocketStream {
  def main(args: Array[String]): Unit = {


    //flink 監(jiān)控socket 端口  累計(jì)輸入單詞的次數(shù)   nc -lk 12345
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.socketTextStream("linux01",12345)
      .flatMap(line => line.split(" "))
      .map((_,1))
      .keyBy(0)
      .sum(1)

    stream.print()

    env.execute("socketWcJob")
  }
}

1.6.1 提交方式一:flink-webUI提交

在web界面選擇Submit new job,點(diǎn)擊Add new+,選擇對(duì)應(yīng)的jar


步驟一

點(diǎn)擊Upload


步驟二

勾選對(duì)應(yīng)的程序,填寫(xiě)對(duì)應(yīng)的參數(shù),點(diǎn)擊Submit
步驟三

這里可以看到對(duì)應(yīng)的job已經(jīng)啟動(dòng)


job啟動(dòng)

我們?cè)趌inux01節(jié)點(diǎn)事先啟動(dòng)好的socket連接輸入字符


socket words

然后在task manager中查看輸出


輸出

然后點(diǎn)擊Running Jobs,選擇對(duì)應(yīng)運(yùn)行job,點(diǎn)擊cancel停止任務(wù)
停止程序

1.6.2 提交方式二:命令行提交

將打包好的jar上傳至服務(wù)器

/opt/soft/flink-1.7.2/bin/flink run -c flinka.dstream.SocketStream ./flink-1.0-SNAPSHOT.jar

成功提交之后可以進(jìn)入webUI界面查看job運(yùn)行情況


成功提交

在Task Managers 查看運(yùn)行結(jié)果


運(yùn)行結(jié)果

停止程序 可以用命令行也可以在ui界面
執(zhí)行bin/flink list 查看job列表

執(zhí)行bin/flink cancel id 停止任務(wù)


停止任務(wù)

1.7增刪 / 啟停節(jié)點(diǎn)(JobManager、TaskManager)

可以使用bin/jobmanager.sh和bin/taskmanager.sh腳本為運(yùn)行中的集群添加JobManager和TaskManager實(shí)例

添加jobmanager:

bin/jobmanager.sh  ((start|start-foreground) cluster)  | stop | stop-all

添加taskmanager:

bin/taskmanager.sh start | start-foreground | stop | stop-all

在新增節(jié)點(diǎn)時(shí)需要更新配置文件
例如新增linux04 TaskManager,需要在集群的slaves文件中新增linux04
然后在linux04節(jié)點(diǎn)運(yùn)行

bin/taskmanager.sh start

到此standalone cluster模式配置成功

2.flink on yarn模式

2.1 yarn模式介紹

flink on yarn目前有兩種模式可供選擇

  • Yarn Session Model
  • Single Job Model
    兩者區(qū)別如下
    Yarn Session:會(huì)在yarn上長(zhǎng)時(shí)間啟動(dòng)一個(gè)flink session集群,用戶可以由命令行、api、web頁(yè)面將flink任務(wù)提交到flink集群,多個(gè)flink程序公用一個(gè)JobManager和TaskManager
    Single Job:與mapreduce任務(wù)類似,每一個(gè)flink程序作為一個(gè)application提交到y(tǒng)arn集群,且每個(gè)任務(wù)都有自己的JobManager和TaskManager,程序執(zhí)行完畢則釋放資源
    區(qū)別
Flink和YARN如何交互

1.上傳依賴jar和配置文件
2.向yarn申請(qǐng)資源
3.啟動(dòng)applicationMaster
4.啟動(dòng)worker節(jié)點(diǎn)

2.2安裝環(huán)境

  • Hadoop 2.2及以上
  • HDFS(或其他分布式文件系統(tǒng))
  • flink所在節(jié)點(diǎn)配置了YARN_CONF_DIR或者HADOOP_CONF_DIR環(huán)境變量,flink會(huì)通過(guò)這些變量讀取Hadoop的配置
    (如果沒(méi)有這些環(huán)境變量也可以在flink-conf.yaml文件中通過(guò)fs.hdfs.hadoopconf屬性指定,或者在啟動(dòng)時(shí)臨時(shí)對(duì)環(huán)境變量進(jìn)行賦值,不過(guò)官方推薦用配置環(huán)境變量的方式)
//三臺(tái)節(jié)點(diǎn)都之配置上HADOOP_CONF_DIR
HADOOP_CONF_DIR=/opt/soft/hadoop-2.7.7/etc/hadoop

2.3 Yarn Session模式

2.3.1 節(jié)點(diǎn)配置

節(jié)點(diǎn) linux01 linux02 linux03
hdfs NameNode、DataNode DataNode DataNode
yarn NodeManager ResourceManager、NodeManager NodeManager
flink slave slave master

2.3.2 啟動(dòng)yarn session

在linux01啟動(dòng)HDFS

/opt/soft/hadoop-2.7.7/sbin/start-dfs.sh

在linux02啟動(dòng)yarn

/opt/soft/hadoop-2.7.7/sbin/start-yarn.sh

linux03啟動(dòng)flink和yarn-session

//啟動(dòng)yarn-session
bin/yarn-session.sh

參數(shù)

parameter parameter Dynamic properties Dynamic properties
-n --container <arg> Number of YARN container to allocate (=Number of Task Managers) Optional taskManager個(gè)數(shù)
-D <property=value> use value for given property 參數(shù)
-d --detached If present runs the job in detached mode 以分離模式運(yùn)行
-h --help Help for the Yarn session CLI. 幫助
-id --applicationId <arg> Attach to running YARN session 綁定yarn applicationid
-j --jar <arg> Path to Flink jar file flink jar路徑
-jm --jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) jobManager分配的內(nèi)存(默認(rèn):1MB)
-nl --nodeLabel <arg> Specify YARN node label for the YARN application 指定yarn application的標(biāo)簽
-nm --name <arg> Set a custom name for the application on YARN 指定yarn application的名稱
-q --query Display available YARN resources (memory| cores) 顯示可用的資源(內(nèi)存、cpu)
-qu --queue <arg> Specify YARN queue. 指定yarn隊(duì)列
-s --slots <arg> Number of slots per TaskManager 指定taskManager中slot的數(shù)量
-sae --shutdownOnAttachedExit If the job is submitted in attached mode| perform a best-effort cluster shutdown when the CLI is terminated abruptly| e.g.| in response to a user interrupt| such as typing Ctrl + C 本地cli進(jìn)程終止關(guān)閉集群
-st --streaming Start Flink in streaming mode 以流模式啟動(dòng)flink
-tm --taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) taskManager的內(nèi)存(默認(rèn):1MB)
-z --zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode HA模式下zookeeper的保存路徑

啟動(dòng)成功

啟動(dòng)成功

啟動(dòng)失敗可能原因

  • HDFS和Yarn沒(méi)啟動(dòng)
  • HADOOP_CONF_DIR 配置有誤
  • Yarn 分配內(nèi)存不夠

在上圖中可以看到master節(jié)點(diǎn)變成了linux01 http://linux01:45622,這說(shuō)明了在flink on yarn模式下flink中的master不是固定的,yarn flink 會(huì)覆蓋掉flink-conf.yaml配置文件中的jobmanager.rpc.address

分離模式啟動(dòng)
看過(guò)官網(wǎng)和其他資料說(shuō)的都比較模糊,這里詳細(xì)說(shuō)下
當(dāng)我們執(zhí)行bin/yarn-session.sh,會(huì)在本地啟動(dòng)FlinkYarnSessionCli進(jìn)程,然后在由此進(jìn)程啟動(dòng)yarn session集群,此時(shí)FlinkYarnSessionCli相當(dāng)于前臺(tái)啟動(dòng),與yarn交互的信息會(huì)一直顯示在控制臺(tái)
此時(shí)Ctrl+C和輸入stop都會(huì)終止FlinkYarnSessionCli進(jìn)程,區(qū)別如下

  • Ctrl+C 終止FlinkYarnSessionCli進(jìn)程而不會(huì)終止yarn session 集群
  • 輸入stop 即會(huì)終止FlinkYarnSessionCli進(jìn)程也會(huì)終止yarn session 集群

那么分離模式的作用是什么呢?首先看一下分離模式啟動(dòng)的命令 bin/yarn-session.sh -d
-d代表detached,意思就是把FlinkYarnSessionCli和yarn session分離,對(duì)FlinkYarnSessionCli的操作不會(huì)影響到y(tǒng)arn session,且在執(zhí)行bin/yarn-session.sh -d時(shí),當(dāng)yarn session創(chuàng)建完畢,FlinkYarnSessionCli會(huì)自動(dòng)停止
此時(shí)不可通過(guò)flink也就是FlinkYarnSessionCli去控制yarn session集群
需要以yarn停止application的方式終止yarn session
yarn application -kill <appId>

附加到現(xiàn)有yarn session
與detached的作用剛好相反,如果我們想通過(guò)FlinkYarnSessionCli來(lái)控制yarn session的話,我們可以啟動(dòng)一個(gè)FlinkYarnSessionCli來(lái)附加到對(duì)應(yīng)的yarn session上去
例如 已經(jīng)啟動(dòng)的yarn session的appid是application_1568879202413_0003
我們執(zhí)行 yarn-session.sh -id application_1568879202413_0003,就會(huì)在當(dāng)前節(jié)點(diǎn)啟動(dòng)一個(gè)FlinkYarnSessionCli進(jìn)程并附加到application_1568879202413_0003這個(gè)應(yīng)用上
此時(shí)我們?cè)诿钚休斎雜top可以直接停止yarn session
注意直接Ctrl + C只能停止FlinkYarnSessionCli進(jìn)程,不能停止yarn session,想通過(guò)FlinkYarnSessionCli停止yarn session只能通過(guò)輸入stop

2.3.3 任務(wù)提交

這里提交方式同standalone 模式類似,也分為通過(guò)web頁(yè)面提交和命令行提交,需要注意的是如果通過(guò)web頁(yè)面提交可以由三種方式訪問(wèn)到web頁(yè)面

  • 方式一:直接在啟動(dòng)yarn-session時(shí)可以看見(jiàn)jobManager地址


    方式一
  • 方式二:通過(guò)yarn web
    首先我們?cè)L問(wèn)yarn,可以看見(jiàn)yarn session已經(jīng)啟動(dòng)


    方式二

    隨后點(diǎn)擊ApplicationMaster,跳轉(zhuǎn)到web頁(yè)面,再點(diǎn)擊submit new job,再點(diǎn)擊here就可以訪問(wèn)jobmanager


    方式二
  • 方式三:通過(guò)yarn 命令yarn application -list


    方式三

2.3.4 停止yarn session

由于我們是本地啟動(dòng)一個(gè)進(jìn)程來(lái)維護(hù)yarn session,所以這里我們可以通過(guò)kill掉進(jìn)程或者通過(guò)yarn來(lái)停止
yarn session

#停止本地進(jìn)程
kill -9 pid

#停止yarn application
yarn application -kill application_xxx_xxx

2.4 Single Job模式

前面已經(jīng)介紹過(guò)Single Job模式每提交一個(gè)flink程序都會(huì)在yarn生成一個(gè)application,運(yùn)行完畢就釋放資源
那么怎么提交獨(dú)立的flink程序呢? 只需要加上-m yarn-cluster即可

/opt/soft/flink-1.7.2/bin/flink run -m yarn-cluster -c flinka.dstream.SocketStream ./flink-1.0-SNAPSHOT.jar

-m 運(yùn)行模式,這里使用yarn-cluster,即yarn集群模式。
-ys slot個(gè)數(shù)。
-ynm Yarn application的名字。
-yn task manager 數(shù)量。
-yjm job manager 的堆內(nèi)存大小。
-ytm task manager 的堆內(nèi)存大小。
-d detach模式。可以運(yùn)行任務(wù)后無(wú)需再控制臺(tái)保持連接。
-c 指定jar包中class全名

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容