前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 窗口剔除器 Evictor

Flink 窗口剔除器 Evictor

作者头像
smartsi
发布2021-09-08 11:13:29
2.3K0
发布2021-09-08 11:13:29
举报
文章被收录于专栏:SmartSi

1. 简介

除了 WindowAssignerTrigger 之外,Flink 的窗口模型还允许指定一个可选的 Evictor。Evictor 提供了在使用 WindowFunction 之前或者之后从窗口中删除元素的能力。为此,Evictor 接口提供了两个方法:

代码语言:javascript
复制
public interface Evictor<T, W extends Window> extends Serializable {
  // 可选的删除元素,在窗口函数之前调用
  void evictBefore(Iterable<TimestampedValue<T>> elements,
      int size, W window, EvictorContext evictorContext);

  // 可选的删除元素,在窗口函数之后调用
  void evictAfter(Iterable<TimestampedValue<T>> elements,
      int size, W window, EvictorContext evictorContext);

  interface EvictorContext {
      // 当前处理时间
      long getCurrentProcessingTime();
      MetricGroup getMetricGroup();
      // 当前Watermark
      long getCurrentWatermark();
  }
}

evictBefore() 用于在使用窗口函数之前从窗口中删除元素,而 evictAfter() 用于在使用窗口函数之后从窗口中删除元素。

2. 内置 Evictor

Flink 本身内置实现了三种 Evictor,分别是 CountEvictor、DeltaEvictor 和 TimeEvictor。

默认情况下,所有内置的 Evictors 都是在触发窗口函数之前使用。

2.1 CountEvictor

CountEvictor 用于在窗口中保留用户指定数量的元素。如果窗口中的元素超过用户指定的阈值,会从窗口头部开始删除剩余元素。

2.1.1 内部实现

CountEvictor 需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,以实现调用窗口函数之前和之后的窗口元素删除逻辑:

代码语言:javascript
复制
private final boolean doEvictAfter;
@Override
public void evictBefore(
        Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
    if (!doEvictAfter) {
        evict(elements, size, ctx);
    }
}

@Override
public void evictAfter(
        Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
    if (doEvictAfter) {
        evict(elements, size, ctx);
    }
}

doEvictAfter 是在构造 CountEvictor 时传入的一个变量,用以指定是否在使用窗口函数之后对元素进行删除操作。如果不指定,默认为 false,即在使用窗口函数之后不对元素进行删除。从上面代码中可以看出,不论是 evictBefore,还是 evictAfter,最后都会调用 evict() 方法:

代码语言:javascript
复制
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (size <= maxCount) {
        return;
    } else {
        int evictedCount = 0;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                iterator.hasNext(); ) {
            iterator.next();
            evictedCount++;
            if (evictedCount > size - maxCount) {
                break;
            } else {
                iterator.remove();
            }
        }
    }
}

从上面可以看出,如果当前窗口元素个数小于等于用户指定的阈值则不做删除操作,否则会从窗口迭代器的头部开始删除多余的元素(size - maxCount)。

2.1.2 如何使用

如下代码所示,在触发使用窗口函数之前保留2个元素:

代码语言:javascript
复制
DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 在触发使用窗口函数之前保留2个元素
    .evictor(CountEvictor.of(2))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅CountEvictorExample

假如输入流如下所示,我们一起看看输出效果:

代码语言:javascript
复制
A,1,2021-08-30 12:07:20
A,2,2021-08-30 12:07:22
A,3,2021-08-30 12:07:33
A,4,2021-08-30 12:07:44
A,5,2021-08-30 12:07:55
A,6,2021-08-30 12:08:34
A,7,2021-08-30 12:08:45
A,8,2021-08-30 12:08:56
A,9,2021-08-30 12:09:30
A,10,2021-08-30 12:09:35

2.2 DeltaEvictor

根据用户自定的 DeltaFunction 函数来计算窗口中最后一个元素与其余每个元素之间的差值,如果差值大于等于用户指定的阈值就会删除该元素。

2.2.1 内部实现

DeltaEvictor 与 CountEvictor 一样,都需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,只是最终调用的 evict() 函数的内部实现逻辑不一样:

代码语言:javascript
复制
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
    // 窗口最后一个元素
    TimestampedValue<T> lastElement = Iterables.getLast(elements);
    // 遍历整个窗口,与每一个元素进行比较
    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {
        TimestampedValue<T> element = iterator.next();
        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())
                >= this.threshold) {
            iterator.remove();
        }
    }
}

deltaFunction 函数以及 threshold 变量是在构造函数中传入的。

首先获取窗口中的最后一个元素并遍历整个窗口,然后调用用户指定的 deltaFunction 计算与每一个元素的差值。如果差值大于等于用户自定的阈值就删除该元素。

2.2.2 如何使用

如下代码所示,在触发窗口函数计算之前剔除与最后一个元素值差大于等于1的元素:

代码语言:javascript
复制
DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 剔除与最后一个元素值差大于1的元素
    .evictor(DeltaEvictor.of(1, new DeltaFunction<Tuple2<String, Long>>() {
        @Override
        public double getDelta(Tuple2<String, Long> oldDataPoint, Tuple2<String, Long> newDataPoint) {
            return oldDataPoint.f1 - newDataPoint.f1;
        }
    }))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅 DeltaEvictorExample

假如输入流如下所示,我们一起看看输出效果:

代码语言:javascript
复制
A,4,2021-08-30 12:07:20
A,1,2021-08-30 12:07:22
A,3,2021-08-30 12:07:33
A,6,2021-08-30 12:07:44
A,3,2021-08-30 12:07:55
A,6,2021-08-30 12:08:34
A,5,2021-08-30 12:08:45
A,1,2021-08-30 12:08:56
A,6,2021-08-30 12:09:30

2.3 TimeEvictor

以毫秒为单位的时间间隔 windowSize 作为参数,在窗口所有元素中找到最大时间戳 max_ts 并删除所有时间戳小于 max_ts - windowSize 的元素。我们可以理解为只保留最新 windowSize 毫秒内的元素。

2.3.1 内部实现

TimeEvictor 与 DeltaEvictor、CountEvictor 一样,都需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,只是最终调用的 evict() 函数的内部实现逻辑不一样:

代码语言:javascript
复制
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (!hasTimestamp(elements)) {
        return;
    }
    // 最大时间戳
    long currentTime = getMaxTimestamp(elements);
    // windowSize 保留元素的时间间隔
    long evictCutoff = currentTime - windowSize;
    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
            iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();
        if (record.getTimestamp() <= evictCutoff) {
            iterator.remove();
        }
    }
}
// 第一个元素是否有时间戳
private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
    Iterator<TimestampedValue<Object>> it = elements.iterator();
    if (it.hasNext()) {
        return it.next().hasTimestamp();
    }
    return false;
}
// 窗口中最大时间戳
private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
    long currentTime = Long.MIN_VALUE;
    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
            iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();
        currentTime = Math.max(currentTime, record.getTimestamp());
    }
    return currentTime;
}

首先获取当前窗口中最大的时间戳,减去用户指定时间间隔 windowSize,得到一个 evictCutoff,然后遍历窗口全部元素,删除时间戳小于等于 evictCutoff 的元素。

2.3.2 如何使用

如下代码所示,在触发窗口函数计算之前只保留最近10s内的元素:

代码语言:javascript
复制
DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 保留窗口中最近10s内的元素
    .evictor(TimeEvictor.of(Time.seconds(10)))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅TimeEvictorExample

假如输入流如下所示,我们一起看看输出效果:

代码语言:javascript
复制
A,1,2021-08-30 12:07:20
A,2,2021-08-30 12:07:22
A,3,2021-08-30 12:07:44
A,4,2021-08-30 12:07:55
A,5,2021-08-30 12:07:54
A,6,2021-08-30 12:08:34
A,7,2021-08-30 12:08:45
A,8,2021-08-30 12:08:56
A,9,2021-08-30 12:09:30

欢迎关注我的公众号和博客:

相关推荐:

参考:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-052,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 简介
  • 2. 内置 Evictor
    • 2.1 CountEvictor
      • 2.1.1 内部实现
      • 2.1.2 如何使用
    • 2.2 DeltaEvictor
      • 2.2.1 内部实现
      • 2.2.2 如何使用
    • 2.3 TimeEvictor
      • 2.3.1 内部实现
      • 2.3.2 如何使用
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档