前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 自定义 countAndTimeTrigger

Flink 自定义 countAndTimeTrigger

作者头像
shengjk1
发布2020-05-21 10:38:34
9670
发布2020-05-21 10:38:34
举报
文章被收录于专栏:码字搬砖码字搬砖

1.背景

项目中需要自定义 trigger,需要基于两个条件:1. count 即 msg 的个数,当个数大于某个数时触发窗口 2. time 即每个固定的时间触发窗口

2.代码样例

代码语言:javascript
复制
/**
 * @author shengjk1
 * @date 2019/9/4
 */
public class CountAndTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;
	
	private final long maxCount;
	
	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
	
	public CountAndTimeTrigger(long maxCount) {
		super();
		this.maxCount = maxCount;
	}
	
	
	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		
		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
		count.add(1L);
		if (count.get() >= maxCount) {
			count.clear();
			return TriggerResult.FIRE_AND_PURGE;
		}
		return TriggerResult.CONTINUE;
	}
	
	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.FIRE;
	}
	
	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}
	
	@Override
	public boolean canMerge() {
		return false;
	}
	
	@Override
	public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
		ctx.mergePartitionedState(stateDesc);
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
		}
	}
	
	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.getPartitionedState(stateDesc).clear();
		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
	}
	
	
	private static class Sum implements ReduceFunction<Long> {
		private static final long serialVersionUID = 1L;
		
		@Override
		public Long reduce(Long value1, Long value2) throws Exception {
			return value1 + value2;
		}
	}
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.背景
  • 2.代码样例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档