flink常用的部署模式可能有如下幾種
- standalone cluster模式
- flink on yarn模式
- flink on kubernetes模式
- flink on Mesos
本章主要介紹前兩種模式,以Centos6.8為例,選擇三臺(tái)機(jī)器(linux01、linux02、linux03)來(lái)搭建flink集群
1.standalone cluster模式
1.1安裝環(huán)境
- Java 1.8.x或更高版本
- ssh(集群節(jié)點(diǎn)之間配置互信,可以免密登錄)
集群之間每臺(tái)節(jié)點(diǎn)的安裝結(jié)構(gòu)保持一致
1.2下載安裝
1.1.1版本選擇
可以根據(jù)對(duì)flink功能的選擇、hadoop的版本、scala的版本選擇合適的flink,這里我們選擇Apache Flink 1.7.2 with Hadoop? 2.7 for Scala 2.11版本的flink
如果機(jī)器不可訪問(wèn)外網(wǎng)則直接進(jìn)入flink下載頁(yè)下載合適版本

如果可以訪問(wèn)外網(wǎng)則直接wget
cd /opt/soft
wget http://mirror.bit.edu.cn/apache/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.11.tgz
1.1.2安裝規(guī)劃
| linux01 | linux02 | linux03 |
|---|---|---|
| slave | slave | master |
1.1.3安裝
在/opt/soft目錄下解壓文件
#解壓
tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz
#進(jìn)入目錄
cd flink-1.7.2
1.3集群配置
#進(jìn)入conf目錄
cd conf
#配置flink-conf.yaml
vim flink-conf.yaml
#修改如下屬性
jobmanager.rpc.address: linux03
env.java.home=/opt/soft/jdk1.8.0_144
#修改masters文件
vim masters
#添加主節(jié)點(diǎn)
linux03
#修改slaves文件
vim slaves
#添加從節(jié)點(diǎn)
linux01
linux02
常用配置參數(shù)列表如下
| 屬性 | 說(shuō)明 | 默認(rèn)值 |
|---|---|---|
| jobmanager.rpc.address | jobmanager地址 | localhost |
| jobmanager.rpc.port | jobmanager端口 | 6123 |
| jobmanager.heap.size | jobmanagerJVM堆內(nèi)存大小 | 1024m |
| taskmanager.heap.size | taskmanagerJVM堆內(nèi)存大小 | 1024m |
| taskmanager.numberOfTaskSlots | 每個(gè)taskmanager的slot數(shù)量,根據(jù)taskmanager所在節(jié)點(diǎn)的cpu數(shù)量決定 | 1 |
| parallelism.default | flink任務(wù)默認(rèn)的并行度 | 1 |
| rest.port | flink webui端口 | 8081 |
| io.tmp.dirs | flink中間計(jì)算結(jié)果的臨時(shí)存儲(chǔ)路徑 | /tmp |
1.4同步flink
將linux01上配置好的flink推到linux02和linux03上
scp -r /opt/soft/flink-1.7.2 work@linux02:/opt/soft
scp -r /opt/soft/flink-1.7.2 work@linux03:/opt/soft
1.5啟動(dòng)集群
linux03節(jié)點(diǎn)啟動(dòng)集群
#在master節(jié)點(diǎn)啟動(dòng)集群
bin/start-cluster.sh
出現(xiàn)如下提示則說(shuō)明啟動(dòng)成功

查看三臺(tái)節(jié)點(diǎn)linux01、linux02、linux03的java進(jìn)程
linux01

linux02

linux03

訪問(wèn)master節(jié)點(diǎn)的8081端口http://linux03:8081

1.6 提交測(cè)試
編寫(xiě)測(cè)試代碼如下,并編譯打包 在linux01節(jié)點(diǎn)提前執(zhí)行nc -lk 12345,不然程序報(bào)錯(cuò)
object SocketStream {
def main(args: Array[String]): Unit = {
//flink 監(jiān)控socket 端口 累計(jì)輸入單詞的次數(shù) nc -lk 12345
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("linux01",12345)
.flatMap(line => line.split(" "))
.map((_,1))
.keyBy(0)
.sum(1)
stream.print()
env.execute("socketWcJob")
}
}
1.6.1 提交方式一:flink-webUI提交
在web界面選擇Submit new job,點(diǎn)擊Add new+,選擇對(duì)應(yīng)的jar

點(diǎn)擊Upload

勾選對(duì)應(yīng)的程序,填寫(xiě)對(duì)應(yīng)的參數(shù),點(diǎn)擊Submit

這里可以看到對(duì)應(yīng)的job已經(jīng)啟動(dòng)

我們?cè)趌inux01節(jié)點(diǎn)事先啟動(dòng)好的socket連接輸入字符

然后在task manager中查看輸出

然后點(diǎn)擊Running Jobs,選擇對(duì)應(yīng)運(yùn)行job,點(diǎn)擊cancel停止任務(wù)

1.6.2 提交方式二:命令行提交
將打包好的jar上傳至服務(wù)器
/opt/soft/flink-1.7.2/bin/flink run -c flinka.dstream.SocketStream ./flink-1.0-SNAPSHOT.jar
成功提交之后可以進(jìn)入webUI界面查看job運(yùn)行情況

在Task Managers 查看運(yùn)行結(jié)果

停止程序 可以用命令行也可以在ui界面
執(zhí)行bin/flink list 查看job列表
執(zhí)行bin/flink cancel id 停止任務(wù)

1.7增刪 / 啟停節(jié)點(diǎn)(JobManager、TaskManager)
可以使用bin/jobmanager.sh和bin/taskmanager.sh腳本為運(yùn)行中的集群添加JobManager和TaskManager實(shí)例
添加jobmanager:
bin/jobmanager.sh ((start|start-foreground) cluster) | stop | stop-all
添加taskmanager:
bin/taskmanager.sh start | start-foreground | stop | stop-all
在新增節(jié)點(diǎn)時(shí)需要更新配置文件
例如新增linux04 TaskManager,需要在集群的slaves文件中新增linux04
然后在linux04節(jié)點(diǎn)運(yùn)行
bin/taskmanager.sh start
到此standalone cluster模式配置成功
2.flink on yarn模式
2.1 yarn模式介紹
flink on yarn目前有兩種模式可供選擇
- Yarn Session Model
- Single Job Model
兩者區(qū)別如下
Yarn Session:會(huì)在yarn上長(zhǎng)時(shí)間啟動(dòng)一個(gè)flink session集群,用戶可以由命令行、api、web頁(yè)面將flink任務(wù)提交到flink集群,多個(gè)flink程序公用一個(gè)JobManager和TaskManager
Single Job:與mapreduce任務(wù)類似,每一個(gè)flink程序作為一個(gè)application提交到y(tǒng)arn集群,且每個(gè)任務(wù)都有自己的JobManager和TaskManager,程序執(zhí)行完畢則釋放資源
區(qū)別

1.上傳依賴jar和配置文件
2.向yarn申請(qǐng)資源
3.啟動(dòng)applicationMaster
4.啟動(dòng)worker節(jié)點(diǎn)
2.2安裝環(huán)境
- Hadoop 2.2及以上
- HDFS(或其他分布式文件系統(tǒng))
- flink所在節(jié)點(diǎn)配置了YARN_CONF_DIR或者HADOOP_CONF_DIR環(huán)境變量,flink會(huì)通過(guò)這些變量讀取Hadoop的配置
(如果沒(méi)有這些環(huán)境變量也可以在flink-conf.yaml文件中通過(guò)fs.hdfs.hadoopconf屬性指定,或者在啟動(dòng)時(shí)臨時(shí)對(duì)環(huán)境變量進(jìn)行賦值,不過(guò)官方推薦用配置環(huán)境變量的方式)
//三臺(tái)節(jié)點(diǎn)都之配置上HADOOP_CONF_DIR
HADOOP_CONF_DIR=/opt/soft/hadoop-2.7.7/etc/hadoop
2.3 Yarn Session模式
2.3.1 節(jié)點(diǎn)配置
| 節(jié)點(diǎn) | linux01 | linux02 | linux03 |
|---|---|---|---|
| hdfs | NameNode、DataNode | DataNode | DataNode |
| yarn | NodeManager | ResourceManager、NodeManager | NodeManager |
| flink | slave | slave | master |
2.3.2 啟動(dòng)yarn session
在linux01啟動(dòng)HDFS
/opt/soft/hadoop-2.7.7/sbin/start-dfs.sh
在linux02啟動(dòng)yarn
/opt/soft/hadoop-2.7.7/sbin/start-yarn.sh
linux03啟動(dòng)flink和yarn-session
//啟動(dòng)yarn-session
bin/yarn-session.sh
參數(shù)
| parameter | parameter | Dynamic properties | Dynamic properties |
|---|---|---|---|
| -n | --container <arg> | Number of YARN container to allocate (=Number of Task Managers) Optional | taskManager個(gè)數(shù) |
| -D | <property=value> | use value for given property | 參數(shù) |
| -d | --detached | If present runs the job in detached mode | 以分離模式運(yùn)行 |
| -h | --help | Help for the Yarn session CLI. | 幫助 |
| -id | --applicationId <arg> | Attach to running YARN session | 綁定yarn applicationid |
| -j | --jar <arg> | Path to Flink jar file | flink jar路徑 |
| -jm | --jobManagerMemory <arg> | Memory for JobManager Container with optional unit (default: MB) | jobManager分配的內(nèi)存(默認(rèn):1MB) |
| -nl | --nodeLabel <arg> | Specify YARN node label for the YARN application | 指定yarn application的標(biāo)簽 |
| -nm | --name <arg> | Set a custom name for the application on YARN | 指定yarn application的名稱 |
| -q | --query | Display available YARN resources (memory| cores) | 顯示可用的資源(內(nèi)存、cpu) |
| -qu | --queue <arg> | Specify YARN queue. | 指定yarn隊(duì)列 |
| -s | --slots <arg> | Number of slots per TaskManager | 指定taskManager中slot的數(shù)量 |
| -sae | --shutdownOnAttachedExit | If the job is submitted in attached mode| perform a best-effort cluster shutdown when the CLI is terminated abruptly| e.g.| in response to a user interrupt| such as typing Ctrl + C | 本地cli進(jìn)程終止關(guān)閉集群 |
| -st | --streaming | Start Flink in streaming mode | 以流模式啟動(dòng)flink |
| -tm | --taskManagerMemory <arg> | Memory per TaskManager Container with optional unit (default: MB) | taskManager的內(nèi)存(默認(rèn):1MB) |
| -z | --zookeeperNamespace <arg> | Namespace to create the Zookeeper sub-paths for high availability mode | HA模式下zookeeper的保存路徑 |
啟動(dòng)成功

啟動(dòng)失敗可能原因
- HDFS和Yarn沒(méi)啟動(dòng)
- HADOOP_CONF_DIR 配置有誤
- Yarn 分配內(nèi)存不夠
在上圖中可以看到master節(jié)點(diǎn)變成了linux01 http://linux01:45622,這說(shuō)明了在flink on yarn模式下flink中的master不是固定的,yarn flink 會(huì)覆蓋掉flink-conf.yaml配置文件中的jobmanager.rpc.address
分離模式啟動(dòng)
看過(guò)官網(wǎng)和其他資料說(shuō)的都比較模糊,這里詳細(xì)說(shuō)下
當(dāng)我們執(zhí)行bin/yarn-session.sh,會(huì)在本地啟動(dòng)FlinkYarnSessionCli進(jìn)程,然后在由此進(jìn)程啟動(dòng)yarn session集群,此時(shí)FlinkYarnSessionCli相當(dāng)于前臺(tái)啟動(dòng),與yarn交互的信息會(huì)一直顯示在控制臺(tái)
此時(shí)Ctrl+C和輸入stop都會(huì)終止FlinkYarnSessionCli進(jìn)程,區(qū)別如下
- Ctrl+C 終止FlinkYarnSessionCli進(jìn)程而不會(huì)終止yarn session 集群
- 輸入stop 即會(huì)終止FlinkYarnSessionCli進(jìn)程也會(huì)終止yarn session 集群
那么分離模式的作用是什么呢?首先看一下分離模式啟動(dòng)的命令 bin/yarn-session.sh -d
-d代表detached,意思就是把FlinkYarnSessionCli和yarn session分離,對(duì)FlinkYarnSessionCli的操作不會(huì)影響到y(tǒng)arn session,且在執(zhí)行bin/yarn-session.sh -d時(shí),當(dāng)yarn session創(chuàng)建完畢,FlinkYarnSessionCli會(huì)自動(dòng)停止
此時(shí)不可通過(guò)flink也就是FlinkYarnSessionCli去控制yarn session集群
需要以yarn停止application的方式終止yarn session
yarn application -kill <appId>
附加到現(xiàn)有yarn session
與detached的作用剛好相反,如果我們想通過(guò)FlinkYarnSessionCli來(lái)控制yarn session的話,我們可以啟動(dòng)一個(gè)FlinkYarnSessionCli來(lái)附加到對(duì)應(yīng)的yarn session上去
例如 已經(jīng)啟動(dòng)的yarn session的appid是application_1568879202413_0003
我們執(zhí)行 yarn-session.sh -id application_1568879202413_0003,就會(huì)在當(dāng)前節(jié)點(diǎn)啟動(dòng)一個(gè)FlinkYarnSessionCli進(jìn)程并附加到application_1568879202413_0003這個(gè)應(yīng)用上
此時(shí)我們?cè)诿钚休斎雜top可以直接停止yarn session
注意直接Ctrl + C只能停止FlinkYarnSessionCli進(jìn)程,不能停止yarn session,想通過(guò)FlinkYarnSessionCli停止yarn session只能通過(guò)輸入stop
2.3.3 任務(wù)提交
這里提交方式同standalone 模式類似,也分為通過(guò)web頁(yè)面提交和命令行提交,需要注意的是如果通過(guò)web頁(yè)面提交可以由三種方式訪問(wèn)到web頁(yè)面
-
方式一:直接在啟動(dòng)yarn-session時(shí)可以看見(jiàn)jobManager地址
方式一 -
方式二:通過(guò)yarn web
首先我們?cè)L問(wèn)yarn,可以看見(jiàn)yarn session已經(jīng)啟動(dòng)
方式二
隨后點(diǎn)擊ApplicationMaster,跳轉(zhuǎn)到web頁(yè)面,再點(diǎn)擊submit new job,再點(diǎn)擊here就可以訪問(wèn)jobmanager
方式二 -
方式三:通過(guò)yarn 命令yarn application -list
方式三
2.3.4 停止yarn session
由于我們是本地啟動(dòng)一個(gè)進(jìn)程來(lái)維護(hù)yarn session,所以這里我們可以通過(guò)kill掉進(jìn)程或者通過(guò)yarn來(lái)停止
yarn session
#停止本地進(jìn)程
kill -9 pid
#停止yarn application
yarn application -kill application_xxx_xxx
2.4 Single Job模式
前面已經(jīng)介紹過(guò)Single Job模式每提交一個(gè)flink程序都會(huì)在yarn生成一個(gè)application,運(yùn)行完畢就釋放資源
那么怎么提交獨(dú)立的flink程序呢? 只需要加上-m yarn-cluster即可
/opt/soft/flink-1.7.2/bin/flink run -m yarn-cluster -c flinka.dstream.SocketStream ./flink-1.0-SNAPSHOT.jar
-m 運(yùn)行模式,這里使用yarn-cluster,即yarn集群模式。
-ys slot個(gè)數(shù)。
-ynm Yarn application的名字。
-yn task manager 數(shù)量。
-yjm job manager 的堆內(nèi)存大小。
-ytm task manager 的堆內(nèi)存大小。
-d detach模式。可以運(yùn)行任務(wù)后無(wú)需再控制臺(tái)保持連接。
-c 指定jar包中class全名




