前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的consecutive windowed operations

聊聊flink的consecutive windowed operations

作者头像
code4it
发布2019-01-23 18:01:22
5940
发布2019-01-23 18:01:22
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的consecutive windowed operations

实例

代码语言:javascript
复制
DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());
  • 本实例首先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)

TimestampsAndPeriodicWatermarksOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java

代码语言:javascript
复制
public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

    private static final long serialVersionUID = 1L;

    private transient long watermarkInterval;

    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        super(assigner);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();

        currentWatermark = Long.MIN_VALUE;
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

        if (watermarkInterval > 0) {
            long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

        output.collect(element.replace(element.getValue(), newTimestamp));
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark);
        }

        long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }

    /**
     * Override the base implementation to completely ignore watermarks propagated from
     * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
     * watermarks from here).
     */
    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;
            output.emitWatermark(mark);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();

        // emit a final watermark
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark);
        }
    }
}
  • 假设assignTimestampsAndWatermarks使用的是AssignerWithPeriodicWatermarks类型的参数,那么创建的是TimestampsAndPeriodicWatermarksOperator;它在open的时候根据指定的watermarkInterval注册了一个延时任务
  • 该延时任务会回调onProcessingTime方法,而onProcessingTime在这里则会调用AssignerWithPeriodicWatermarks的getCurrentWatermark方法获取watermark,然后重新注册新的延时任务,延时时间为getProcessingTimeService().getCurrentProcessingTime()+watermarkInterval;这里的watermarkInterval即为env.getConfig().setAutoWatermarkInterval设置的值
  • AssignerWithPeriodicWatermarks的getCurrentWatermark方法除了注册延时任务实现不断定时的效果外,还会在新的watermark值大于currentWatermark的条件下发射watermark

SystemProcessingTimeService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

代码语言:javascript
复制
public class SystemProcessingTimeService extends ProcessingTimeService {

    private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);

    private static final int STATUS_ALIVE = 0;
    private static final int STATUS_QUIESCED = 1;
    private static final int STATUS_SHUTDOWN = 2;

    // ------------------------------------------------------------------------

    /** The containing task that owns this time service provider. */
    private final AsyncExceptionHandler task;

    /** The lock that timers acquire upon triggering. */
    private final Object checkpointLock;

    /** The executor service that schedules and calls the triggers of this task. */
    private final ScheduledThreadPoolExecutor timerService;

    private final AtomicInteger status;

    public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
        this(failureHandler, checkpointLock, null);
    }

    public SystemProcessingTimeService(
            AsyncExceptionHandler task,
            Object checkpointLock,
            ThreadFactory threadFactory) {

        this.task = checkNotNull(task);
        this.checkpointLock = checkNotNull(checkpointLock);

        this.status = new AtomicInteger(STATUS_ALIVE);

        if (threadFactory == null) {
            this.timerService = new ScheduledThreadPoolExecutor(1);
        } else {
            this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
        }

        // tasks should be removed if the future is canceled
        this.timerService.setRemoveOnCancelPolicy(true);

        // make sure shutdown removes all pending tasks
        this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {

        // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
        // T says we won't see elements in the future with a timestamp smaller or equal to T.
        // With processing time, we therefore need to delay firing the timer by one ms.
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.schedule(
                    new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

    //......
}
  • SystemProcessingTimeService的registerTimer方法根据指定的timestamp注册了一个延时任务TriggerTask;timerService为JDK自带的ScheduledThreadPoolExecutor;TriggerTask的run方法会在service状态为STATUS_LIVE时,触发ProcessingTimeCallback(这里为TimestampsAndPeriodicWatermarksOperator)的onProcessingTime方法

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

代码语言:javascript
复制
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

    //......
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {

            //......

        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    /**
     * Emits the contents of the given window using the {@link InternalWindowFunction}.
     */
    @SuppressWarnings("unchecked")
    private void emitWindowContents(W window, ACC contents) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
        processContext.window = window;
        userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
    }

    //......
}
  • WindowOperator的processElement方法会把element添加到windowState,这里为HeapAggregatingState,即在内存中累积,之后调用triggerContext.onElement方法(里头使用的是trigger.onElement方法,这里的trigger为EventTimeTrigger)获取TriggerResult,如果需要fire,则会触发emitWindowContents,如果需要purge则会清空windowState;emitWindowContents则是调用userFunction.process执行用户定义的窗口操作

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

代码语言:javascript
复制
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}
  • EventTimeTrigger的onElement方法会判断,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIRE,告知WindowOperator可以emitWindowContents

小结

  • flink支持consecutive windowed operations,比如先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)
  • AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks它们有两个功能,一个是从element提取timestamp作为eventTime,一个就是发射watermark;由于element实际上不一定是严格按eventTime时间到来的,可能存在乱序,因而watermark的作用就是限制迟到的数据进入窗口,不让窗口无限等待迟到的可能属于该窗口的element,即告知窗口eventTime小于等于该watermark的元素可以认为都到达了(窗口可以根据自己设定的时间范围,借助trigger判断是否可以关闭窗口然后开始对该窗口数据执行相关操作);对于consecutive windowed operations来说,上游的watermark会forward给下游的operations
  • Trigger的作用就是告知WindowOperator什么时候可以对关闭该窗口开始对该窗口数据执行相关操作(返回TriggerResult.FIRE的情况下),对于EventTimeTrigger来说,其onElement方法的判断逻辑跟watermark相关,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIRE

doc

  • Consecutive windowed operations
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • TimestampsAndPeriodicWatermarksOperator
  • SystemProcessingTimeService
  • WindowOperator
  • EventTimeTrigger
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档