前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 计算 PV UV

Flink 计算 PV UV

作者头像
shengjk1
发布2020-12-31 11:51:21
1.2K0
发布2020-12-31 11:51:21
举报
文章被收录于专栏:码字搬砖码字搬砖

前言

使用 flink 很长一段时间了,突然发现竟然没有计算过 pv uv,这可是 flink 常见的计算场景了,面试时也是常问题之一。故自己想了一个场景来计算一下。 基于 Flink 1.12

场景

外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。

kafka 中消息类型

代码语言:javascript
复制
{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}

locTime:事件发生的时间,courierId 外卖员id

计算一天有多少个外卖员听单( UV ),总共听单多少次( PV )

代码

代码语言:javascript
复制
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);
		FlinkHelp.setOffset(parameter, consumer);
		consumer.assignTimestampsAndWatermarks(
				WatermarkStrategy.<String>forMonotonousTimestamps()
						.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
							@Override
							public long extractTimestamp(String element, long recordTimestamp) {
								String locTime = "";
								try {
									Map<String, Object> map = Json2Others.json2map(element);
									locTime = map.get("locTime").toString();
								} catch (IOException e) {
								}
								LocalDateTime startDateTime =
										LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
								long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
								return milli;
							}
						}).withIdleness(Duration.ofSeconds(1)));

		env.addSource(consumer).filter(new FilterFunction<String>() {
			@Override
			public boolean filter(String value) throws Exception {
				return true;
			}
		}).windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
				.allowedLateness(Time.minutes(1))
//				.trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger
				//用 event time 可能会导致 window 延迟触发,最好的解决办法是在 processingTime 的基础上添加对窗口的判断
				// watermark 不会回退,所以如果消息早到的话( 乱序了,该相对来说晚到的消息早到了),可能会导致窗口延迟触发
				// 夸张一点的话,窗口不触发了,直到有大于等于 watermark + triggerTime 的消息到达
				// ContinuousProcessingTimeTrigger 一样
				.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
				//追历史数据的时候会有问题,可能历史数据不足 10s 就全部消费完毕,导致窗口不会被触发而被跳过,消费同理
//				.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
				//处理完毕后将 window state 中的数据清除掉
				// 其实完全可以通过自定义 trigger 来达到 clear windowState 的目的 (Purge)
				.evictor(TimeEvictor.of(Time.seconds(0), true))
				.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
					private JedisCluster jedisCluster;
					private MapState<String, String> courierInfoMapState;
					private MapStateDescriptor<String, String> mapStateDescriptor;
					private MapStateDescriptor<String, Long> mapStateUVDescriptor;
					private MapState<String, Long> courierInfoUVMapState;
					private MapStateDescriptor<String, Long> mapStatePVDescriptor;
					private MapState<String, Long> courierInfoPVMapState;
					private String beforeDay = "";
					private String currentDay = "";

					@Override
					public void open(Configuration parameters) throws Exception {
						StateTtlConfig ttlConfig = StateTtlConfig
								.newBuilder(org.apache.flink.api.common.time.Time.hours(25))
								//default,不支持 eventTime 1.12.0
								.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
								.cleanupInRocksdbCompactFilter(1000)
								.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
								.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
								.build();

						mapStateDescriptor =
								new MapStateDescriptor<String, String>("courierInfos", TypeInformation.of(String.class), TypeInformation.of(String.class));
						mapStateDescriptor.enableTimeToLive(ttlConfig);
						courierInfoMapState = getRuntimeContext().getMapState(mapStateDescriptor);

						mapStateUVDescriptor =
								new MapStateDescriptor<String, Long>("courierUVStateDesc", TypeInformation.of(String.class), TypeInformation.of(Long.class));
						mapStateUVDescriptor.enableTimeToLive(ttlConfig);
						courierInfoUVMapState = getRuntimeContext().getMapState(mapStateUVDescriptor);

						mapStatePVDescriptor =
								new MapStateDescriptor<String, Long>("courierPVStateDesc", TypeInformation.of(String.class), TypeInformation.of(Long.class));
						mapStatePVDescriptor.enableTimeToLive(ttlConfig);
						courierInfoPVMapState = getRuntimeContext().getMapState(mapStatePVDescriptor);


						jedisCluster = RedisUtil.getJedisCluster(redisHp);
					}

					@Override
					public void close() throws Exception {
						RedisUtil.closeConn(jedisCluster);
					}

					@Override
					public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
						Iterator<String> iterator = elements.iterator();
						TimeWindow window = context.window();
						System.out.println(" window = "
								+ DateUtils.millisecondsToDateStr(window.getStart(), "yyyy-MM-dd HH:mm:ss")
								+ "-" + DateUtils.millisecondsToDateStr(window.getEnd(), "yyyy-MM-dd HH:mm:ss"));
						while (iterator.hasNext()) {
							Map<String, Object> map = Json2Others.json2map(iterator.next());
							String courierId = map.get("courierId").toString();
							String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
							if (courierInfoPVMapState.contains(day)) {
								courierInfoPVMapState.put(day, courierInfoPVMapState.get(day) + 1);
							} else {
								courierInfoPVMapState.put(day, 1L);
							}
							if (!courierInfoMapState.contains(day + "-" + courierId)) {
								if (courierInfoUVMapState.contains(day)) {
									courierInfoUVMapState.put(day, courierInfoUVMapState.get(day) + 1);
								} else {
									courierInfoUVMapState.put(day, 1L);
								}
								courierInfoMapState.put(day + "-" + courierId, "");
							}
							currentDay = day;
						}

						HashMap<String, String> map = new HashMap<String, String>();
						if (currentDay.equals(beforeDay)) {
							map.put(currentDay + "-pv", courierInfoPVMapState.get(currentDay).toString());
							map.put(currentDay + "-uv", courierInfoUVMapState.get(currentDay).toString());

						} else {
							map.put(currentDay + "-pv", courierInfoPVMapState.get(currentDay).toString());
							map.put(currentDay + "-uv", courierInfoUVMapState.get(currentDay).toString());
							//超过25个小时,昨天的数据就不对了
							if (!beforeDay.isEmpty()) {
								map.put(beforeDay + "-pv", courierInfoPVMapState.get(beforeDay).toString());
								map.put(beforeDay + "-uv", courierInfoUVMapState.get(beforeDay).toString());
							}
						}
						map.forEach((k, v) -> {
							System.out.println(k + ":" + v);
						});
						jedisCluster.hmset("test_courier_puv:", map);
						jedisCluster.expire("test_courier_puv:", 3 * 24 * 60 * 60);

						beforeDay = currentDay;

					}
				});

结果样例

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-12-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 场景
  • 代码
  • 结果样例
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档