Flink數(shù)據的處理

1.1 Flink之數(shù)據源

1.1.1 source簡介

source是程序的數(shù)據源輸入,你可以通過StreamExecutionEnvironment.addSource(sourceFunction) 來為你的程序添加一個source。 ?ink提供了大量的已經實現(xiàn)好的source方法,你也可以自定義source:

  • (1)通過實現(xiàn)sourceFunction接口來自定義無并行度的source
  • (2)通過實現(xiàn)ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義有并行度的 source 不過大多數(shù)情況下,我們使用自帶的source即可。

獲取source的方式

  • (1)基于文件
    readTextFile(path)
    讀取文本文件,文件遵循TextInputFormat 讀取規(guī)則,逐行讀取并返回。
  • (2)基于socket
    socketTextStream
    從socker中讀取數(shù)據,元素可以通過一個分隔符切開。
  • (3)基于集合
    fromCollection(Collection)
    通過java 的collection集合創(chuàng)建一個數(shù)據流,集合中的所有元素必須是相同類型的。
  • (4)自定義輸入
    addSource 可以實現(xiàn)讀取第三方數(shù)據源的數(shù)據
    系統(tǒng)內置提供了一批connectors,連接器會提供對應的source支持【kafka】

擴展的connectors
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)

1.1.2 數(shù)據源之collection

StreamingSourceFromCollection.java

public class StreamingSourceFromCollection {
    public static void main(String[] args) throws Exception {
        //步驟一:獲取環(huán)境變量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //步驟二:模擬數(shù)據
        ArrayList<String> data = new ArrayList<String>();
        data.add("hadoop");
        data.add("spark");
        data.add("flink");
        //步驟三:獲取數(shù)據源
        DataStreamSource<String> dataStream = env.fromCollection(data);
        //步驟四:transformation操作
        SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String, String>() {

            public String map(String word) throws Exception {
                return "mi_" + word;
            }
        });
        //步驟五:對結果進行處理(打?。?        addPreStream.print().setParallelism(1);
        //步驟六:啟動程序
        env.execute("StreamingSourceFromCollection");

    }
}

1.1.3 自定義單并行度數(shù)據源

MyNoParalleSource.java

/**
 *
 * 我們數(shù)據輸出的數(shù)據類型
 *
 * 代表我們的這個數(shù)據源只能支持一個并行度(單并行度)
 */
public class MyNoParalleSource implements SourceFunction<Long> {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
            //往下游發(fā)送數(shù)據
            sct.collect(number);
            number++;
            //每秒生成一條數(shù)據
            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

StreamingDemoWithMyNoPralalleSource.java

public class StreamingDemoWithMyNoPralalleSource {
    public static void main(String[] args) throws Exception {

        /**
         * 1. 獲取程序入口
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());


        /**
         * 2 獲取數(shù)據源
         */

        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);

        /**
         * 3 數(shù)據的處理
         */
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了數(shù)據:"+value);
                return value;
            }
        }).setParallelism(2);

        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                //過濾出來偶數(shù)
                return number % 2 == 0;
            }
        }).setParallelism(2);

        filterDataStream.print().setParallelism(1);

        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

RichParallelSourceFunction是支持設置多并行度的,關于RichParallelSourceFunctionRichSourceFunction的區(qū)別,前者支持用戶設置多并行度,后者不支持通過setParallelism()方法設置并行度大于1,默認的并行度為1,否則會報如下錯誤:

bashException in thread "main" java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.

1.1.4 自定義多并行度數(shù)據源

MyParalleSource.java

/**
 * 我們的這個source是支持多并行度的
 */
public class MyParalleSource implements ParallelSourceFunction<Long> {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
            sct.collect(number);
            number++;
            //每秒生成一條數(shù)據
            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

StreamingDemoWithMyPralalleSource.java

public class StreamingDemoWithMyPralalleSource {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);

        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了數(shù)據:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

1.2 常見Transformation操作

1.2.1 map和?lter

/**
 * 數(shù)據源:1 2 3 4 5.....源源不斷過來
 * 通過map打印一下接收到數(shù)據
 * 通過filter過濾一下數(shù)據,我們只需要偶數(shù)
 */
public class MapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        //flink FlatMap/map -> spark FlatMap/map  -> Scala flatmap/Map
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了數(shù)據:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;//true
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }


}

1.2.3 union

/**
 * 合并多個流,新的流會包含所有流中的數(shù)據,但是union是一個限制,就是所有合并的流類型必須是一致的
 * union timeWindowAll
 */
public class unionDemo {
    public static void main(String[] args) throws Exception {
        //獲取Flink的運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //獲取數(shù)據源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        //把text1和text2組裝到一起
        DataStream<Long> text = text1.union(text2);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("原始接收到數(shù)據:" + value);
                return value;
            }
        });
        //每2秒鐘處理一次數(shù)據
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
        //打印結果
        sum.print().setParallelism(1);
        String jobName = unionDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

1.2.4 connect,conMap和conFlatMap

/**
 * 和union類似,但是只能連接兩個流,兩個流的數(shù)據類型可以不同,會對兩個流中的數(shù)據應用不同的處理方法
 */
public class ConnectionDemo {
    public static void main(String[] args) throws Exception {
        //獲取Flink的運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //獲取數(shù)據源
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);


        SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "str_" + value;
            }
        });

        //union
        ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);



        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
            //這個方法處理的是數(shù)據源 1
            @Override
            public Object map1(Long value) throws Exception {
                return value;
            }
            //這個方法處理的就是數(shù)據源 2
            @Override
            public Object map2(String value) throws Exception {
                return value;
            }
        });

        //打印結果
        result.print().setParallelism(1);
        String jobName = ConnectionDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

1.2.5 Split和Select

/**
 *  根據規(guī)則把一個數(shù)據流切分為多個流
 應用場景:
 * 可能在實際工作中,源數(shù)據流中混合了多種類似的數(shù)據,多種類型的數(shù)據處理規(guī)則不一樣,所以就可以在根據一定的規(guī)則,
 * 把一個數(shù)據流切分成多個數(shù)據流,這樣每個數(shù)據流就可以使用不用的處理邏輯了
 */
public class SplitDemo {
    public static void main(String[] args) throws  Exception {
        //獲取Flink的運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //獲取數(shù)據源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1

        //對流進行切分,按照數(shù)據的奇偶性進行區(qū)分
        SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
            @Override
            public Iterable<String> select(Long value) {
                ArrayList<String> outPut = new ArrayList<>();
                if (value % 2 == 0) {
                    outPut.add("even");//偶數(shù)
                } else {
                    outPut.add("odd");//奇數(shù)
                }
                return outPut;
            }
        });

        //選擇一個或者多個切分后的流
        DataStream<Long> evenStream = splitStream.select("even");

        DataStream<Long> oddStream = splitStream.select("odd");
        DataStream<Long> moreStream = splitStream.select("odd","even");

        //打印結果
        //打印偶數(shù)
        evenStream.print().setParallelism(1);
        //打印奇數(shù)
//        oddStream.print().setParallelism(1);
        //打印全部
//        moreStream.print().setParallelism(1);
        String jobName = SplitDemo.class.getSimpleName();
        env.execute(jobName);

    }
}

1.3常見sink操作

1.3.1 print() / printToErr()

打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中

1.3.2 writeAsText()

/**
 * 數(shù)據源:1 2 3 4 5.....源源不斷過來
 * 通過map打印一下接收到數(shù)據
 * 通過filter過濾一下數(shù)據,我們只需要偶數(shù)
 */
public class WriteTextDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了數(shù)據:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.writeAsText("D:\\flinkout\\value.txt").setParallelism(1);

        filterDataStream.print();

        env.execute("StreamingDemoWithMyNoPralalleSource");
    }

}


1.3.3 Flink提供的sink

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)

1.3.4 自定義sink

/**
 * 把數(shù)據寫入redis
 */
public class SinkForRedisDemo {
    public static void main(String[] args) throws  Exception {
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //數(shù)據源
        DataStreamSource<String> text = env.socketTextStream("bigdata02", 8888, "\n");
        //lpsuh l_words word
        //對數(shù)據進行組裝,把string轉化為tuple2<String,String>
        DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                //k v
                return new Tuple2<>("f", value);
            }
        });
//        //創(chuàng)建redis的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata04").setPort(6379).setPassword("bigdata04").build();
//
//        //創(chuàng)建redissink
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());



        l_wordsData.addSink(redisSink);


        env.execute("StreamingDemoToRedis");

    }

    /**
     * 把數(shù)據插入到redis到邏輯
     */
    public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
        //表示從接收的數(shù)據中獲取需要操作的redis key
        @Override
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
        //表示從接收的數(shù)據中獲取需要操作的redis value
        @Override
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }
    }
}

1.4 【State】

1.4.1 state概述

Apache Flink? — Stateful Computations over Data Streams
回顧單詞計數(shù)的例子

//實時統(tǒng)計單詞出現(xiàn)次數(shù)
public class WordCount {
    public static void main(String[] args) throws Exception{
        //創(chuàng)建程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //數(shù)據的輸入
        DataStreamSource<String> myDataStream = env.socketTextStream("bigdata02", 1234);
        //數(shù)據的處理
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = myDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(new Tuple2<>(word, 1));
                    //out.collect(Tuple2.of(word,1));
                }

            }
        }).keyBy(0)
                .sum(1);
        //數(shù)據的輸出
        result.print();
        //啟動應用程序
        env.execute("WordCount");

    }
}

輸入

hadoop,hadoop 
hadoop 
hive,hadoop 

輸出

4> (hadoop,1) 
4> (hadoop,2) 
4> (hadoop,3) 
1> (hive,1) 
4> (hadoop,4)

我們會發(fā)現(xiàn),單詞出現(xiàn)的次數(shù)有累計的效果。如果沒有狀態(tài)的管理,是不會有累計的效果的,所以Flink 里面還有state的概念。

state:一般指一個具體的task/operator的狀態(tài)。State可以被記錄,在失敗的情況下數(shù)據還可以恢復, Flink中有兩種基本類型的State:Keyed State,Operator State,他們兩種都可以以兩種形式存在:原 始狀態(tài)(raw state)和托管狀態(tài)(managed state)
托管狀態(tài):由Flink框架管理的狀態(tài),我們通常使用的就是這種。
原始狀態(tài):由用戶自行管理狀態(tài)具體的數(shù)據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態(tài) 內容,對其內部數(shù)據結構一無所知。通常在DataStream上的狀態(tài)推薦使用托管的狀態(tài),當實現(xiàn)一個用 戶自定義的operator時,會使用到原始狀態(tài)。但是我們工作中一般不常用,所以我們不考慮他。

1.4.2 State類型

Operator State(task級別的)

    1. operator state是task級別的state,說白了就是每個task對應一個state
    1. Kafka Connector source中的每個分區(qū)(task)都需要記錄消費的topic的partition和o?set等信息。

Keyed State(針對每一個key)

    1. keyed state 記錄的是每個key的狀態(tài)
    1. Keyed state托管狀態(tài)有六種類型:
        1. ValueState
        1. ListState
        1. MapState
        1. ReducingState
        1. AggregatingState
        1. FoldingState


state理解

數(shù)據源是Kafka

1.4.3 Keyed State的案例演示

ValueState

public class CountWindowAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {

    /**
     * 1.valueState 屬于keyed state
     * 2.valueState里面只能存儲一條數(shù)據
     * 
     * 思路:
     * long1:當前key出現(xiàn)的次數(shù)
     * long2:累加的value值
     * if(long1=3){
     * long2/long1 =avg
     * }
     */

    private ValueState<Tuple2<Long, Long>> countAndSum;

    @Override
    public void open(Configuration parameters) throws Exception {

        ValueStateDescriptor<Tuple2<Long, Long>> average = new ValueStateDescriptor<>(
                "average",
                Types.TUPLE(Types.LONG, Types.LONG)
        );

        countAndSum = getRuntimeContext().getState(average);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {

        Tuple2<Long, Long> currentState = countAndSum.value();
        if (currentState == null) {
            currentState = Tuple2.of(0L, 0L);
        }
        //統(tǒng)計key出現(xiàn)的次數(shù)
        currentState.f0 += 1;
        //統(tǒng)計value總值
        currentState.f1 += element.f1;
        countAndSum.update(currentState);

        if (currentState.f0 ==3){
            double avg =(double)currentState.f1/currentState.f0;
            out.collect(Tuple2.of(element.f0,avg));
            //清空里面的數(shù)據
            countAndSum.clear();
        }
    }
}

/**
 *  需求:當接收到的相同 key 的元素個數(shù)等于 3 個
 *  就計算這些元素的 value 的平均值。
 *  計算 keyed stream 中每 3 個元素的 value 的平均值
 *
 *  1,3
 *  1,7
 *
 *  1,5
 *
 *  1,5.0
 *
 *  2,4
 *
 *  2,2
 *  2,5
 *
 *  2,3.666
 *
 *  key,value
 *  1 long,5 doulbe
 *
 */
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //數(shù)據源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 輸出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState()) //flatMap,map + state = 自定義函數(shù)的感覺
                .print();

        env.execute("TestStatefulApi");
    }
}

結果輸出:

ListState

public class CountWindowAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {

    /**
     * 1,3
     * 1,7
     * 1,5
     */
    private ListState<Tuple2<Long, Long>> elementsByKey;


    @Override
    public void open(Configuration parameters) throws Exception {

        ListStateDescriptor<Tuple2<Long, Long>> average = new ListStateDescriptor<>(
                "average",
                Types.TUPLE(Types.LONG, Types.LONG)
        );
        elementsByKey = getRuntimeContext().getListState(average);
    }


    @Override
    public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
        Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
        if (currentState ==null){
            elementsByKey.addAll(Collections.emptyList());
        }
        elementsByKey.add(element);

        ArrayList<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());

        if (allElements.size() ==3){
            long count =0;
            long sum=0;
            for(Tuple2<Long,Long> ele:allElements){
                count++;
                sum +=ele.f1;
            }
            double avg =(double)sum/count;
            out.collect(Tuple2.of(element.f0,avg));

            elementsByKey.clear();
        }


    }
}
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //數(shù)據源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 輸出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithListState()) //flatMap,map + state = 自定義函數(shù)的感覺
                .print();

        env.execute("TestStatefulApi");
    }
}

結果輸出:

MapState

/**
 *  MapState<K, V> :這個狀態(tài)為每一個 key 保存一個 Map 集合
 *      put() 將對應的 key 的鍵值對放到狀態(tài)中
 *      values() 拿到 MapState 中所有的 value
 *      clear() 清除狀態(tài)
 */
public class CountWindowAverageWithMapState
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // managed keyed state
    //1. MapState :key 是一個唯一的值,value 是接收到的相同的 key 對應的 value 的值

    //我們開發(fā)過程當中聲明的state其實我們可以理解為就是一個輔助變量。


    //Map的數(shù)據類型:key相同 數(shù)據就覆蓋了
    /**
     * 1,3
     * 1,5
     * 1,7
     *
     */

    private MapState<String, Long> mapState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注冊狀態(tài)
        MapStateDescriptor<String, Long> descriptor =
                new MapStateDescriptor<String, Long>(
                        "average",  // 狀態(tài)的名字
                        String.class, Long.class); // 狀態(tài)存儲的數(shù)據類型
        mapState = getRuntimeContext().getMapState(descriptor);
    }

    /**
     * 1,3
     * 1,5
     * 1,7
     *
     * dfsfsdafdsf,3
     * dfsfxxxfdsf,5
     * xxxx323123,7
     *
     *
     * @param element
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {

        mapState.put(UUID.randomUUID().toString(), element.f1);

        // 判斷,如果當前的 key 出現(xiàn)了 3 次,則需要計算平均值,并且輸出
        List<Long> allElements = Lists.newArrayList(mapState.values());

        if (allElements.size() == 3) {
            long count = 0;
            long sum = 0;
            for (Long ele : allElements) {
                count++;
                sum += ele;
            }
            double avg = (double) sum / count;
            //
            out.collect(Tuple2.of(element.f0, avg));

            // 清除狀態(tài)
            mapState.clear();
        }
    }
}

public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //數(shù)據源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 輸出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithMapState()) //flatMap,map + state = 自定義函數(shù)的感覺
                .print();

        env.execute("TestStatefulApi");
    }
}

輸出結果:

ReducingState

/**
 *  ReducingState<T> :這個狀態(tài)為每一個 key 保存一個聚合之后的值
 *      get() 獲取狀態(tài)值
 *      add()  更新狀態(tài)值,將數(shù)據放到狀態(tài)中
 *      clear() 清除狀態(tài)
 */
public class SumFunction
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    //sum = 最終累加的結果的數(shù)據類型
    private ReducingState<Long> sumState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注冊狀態(tài)
        ReducingStateDescriptor<Long> descriptor =
                new ReducingStateDescriptor<Long>(
                        "sum",  // 狀態(tài)的名字
                        new ReduceFunction<Long>() { // 聚合函數(shù)
                            @Override
                            public Long reduce(Long value1, Long value2) throws Exception {
                                return value1 + value2;
                            }
                        }, Long.class); // 狀態(tài)存儲的數(shù)據類型
        sumState = getRuntimeContext().getReducingState(descriptor);
    }

    /**
     *
     * 3
     * 5
     * 7
     *
     * @param element
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Long>> out) throws Exception {
        // 將數(shù)據放到狀態(tài)中
        sumState.add(element.f1);

        out.collect(Tuple2.of(element.f0, sumState.get()));
    }
}

public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //數(shù)據源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 輸出:
        dataStreamSource
                .keyBy(0)
                .flatMap(new SumFunction()) //累加
                .print();

        env.execute("TestStatefulApi");
    }
}

輸出

AggregatingState

public class ContainsValueFunction
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
    /**
     * 1, contains:3 and 5
     */
    private AggregatingState<Long, String> totalStr;//輔助字段

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注冊狀態(tài)
        AggregatingStateDescriptor<Long, String, String> descriptor =
                new AggregatingStateDescriptor<Long, String, String>(
                        "totalStr",  // 狀態(tài)的名字

                        //SparkSQL 自定義聚合函數(shù)
                        new AggregateFunction<Long, String, String>() {
                            //初始化的操作,只運行一次哦
                            @Override
                            public String createAccumulator() {
                                return "Contains:";
                            }

                            @Override
                            public String add(Long value, String accumulator) {
                                if ("Contains:".equals(accumulator)) {
                                    return accumulator + value;
                                }
                                return accumulator + " and " + value;
                            }

                            @Override
                            public String merge(String a, String b) {
                                return a + " and " + b;
                            }

                            @Override
                            public String getResult(String accumulator) {
                                //contains:1
                                //contains: 1 and 3 and
                                return accumulator;
                            }
                        }, String.class); // 狀態(tài)存儲的數(shù)據類型
        totalStr = getRuntimeContext().getAggregatingState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, String>> out) throws Exception {
        totalStr.add(element.f1);

        out.collect(Tuple2.of(element.f0, totalStr.get()));
    }
}
public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        //程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //數(shù)據源
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 輸出:
        dataStreamSource
                .keyBy(0)
                .flatMap(new ContainsValueFunction()) //flatMap,map + state = 自定義函數(shù)的感覺
                .print();

        env.execute("TestStatefulApi");
    }
}

輸出

1.5 State backend

1.5.1 概述

Flink支持的StateBackend:

MemoryStateBackend
FsStateBackend
RocksDBStateBackend

1.5.2 MemoryStateBackend

默認情況下,狀態(tài)信息是存儲在 TaskManager 的堆內存中的,checkpoint 的時候將狀態(tài)保存到 JobManager 的堆內存中。

  • 缺點:
    只能保存數(shù)據量小的狀態(tài) 狀態(tài)數(shù)據有可能會丟失
  • 優(yōu)點:
    開發(fā)測試很方便

1.5.3 FSStateBackend

狀態(tài)信息存儲在 TaskManager 的堆內存中的,checkpoint 的時候將狀態(tài)保存到指定的文件中 (HDFS 等文件系統(tǒng))

  • 缺點:
    狀態(tài)大小受TaskManager內存限制(默認支持5M)
  • 優(yōu)點:
    狀態(tài)訪問速度很快
    狀態(tài)信息不會丟失
    用于: 生產,也可存儲狀態(tài)數(shù)據量大的情況

1.5.4 RocksDBStateBackend

狀態(tài)信息存儲在 RocksDB 數(shù)據庫 (key-value 的數(shù)據存儲服務), 最終保存在本地文件中 checkpoint 的時候將狀態(tài)保存到指定的文件中 (HDFS 等文件系統(tǒng))

  • 缺點:
    狀態(tài)訪問速度有所下降
  • 優(yōu)點:
    可以存儲超大量的狀態(tài)信息
    狀態(tài)信息不會丟失
    用于: 生產,可以存儲超大量的狀態(tài)信息

1.5.5 StateBackend配置方式

(1)單任務調整

修改當前任務代碼 env.setStateBackend(new FsStateBackend("hdfs://bigdata02:9000/flink/checkpoints")); 
或者new MemoryStateBackend() 
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】

第三方依賴

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

(2)全局調整

修改flink-conf.yaml

state.backend: filesystem state.checkpoints.dir: hdfs://bigdata02:9000/flink/checkpoints 
注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

1.6 checkpoint(容錯)

1.6.1 checkpoint概述

  • (1)為了保證state的容錯性,F(xiàn)link需要對state進行checkpoint。
  • (2)Checkpoint是Flink實現(xiàn)容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個 Operator/task的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據定期持久化存儲下來,當Flink程序一旦意外崩 潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數(shù)據異常
  • (3)Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提: 持久化的source,它需要支持在一定時間內重放事件。這種sources的典型例子是持久化的消息隊列 (比如Apache Kafka,RabbitMQ等)或文件系統(tǒng)(比如HDFS,S3,GFS等) 用于state的持久化存儲,例如分布式文件系統(tǒng)(比如HDFS,S3,GFS等)

生成快照

恢復快照

1.6.2 checkpoint配置

默認checkpoint功能是disabled的,想要使用的時候需要先啟用,checkpoint開啟之后, checkPointMode有兩種,Exactly-once和At-least-once,默認的checkPointMode是Exactly-once, Exactly-once對于大多數(shù)應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終 延遲為幾毫秒)。

/**
 * state:
 * keyed
 * operator  -> checkpoint
 */
public class WordCount4 {
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //10s 15s
        //如果數(shù)據量比較大,建議5分鐘左右checkpoint的一次。
        //阿里他們使用的時候 也是這樣建議的。
        env.enableCheckpointing(10000);//10s 15s state

        FsStateBackend fsStateBackend = new FsStateBackend("hdfs://bigdata02:9000/flink_1/checkpoint");

         MemoryStateBackend memoryStateBackend = new MemoryStateBackend();

        env.setStateBackend(fsStateBackend);

        env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata02:9000/flink_2/checkpoint"));

        //setCheckpointingMode---是否允許數(shù)據重復
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //setMinPauseBetweenCheckpoints  ---兩個checkpoint之間間隔多久
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        //setCheckpointTimeout ---超時時間 
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        //enableExternalizedCheckpoints---cancel程序的時候保存checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 嘗試重啟的次數(shù)
                Time.of(10, TimeUnit.SECONDS) // 間隔
        ));

        DataStreamSource<String> dataStream = env.socketTextStream(hostname, port);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }).keyBy(0)
                .sum(1);

        result.print();

        env.execute("WordCount check point....");
    }
}

1.7 恢復數(shù)據(容錯)

1.7.1 重啟策略概述(重試)

Flink支持不同的重啟策略,以在故障發(fā)生時控制作業(yè)如何重啟,集群在啟動時會伴隨一個默認的重啟策 略,在沒有定義具體重啟策略時會使用該默認策略。 如果在工作提交時指定了一個重啟策略,該策略會 覆蓋集群的默認策略,默認的重啟策略可以通過 Flink 的配置文件 ?ink-conf.yaml 指定。配置參數(shù) restart-strategy 定義了哪個策略被使用。

常用的重啟策略

  • (1)固定間隔 (Fixed delay)
  • (2)失敗率 (Failure rate)
  • (3)無重啟 (No restart)
    如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。
    如果啟用了 checkpointing,但沒有配置重啟策略,則使用固定間隔 (?xed-delay) 策略, 嘗試重啟次數(shù) 默認值是:Integer.MAX_VALUE,重啟策略可以在?ink-conf.yaml中配置,表示全局的配置。也可以在 應用代碼中動態(tài)指定,會覆蓋全局配置。

1.7.2 重啟策略

固定間隔 (Fixed delay)

第一種:全局配置 flink-conf.yaml 
restart-strategy: fixed-delay 
restart-strategy.fixed-delay.attempts: 3 
restart-strategy.fixed-delay.delay: 10 s

第二種:應用代碼設置
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 嘗試重啟的次數(shù)
  Time.of(10, TimeUnit.SECONDS) // 間隔
 ));

失敗率 (Failure rate)

第一種:全局配置 flink-conf.yaml
 restart-strategy: failure-rate
 restart-strategy.failure-rate.max-failures-per-interval: 3
 restart-strategy.failure-rate.failure-rate-interval: 5 min 
 restart-strategy.failure-rate.delay: 10 s 

第二種:應用代碼設置 
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 一個時間段內的最大失敗次數(shù)
  Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數(shù)的是時間段
  Time.of(10, TimeUnit.SECONDS) // 間隔 
));

無重啟 (No restart)

第一種:全局配置 flink-conf.yaml
 restart-strategy: none 
第二種:應用代碼設置 
env.setRestartStrategy(RestartStrategies.noRestart());

1.7.3 多checkpoint

默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink 程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint, 并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發(fā)現(xiàn)最近4個小時數(shù)據記錄 處理有問題,希望將整個狀態(tài)還原到4小時之前Flink可以支持保留多個Checkpoint,需要在Flink的配置 文件conf/?ink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù):

state.checkpoints.num-retained: 20

這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄

hdfs dfs -ls hdfs://bigdata02:9000/?ink/checkpoints 

如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現(xiàn)

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容