首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码 - KeyedProcessFunction 执行调用

源码 - KeyedProcessFunction 执行调用

原创
作者头像
用户7283316
修改2020-05-08 15:31:10
1.3K0
修改2020-05-08 15:31:10
举报
文章被收录于专栏:FFF

ctx.timerService().registerEventTimeTimer(timeStamp) 就是定义一个事件触发器,触发的时间是 timeStamp | 到达该时间则调用

onTimer(long timestamp, OnTimerContext ctx, Collector<SimplifyMetricEvent> out)


1. data:SingleOutputStreamOperator 调用keyBy形成 KeyedStream,调用process

@Internal
    public <R> SingleOutputStreamOperator<R> process(
            KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
            TypeInformation<R> outputType) {

        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
        return transform("KeyedProcess", outputType, operator);
    }

keyedProcessFunction 就是上边我们自定义的OutageFunction。 这里生成的 KeyedProcessOperator

2.KeyedProcessOperator

public class KeyedProcessOperator<K, IN, OUT>
        extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {

KeyedProcessOperator实现了 Triggerable

        @Override  //实现Triggerable
    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.setAbsoluteTimestamp(timer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
    }

    @Override //实现Triggerable
    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }

    private void invokeUserFunction(
            TimeDomain timeDomain,
            InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }

userFunction.processElement(element.getValue(), context, collector); userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); userFunction就是我们上面的OutageFunction。 这里看到在onEventTime或者onProcessingTime方法调用的时候才会调用userFunction.onTimer。那么 onEventTime 什么时候触发呢?

3.以onEventTime为例
InternalTimerServiceImpl
InternalTimerServiceImpl

进入到InternalTimerServiceImpl

public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

也就是说InternalTimerServiceImpl调用advanceWatermark时我们的onEventTime方法才调用。而advanceWatermark方法的入参time是当前operator的watermark所代表的时间。那么什么时候调用advanceWatermark呢?这个等下再看。 这个方法里面的eventTimeTimersQueue

       /**
     * Event time timers that are currently in-flight.
     */
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;

当我们调用时ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay); 就是调用

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

向里eventTimeTimersQueue存储TimerHeapInternalTimer(包含key,timestamp等)。 当调用advanceWatermark时,更新currentWatermark,从eventTimeTimersQueue里peek出timer,判断当前watermark的时间是否大于timer里的时间,若大于,则从队列里弹出这个timer调用 triggerTarget.onEventTime(timer) 也就是调用 KeyedProcessOperator.onEventTime,最终调用到里我们自定义OutageFunctiononTimer方法。


原文链接 : https://www.jianshu.com/p/b1d0e6895625

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 3.以onEventTime为例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档