2021-01-17-Flink-22(Flink 窗口函數(shù))

1.Global Windows

全局窗口將key相同的數(shù)據(jù)都分配到一個單獨(dú)的窗口中,每一種key對應(yīng)一個全局窗口,多個全局窗口之間是相互獨(dú)立的。如果是Non-Keyed Windows,就僅有一個全局窗口。全局窗口沒有結(jié)束的邊界,使用的Trigger(觸發(fā)器)是NeverTrigger。如果不對全局窗口指定一個觸發(fā)器,窗口是不會觸發(fā)計(jì)算的

reduce/sum

public class Reduce {
/**
 * Created with IntelliJ IDEA.
 * Description: 
 * User: 
 * Date: 2021-01-16
 * Time: 21:31
 */
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
    SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);
    AllWindowedStream<Integer, GlobalWindow> windowedStream = map.countWindowAll(5);
    SingleOutputStreamOperator<Integer> reduce = windowedStream.reduce(new ReduceFunction<Integer>() {
        @Override
        public Integer reduce(Integer t2, Integer t1) throws Exception {
            return t2 + t1;
        }
    });

    reduce.print();
    environment.execute("job");

}
}

Keyby

注意lambda表達(dá)式的使用 : SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

public class Keyby {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-16
     * Time: 21:31
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
    /*   SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map (String s) throws Exception {
            return Tuple2.of(s, 1);
        }
    });*/
        KeyedStream<Tuple2<String, Integer>, Tuple> stream = map.keyBy(0);
        // KeyedStream<Tuple2<String, Integer>, String> keyBy = map.keyBy(x -> x.f0);
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = stream.countWindow(5);
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = window.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
                stringIntegerTuple2.f1 = stringIntegerTuple2.f1 + t1.f1;
                return stringIntegerTuple2;
            }
        });
        reduce.print();
        environment.execute("job");
    }
}

apply

public class Apply {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-16
     * Time: 22:12
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);
        AllWindowedStream<Integer, GlobalWindow> countWindowAll = map.countWindowAll(5);
        SingleOutputStreamOperator<Integer> streamOperator = countWindowAll.apply(new AllWindowFunction<Integer, Integer, GlobalWindow>() {
            @Override
            public void apply(GlobalWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
                ArrayList<Integer> list = new ArrayList<>();
                for (Integer value : values) {
                    list.add(value);
                }
                list.sort(new Comparator<Integer>() {
                    @Override
                    public int compare(Integer integer, Integer t1) {
                        return Integer.compare(integer,t1);
                    }
                });

                for (Integer integer : list) {
                    out.collect(integer);
                }
            }
        });
        //.setParallelism(1) 注意在printf的后面
        streamOperator.print().setParallelism(1);
        environment.execute("job");
    }
}

2.Tumbling Windows

滾動窗口是按照時間劃分的窗口,其Assinger會將輸入的每一條數(shù)據(jù)按照時間分配到固定長度的窗口內(nèi),并且按照這個固定的時間進(jìn)行滾動,窗口和窗口之間沒有數(shù)據(jù)重疊

TumblingWindows的of方法如果指定一個參數(shù),就會按照指定的時間周期性的滾動形成新的窗口,例如TumblingProcessingTimeWindows.of(Time.days(1)),那么窗口的起始時間是以當(dāng)前系統(tǒng)的ProcessingTime的整點(diǎn)開始以小時為單位對齊。例如[1:00:00.000, 1:59:59.999]對應(yīng)一個窗口,[2:00:00.000, 2:59:59.999]會對應(yīng)下一個窗口,并且會不斷的生成窗口。(為了方便描述,才使用1:00:00.000這種格式,窗口的時間其實(shí)是timestamp格式)
TumblingWindows的of方法還可以傳入2個參數(shù),第二個參數(shù)的作用是將時間調(diào)整成指定時區(qū)的時間。在UTC-0以外的時區(qū),就需要指定一個偏移量進(jìn)行調(diào)整。例如,在中國就必須指定Time.hours(-8)的偏移量

Non-Keyed Tumbling Windows

public class Test1 {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-17
     * Time: 19:48
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        AllWindowedStream<String, TimeWindow> stream = source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> streamOperator = stream.reduce((x, y) -> String.valueOf(Integer.parseInt(x) + Integer.parseInt(y))).returns(Types.STRING);
        streamOperator.print();
        environment.execute("job");
    }
}

Keyed Tumbling Windows

public class Test1 {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-17
     * Time: 19:48
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> operator = source.map(x -> Tuple2.of(Integer.parseInt(x), 1)).returns(Types.TUPLE(Types.INT, Types.INT));
        KeyedStream<Tuple2<Integer, Integer>, Integer> keyBy = operator.keyBy(x -> x.f0);
        AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowAll = keyBy.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = windowAll.sum(1);
        sum.print();
        environment.execute("job");
    }
}

3.Sliding Windows

滑動窗口是按照時間劃分的窗口,其Assinger會將輸入的每一條數(shù)據(jù)按照時間分配到固定長度的窗口內(nèi),并且還可以指定一個額外的滑動參數(shù)用來指定窗口滑動的頻率(也叫滑動步長),因此當(dāng)滑動步長小于窗口的長度時,窗口和窗口之間有數(shù)據(jù)重疊

SlidingWindows的of方法如果指定兩個參數(shù),第一個參數(shù)為窗口的長度,第二個為滑動的頻率(或加滑動步長)。例如SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)),那么窗口的起始時間是以數(shù)據(jù)對應(yīng)的EventTime并且是滑動步長的整數(shù)倍為單位對齊。例如[1:00:00.000, 1:00:09.999]對應(yīng)一個窗口,[1:00:05.000, 1:00:14.999]會對應(yīng)下一個窗口,兩窗口有數(shù)據(jù)重疊,并且會不斷的生成窗口

Non-Keyed Sliding Windows

public class Test2 {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-17
     * Time: 20:13
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);
        AllWindowedStream<Integer, TimeWindow> stream = map.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
        SingleOutputStreamOperator<Integer> sum = stream.sum(0);
        sum.print();
        environment.execute("job");
    }
}

Keyed Sliding Windows

public class Test2 {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-17
     * Time: 20:13
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> returns = source.map(x -> Tuple2.of(Integer.parseInt(x), 1)).returns(Types.TUPLE(Types.INT, Types.INT));
        KeyedStream<Tuple2<Integer, Integer>, Tuple> keyBy = returns.keyBy(0);
        AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> stream = keyBy.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = stream.sum(0);
        sum.print();
        environment.execute("job");
    }
}

4.Session Windows

會話窗口是按照時間間隔劃分窗口的,當(dāng)超過指定的時間間隔,就會劃分一個新的窗口。會話窗口沒有固定的起始時間和結(jié)束時間,窗口中的數(shù)據(jù)也不會重疊。會話窗口可以指定一個固定的時間間隔,也可以根據(jù)數(shù)據(jù)中的信息傳入一個函數(shù)計(jì)算出一個動態(tài)變化的時間間隔

//EventTime會話窗口wordAndOne
        .keyBy(0) //指定key selector 分組字段
        .window(EventTimeSessionWindows.withGap(Time.minutes(10))) //指定固定的時間間隔為10分鐘
        .sum(1); //觸發(fā)窗口對窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算

wordAndOne
        .keyBy(0) //指定key selector 分組字段
        .window(EventTimeSessionWindows.withDynamicGap((element) -> {
            return element.f1 * 1000; //指定一個動態(tài)的時間間隔,根據(jù)數(shù)據(jù)的f1字段乘以1000得到,返回的是long類型
        }))
        .sum(1); //觸發(fā)窗口對窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算
//ProcessingTime會話窗口
wordAndOne
        .keyBy(0) //指定key selector 分組字段
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .sum(1); //觸發(fā)窗口對窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算
wordAndOne
        .keyBy(0) //指定key selector 分組字段
        .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
            return element.f1 * 1000; //指定一個動態(tài)的時間間隔,根據(jù)數(shù)據(jù)的f1字段乘以1000得到,返回的是long類型
        }))
        .sum(1); //觸發(fā)窗口對窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容