前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink时间系统系列之Processing Time源码分析

flink时间系统系列之Processing Time源码分析

作者头像
Flink实战剖析
发布2022-04-18 11:18:57
9750
发布2022-04-18 11:18:57
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

flink 中Processing Time也就是处理时间在watermark定时生成、ProcessFunction中定时器与时间类型的窗口中都有使用,但是其内部是如何实现注册定时器、如何调用、如何容错保证在任务挂掉在下次重启仍然能够触发任务执行,都是我们今天的主题。首先需要了解一下在flink内部时间系统是由哪些类来共同完成这件事,下面画了一个简易的类关系图:

AbstractStreamOperator: flink runtime 的核心operator, 包含了一个operator生命周期所有的执行方法(后面做单独介绍),其包含一个InternalTimeServiceManager的对象,在initializeState完成初始化;

InternalTimeServiceManager:flink 内部提供时间服务的manager, getInternalTimerService负责注册一个时间服务类型InternalTimeService、advanceWatermark在eventTime中使用(下节分享),snapshotStateForKeyGroup对InternalTimeService进行checkpoint,restoreStateForKeyGroup任务重启恢复InternalTimeService;

InternalTimerServiceImpl:负责具体的时间服务操作,包含KeyGroupedInternalPriorityQueue属性,是一种flink自身实现的优先级队列,存储的数据是TimerHeapInternalTimer类型,包含三个属性key/namespace/timestamp,在优先级队列中按照timestamp进行排序,InternalTimerServiceImpl的startTimerService方法主要用于时间状态恢复初始化,registerProcessingTimeTimer注册处理时间定时器,registerEventTimeTimer注册事件时间定时器,onProcessingTime方法是实现ProcessingTimeCallback接口的回调方法,用于处理时间触发执行方法,另外还有两个对应的delete删除注册的定时器方法,advanceWatermark方法同样用于事件时间中,snapshotTimersForKeyGroup对该timerservice进行checkpoint, restoreTimersForKeyGroup时间状态恢复;

ProcessingTimeService : 负责processing time 的处理类,实现类是SystemProcessingTimeService,包含registerTimer的方法,用于注册一个具体的定时器;

TimerHeapInternalTimer:实现了InternalTimer接口,为什么要这个单独讲一下,由于它flink 注册定时器的基本注册对象,所有需要注册定时器的最后基本都会转换成为该对象然后进行注册,在这里我们需要理解一点,在flink内部确定一个具体的状态的具体数据需要key/namespce, 第一个具体代表的是operator/statedesc,由于TimerHeapInternalTimer需要容错所以同样包含key/namespace,从另外一个角度也说明在一个operator中如果我们多次注册同一个key相同的时间,达到的效果是一样,只会触发一次(默认在flink内部namespce是相同的),同样也说明了注册的定时器必须是在keyedStream中。

以上就是flink内部时间所涉及的核心类,那么接下来具体看一下注册一个processing time 将会发生哪些方法的调用:

registerProcessingTimeTimer方法接受两个参数:namespace与time, namespace在普通的keyedStream 中namespace表示VoidNamespace, 在WindowedStream中namespace表示的是Window对象,time表示的是注册的触发时间,在这个方法里面主要做两件事情:

一、将namespace与time转换为InternalTimer存入KeyGroupedInternalPriorityQueue优先级队列中,其中key从当前的KeyContext中获取,如果该Queue中包含相同的key/namspace/time,将不会被添加进去并且不会执行下面调用

二、调用SystemProcessingTimeService.registerTimer方法,传入具体的时间参数time,在registerTimer方法中使用ScheduledThreadPoolExecutor提交一个定时执行的方法,定时执行对象是实现Runnable接口的TriggerTask,那么当达到执行时间就会执行里面的run方法。

processing time 触发调用逻辑:

TriggerTask就是上面提到的实现Runnable接口的类,在run方法里面会调用

ProcessingTimeCallback.onProcessingTime方法,ProcessingTimeCallback就是在InternalTimerServiceImpl.registerProcessingTimeTimer 调用是传入进来的,传入的是当前对象this, 也就是会调用InternalTimerServiceImpl.onProcessingTime方法,在该方法中会循环遍历KeyGroupedInternalPriorityQueue这个优先级队列,如果获取到的时间小于调用的触发时间,那么就会执行Triggerable.onProcessingTime方法,Triggerable表示具体定时操作接口,例如WindowOperator/KeyedProcessOperator 都实现了该接口。

注册的定时数据都存储在KeyGroupedInternalPriorityQueue这个优先级队列中,也就是内存中,如果任务出现问题挂掉了,那么内存数据就会丢失,所以需要对其进行备份,备份入口是InternalTimeServiceManager.snapshotStateForKeyGroup 会将其Map<String, InternalTimerServiceImpl<K, ?>> 做checkpoint,从而对InternalTimerServiceImpl<K, ?>中具体的队列做checkpoint,相反数据恢复调用InternalTimeServiceManager.restoreStateForKeyGroup 方法恢复InternalTimerServiceImpl。对于processing time任务恢复重启有一个重要的方法InternalTimerServiceImpl.startTimerService 在获取InternalTimerServiceImpl时会被调用,里面的有一个判断逻辑processingTimeTimersQueue获取数据,如果不为空,那么就调用processingTimeService.registerTimer 重新注册定时器。

以上关于flink 时间系统与Processing Time的源码分析逻辑,最好还是对照源码多看几遍。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档