前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 计算 TopN

Flink 计算 TopN

作者头像
shengjk1
发布2020-12-31 14:28:58
8570
发布2020-12-31 14:28:58
举报
文章被收录于专栏:码字搬砖码字搬砖

前言

使用 flink 很长一段时间了,突然发现竟然没有计算过 topN,这可是 flink 常见的计算场景了, 故自己想了一个场景来计算一下。 基于 Flink 1.12

场景

外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。

kafka 中消息类型

代码语言:javascript
复制
{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}

locTime:事件发生的时间,courierId 外卖员id

计算一天中 听单次数 top5 的外卖员

代码

代码语言:javascript
复制
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);
		FlinkHelp.setOffset(parameter, consumer);
		consumer.assignTimestampsAndWatermarks(
				WatermarkStrategy.<String>forMonotonousTimestamps()
						.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
							@Override
							public long extractTimestamp(String element, long recordTimestamp) {
								String locTime = "";
								try {
									Map<String, Object> map = Json2Others.json2map(element);
									locTime = map.get("locTime").toString();
								} catch (IOException e) {
								}
								LocalDateTime startDateTime =
										LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
								long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
								return milli;
							}
						}).withIdleness(Duration.ofSeconds(1)));

		SingleOutputStreamOperator<CourierListenInfos> process = env.addSource(consumer).filter(new FilterFunction<String>() {
			@Override
			public boolean filter(String value) throws Exception {
				return true;
			}
		}).keyBy(new KeySelector<String, String>() {
			@Override
			public String getKey(String value) throws Exception {
				Map<String, Object> map = Json2Others.json2map(value);
				String courierId = map.get("courierId").toString();
				String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
				return day + "-" + courierId;
			}
		}).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
				.allowedLateness(Time.minutes(1))
//				.trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger
				.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
				//追历史数据的时候会有问题
//				.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
				//处理完毕后将 window state 中的数据清除掉
				.evictor(TimeEvictor.of(Time.seconds(0), true))
				.process(new ProcessWindowFunction<String, CourierListenInfos, String, TimeWindow>() {
					private JedisCluster jedisCluster;
					private ReducingStateDescriptor<Long> reducingStateDescriptor;
					private ReducingState<Long> listenCount;

					@Override
					public void open(Configuration parameters) throws Exception {
						StateTtlConfig ttlConfig = StateTtlConfig
								.newBuilder(org.apache.flink.api.common.time.Time.hours(25))
								//default,不支持 eventTime 1.12.0
								.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
								.cleanupInRocksdbCompactFilter(1000)
								.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
								.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
								.build();

						reducingStateDescriptor =
								new ReducingStateDescriptor<Long>("listenCount", new Sum(), TypeInformation.of(Long.class));
						reducingStateDescriptor.enableTimeToLive(ttlConfig);
						listenCount = getRuntimeContext().getReducingState(reducingStateDescriptor);

						jedisCluster = RedisUtil.getJedisCluster(redisHp);
					}

					@Override
					public void close() throws Exception {
						RedisUtil.closeConn(jedisCluster);
					}

					@Override
					public void process(String s, Context context, Iterable<String> elements, Collector<CourierListenInfos> out) throws Exception {
						Iterator<String> iterator = elements.iterator();

						long l = context.currentProcessingTime();
						long watermark = context.currentWatermark();
						TimeWindow window = context.window();

						String endDay = DateUtils.millisecondsToDateStr(window.getEnd(), "yyyyMMdd HH:mm:ss");
						String startDay = DateUtils.millisecondsToDateStr(window.getStart(), "yyyyMMdd HH:mm:ss");

						System.out.println("currentProcessingTime:" + l + " watermark:" + watermark + " windowTime:" + startDay + "-" + endDay);

						while (iterator.hasNext()) {
							iterator.next();
							listenCount.add(1L);
						}

						iterator = elements.iterator();
						Map<String, Object> map = Json2Others.json2map(iterator.next());
						String courierId = map.get("courierId").toString();
						String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
						out.collect(new CourierListenInfos(day, courierId, listenCount.get()));
					}
				});

		process.keyBy(new KeySelector<CourierListenInfos, String>() {
			@Override
			public String getKey(CourierListenInfos value) throws Exception {
				return value.getDay();
			}
		}).process(new KeyedProcessFunction<String, CourierListenInfos, String>() {
			private JedisCluster jedisCluster;
			private MapStateDescriptor<String, Long> mapStateCountDescriptor;
			private MapState<String, Long> courierInfoCountMapState;
			private boolean mucalc = false;

			@Override
			public void open(Configuration parameters) throws Exception {
				StateTtlConfig ttlConfig = StateTtlConfig
						.newBuilder(org.apache.flink.api.common.time.Time.hours(25))
						//default,不支持 eventTime 1.12.0
						.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
						.cleanupInRocksdbCompactFilter(1000)
						.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
						.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
						.build();

				mapStateCountDescriptor =
						new MapStateDescriptor<String, Long>("courierInfoCountMapState", TypeInformation.of(String.class), TypeInformation.of(Long.class));
				mapStateCountDescriptor.enableTimeToLive(ttlConfig);
				courierInfoCountMapState = getRuntimeContext().getMapState(mapStateCountDescriptor);

				jedisCluster = RedisUtil.getJedisCluster(redisHp);
			}

			@Override
			public void close() throws Exception {
				RedisUtil.closeConn(jedisCluster);
			}

			@Override
			public void processElement(CourierListenInfos value, Context ctx, Collector<String> out) throws Exception {
				courierInfoCountMapState.put(value.getDay() + "#" + value.getCourierId(), value.getListenCount());
//				System.out.println("ctx.timerService().currentWatermark() = " + DateUtils.millisecondsToDateStr(ctx.timerService().currentWatermark(), "yyyyMMdd HH:mm:ss"));
//				System.out.println("ctx.timestamp() = " + DateUtils.millisecondsToDateStr(ctx.timestamp(), "yyyyMMdd HH:mm:ss"));
				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() / 1000 + 1000);
			}

			@Override
			public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
				String day = ctx.getCurrentKey();
				PriorityQueue<CourierListenInfos> courierListenInfos = new PriorityQueue<>(new Comparator<CourierListenInfos>() {
					@Override
					public int compare(CourierListenInfos o1, CourierListenInfos o2) {
						return (int) (o1.listenCount - o2.listenCount);
					}
				});

				Iterable<Map.Entry<String, Long>> entries = courierInfoCountMapState.entries();
				for (Map.Entry<String, Long> entry : entries) {
//					System.out.println("entry.getKey() " + entry.getKey());
					String[] split = entry.getKey().split("#", -1);
					courierListenInfos.offer(new CourierListenInfos(split[0], split[1], entry.getValue()));
					if (courierListenInfos.size() > 5) {
						courierListenInfos.poll();
					}
				}

				courierInfoCountMapState.clear();
				String tops = "";
				int size = courierListenInfos.size();
				for (int i = 0; i < size; i++) {
					CourierListenInfos courierListenInfos1 = courierListenInfos.poll();
					System.out.println("courierListenInfos1 " + courierListenInfos1);
					courierInfoCountMapState.put(courierListenInfos1.getDay() + "#" + courierListenInfos1.getCourierId(), courierListenInfos1.listenCount);
					tops = tops + courierListenInfos1.courierId + "#" + courierListenInfos1.listenCount;
					if (i != size - 1) {
						tops += ",";
					}
				}
//				System.out.println("courierListenInfos.poll() = " + tops);
				jedisCluster.hset("test_courier_tops", day + "-top5", tops);
				System.out.println("============");
			}
		}).setParallelism(1);

结果样例

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-12-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 场景
  • 代码
  • 结果样例
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档