flink學(xué)習(xí)之十-window&ProcessingTime實(shí)例

這里先使用Processing Time,使用window來(lái)處理,看下demo

package myflink.job;

import com.alibaba.fastjson.JSON;
import myflink.model.UrlInfo;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Date;
import java.util.Properties;

public class WindowTest {

    public static void main(String[] args) throws Exception {

        // 從kafka中獲取數(shù)據(jù)
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
                new FlinkKafkaConsumer010<String>(
                        "testjin",// topic
                        new SimpleStringSchema(),
                        properties
                )
        ).setParallelism(1)
                // map操作,轉(zhuǎn)換,從一個(gè)數(shù)據(jù)流轉(zhuǎn)換成另一個(gè)數(shù)據(jù)流,這里是從string-->UrlInfo
                .map(string -> {
                    UrlInfo urlInfo = JSON.parseObject(string, UrlInfo.class);
                    urlInfo.setDomain(urlInfo.generateDomain());
                    return urlInfo;
                });

        // 根據(jù)domain做keyby
        KeyedStream<UrlInfo, String> keyedStream = dataStreamSource.keyBy(new KeySelector<UrlInfo, String>() {
            @Override
            public String getKey(UrlInfo urlInfo) throws Exception {
                return urlInfo.getDomain();
            }
        });

        // 設(shè)置時(shí)間類(lèi)型為Processing Time
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 使用timeWindow
        SingleOutputStreamOperator<UrlInfo> windowReduceStream = keyedStream.timeWindow(Time.seconds(30))
        .reduce((ReduceFunction<UrlInfo>) (t1, t2) -> {
            UrlInfo urlInfo = new UrlInfo();

            // domain都是同一個(gè)partition,所以都是同一個(gè)
            urlInfo.setDomain(t1.getDomain());
            urlInfo.setUrl(urlInfo.getDomain() + "/reduce/" + DateFormatUtils.format(new Date(),"yyyy-MM-dd'T'HH:mm:ss"));
            urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));

            urlInfo.setCount(t1.getCount() + 1);// 在reduce中做累加計(jì)數(shù)

            return urlInfo;
        }).returns(UrlInfo.class);

        windowReduceStream.addSink(new PrintSinkFunction<>());

        env.execute("execute window reduce info");
    }
}

可以看到,這里使用window,在window內(nèi),每隔30秒做一次reduce,統(tǒng)計(jì)窗口內(nèi)總共的數(shù)據(jù)個(gè)數(shù)。

由于用了window+reduce,這里30秒只有一個(gè)結(jié)果出來(lái)。

運(yùn)行后,看下結(jié)果:

2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:35:56, hash=e7b48416a083727b703df80008dfe4e8, domain=so.com, count=16)
2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:35:59, hash=e478c32f727bd95507a409d6c6b08146, domain=baidu.com, count=6)

2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:26, hash=b22e6462ab7f2a263eb7934fa0fe110f, domain=baidu.com, count=3)
2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:29, hash=7da591487d9c624ae7209b7c2028eec0, domain=so.com, count=5)

2> UrlInfo(id=0, url=so.com/reduce/2019-01-24T17:36:59, hash=f2a7487a54a4fb193d5acbac00a0d539, domain=so.com, count=5)
2> UrlInfo(id=0, url=baidu.com/reduce/2019-01-24T17:36:53, hash=ce326552180fe4e1465a90ac7baeb380, domain=baidu.com, count=3)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容