专栏首页SmartSiFlink 窗口剔除器 Evictor

Flink 窗口剔除器 Evictor

1. 简介

除了 WindowAssignerTrigger 之外,Flink 的窗口模型还允许指定一个可选的 Evictor。Evictor 提供了在使用 WindowFunction 之前或者之后从窗口中删除元素的能力。为此,Evictor 接口提供了两个方法:

public interface Evictor<T, W extends Window> extends Serializable {
  // 可选的删除元素,在窗口函数之前调用
  void evictBefore(Iterable<TimestampedValue<T>> elements,
      int size, W window, EvictorContext evictorContext);

  // 可选的删除元素,在窗口函数之后调用
  void evictAfter(Iterable<TimestampedValue<T>> elements,
      int size, W window, EvictorContext evictorContext);

  interface EvictorContext {
      // 当前处理时间
      long getCurrentProcessingTime();
      MetricGroup getMetricGroup();
      // 当前Watermark
      long getCurrentWatermark();
  }
}

evictBefore() 用于在使用窗口函数之前从窗口中删除元素,而 evictAfter() 用于在使用窗口函数之后从窗口中删除元素。

2. 内置 Evictor

Flink 本身内置实现了三种 Evictor,分别是 CountEvictor、DeltaEvictor 和 TimeEvictor。

默认情况下,所有内置的 Evictors 都是在触发窗口函数之前使用。

2.1 CountEvictor

CountEvictor 用于在窗口中保留用户指定数量的元素。如果窗口中的元素超过用户指定的阈值,会从窗口头部开始删除剩余元素。

2.1.1 内部实现

CountEvictor 需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,以实现调用窗口函数之前和之后的窗口元素删除逻辑:

private final boolean doEvictAfter;
@Override
public void evictBefore(
        Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
    if (!doEvictAfter) {
        evict(elements, size, ctx);
    }
}

@Override
public void evictAfter(
        Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
    if (doEvictAfter) {
        evict(elements, size, ctx);
    }
}

doEvictAfter 是在构造 CountEvictor 时传入的一个变量,用以指定是否在使用窗口函数之后对元素进行删除操作。如果不指定,默认为 false,即在使用窗口函数之后不对元素进行删除。从上面代码中可以看出,不论是 evictBefore,还是 evictAfter,最后都会调用 evict() 方法:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (size <= maxCount) {
        return;
    } else {
        int evictedCount = 0;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                iterator.hasNext(); ) {
            iterator.next();
            evictedCount++;
            if (evictedCount > size - maxCount) {
                break;
            } else {
                iterator.remove();
            }
        }
    }
}

从上面可以看出,如果当前窗口元素个数小于等于用户指定的阈值则不做删除操作,否则会从窗口迭代器的头部开始删除多余的元素(size - maxCount)。

2.1.2 如何使用

如下代码所示,在触发使用窗口函数之前保留2个元素:

DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 在触发使用窗口函数之前保留2个元素
    .evictor(CountEvictor.of(2))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅CountEvictorExample

假如输入流如下所示,我们一起看看输出效果:

A,1,2021-08-30 12:07:20
A,2,2021-08-30 12:07:22
A,3,2021-08-30 12:07:33
A,4,2021-08-30 12:07:44
A,5,2021-08-30 12:07:55
A,6,2021-08-30 12:08:34
A,7,2021-08-30 12:08:45
A,8,2021-08-30 12:08:56
A,9,2021-08-30 12:09:30
A,10,2021-08-30 12:09:35

2.2 DeltaEvictor

根据用户自定的 DeltaFunction 函数来计算窗口中最后一个元素与其余每个元素之间的差值,如果差值大于等于用户指定的阈值就会删除该元素。

2.2.1 内部实现

DeltaEvictor 与 CountEvictor 一样,都需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,只是最终调用的 evict() 函数的内部实现逻辑不一样:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
    // 窗口最后一个元素
    TimestampedValue<T> lastElement = Iterables.getLast(elements);
    // 遍历整个窗口,与每一个元素进行比较
    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {
        TimestampedValue<T> element = iterator.next();
        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())
                >= this.threshold) {
            iterator.remove();
        }
    }
}

deltaFunction 函数以及 threshold 变量是在构造函数中传入的。

首先获取窗口中的最后一个元素并遍历整个窗口,然后调用用户指定的 deltaFunction 计算与每一个元素的差值。如果差值大于等于用户自定的阈值就删除该元素。

2.2.2 如何使用

如下代码所示,在触发窗口函数计算之前剔除与最后一个元素值差大于等于1的元素:

DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 剔除与最后一个元素值差大于1的元素
    .evictor(DeltaEvictor.of(1, new DeltaFunction<Tuple2<String, Long>>() {
        @Override
        public double getDelta(Tuple2<String, Long> oldDataPoint, Tuple2<String, Long> newDataPoint) {
            return oldDataPoint.f1 - newDataPoint.f1;
        }
    }))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅 DeltaEvictorExample

假如输入流如下所示,我们一起看看输出效果:

A,4,2021-08-30 12:07:20
A,1,2021-08-30 12:07:22
A,3,2021-08-30 12:07:33
A,6,2021-08-30 12:07:44
A,3,2021-08-30 12:07:55
A,6,2021-08-30 12:08:34
A,5,2021-08-30 12:08:45
A,1,2021-08-30 12:08:56
A,6,2021-08-30 12:09:30

2.3 TimeEvictor

以毫秒为单位的时间间隔 windowSize 作为参数,在窗口所有元素中找到最大时间戳 max_ts 并删除所有时间戳小于 max_ts - windowSize 的元素。我们可以理解为只保留最新 windowSize 毫秒内的元素。

2.3.1 内部实现

TimeEvictor 与 DeltaEvictor、CountEvictor 一样,都需要实现 Evictor 接口的 evictBefore 和 evictAfter 方法,只是最终调用的 evict() 函数的内部实现逻辑不一样:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (!hasTimestamp(elements)) {
        return;
    }
    // 最大时间戳
    long currentTime = getMaxTimestamp(elements);
    // windowSize 保留元素的时间间隔
    long evictCutoff = currentTime - windowSize;
    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
            iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();
        if (record.getTimestamp() <= evictCutoff) {
            iterator.remove();
        }
    }
}
// 第一个元素是否有时间戳
private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
    Iterator<TimestampedValue<Object>> it = elements.iterator();
    if (it.hasNext()) {
        return it.next().hasTimestamp();
    }
    return false;
}
// 窗口中最大时间戳
private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
    long currentTime = Long.MIN_VALUE;
    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
            iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();
        currentTime = Math.max(currentTime, record.getTimestamp());
    }
    return currentTime;
}

首先获取当前窗口中最大的时间戳,减去用户指定时间间隔 windowSize,得到一个 evictCutoff,然后遍历窗口全部元素,删除时间戳小于等于 evictCutoff 的元素。

2.3.2 如何使用

如下代码所示,在触发窗口函数计算之前只保留最近10s内的元素:

DataStream<Tuple2<String, Long>> result = stream
    // 格式转换
    .map(tuple -> Tuple2.of(tuple.f0, tuple.f1)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    // 根据key分组
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    // 处理时间滚动窗口 滚动大小60s
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 保留窗口中最近10s内的元素
    .evictor(TimeEvictor.of(Time.seconds(10)))
    // 窗口函数
    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
            // Watermark
            long watermark = context.currentWatermark();
            String watermarkTime = DateUtil.timeStamp2Date(watermark);
            // 窗口开始与结束时间
            TimeWindow window = context.window();
            String start = DateUtil.timeStamp2Date(window.getStart());
            String end = DateUtil.timeStamp2Date(window.getEnd());
            // 窗口中元素
            List<Long> values = Lists.newArrayList();
            for (Tuple2<String, Long> element : elements) {
                values.add(element.f1);
            }
            LOG.info("[Process] Key: {}, Watermark: [{}|{}], Window: [{}|{}, {}|{}], Values: {}",
                    key, watermarkTime, watermark, start, window.getStart(), end, window.getEnd(), values
            );
        }
    });

完整代码请查阅TimeEvictorExample

假如输入流如下所示,我们一起看看输出效果:

A,1,2021-08-30 12:07:20
A,2,2021-08-30 12:07:22
A,3,2021-08-30 12:07:44
A,4,2021-08-30 12:07:55
A,5,2021-08-30 12:07:54
A,6,2021-08-30 12:08:34
A,7,2021-08-30 12:08:45
A,8,2021-08-30 12:08:56
A,9,2021-08-30 12:09:30

欢迎关注我的公众号和博客:

相关推荐:

参考:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink学习笔记

    流式计算是大数据计算的痛点,第1代实时计算引擎Storm对Exactly Once 语义和窗口支持较弱,使用的场景有限且无法支持高吞吐计算;Spark Stre...

    数据社
  • 《从0到1学习Flink》—— 介绍Flink中的Stream Windows

    目前有许多数据分析的场景从批处理到流处理的演变, 虽然可以将批处理作为流处理的特殊情况来处理,但是分析无穷集的流数据通常需要思维方式的转变并且具有其自己的术语(...

    zhisheng
  • Flink1.4 窗口触发器与Evictors

    触发器(Trigger)决定了窗口(请参阅窗口概述)博文)什么时候使用窗口函数处理窗口内元素。每个窗口分配器都带有一个默认的触发器。如果默认触发器不能满足你的要...

    smartsi
  • Flink 窗口之Window机制

    数据分析场景见证了批处理到流处理的演变过程。尽管批处理可以作为流处理的一种特殊情况来处理,但分析永无止境的流数据通常需要转变一种思维方式,并使用它自己的专门术语...

    smartsi
  • 聊聊flink的Evictors

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/wind...

    codecraft
  • 一文搞懂 Flink window 元素的顺序问题

    大概的意思就是说,evictor 尽管可以从窗口开始时移除元素但是并不保证,这个元素是第一个或最后一个到达窗口的。 why?

    shengjk1
  • 聊聊flink的Evictors

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/wind...

    codecraft
  • Flink 滑动窗口优化

    Flink 的窗口功能非常强大,因为要支持各种各样的窗口,像滑动窗口和滚动窗口这样的对齐窗口,像会话窗口这样的非对齐窗口,复杂度也会比较高。其中在超长滑动窗口的...

    有赞coder
  • 一网打尽Flink中的时间、窗口和流Join

    首先,我们会学习如何定义时间属性,时间戳和水位线。然后我们将会学习底层操作process function,它可以让我们访问时间戳和水位线,以及注册定时器事件。...

    王知无-import_bigdata
  • Flink 流计算算子函数详解

    Flink 的算子函数和spark的大致一样,但是由于其是流处理的模式,所有还要有需要加强理解的地方

    黑白格
  • Flink 彻底理解 window(窗口)

    Window 是处理无限流的核心。Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层的引擎是一个流式引擎,在上面实现了流处...

    kk大数据
  • Flink基础:时间和水印

    本篇终于到了Flink的核心内容:时间与水印。最初接触这个概念是在Spark Structured Streaming中,一直无法理解水印的作用。直到使用了一段...

    用户1154259
  • 写给大忙人看的 Flink Window原理

    Window 可以说是 Flink 中必不可少的 operator 之一,在很多场合都有很非凡的表现。今天呢,我们就一起来看一下 window 是如何实现的。

    shengjk1
  • Flink 计算 PV UV

    使用 flink 很长一段时间了,突然发现竟然没有计算过 pv uv,这可是 flink 常见的计算场景了,面试时也是常问题之一。故自己想了一个场景来计算一下。...

    shengjk1
  • 基于 Flink 的动态欺诈检测系统(下)

    在本系列的前两篇文章中,我们描述了如何基于动态更新配置(欺诈检测规则)来实现灵活的数据流分区,以及如何利用 Flink 的 Broadcast 机制在运行时在相...

    zhisheng
  • Flink在实时在实时计算平台和实时数仓中的企业级应用小结

    在过去的这几年时间里,以 Storm、Spark、Flink 为代表的实时计算技术接踵而至。2019 年阿里巴巴内部 Flink 正式开源。整个实时计算领域风起...

    王知无-import_bigdata
  • Flink实战(七) - Time & Windows编程

    掌握Flink中三种常用的Time处理方式,掌握Flink中滚动窗口以及滑动窗口的使用,了解Flink中的watermark。

    JavaEdge
  • Flink架构、原理与部署测试

    Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。

    Florian
  • Flink重点难点:维表关联理论和Join实战

    数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或Join。Flink DataStream API中内置有两个可以根据时间条件对数据...

    王知无-import_bigdata

扫码关注云+社区

领取腾讯云代金券