00:00
我们已经了解了水位线的概念和特性,那接下来呢,就可以做一个实践,在代码当中去尝试着生成水位线了,其实前面我们已经知道水位线设置的一个基本原则啊,我们知道一般要去处理乱序流数据的话,那么。我们的基本原则就是基于当前已经接收到数据的最大时间戳,然后设置一个延迟时间,多等一会儿啊,我们说的啊,延迟发车,只要多等一会儿,等到所有的乱序数据到齐,这个时候就可以正确处理数据了。那所以这里有一个核心问题,就是我们这个延迟发车,到底延迟多久呢。啊,在实际发车的这个例子里边,我们是知道到底有多少人,到底有哪些人要上这班车的,那我们等到所有人到齐就可以了,但是在处理数据的过程当中,我们要收集八点到九点的数据,我们并不知道八点到9.1共有多少数据啊,所以我们并没有一个完整的结束标准。
01:04
那这个数据有可能它一直会到来啊,就有可能这个延迟的时间也是无限长的,我们等等一分钟啊,那有可能一分钟之后还会有十道数据,等两分钟可能之后还会有十道数据,那这怎么办呢?难道无限等下去吗?所以这里有一个基本的整体原则,那就是我们既不能延迟太高,也不能延迟太低哈,啊其实这个是很容易想到的,因为如果说我们延迟足够高的话,那很显然整个flink的实时流处理啊,这个实时性就大大降低了,而且我们之前的数据都要缓存起来嘛,啊,那整个会占用大量的系统资源,我们整个处理效率会比较低,那如果说把这一个等待的时间延迟时间设置很低好不好呢?诶,当然不好,因为前面我们说啊,设置的这个延迟时间应该最好就是当前数据流里边的。
02:00
最大乱序程度,但是关键问题就在于,在这个数据还没到来之前,我们根本不知道它能乱到什么程度啊啊啊,所以这个时候你如果能够把之前的数据统计出它的一个乱序程度的规律的话,哎,那我们知道往往可能这是一个类似于正态分布的一个规律啊,就是乱序程度的这个时间可能是正态分布的啊,那中间的这个值可能也就是个几毫秒,然后在它某一个范围内,大部分数据的乱序程度啊,都在这个范围之内。所以我们就会发现啊,这我们就要做取舍了,假如是这样的话,那如果说我们想保证。足够快的话,那等待的时间可以设置的很小,但是就有可能绝大多数的数据,乱序的数据啊,迟到数据都没有办法正确处理,那如果说我们把这一个乱序的时间,等待的时间设置的足够大呢,那又会导致我们延迟时间过大,耗费更多的系统资源,它是可以处理这个大部分的迟到数据了啊,正确性得到保证了,但是。
03:03
会占据更多的系统资源,等待的时间更长,所以我们就需要在这两者之间找到一个平衡点啊,就比方说我们选取当前这一个正态分布里边的某一个节点,在它之下。呃,或许百分之九十九点几的数据,乱序数据,迟到数据都可以得到正确的处理,那剩下的概率很小的那些迟到时间很长的数据。那我们就干脆就把它就丢掉了啊,我们就可以不予考虑了啊,所以最终我们其实就是要在正确性和延迟时间上做一个权衡。那接下来呢,我们就可以在代码当中具体的看一看,到底怎么样去设置水位线的延迟。那现在我们是第六章了,所以可以新建一个package。CHAPTER06。然后接下来我们要去进行测试的。主要是。
04:02
水线,所以我们新建一个sky object,那就叫做water mark test。方法先写出来,呃,然后前面呢,我们还是可以从之前的代码当中copy创建执行环境和读取数据源的这个过程。上面同样还是把下划线引入。然后接下来基于当前的这个data stream stream,它就可以调用一个方法叫做。我们看到a sign time stamp and water marks,这就是设置水位线,生成水位线的一个标准方法,标准接口,然后我们看到啊,里边它要传入的东西是什么呢?这里它要传入的是一个water mark strategy,这是一个water mark的生成策略,那这个策略我们可以看到它本身是一个接口,一个interface,在watermark strategy里边最核心的抽象方法其实就是这个create water mark generator。
05:06
它就是要创建一个水位线的生成器,它返回的就是一个water mark generator,啊,那对于这个water generator呢,点进去之后我们会发现,啊,这又是一个接口,它里边有两个核心的抽象方法,一个叫做on event,另外一个叫做on periodicit。哎,那这两个方法从名字上我们就可以看出来,一个是基于事件去生成water mark。另外一个是基于周期性的发射去生成water mark,哎,那所以整体来讲的话,这就对应着我们之前所介绍的生成watermark的两种方式,一种是周期性的隔一段时间去生成一个,另外一个是什么呢?诶,那可能就是每来一个数据,我们知道数据的生成,这就是一个事件嘛,所以那就是基于事件去触发来一个数据,就判断一次可以去生成一个watermark。
06:02
这就是water mark生成的两种策略啊,可以在这里做一个自定义的选取啊,那除了这个water mark generator之外呢,另外在这个watermark策略里边还有一个time step a sign,我们看到这里有一个方法叫做create time stepmp a sign,顾名思义这个东西呢,就是一个时间戳的分配器。好,那我们可能会觉得有点奇怪啊,为什么这里边时间戳不是从数据里边提取的吗?怎么还得做一个时间戳的分配呢?啊,其实这个也容易想到,那就是对于数据而言,里边是带了一个时间戳的字段。但是对于flink而言,它其实对于数据是一视同仁的,这里面的每一个字段并没有做特别的区分,他也不知道你这个当前这个字段叫TS还是叫time stamp呀,或者叫别的名字啊啊,所以这里边必须有我们的代码,由这个RA strategy显示的,指定从数据里边每一个字段去提取当前的时间戳,然后把它分配到当前的数据上。
07:09
这就相当于是什么呢?就相当于在我们当前的数据上又追加了一个字段,这个字段是真正意义上的time STEM,它有可能跟我们前面的某一个字段是完全一样,也有可能呢,是基于之前的这个字段做了一个转换,做了一个改变啊呃,因为我们知道在有一些日志数据里边啊,它本身的那个时间戳并不是长整形的,我们在代码里边要求的是一个长整型的整数。而如果说我们这里面日志它本身是一个年月日十分秒这样的一个形式的话,那我们还需要把它做一个长整型的转换。啊,那所以这个time Sam和signner,它主要就是用来从数据的某个字段里边去提取时间戳,然后分配给当前的数据元素,啊,这是生成水位线的一个基础,有了这个时间戳之后,后边我们看到这个create water mark generator里边,它要返回这个water mark generator啊,这里边就会有对应的数据元素,另外呢,还有一个长整型的二,这个二我们知道就是当前的时间戳了。
08:14
那当然了,如果说我们是基于当前的事件去生成对应的水位线的话,这里边我们需要去获取到数据和目前的提取出来的时间戳,那如果说我们是周期性的去生成水位线的话,那那很显然就不需要其他的东西了。这里周期性的生成水位线呢,默认周期我们说了系统里边设置的是默认200毫秒,那如果说我们想配这个怎么配呢?啊,这个也很简单,我们可以在env这里去get当前的config,获取当前的运行配置,然后接下来呢,可以去set all to water mark in t,自动生成水位线的周期时间间隔,那这里边是一个长整型的时间戳啊,表示一个时间间隔,比方说如果说我们把这个调大一点啊,设置成500毫秒生成一次,哎,那我可以直接给一个500L,诶这个是完全没有问题的啊,那对于这个默认的配置呢,其实我们也可以看到get con返回的,这里调调用底层的Java代码里面的get con返回的是一个execution con,在这一个类里边,它其实是有一些默认的定义的,我们看它的构造方法,这里边我们可以看到。
09:28
Auto to water mark interval,默认就是200毫秒啊,那所以呢,整体来讲,我们在这里就是要去自己实现一个。Water mark strategy其实就是要实现这个东西里边最核心的一个抽象方法,当然就是create water mark generator。这就是关于在代码当中生成wma的通用接口。
我来说两句