专栏首页码字搬砖Flink 自定义 countAndTimeTrigger

Flink 自定义 countAndTimeTrigger

1.背景

项目中需要自定义 trigger,需要基于两个条件:1. count 即 msg 的个数,当个数大于某个数时触发窗口 2. time 即每个固定的时间触发窗口

2.代码样例

/**
 * @author shengjk1
 * @date 2019/9/4
 */
public class CountAndTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;
	
	private final long maxCount;
	
	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
	
	public CountAndTimeTrigger(long maxCount) {
		super();
		this.maxCount = maxCount;
	}
	
	
	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		
		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
		count.add(1L);
		if (count.get() >= maxCount) {
			count.clear();
			return TriggerResult.FIRE_AND_PURGE;
		}
		return TriggerResult.CONTINUE;
	}
	
	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.FIRE;
	}
	
	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}
	
	@Override
	public boolean canMerge() {
		return false;
	}
	
	@Override
	public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
		ctx.mergePartitionedState(stateDesc);
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
		}
	}
	
	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.getPartitionedState(stateDesc).clear();
		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
	}
	
	
	private static class Sum implements ReduceFunction<Long> {
		private static final long serialVersionUID = 1L;
		
		@Override
		public Long reduce(Long value1, Long value2) throws Exception {
			return value1 + value2;
		}
	}
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • java执行 string code

    工作当中需要执行 string 类型的 java code ,之前有同事用过 mvel ,调研之后发现太多于重量级了,我就想安安静静的执行一段 java 代码 ...

    shengjk1
  • 写给大忙人看的 Flink Window原理

    Window 可以说是 Flink 中必不可少的 operator 之一,在很多场合都有很非凡的表现。今天呢,我们就一起来看一下 window 是如何实现的。

    shengjk1
  • JVM内存模型之直接内存

    直接内存 又称堆外内存,也就是说这不是jvm运行时数据区的一部分,也不是java虚拟机规范中定义的内存区域,但这部分也会被频繁的使用,而且也可能导致OOM。

    shengjk1
  • 聊聊artemis的CriticalAnalyzerPolicy

    activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemi...

    codecraft
  • 聊聊artemis的CriticalAnalyzerPolicy

    activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemi...

    codecraft
  • Spring注解驱动

    @ComponentScan:用于对Component进行扫描,里面有包含与排除规则

    晚上没宵夜
  • 聊聊flink的Triggers

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/wind...

    codecraft
  • 聊聊flink的Triggers

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/wind...

    codecraft
  • okhttp拦截器的使用------修改baseurl

    有的时候做项目的时候会遇到调用接口的url和之前接口定义的url不一样,很可能连retrofit已经设置好的baseurl都要换,但是retrofit并没有提供...

    提莫队长
  • LocalDateTime反序列化,LocalDateTime格式化

    yunlgonn

扫码关注云+社区

领取腾讯云代金券