前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的Sliding Window

聊聊flink的Sliding Window

原创
作者头像
code4it
发布2019-01-03 10:40:40
1.5K0
发布2019-01-03 10:40:40
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的Sliding Window

SlidingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java

代码语言:javascript
复制
@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
​
    private final long size;
​
    private final long slide;
​
    private final long offset;
​
    protected SlidingEventTimeWindows(long size, long slide, long offset) {
        if (offset < 0 || offset >= slide || size <= 0) {
            throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
        }
​
        this.size = size;
        this.slide = slide;
        this.offset = offset;
    }
​
    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                start > timestamp - size;
                start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
​
    public long getSize() {
        return size;
    }
​
    public long getSlide() {
        return slide;
    }
​
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
​
    @Override
    public String toString() {
        return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
    }
​
    public static SlidingEventTimeWindows of(Time size, Time slide) {
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }
​
    public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
            offset.toMilliseconds() % slide.toMilliseconds());
    }
​
    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }
​
    @Override
    public boolean isEventTime() {
        return true;
    }
}
  • SlidingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
  • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart,然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为true
  • SlidingEventTimeWindows提供了of静态工厂方法,可以指定size、slide及offset参数,它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

SlidingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java

代码语言:javascript
复制
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
​
    private final long size;
​
    private final long offset;
​
    private final long slide;
​
    private SlidingProcessingTimeWindows(long size, long slide, long offset) {
        if (offset < 0 || offset >= slide || size <= 0) {
            throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
        }
​
        this.size = size;
        this.slide = slide;
        this.offset = offset;
    }
​
    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        timestamp = context.getCurrentProcessingTime();
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart;
            start > timestamp - size;
            start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    }
​
    public long getSize() {
        return size;
    }
​
    public long getSlide() {
        return slide;
    }
​
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
​
    @Override
    public String toString() {
        return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
    }
​
    public static SlidingProcessingTimeWindows of(Time size, Time slide) {
        return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }
​
    public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
        return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
            offset.toMilliseconds() % slide.toMilliseconds());
    }
​
    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }
​
    @Override
    public boolean isEventTime() {
        return false;
    }
}
  • SlidingProcessingTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
  • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart(与SlidingEventTimeWindows不同的是SlidingProcessingTimeWindows的这个方法里头使用context.getCurrentProcessingTime()值重置了timestamp),然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为false
  • SlidingEventTimeWindows提供了of静态工厂方法,可以指定size、slide及offset参数,它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

小结

  • flink的Sliding Window分为SlidingEventTimeWindows及SlidingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
  • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;SlidingEventTimeWindows及SlidingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start
  • SlidingEventTimeWindows及SlidingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是context.getCurrentProcessingTime();前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为false

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SlidingEventTimeWindows
  • SlidingProcessingTimeWindows
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档