ctx.timerService().registerEventTimeTimer(timeStamp) 就是定义一个事件触发器,触发的时间是 timeStamp | 到达该时间则调用
onTimer(long timestamp, OnTimerContext ctx, Collector<SimplifyMetricEvent> out)
1. data:SingleOutputStreamOperator 调用keyBy形成 KeyedStream,调用process
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
keyedProcessFunction
就是上边我们自定义的OutageFunction
。
这里生成的 KeyedProcessOperator
2.
KeyedProcessOperator
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
KeyedProcessOperator
实现了 Triggerable
@Override //实现Triggerable
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
@Override //实现Triggerable
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
userFunction.processElement(element.getValue(), context, collector);
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
userFunction
就是我们上面的OutageFunction
。
这里看到在onEventTime
或者onProcessingTime
方法调用的时候才会调用userFunction.onTimer。那么 onEventTime
什么时候触发呢?
进入到InternalTimerServiceImpl
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
也就是说InternalTimerServiceImpl
调用advanceWatermark
时我们的onEventTime
方法才调用。而advanceWatermark
方法的入参time
是当前operator的watermark所代表的时间。那么什么时候调用advanceWatermark
呢?这个等下再看。
这个方法里面的eventTimeTimersQueue
是
/**
* Event time timers that are currently in-flight.
*/
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
当我们调用时ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay);
就是调用
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
向里eventTimeTimersQueue
存储TimerHeapInternalTimer
(包含key,timestamp等)。
当调用advanceWatermark时,更新currentWatermark,从eventTimeTimersQueue里peek出timer,判断当前watermark的时间是否大于timer里的时间,若大于,则从队列里弹出这个timer调用 triggerTarget.onEventTime(timer)
也就是调用 KeyedProcessOperator.onEventTime
,最终调用到里我们自定义OutageFunction
的onTimer
方法。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。