00:00
我们已经了解了在flink当中给我们内置的两种不同的水位线生成策略啊,那他们整体来讲呢,都是周期性去生成水位线的,一种是针对有序流,另外一种是针对乱序流啊,那一般情况下呢,有了这两种方法,我们其实在生产环境里边实际应用的时候就可以满足大多数需求了,那有时候呢,我们会想到那业务逻辑如果特别复杂,我可能。简单的只给定一个乱序流,里边的延迟时间可能还不能满足我们的需求,诶,那我就想对它进行更加精细的控制,就想按照自己定义的逻辑去发出水位线,能不能实现呢?诶,当然能实现了,那我们就是自己去实现这里通用的watermark strategy这个接口啊,啊,那里边的核心呢,当然就是实现一个create water generator,那里边呢,有两种方式,一种是on,这就是我们说的啊,事件触发,那就是来一个数据就可以判断去进行水位线的发射,另外一个呢,就是on periodic1米,那是周期性的生成水位线啊,所以我们看到就是自定义的话,也就是实现这两个方法就可以了。
01:11
所以接下来呢,我们再简单的来说一下如何自定义。水位线的生成。啊,那么这种方法呢,我们看到啊,除了这个要实现create generator之外,另外啊,我们想到前面我们不是还得with time stamp a sign去专门从数据里面提取时间戳吗?啊,那在这里面也是一样,那所以我们也可以在自定义的water mark strategy里边去重写它的。Create time stamp sign这个方法啊,然后接下来我们也就可以去单独去指定了,当然我们也可以在后面去调用with time stamp啊,这两种方式其实也是类似的,因为他们都会指定返回我们当前的time step aign。所以接下来我们可以在这里去直接重写create time stamp,然后里边想要返回一个time step s啊,那我们就直接去new一个,我们实现一个szable time stamp a sign啊,里边同样的策略啊,t.time stamp提取返回就可以了,然后剩下的关键点就在于自定义这个。
02:21
Create water mark generator方法返回一个water generator了,那这里边呢,就是这两种方法,On event和on periodicit这两个方法去实现具体的逻辑,这就有两种方式了,一种就是周期性的生成,那周期性生成关键就是在这里边去发射水平线啊,那就在这儿可以直接去拗一个water mark。啊,那当然了,这里我们把这个water这个类要引入,注意我们引入的是API flink API common time里边的water,点进去我们可以看到它的构造方法,里边最关键的就是一个time step当前的时间戳啊,那所以我们其实可以直接在这里传入想要生成的时间戳就可以了。啊,那当然了,得到了这个water mark之后,接下来这还没有真正的发出去,要发送出去的话,那得water mark output,利用这里的参数去it water mark。
03:21
把watermark发射出去就可以了,这就是我们周期性发射的一个过程啊,那当然了,如果要是结合我们之前的这种啊,呃,处理乱序流,周期性的去生成一个水位线的话啊,那我们可以参考之前帮ED out of orderne里边的那种实现,我们可以怎么样呢?那就是在这里啊,上面先去定义定义一个。延迟时间。那在这里我们可以直接定义一个delay。比方说我们还是毫秒数啊,那就给一个五秒钟的延迟,那就是5000L长整形的,然后接下来呢,那还得去保存当前的最大时间戳定义。
04:03
属性保存最大时间戳。我们可以把它叫做。Max ts。那这里它的初始值给多少呢?哎,我们可能想到,哎,那是不是给一个最小的值就可以了呢?哎,对,所以我们这里可以直接给一个当前长整形的最小值,也就是说。Long点命value啊,那我们知道它默认啊,这个应该是一个很小的负数啊,就是绝对值很大的一个负数,所以接下来我们有了它之后,是不是就直接可以使用了呢?哎,注意。还不能这样,因为接下来我们执行这个周期性生成的时候,假如一开始什么数据都没有,我们直接基于这个最小值,那后边生成这个watermark的时候,是需要用基于当前最大的时间戳去减去我们的delay,然后再减去一毫秒,哎,按照我们之前的规则啊,那如果是这种逻辑的话,那很显然。
05:07
本身就是长整形的最小值,再一减D类的话,那就变成溢出了,那就变成一个很大的值了,所以这个时候啊,最初我们应该是在它基础上加上一个D类,先把这个先补回来啊,当然了还要减一毫秒的话,那我们再加上一,这样的话,最初这个一减还是当前长整形的最小值。这个就保证后边我们所有的时间戳啊,即使是临时刻的数据来了,我们照样可以正常的生成对应的水平线啊那。同样上面在这个每一个事件来的时候啊,On event数据来的时候,我们还得做一些事情,那就是要更新当前的最大时间戳,这个逻辑跟之前我们在ED out源码里边看到的是一样啊,我们这儿可以直接去调用ma下边的max方法啊,去做一个最大值的比较,之前的最大时间戳,以及当前传入的T的time time做一个比较,得到的最大值返回就可以了。
06:08
当然了,我们发现这里面报错,那就主要因为我们前面把max ts定义成了value va现在要改变它的值,哎,当然它应该是一个VAR了,是一个变量。这就是我们自定义周期性水位线的一个生成啊,其实这个过程非常的简单啊,我们这里自定义生成的是周期性的水位线啊,那当然了,如果说我们不想周期性去生成的话,那其实更加简单,我们直接在on event这个方法里边利用watermark output去发射水位线就可以了,那后面这个方法就没有任何的用途了,因为我们知道这个周期性它是按照当前的系统时间,按照处理时间,每隔多长时间就做一个调用的啊,那当前的这个one,那就是每来一条数据之后做一个调用。这个我们就不再去详细说明了,这就是关于自定义水位线的生成。
我来说两句