00:00
在已经了解了什么是水位线,也知道了水位线的基本的特性,而且知道了在数据流里边,我们去创建生成一个水位线的基本原则是什么样的,那接下来呢,我们再来总结一下生成水位线的总体原则,之前我们已经说了生成水位线那就是。一种方式是基于数据来触发,另外一种是周期性,前面我们已经解释过,一般是周期性的实际使用会比较多一点,当然基于数据就是相当于类似于这个应该叫断点式的生成水位线啊,这也是一种策略啊,就是在有些场景下这种可能会有用,一般情况我们就周期性就可以了,呃,另外一个对于这个乱序数据,实际的乱序数据的处理该怎么样呢?那就是要设置一个延迟时间。这就有一个原则性的问题,到底设置多大呢?啊,我们之前这里是等两秒,那之前给大家说了,等两秒的话,假如九秒钟要关一个窗口,在九后面的这个迟到的七和八是可以正常处理的,正常把它收入到九秒钟窗口理来的,但是这个22秒之后的这个17,它就不会收入到18秒关闭的那个窗口里边来了啊,因为20这个到了,等两秒的话,现在的水位线auto马就到18了嘛,这个时候有18秒该关的窗口,这不就应该关了吗?啊,所以等到后面再来17这个就已经不敢趟了。
01:24
到底我这个应该等多久呢?那就等多一点呗,两秒不够,那我就等五秒,五秒不够我等十秒啊,所以这里面就涉及到一个问题,就是到底等多久。那总体原则理论上如果你想保证绝对的正确的话,那应该就是。等足够长的时间,什么叫足够长的时间啊?就是你把所有的数据都统计出来,然后看他们所有数据两两之间的那个乱序程度。就是看后面如果来了一个迟到的数据的话,比前面能迟到多久,最大的那个乱序程度,最大的那个迟到时间提出来作为我们等待的时间,这就肯定没问题,但是这样的话问题就又来了,我们现在在实际处理的时候,流数据它是源源不断来的呀,如果它真的是无限流的话,永远等不到所有数据,而且一开始上线你就得设置好water mark的延迟机制啊,你不能说等到所有数据来了之后,我再去重新设置water呀,那这个就变成批处理了,所以一开始我根本不知道后面的数据是什么的时候,我又怎么能知道它最大乱序程度是多少呢?
02:34
啊,这就有两种方式了,一种就是我们可以做一个经验性的提取啊,就是之前我有经验,我看之前的那个数据,大概它乱序乱到什么程度,然后我给他拍脑袋想一个,啊,当然也可以有更高级的方法,那就是我可以做一个规律性的总结,自己写一个统计程序,把它的这个之前所有的那个乱序程度做一个统计,甚至还可以画出对应的乱序分布的一个曲线啊,我最后把最大的范围给它截出来,作为当前的。
03:02
延迟时间,这样的话就能基本上保证处理的这个结果是正确的,所有的这个迟到数据都能被处理,这是一种想法,那另外一种想法就是,那我既然不知道怎么样,那我就尽量把这个时间调大呗,问题就又来了,那你多大是大呢?你如果五秒不够的话,十秒够大吗?一分钟够大吗?一分钟如果还不够的话,难道调十分钟一个小时吗?啊,这是没有止境的,所以大家会发现这个水位线的延迟等多久的这个策略啊,它是应该要做一个权衡的,就是一方面我们不应该把它调的特别大,因为如果水位线调的特别大的话,这不就是相当于是一个延迟吗?啊,尽管这个延迟是因为数据本身迟到了,数据本身的延迟导致的,但是你现在还不太确定数据到底能延迟多久的时候,你把它调的很大,这个延迟是实实在在的呀,你这里直接水位线延迟一分钟,本来实际一分钟前的数据已经到了啊,那视线时间肯定进展到这儿了,但是这个时候呢,你为了等他迟到的数据,你设置的事件时间是一分钟之前。
04:09
那就相当于我们的实时流处理的这个延迟至少是一分钟以上了啊,所以这个不能调太大,但是呢也不能调太小,你要是调太小的话,很有可能大量的那个乱序数据都会等到窗口关闭了之后才到来,就没办法正确处理了。所以我们应该要。做一个权衡,做一个取舍,如果说你希望的是要更快,实时性更强,那可以适当的把水位线设置的低一点啊,就是水位线的那个延迟设置的小一点,那你如果希望是结果要更加准确,你可以把它延迟设置的高一点。那整体来讲的话,比较高级的做法是什么呢?你先去总结一下历史数据里边它的乱序程度,正常来讲应该是有这样的一个分布情况的啊,果是乱序程度的话,可能是从零开始,从零开始有这样的一个分布曲线,比方说是这样啊,有可能这样的一个曲线,有可能这个大部分的数据乱序可能都是在比方说从这到这儿啊。
05:15
乱序可能是在一毫秒到。五毫秒之间。在这个范围内,乱序的数据可能就涵盖了95%的数据了,诶大部分这个东西都是一个长尾分布,这个很正常的,你比方说在这之后,如果要是十毫秒之内的话,可能就百分之之99的数据都等到了啊,那如果100毫秒的话,可能就99.9%就都到了啊啊,那那所以在这种情况下,这就是看你要求对这个正确性的要求多高了,你如果要求的越高,那你就等的时间越长,所以最后就是你要到底是保证百分之九十九点几几位精度的这样的一个正确性,那么你就可以按照这个分布去截取对应的延迟的时间。
06:00
这是比较标准的高级的做法啊,那实际项目应用当中可能也没人那么麻烦啊,一般情况就是经验性的给一个值就可以了啊,所以一般情况这个给多少呢?因为我们flink的这个实时性啊,它是可以达到毫秒级别的延迟的,而我们这里边的这个乱序程度主要是因为什么呢?主要就是因为分布式架构里边彼此之间那个网络传输,然后分布式处理这个带来的延迟,这个延迟一般有多大呢?啊,其实一般也就是几十毫秒,最多几百毫秒级别的这个延迟,所以一般我们设置这个可能是在秒级别以下,不会一设这设置个这个一分钟,这这个就太夸张了啊,我们后面测试的话,可能呃,一般就给个一两秒啊,设五秒也是有可能的,为了测试嘛,但实际的话可能都是几百毫秒甚至几十毫秒这样去设。那然后接下来我们就看一看代码里边到底怎么做了,呃,代码里边我们就要考虑到一个真正意义上的水位线生成策略的定义,在data stream API里边要想去生成水位线。
07:05
自己去定义这个水位线的生成策略,那需要调一个方法,有一个叫做assign time stamp and water marks这样的一个方法。基于当前的这个data stream,我们就可以调a stems and water marks,其实这个方法本身就是data下面的一个方法,它本身也是一个算子。它本身返回的这是一个single output stream operator,跟我们之前讲到的map reduce,它们其实都是一个算子,那我们这里边的这一个算子,它主要是用来干什么呢?根据名字非常直观的可以看到分配时间戳和。Watermark和水平线。然后里边要传一个参数,传的参数就是一个water mark的生成策略,Water mark strategy,哎,这就是当前要调用的一个特殊的算子啊,那这里边是不是就是new一个奥strategy就可以了呢?啊,确实啊,理论上是这样的,但是我们会看到是一个interface。
08:10
本身是一个接口,这个接口我们需要自己去单独实现才可以,诶,那这个就有点儿麻烦,诶,那所以对于一般做这个应用而言呢,我们肯定不会去自己单独去做这个事情,Flink里边有内置的东西让我们使用,我们先来看一下这个接口里边到底有什么东西吧。这里边最重要的方法是什么呢?什么东西可以直接返回一个watermark strategy呢?可以调这个接口里边的for bonded out of orderness是一个直接给我们提供好的一个静态方法,直接调这个,它里边就给我们自己有一个对应的实现,然后就会返回一个water strategy。从这个名字大家已经看出来了,Out of is什么意思,没有次序,没有顺序,所以它是处理乱序数据的嘛,啊,然后同样另外还有一个叫ball monotonness,这个单词是单调的,单调性的这样的一个含义,所以它表示的是什么呢?这个就是表示单调递增的数据,就是没有乱序的数据,那么你可以调这样的一个静态方法,直接对应的返回一个watermark strategy。
09:19
这两种方法是我们在实际的代码当中最常用的。那更加一般化的做法是什么呢?更加一般化的做法当然就是自己去实现water strategy了啊,你就这里stream STEM and water,然后你自己去,你有一个类,然后去实现auto strategy这个接口都有一个auto strategy,最关键的啊,这里边要去实现的一个方法就是叫做create watermark generator。这个东西主要是用来生成watermark,创建一个water的生成器,哎,这是发出watermark的核心方法,然后另外我们会想到这个算子,它不光是要发出water mark,它还有一个操作叫做aign time step,这又是干什么呢?
10:07
大家看到当前实现的这个类里边有两个方法是我们往往需要去重写的,一个是create watermark generator,另外一个叫做create time stamp a signer,这里报错主要是因为我们这里就不是data source了,那我们干脆把它直接叫做data stream。不管是three source也好,还是这里返回的operator也好,都是data stream,这样的话他就不会再去报错了。所以这两个方法。一个是提取时间戳,分配时间戳,它主要是用来干什么呢?主要就是基于当前的事件,我们这里不是把这个数据传进来了吗?数据在这个流里边传到这个算子任务的时候,我就可以基于这个数据提取它里边的那个时间戳的信息,把它提出来,然后提出来之后干什么呢?提出来之后就可以在这个create auTo Generator方法里边基于这个时间戳去。
11:06
你有一个自己的对应的那个watermark出来,把它发射出去。这里边核心的这两个方法啊,一个方法要返回的是一个time step sign。另外一个方法返回的是water generator啊,这就是这个是一个提取时间戳的分配器,而这个是watermark的生成器,这个如果继续追到源码里边,大家会看到它又是一个接口,里边有两个关键的方法,一个叫做on event,另外一个叫做on periodic。哎,这这是什么意思呢?简单来讲的话,On event on什么?这就是相当于一个事件触发的那个感觉,所以on event是什么呢?就是当前的事件来了之后,我该怎么办嘛,啊,所以大家看到这个one其实就是我当前事件,事件不就是数据吗?数据来了之后,接下来我该怎么办,是不是要去发出一个water mark啊,通过这个water mark output,大家看到也是一个一个这个接口,它就可以去it water mark调这个方法就可以直接去发出一个water。
12:14
这是它成的这个逻辑啊,源码里边的实现啊,那另外还有一个on periodicit,那就是说啊,周期性的调用,隔一段时间就调用一次啊,所以大家就看到了这里边auTo Generator这两个方法呀,就对应着前面我们说过的两种生成watermark的机制,一种叫做欧期性的生成,另外一种叫做工具数据基于事件的断点式的生成。啊,这个稍后我们再说这个具体代码里边怎么样去自定义实现,那我们一般情况用到的啊,Flink底层给我们实现的处理乱序数据,或者是单调递增数据的watermark策略生成的策略,都是基于周期性的这个处罚。那这个周期时间是可以去自定义的,这个时间默认是200毫秒,如果你想去设的话,那在哪里设呢?
13:06
Env。来看,拿到当前的这个执行的配置,然后去set all to water mark in t,然后给一个长整型的,比方说我觉得200毫秒太长了,我给一个100,就是100毫秒触发一次water mark啊,触发一个这个water生成的这个机制。这是关于wal generator生成的这个过程啊,啊,那另外就是前面我们还看到了,除了这个wal mark generator之外,还有一个time stamp sign,这个的话也是一个interface,它就更简单了,只有一个方法叫做extract time stamp。这里边的话就是从当前的数据里边,我要去提取一个时间戳出来,你告诉我到底从数据哪个字段去提取,当成当前的时间戳,那这个时间戳呢,又会。
14:03
在这个water generator里边,作为当前事件的时间戳。当成这个参数传进来,以事件为基准触发这个watermark的时候,那我怎么知道当前的对应的那个时间戳是什么呢?你直接拿这个就可以了,你都不需要再从事件里边再去做提取了,因为我们已经有提取的那个策略了。这是时间戳和water mark之间的关系。
我来说两句