前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的TimerService

聊聊flink的TimerService

原创
作者头像
code4it
发布2019-01-17 11:31:08
1.3K0
发布2019-01-17 11:31:08
举报

本文主要研究一下flink的TimerService

TimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java

@PublicEvolving
public interface TimerService {
​
    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
​
    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
​
    long currentProcessingTime();
​
    long currentWatermark();
​
    void registerProcessingTimeTimer(long time);
​
    void registerEventTimeTimer(long time);
​
    void deleteProcessingTimeTimer(long time);
​
    void deleteEventTimeTimer(long time);
}
  • TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口

SimpleTimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java

@Internal
public class SimpleTimerService implements TimerService {
​
    private final InternalTimerService<VoidNamespace> internalTimerService;
​
    public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {
        this.internalTimerService = internalTimerService;
    }
​
    @Override
    public long currentProcessingTime() {
        return internalTimerService.currentProcessingTime();
    }
​
    @Override
    public long currentWatermark() {
        return internalTimerService.currentWatermark();
    }
​
    @Override
    public void registerProcessingTimeTimer(long time) {
        internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }
​
    @Override
    public void registerEventTimeTimer(long time) {
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
    }
​
    @Override
    public void deleteProcessingTimeTimer(long time) {
        internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }
​
    @Override
    public void deleteEventTimeTimer(long time) {
        internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
    }
}
  • SimpleTimerService实现了TimerService,它是委托InternalTimerService来实现

InternalTimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerService.java

@Internal
public interface InternalTimerService<N> {
​
    long currentProcessingTime();
​
    long currentWatermark();
​
    void registerProcessingTimeTimer(N namespace, long time);
​
    void deleteProcessingTimeTimer(N namespace, long time);
​
    void registerEventTimeTimer(N namespace, long time);
​
    void deleteEventTimeTimer(N namespace, long time);
}
  • InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数

InternalTimerServiceImpl

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java

public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
​
    private final ProcessingTimeService processingTimeService;
​
    private final KeyContext keyContext;
​
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
​
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
​
    private final KeyGroupRange localKeyGroupRange;
​
    private final int localKeyGroupRangeStartIdx;
​
    private long currentWatermark = Long.MIN_VALUE;
​
    private ScheduledFuture<?> nextTimer;
​
    // Variables to be set when the service is started.
​
    private TypeSerializer<K> keySerializer;
​
    private TypeSerializer<N> namespaceSerializer;
​
    private Triggerable<K, N> triggerTarget;
​
    private volatile boolean isInitialized;
​
    private TypeSerializer<K> keyDeserializer;
​
    private TypeSerializer<N> namespaceDeserializer;
​
    private InternalTimersSnapshot<K, N> restoredTimersSnapshot;
​
    InternalTimerServiceImpl(
        KeyGroupRange localKeyGroupRange,
        KeyContext keyContext,
        ProcessingTimeService processingTimeService,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {
​
        this.keyContext = checkNotNull(keyContext);
        this.processingTimeService = checkNotNull(processingTimeService);
        this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
        this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
        this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);
​
        // find the starting index of the local key-group range
        int startIdx = Integer.MAX_VALUE;
        for (Integer keyGroupIdx : localKeyGroupRange) {
            startIdx = Math.min(keyGroupIdx, startIdx);
        }
        this.localKeyGroupRangeStartIdx = startIdx;
    }
​
    public void startTimerService(
            TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer,
            Triggerable<K, N> triggerTarget) {
​
        if (!isInitialized) {
​
            if (keySerializer == null || namespaceSerializer == null) {
                throw new IllegalArgumentException("The TimersService serializers cannot be null.");
            }
​
            if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
                throw new IllegalStateException("The TimerService has already been initialized.");
            }
​
            // the following is the case where we restore
            if (restoredTimersSnapshot != null) {
                CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.keyDeserializer,
                    null,
                    restoredTimersSnapshot.getKeySerializerConfigSnapshot(),
                    keySerializer);
​
                CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.namespaceDeserializer,
                    null,
                    restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(),
                    namespaceSerializer);
​
                if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
                    throw new IllegalStateException("Tried to initialize restored TimerService " +
                        "with incompatible serializers than those used to snapshot its state.");
                }
            }
​
            this.keySerializer = keySerializer;
            this.namespaceSerializer = namespaceSerializer;
            this.keyDeserializer = null;
            this.namespaceDeserializer = null;
​
            this.triggerTarget = Preconditions.checkNotNull(triggerTarget);
​
            // re-register the restored timers (if any)
            final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
            if (headTimer != null) {
                nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this);
            }
            this.isInitialized = true;
        } else {
            if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) {
                throw new IllegalArgumentException("Already initialized Timer Service " +
                    "tried to be initialized with different key and namespace serializers.");
            }
        }
    }
​
    @Override
    public long currentProcessingTime() {
        return processingTimeService.getCurrentProcessingTime();
    }
​
    @Override
    public long currentWatermark() {
        return currentWatermark;
    }
​
    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }
​
    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }
​
    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }
​
    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }
​
    @Override
    public void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;
​
        InternalTimer<K, N> timer;
​
        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }
​
        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }
​
    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
​
        InternalTimer<K, N> timer;
​
        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }
​
    public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
        return new InternalTimersSnapshot<>(
            keySerializer,
            keySerializer.snapshotConfiguration(),
            namespaceSerializer,
            namespaceSerializer.snapshotConfiguration(),
            eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx),
            processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx));
    }
​
    @SuppressWarnings("unchecked")
    public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) {
        this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;
​
        if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
            throw new IllegalArgumentException("Tried to restore timers " +
                "for the same service with different serializers.");
        }
​
        this.keyDeserializer = restoredTimersSnapshot.getKeySerializer();
        this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer();
​
        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
            "Key Group " + keyGroupIdx + " does not belong to the local range.");
​
        // restore the event time timers
        eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers());
​
        // restore the processing time timers
        processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers());
    }
​
    @VisibleForTesting
    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }
​
    @VisibleForTesting
    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }
​
    @VisibleForTesting
    public int numProcessingTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue);
    }
​
    @VisibleForTesting
    public int numEventTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue);
    }
​
    private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) {
        int count = 0;
        try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
            while (iterator.hasNext()) {
                final TimerHeapInternalTimer<K, N> timer = iterator.next();
                if (timer.getNamespace().equals(namespace)) {
                    count++;
                }
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Exception when closing iterator.", e);
        }
        return count;
    }
​
    @VisibleForTesting
    int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }
​
    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(eventTimeTimersQueue);
    }
​
    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(processingTimeTimersQueue);
    }
​
    private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) {
        List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups());
        for (int keyGroup : localKeyGroupRange) {
            result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup)));
        }
        return result;
    }
​
    private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) {
        return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
            (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
    }
}
  • InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(定义了onProcessingTime方法)接口
  • startTimerService方法主要是初始化keySerializer、namespaceSerializer、triggerTarget属性;registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer)
  • eventTimerTimer的触发主要是在advanceWatermark方法中(AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法;而processingTimeTimer的触发则是在onProcessingTime方法中(SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法

Triggerable

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/Triggerable.java

@Internal
public interface Triggerable<K, N> {
​
    /**
     * Invoked when an event-time timer fires.
     */
    void onEventTime(InternalTimer<K, N> timer) throws Exception;
​
    /**
     * Invoked when a processing-time timer fires.
     */
    void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
}
  • Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调

小结

  • TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口;它有一个实现类为SimpleTimerService,而SimpleTimerService主要是委托给InternalTimerService来实现
  • InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数;它的实现类为InternalTimerServiceImpl;InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(定义了onProcessingTime方法)接口,其registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer)
  • InternalTimerServiceImpl的eventTimerTimer的触发主要是在advanceWatermark方法中(AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法
  • InternalTimerServiceImpl的processingTimeTimer的触发则是在onProcessingTime方法中(SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法
  • Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TimerService
  • SimpleTimerService
  • InternalTimerService
  • InternalTimerServiceImpl
  • Triggerable
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档