00:00
好,接下来给大家介绍flink里边的时间语义和water mark,呃,大家可能一看这个就会有点觉得有点有点奇怪是吧?什么叫时间语义,什么又叫这个water mark呢?其实上节课我们已经讲到了那个窗口操作嘛,Window API,那么大家其实可以知道,在流处理里边,在Spark里边,我们好像窗口操作很简单,对不对?那就好像我定义了那个窗口,反正它都是一批的嘛,然后你该怎么操作怎么操作都是RDD做就行了。那在流处理里边的窗口操作的概窗口操作的这个语义啊,它离不开两个很重要的概念,一个。是它的状态,状态管理,那这个我们到后面再给大家详细讲状态编程对吧?另外还有一个很重要的概念就是时间语义,所以接下来我们就要重点跟跟大家讨论讨论所谓的这个时间语义,这个time到底是什么东西,好,这是我们这一课里边的主要内容啊,首先讨论一下flink里边的时间语义,然后我们会去给大家看看怎么样在代码当中设置even time。
01:14
呃,这个是看起来是一个比较特殊的时间,对吧?一个特殊的时间语义,然后给大家重点讲一讲所谓的水位线,也就是water mark这个概念,最后跟大家说一说water的传递引入和设定啊,就是综合起来给大家讲一讲啊,呃,这个时间呢,其实大家会发现可能是流处理里边就是非常重要的一个特性了,大家可能知道就是说流处理最给力的一个特性吧,应该就是低延迟,对不对,快对吧,这个所谓的低延迟,所谓的快,其实。哎,这就是跟时间相关的一个概念,对不对?呃,但是大家会想到啊,就是你你一味的求快,其实并不是并不是最好的一种解决方案,因为就像我们上午跟大家提到exactly,呃,就是端对端那个exactly one的那个保证一样,对吧?你如果要是求足够快的话,能保证它绝对状态一致吗?那就保证不了了,对不对?哎,所以在有些时候我们还要考虑更多的事情,比如说真实世界这个真实的系统里边做数据传输的时候,特别我们这是一个分布式系统,大家会想到有数据的延迟,对吧,有网络的延迟,那很容易就会出现一个什么样的状况呢?很多数据它不是按照发生的时间的那个顺序来的。
02:43
有可能是乱的对不对?哎,这就是我们所谓这个乱序事件啊,乱序数据的一个问题,那么对于这样的这些乱序数据,我们一味求快,那是不是就得不到正确的那个聚合结果啊,对吧?你开的窗口可能就没有办法正确关闭了,所以在这种条件下,能不能既快速又能提供比较准确的一个计算结果,这其实是我们比较关心的一个一种情况,对吧?呃,那么呃,在在这之前呢,如果我们想解决这个问题,解决乱序事件的问题,那首先就要跟大家讨论一下所谓的时间语义了啊,一说这个时间语义大家扯这么多啊,大家就会想这个时间语义那还不简单吗?我还我还不知道这个时间是什么意思嘛,对吧,我们天天这个时间,呃,就是那个表,就是在咔咔咔一直在动着,对吧,我们时间就在一直平缓的在流逝着,这不就是每天生活当中最常见的东西吗?
03:43
啊呃,但是大家想一下啊,大家看一下这张图,这张图也是这个官网上一个很经典的图啊,大家可能在很多文章里面都见到过,这个表示什么呢。啊,大家看一看,在分布式的流处理领域,这个时间还真不是简简单单的就是我们觉得呃,司空见惯的一一个概念,啊,来大家看一下,在我们这个流处理里边,我们说。
04:11
Flink这样的流处理框架,它是事件驱动的,所以本身数据产生的时候,那是不是应该有一个事件生成的时间啊,哎,所以大家看啊,这里边有一个这个表,这里边我们当时发生这个事件的时候,这是一个事件。然后生成这个数据之后,它就会进入到比方说我们后面写到日志里边,对吧,卡夫卡进入消息队列,然后不同的分区分布式的这个数据结构对吧,那那是不是有可能这个数据就乱了,有可能我这里先发生的数据,有可能会后传到后面来。后发生的事件反而先来了,完全有可能,对不对?哎,这就是我们所谓的乱序的情况,所以接下来进入到我们的flink系统里边的时候,这是不是又有一个时间啊,有先有后,对不对啊,有可能有些数据先来,有些数据后来,所以进入flink这个data source这一步进入到source算子这里边来的时候,这又有一个时间,不同的时间,这个时间我们把它。
05:17
啊,大家会看到这就给出不同的这个时间概念了,对不对啊,最后还有就是说在我们flink内部程序代码内部又还会做各种各样的传输分区,对吧,各种各样的操作,那么是不是进入先进来的,也有可能是后处理啊,也有可能后进来的先处理对不对?所以后面真正处理的时候,某一个算子任务啊,我们这个window,特别是开窗去给它做处理的时候,时间还有可能不一样,所以在这整个的从事件生成发生到进入flink里边,到处理处理完毕。其实大家发现有三种不同的时间语义,按照刚才的这个说法就可以分成一个叫啊,事件发生的时间,对吧,叫even time。
06:08
另外一个叫进入flink系统的时间,就是摄入的一个时间,对不对?哎,这个叫in interesting type,大家看这是事件时间对不对,这是摄入时间,然后最后还有一个就是让我们具体的那个算子执行这个操作的时候的本地系统时间,对不对啊,这个我们叫做processing type处理时间,这就是分布式系统,特别是分布式流处理框架里边。可以遇到的不同的时间概念,好,那呃,接下来大家可以想一想,那这既然有不同的时间了,那我们用的时候以哪个时间为标准呢?这这就这就涉及到其他的一些概念了,对不对?你得看一看这个东西到底是怎么怎么算的啊,那给大家举一个生活当中的例子啊,大家就会想到到底哪种时间语义更加重要,更加是我们想要处理的那种场景呢?给大家举一个这个电影星球大战的一个例子。
07:10
呃,有有同学看过星球大战吧,有呃呃,有同学看过看,但是好像看过的同学不太多,呃,可能因为这个星球大战第一部他这个太早了是吧,这个对于大家年纪可能没有看过之前的话,就后面的可能也不太愿意想去看了,对吧?呃,给大家简单的说一下啊,星球大战是个什么样的状况呢?大家看他第一部是七七年就拍出来了,对,七七年就拍出来了,但是大家看。按照后边的这个划分,它是一个什么样呢?它其实应该是星球大战四,应该是第四部,为什么呢?因为大家知道有时候有些电影啊,它那个故事他是先拍,拍出来之后,诶,这个很卖座对吧,很大,大卖了,票房很好,后面就拍续集对不对,经常就拍续集,拍着拍着拍着,续集拍的又不过瘾,他有时候还翻回头去拍前传对不对?哎,所以大家看星球大战就是一个典型的例子啊,大家看七七年拍了第一部之后,后边。
08:11
八零年的时候拍第二部,这就是一个续集,对不对,对吧,他就是新希望,然后帝国反击战,然后八三年又拍第三部,这又是一个续集,它的时间顺序是顺下来的,然后呢,诶,过了十几年之后,就觉得这个玩意儿不不好玩了,不想炒冷饭了,对吧,到九九年的时候再拍,拍的就变成前传了,呃,前传了对吧,不是前传啊,前前传,呃,所以大家看到在拍的这个时间,其实从真正这个故事发生的时间来看的话。这其实是第一步,对不对,这是星球大战最开始的那个故事的起源,幽灵的威胁啊,然后后边大家看什么克隆人进攻啊,什么西斯复仇啊,这些其实是,呃,就是所谓的前前传啊,就发生的时间应该更早,但是他拍的更迟。
09:01
所以大家看在这个例子里边,这就有点像什么呢?这里的这个故事发生的背景时间,就相当于是我们前面讲的哪种时间语义故事发生的时间是不是就像是我们那个事件发生的那个时间啊,Even time,对,然后大家会发现这个故事已经发生了,我们现在要去拍这个电影,然后要去看,我们观众要去看这个电影,那是不是拍这个电影的时候可以认为是一个摄入时间啊,对吧,拍摄时间对不对?所以下边这个有点像是一个拍摄时间,或者是一个上映时间,那如果要是我们看的时候上映的时间,那是不是就相当于我们真正的处理这个故事的时间啊,那是不是相当于是processing time。哦,那大大家就会想到,那我们更关心的是什么呢?更关心的是哪哪种时间语义呢?啊,这这就这就看你到底关心的是什么了,对吧?那大家可能会想到我们一般去看电影的时候,其实你更关心的是不是应该是这个故事的梗概啊,故事发展的进度对吧?所以我们在看电影的时候,可能更关心的是他的even time,它本身故事发生的时间对不对啊,但是你如果是一个比方说,你是一个电影从业人员的话,有可能你看电影的时候,你更关注,诶你这个什么时候上映,它的票房是多少对吧?你有可能关注这个,那是不是就可能要用到它的这个processing time对吧?就是跟我什么时候看这个东西可能要有关系了。所以大家看其实。
10:40
不同的时间语义会有不同的应用场合。啊,但是我们能看出来,往往我们更关心的其实是这个事件事件even态对吧,大家可能看这个生活当中的例子,大家觉得有点有点远啊,我们在在看一个具体的比方说我们在这个真正的it行业里边要做数据处理的一个环境啊,比方说这里边我们考虑一个这是个,这是考虑一个什么呢?我们考虑一个我们在玩一个在线手游,大家平常可能也也平常玩这些游戏对不对,比方说我们玩的是一个休闲类的游戏啊,就比方说是一个什么消消乐啊,或者什么那个对吧,就是什么答题闯关啊,类似于这种一个游戏,大家知道这种游戏是不是相当于平常你单机自己也能玩啊,但是可能它有些任务,或者有些什么东西,是不是要联网才能才能去提交,才能去实现对吧?诶所以比方说现在我们玩的时候啊,它它有一个有一个挑战,比方说就是你如果在一定。
11:45
的时间内,完成一些挑战,完成一些特定的操作,就可以给你奖励,比方说五分钟,或者说那个两分钟之内你能过掉,比方说过掉五关对吧,过掉十关,我就给你什么什么奖励,哎,这可能是这个游戏里边的一个设定啊,那所以我们就很开心,我们就这个,呃,上班的路上,哎,坐着这个坐坐着车,坐着地铁,我们就就很开心的在那刷,对不对啊,你几分钟之内只要过了几关,我就可以把这个奖励拿到手,你就在那里刷了,但是。
12:19
本身你玩的很溜啊,这个很很厉害,两分钟之内大家看啊,在这里边上面这个时间是什么呢?这其实就是我们玩的时间对吧,玩的时候比方说我在上班的路上,08:22开始玩儿,08:23分到这儿的时候,中间已经过了这么多关了,过了八关了,那我应该能拿到能拿到这个奖励了,对不对。但是很不幸,大家知道我们过程当中有可能坐地铁对吧,有可能对大家看,诶上面这个信号到到这一段时间内断网了,对不对,那我们刚好完成这个任务的时候,该提交该拿奖励了,结果断网了,那大家会想到啊,那就断网就断网呗,过一会儿我出了地铁,是不是它就又有网了呀,有网之后按道理他是不是还应该把那个奖励给我啊,哎,我们是这么想的,但是大家看看,如果你是这个写这个服务器后台的这个程序员的话,你怎么做这个后台的时间处理呢,大家看。
13:18
后台的时间。我们其实看到的是怎么样的一个情况,是不是你在进地铁之前的这几关过关的信息快速的发过来了,对吧,然后是不是后面几关的过关信息是隔了很久才发过来的呀?那所以大家说我现在处理的时候应该按哪种时间处理呢?到底算不算你在比方说两分钟之内把这个都都做完了呢,过完关了呢,大家可能想到,假如说我按服务器的处理时间的话,大家看我要求的两分钟之内,是不是你相当于只过了三关啊,对吧。但是如果我按这种方式来做的话,直接按我服务器处理的时间来算的话,那肯定这个结果就是什么呢,以后我们。
14:08
再也不想玩它了,马上卸载什么破游戏对吧?所以我我一旦是没网,这个进了地铁,你直接过了关都都不给我算了,那这肯定不爽啊,直接卸载不玩了啊,所以更好的方式应该是去看什么呢?是不是要看本身用户过关的时候记录下来的他的那个时间啊,而不是对进入到我们这里处理的时间,那大家看如果我们用even time的话,是不是两分钟其实应该有这么长啊,对吧?1TIME其实到这儿的时候,这也是两分钟之内发生的事情,可能我们就过了八关,那就应该给发放奖励了啊,所以这里边大家可以看到有一些应用的场合,其实是不应该用processing time的,而是直接就应该用even time啊,这也是一个典型的例子啊,那大家可以想到就是在实际应用当中,我们怎么样用这个even time呢?Processing time这个好算,这个我机器时间是什么就是什么,对不对啊,这个就不用系统。
15:09
时间嘛,那医院的态度,我哪知道你这中间到底隔了多久啊,对吧,我怎么知道你这个数据隔了这么久来的这个数据,这是两分钟的之内的数据呢。诶,所以呢,对,大家会发现这个数据在提交的时候,在生成这个数据它里边的内容的时候,是不是应该把它生成的时间那个时间戳也要写进去啊,哎,所以大家就会想到,比方说我们一般提取数据不都是在日志里边吗?大家看这条日志啊,什么2017年,呃,几点几分,然后后边这是个infer,对吧?然后feel over to,这个RM,这是一条日志信息,日志信息里边我们是不是就应该把它的时间转换成一个可用的时间戳,这就是我们的even time对不对啊,所以大家其实是从从这里能够看到该怎么去做处理的,好,那那大家想到我们讲了这些的话,那processing time为什么还要提这个时间语义呢?它就完全没用吗?当然不是,对,当然,肯定它是有用的。首先大家会想到什么时候processing time有用。
16:20
没有。对,大家会想到你这里边是能提取出数据里边的时间戳,对不对,那假如说我们一开始这个日志里边就没有记录,当然正常来讲日志里面应该有对吧,但假如说你就没有时间呢?那是不是就是不是就没办法啊啊这是一种情况。另外大家还会想到。如果说我们想要更低的延迟,更快的处理速度的话,那你说我要去等他这么长时间,去等这两分钟的数据吗?大家看,如果我用even time,是不是我得等更长的时间啊,才能把它处理完对不对?尽管这个结果可能正正更加正确,但如果你想要更低的延迟的话,如果说这个结果不正确,我也可以容忍。
17:04
那是不是我直接processing time就好啊,这就是绝对实时对不对?哎,所以大家可以看到就是在有些场合其实还是需要这样的processing time这样的时间语义的啊好,然后接下来大家看一看在代码里边怎么样去设置even time,设置这个事件时间的语义啊这这个其实也很简单,大家看在代码里边就是直接对那个执行环境,我们不是有一个那个env吗?拿到了当前这个流逝的这个执行环境对吧?直接调它的一个方法叫set stream time characteristic,对时间特性对吧?设置时间特性设置成什么呢?Time characteristic.even time啊对,这里边把这个设置就可以了。那大家就会想到,那之前我们从来都没设过,对不对,那没设过,那应该是什么,我们处理的时候按什么时间处理。
18:04
对,当然就是processing time对吧,当前是什么时间就按什么时间去处理啊,所以这是这个对时间语义的一个设定啊,那大家会想到就是说这里边设了这个even time,后面我们处理的时候,它就真能按照even time来处理了吗?我这一射,哎,你给我按照他发生的那个时间来处理啊,弗Li就知道该怎么处理了吗?他并不知道,因为flink很傻,他并不知道那个事件发生的时间到底是什么,对不对,尽管有可能那个时间是不是已经在我们的那个数据里边了呀,但是我们是不是还得明确的告诉flink,这个数据里边的这个字段就是你要的那个事件时间的时间戳,对吧?哎,所以大家注意啊,具体时间我们还应该有一步,就是从数据里边提取时间戳,给它分配清楚,告诉flink哪个是我们要的事件时间,哎,这就是这个对这个事件时间的一个设置啊。
19:10
好,呃,那么大家现在已经知道我们更想要的是这个事件时间了,然后还有一个还有一个问题,大家就会想到,我们既然要用事件时间的话,如果用处理时间processing time这个比较简单,它是不是就是就是顺序往下流转的对吧?这个不会出现任何的其他情况,但是如果用even time事件时间的话,那大家想它的这个流动,它的推推进是不是要跟我们本身来的那个数据有关系啊,数据的那个时间戳大,是不是代表我们现在的这个事件时间就推进的比较,推进到比较后来了呀,如果数据的时间数比较小,是不是说明我们现在时间还早啊?大家就会想到这就有问题了,之前我们不是说数据有可能是乱序的吗?
20:02
有可能本身发生比较早的数据很迟才来吗?那我们的这个事件时间even time还能保证吗?好,所以接下来给大家看一下这个所谓乱序数据的影响啊,这个乱序数据有什么影响呢?大家看一下上面这张图,上面这张图是一个很理想的一个情况,我们想要的是什么呢?就是一个串行的一条数据流,123456,这个不是,呃,这个也是它里边的数据,大家可以认为这就是他那个log里边写的那个时间戳,对吧?啊,就是按照它发生的时间来来写的,所以大家看我想要的就是第一秒的数据,哎,第一个先来,我先处理,后面紧跟着就是第二秒,第三秒,第四秒,这个是不是就非常好啊。我处理起来是不是就我如果要是这里边有一个时间窗口的话,是不是第二秒的数据来了,那是不是第二秒结束的时间窗口就可以关闭了,这个就很准对吧,一点问题都没有,很可惜,实际的情况往往不是这样,而是什么呢?
21:06
大家看就是145236,大致的这个趋势是逐渐增长的,但是局部是不是就有可能出现乱序。哎,所以大家会想到,那你如果看到一来了,看到四来了,那是你觉得是时间已经到四了吗。你要是觉得,哎,这里边大家可以看一看啊,就是由于这个网络分布式这些原因,大家大家会发现它会导致这个乱序数据产生,对不对,在前面我们这个理想情况下,假如我们要有一个五秒的时间窗口的话,是不是正好等到五秒那个数据来的时候,把这个窗口关闭,所有五秒前的数据是不是都收进来了,哎这个就没问题,那假如下边我们这种情况的话。你看到五这个数据来的时候,你就想关闭五秒的时间窗口,是不是大家会想到这里边是不是只收进来三条数据啊,那是不是这就有一个问题了,这个时候就直接关吗?
22:06
如果你直接关后边是不是大家会看到乱序数据对吧,二三后边来这个数据是不是就丢掉了。哎,所以这其实就是乱序数据对我们系统的一个影响,如果你这里边提前已经关掉,那这个窗口处理的数据就不正确了,对吧,该收的数据都没收进去。哎,所以这就是窗乱序数据对窗口数计算的一个影响,它会让窗口计算不准确,那怎么解决这个问题呢?有些同学可能想到了,之前你不是讲过可以那个什么允许延迟数据嘛,对吧,那那这里边你可以把这个剩下的那个数据,等窗口关闭之后,把它放到另外一个流里边延迟处理对不对?但是其实我们想要更好的处理是什么呢?是应该他就让这个窗口把它收进去,对不对。那我们怎么样就让他把这个窗口收进去呢?
23:03
哎,其实有一个想法是什么?有一个想法就是那我能不能让窗口稍微等他一会儿,等他该来的数据都来了之后再关闭呢?大家想想这是不是也是一个思路。对吧?啊,这其实有点像什么呢?有点像大家那个平常坐车的时候,平常如果我们要是大家有过赶那个汽车啊,有时候那个班车什么的这种汽车的话,可能就有这样的经验,比方说我们一般汽车八点钟准时发车,那有可能出现什么情况呢?到八点的时候发现,诶这个人稀稀,哈哈,就没几个,对不对,然后司机或者说那个呃,卖票的这个售票员,他就说,哎,那我们再再再等几分钟吧,对吧,有些人是不是没来啊,对吧,该来的人他坐这班车的人可能还没来,我们等几分钟吧,那大想这个等几分钟。是不是就相当于是要等这个乱序数据,该来的数据迟来了,对吧?等乱序数据属于这班车的,赶这班车的人等他来,哎,等完这几分钟之后,该来的人都来齐了,我们到时候再发车,这个发车是不是就相当于窗口关闭做操作,做计算操作了啊,所以一样在flink里边也有类似的这样的一个延迟发车的一个机制。
24:23
这个机制是什么呢?就是后边要给大家讲的。Wal水位线。
我来说两句