flink的local-cluster模式

Flink中的Local-cluster(本地集群)模式,主要用于測(cè)試、學(xué)習(xí),可幫助我們快速入門flink。

1)local-cluster模式配置

local-cluster模式基本屬于零配置。
配置步驟為:

1.上傳Flink的安裝包flink-1.12.0-bin-scala_2.11.tgz到hadoop162

2.解壓
tar -zxvf flink-1.12.0-bin-scala_2.11.tgz -C /opt/module

3.進(jìn)入目錄/opt/module, 把剛剛解壓出來的文件夾 flink-1.12.0 復(fù)制為 flink-local :
cd /opt/module
cp -r flink-1.12.0 flink-local

2)local-cluster模式下運(yùn)行無界的WordCount

代碼:

package com.evscn;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class WC_UnBoundStream {
    public static void main(String[] args) throws Exception {
        // 獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 從文件中獲取流
        DataStreamSource<String> stream = env.socketTextStream("hadoop162", 9999);

        // 做轉(zhuǎn)換
        SingleOutputStreamOperator<Tuple2<String, Long>> tmp = stream.flatMap((String value, Collector<Tuple2<String, Long>> out) -> {
            String[] arr = value.split(" ");
            for (String s : arr) {
                out.collect(Tuple2.of(s, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        KeyedStream<Tuple2<String, Long>, String> keyedStream = tmp.keyBy(value -> value.f0);
        SingleOutputStreamOperator<Tuple2<String, Long>> ans = keyedStream.sum(1);
        ans.print("sum: ");

        // 啟動(dòng)執(zhí)行環(huán)境
        env.execute();
    }
}

打包并上傳:

image.png

提交任務(wù):


image.png

測(cè)試輸入:

圖片.png

如下圖,觀察 slot數(shù)、CPU核數(shù) 等信息,可見CPU核數(shù)是8核(虛擬8核,等價(jià)于8核),而flink的local-cluster模式下,只分配了1個(gè)核(即slot)(此時(shí)可簡單認(rèn)為1個(gè)slot對(duì)應(yīng)1個(gè)核)給潛在的任務(wù)(Jobs)——這是因?yàn)椋a中設(shè)定了此任務(wù)的并行度為1【env.setParallelism(1);】:

圖片.png

如下兩圖,可觀察TM(TaskManager)的相關(guān)信息,留意心跳時(shí)間會(huì)不斷被更新,每次心跳發(fā)生時(shí),TaskManager得以與其他組件溝通:

圖片.png
圖片.png

在瀏覽器中查看程序的執(zhí)行情況:

圖片.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容