Flink 計(jì)算 PV UV

前言

使用 flink 很長一段時(shí)間了,突然發(fā)現(xiàn)竟然沒有計(jì)算過 pv uv,這可是 flink 常見的計(jì)算場景了,面試時(shí)也是常問題之一。故自己想了一個(gè)場景來計(jì)算一下。
基于 Flink 1.12

場景

外賣員聽單的信息會發(fā)到單獨(dú)一個(gè) topic 中,計(jì)算一個(gè)每天有多少個(gè) 外賣員聽單以及總共的聽單次數(shù)。

kafka 中消息類型

{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}

locTime:事件發(fā)生的時(shí)間,courierId 外賣員id

計(jì)算一天有多少個(gè)外賣員聽單( UV ),總共聽單多少次( PV )

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);
        FlinkHelp.setOffset(parameter, consumer);
        consumer.assignTimestampsAndWatermarks(
                WatermarkStrategy.<String>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String element, long recordTimestamp) {
                                String locTime = "";
                                try {
                                    Map<String, Object> map = Json2Others.json2map(element);
                                    locTime = map.get("locTime").toString();
                                } catch (IOException e) {
                                }
                                LocalDateTime startDateTime =
                                        LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                                long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
                                return milli;
                            }
                        }).withIdleness(Duration.ofSeconds(1)));

        env.addSource(consumer).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return true;
            }
        }).windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
                .allowedLateness(Time.minutes(1))
//              .trigger(CountTrigger.of(5))// 其實(shí)多個(gè) trigger 就是下一個(gè) trigger 覆蓋上一個(gè) trigger
                //用 event time 可能會導(dǎo)致 window 延遲觸發(fā),最好的解決辦法是在 processingTime 的基礎(chǔ)上添加對窗口的判斷
                // watermark 不會回退,所以如果消息早到的話( 亂序了,該相對來說晚到的消息早到了),可能會導(dǎo)致窗口延遲觸發(fā)
                // 夸張一點(diǎn)的話,窗口不觸發(fā)了,直到有大于等于 watermark + triggerTime 的消息到達(dá)
                // ContinuousProcessingTimeTrigger 一樣
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
                //追歷史數(shù)據(jù)的時(shí)候會有問題,可能歷史數(shù)據(jù)不足 10s 就全部消費(fèi)完畢,導(dǎo)致窗口不會被觸發(fā)而被跳過,消費(fèi)同理
//              .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                //處理完畢后將 window state 中的數(shù)據(jù)清除掉
                // 其實(shí)完全可以通過自定義 trigger 來達(dá)到 clear windowState 的目的 (Purge)
                .evictor(TimeEvictor.of(Time.seconds(0), true))
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    private JedisCluster jedisCluster;
                    private MapState<String, String> courierInfoMapState;
                    private MapStateDescriptor<String, String> mapStateDescriptor;
                    private MapStateDescriptor<String, Long> mapStateUVDescriptor;
                    private MapState<String, Long> courierInfoUVMapState;
                    private MapStateDescriptor<String, Long> mapStatePVDescriptor;
                    private MapState<String, Long> courierInfoPVMapState;
                    private String beforeDay = "";
                    private String currentDay = "";

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        StateTtlConfig ttlConfig = StateTtlConfig
                                .newBuilder(org.apache.flink.api.common.time.Time.hours(25))
                                //default,不支持 eventTime 1.12.0
                                .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                                .cleanupInRocksdbCompactFilter(1000)
                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                .build();

                        mapStateDescriptor =
                                new MapStateDescriptor<String, String>("courierInfos", TypeInformation.of(String.class), TypeInformation.of(String.class));
                        mapStateDescriptor.enableTimeToLive(ttlConfig);
                        courierInfoMapState = getRuntimeContext().getMapState(mapStateDescriptor);

                        mapStateUVDescriptor =
                                new MapStateDescriptor<String, Long>("courierUVStateDesc", TypeInformation.of(String.class), TypeInformation.of(Long.class));
                        mapStateUVDescriptor.enableTimeToLive(ttlConfig);
                        courierInfoUVMapState = getRuntimeContext().getMapState(mapStateUVDescriptor);

                        mapStatePVDescriptor =
                                new MapStateDescriptor<String, Long>("courierPVStateDesc", TypeInformation.of(String.class), TypeInformation.of(Long.class));
                        mapStatePVDescriptor.enableTimeToLive(ttlConfig);
                        courierInfoPVMapState = getRuntimeContext().getMapState(mapStatePVDescriptor);


                        jedisCluster = RedisUtil.getJedisCluster(redisHp);
                    }

                    @Override
                    public void close() throws Exception {
                        RedisUtil.closeConn(jedisCluster);
                    }

                    @Override
                    public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        Iterator<String> iterator = elements.iterator();
                        TimeWindow window = context.window();
                        System.out.println(" window = "
                                + DateUtils.millisecondsToDateStr(window.getStart(), "yyyy-MM-dd HH:mm:ss")
                                + "-" + DateUtils.millisecondsToDateStr(window.getEnd(), "yyyy-MM-dd HH:mm:ss"));
                        while (iterator.hasNext()) {
                            Map<String, Object> map = Json2Others.json2map(iterator.next());
                            String courierId = map.get("courierId").toString();
                            String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
                            if (courierInfoPVMapState.contains(day)) {
                                courierInfoPVMapState.put(day, courierInfoPVMapState.get(day) + 1);
                            } else {
                                courierInfoPVMapState.put(day, 1L);
                            }
                            if (!courierInfoMapState.contains(day + "-" + courierId)) {
                                if (courierInfoUVMapState.contains(day)) {
                                    courierInfoUVMapState.put(day, courierInfoUVMapState.get(day) + 1);
                                } else {
                                    courierInfoUVMapState.put(day, 1L);
                                }
                                courierInfoMapState.put(day + "-" + courierId, "");
                            }
                            currentDay = day;
                        }

                        HashMap<String, String> map = new HashMap<String, String>();
                        if (currentDay.equals(beforeDay)) {
                            map.put(currentDay + "-pv", courierInfoPVMapState.get(currentDay).toString());
                            map.put(currentDay + "-uv", courierInfoUVMapState.get(currentDay).toString());

                        } else {
                            map.put(currentDay + "-pv", courierInfoPVMapState.get(currentDay).toString());
                            map.put(currentDay + "-uv", courierInfoUVMapState.get(currentDay).toString());
                            //超過25個(gè)小時(shí),昨天的數(shù)據(jù)就不對了
                            if (!beforeDay.isEmpty()) {
                                map.put(beforeDay + "-pv", courierInfoPVMapState.get(beforeDay).toString());
                                map.put(beforeDay + "-uv", courierInfoUVMapState.get(beforeDay).toString());
                            }
                        }
                        map.forEach((k, v) -> {
                            System.out.println(k + ":" + v);
                        });
                        jedisCluster.hmset("test_courier_puv:", map);
                        jedisCluster.expire("test_courier_puv:", 3 * 24 * 60 * 60);

                        beforeDay = currentDay;

                    }
                });

結(jié)果樣例

20201227-pv:1111111
20201227-uv:111

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 基于flink-1.8.1 本文轉(zhuǎn)載自一文搞懂Flink內(nèi)部的Exactly Once和At Least Once...
    李小李的路閱讀 33,275評論 6 57
  • 轉(zhuǎn)自:http://m.itdecent.cn/p/4d31d6cddc99 如何理解flink中state...
    Jimmy2019閱讀 601評論 0 1
  • 久違的晴天,家長會。 家長大會開好到教室時(shí),離放學(xué)已經(jīng)沒多少時(shí)間了。班主任說已經(jīng)安排了三個(gè)家長分享經(jīng)驗(yàn)。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,868評論 16 22
  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友。感恩相遇!感恩不離不棄。 中午開了第一次的黨會,身份的轉(zhuǎn)變要...
    余生動聽閱讀 10,918評論 0 11
  • 可愛進(jìn)取,孤獨(dú)成精。努力飛翔,天堂翱翔。戰(zhàn)爭美好,孤獨(dú)進(jìn)取。膽大飛翔,成就輝煌。努力進(jìn)取,遙望,和諧家園。可愛游走...
    趙原野閱讀 3,549評論 1 1

友情鏈接更多精彩內(nèi)容