00:00
这一部分先给大家有一个整体的概念,有一个整体的了解,我们的重点其实根本不在这里啊,我们的重点是看一看这个flink内置的水位线生成器到底是怎么用的,所以接下来我们现在是新的一张了啊,所以CHAPTER06。然后在下边创建一个watermark。这里不要忘记throws exception啊,其实前面的内容都差不多啊,我们主要是测mark生成,所以我们就把这个直接copy过来吧。然后下面是en execute。所以我们已经知道可以利用get config里边的set all to auto in t去设置我们当前的生成水位线的时间间隔啊,默认情况下我们现在就是事件时间嘛,所以你直接设这个间隔就可以了。然后接下来我们在定义水位线生成策略的时候,它是data stream的方法嘛,所以我们一般情况下就是只要当前是一个data stream,你就可以调这个方法去按照某种策略去生成水位线。
01:06
一般情况下我们到底在哪儿去生成呢?啊,一般推荐就是离圆越近越好,甚至可以在圆里边直接就指定水位线。因为离源越近,后边对于时间的控制就会越精确,因为水位线它会朝下游传递嘛,你如果要是直接在后面的某一步才生成水位线的话,那之前其实相当于时间就都没有控制了,它没有时钟嘛啊,那这里边就是time stamp and water mark啊,那这里之前我们看到的是直接自己去拗一个对应的autowa strategy的一个实现,这个接口的一个实现啊,那现在如果说我们平常不想那么麻烦啊,里边你看那个接口套接口里边东西太多了,那能不能更快的去做一个分配呢?那这个就是我们前面说的,可以直接调弗link内部给我们内置好的微线生成策略加个注释就是。
02:00
有序流的。那这里我们就可以去直接调water mark strategy,看到可以调它的forotonouss这样一个病态的方法,它会返回一个watermark strategy。啊,直接这么调就可以了,然后大家会发现它本身这个方法是一个泛型方法,需要指定当前的这个类型是什么啊,那这个正常来讲,我们自然是前边的数据类型是什么,现在就应该是什么。接下来看一下for monotonous times STEM,呃,里边还需要去实现什么东西的吗?啊,那确实是需要实现一些东西的,因为你直接这么定义了之后,它里边并不知道我们的这个时间戳到底怎么提取啊。因为之前我确实是有一个字段是叫time stamp啊,但是flink怎么知道你这个包装的po类里边字段叫什么名呢?那有些人我就喜欢叫time难道不行吗?啊,我有些人我就喜欢叫TS,难道不行吗?而且你即使是都叫统一叫time stamp,我这里边到底是秒还是毫秒我也不知道啊,你总得给我一个统一的标准,对不对啊,所以这里边目前的这个方法里边。
03:19
它是用了一个ending time watermarks是个什么东西呢?它是继承自这个bed out of waterness啊啊,当然这是后话了,然后在这个类里边,我们可以看一眼也有。On event和on periodicit2个方法,因为它本身是实现了watermark generator这个接口,Strategy里边,我们这里边相当于返回了一个是实现water mark generator接口,那本身在这个strategy里边,它是要什么呢?是除了这个watermark generator之外,还应该要有一个time stamp a s的。
04:00
哎,那所以这里边就需要考虑到这个问题了,那当前我的这个watermark strategy里边没有time s啊,那怎么办呢。我现在这个调用相当于只是有了一个auTo Generator了。再调一个方法,叫做with time stamp a sign r是什么呢?它是返回的这一个watermark strategy里面的一个方法。大家看到是下面这个方法,Base time sample sign,专门用来指定当前怎么样从数据里边提取时间戳啊啊,所以这个方法也是比较好理解的,你前面这里只是一个。Watermark的生成器。那这里边还有那个时间戳没有提取嘛,所以这里边必须要指定一下,那这个里边a sign里边又得传什么呢?要传的是一个time stamp a sign supplier。啊,那这里边我们就简单的给大家实现一下啊,又一个常见的是直接用这个啊,Thereizezable,序列化的a signer,然后把当前的这个泛型直接传进去,那这里边需要实现的就是一个extra time stamp方法。
05:11
它里边呢,参数有两个,一个是当前的element,当前传进来的数据,另外一个是有一个record time stamp,就是如果说本身带着时间戳的话,我这里边也可以获取到,当然我们现在刚刚从这里读进来,这里肯定啥都没有啊,我们就从这里边提就完了嘛,对吧?Element time stamp指定就是拿它作为空前的时间说。所以这里边我们要求的是一个毫秒数,如果说这里我们本身给的是一个秒的话,那怎么办呢?哎,那就在这儿再乘以1000。就完了啊,那有时候会出现这种情况,我们这里边就把这个当成毫秒好了,可直接就是element点。这样就实现了一个非常简单的有序流的watermark生成,然后接下来再给大家看一下。
06:03
乱留的生成。其实跟上面那个也很类似啊。偶续流这种方法,呃,一般也就是做测试采用了。真正处理的时候肯定都是乱序流。好在time water,那同样的里面的乱序流处理,就是之前给大家说过的for ended out of orderness这样的一个用法,然后这里边大家发现前那个有序流里边不需要传参,这还需要传参的,传一个什么呢?它需要传入一个。啊,就是这里边是一个max out of orderness什么意思,最大乱序程度,这不就是我们延迟的那个时间吗?尽量能做到那个延迟时间,然后这个duration是Java time下边的duration。所以。当前我们要做的就是传一个。Duration,然后这里我们可以直接去,你可以是0ZERO,然后可以du.of,也可以直接去of minutes of second,迟到个一秒钟,或者我们测两秒钟吧,看的更清晰一些啊。
07:12
这样的话就定义出了一个本身这里边用的是一个什么?是一个water mark,本质是一个water mark generator。所以之前我们不是说了吗?在这个strategy里边必须要有一个water mark generator,还需要有一个time stamp a signer啊,所以问题又来了,你现在现在拗了之后,这个water strategy里边只有一个watermark generator,只有一个水位线的生成器,没有那个时间戳的提取器,那怎么办呢?跟之前完全一样,那你直接再去with一下不就完了吗?With time sample signer啊,然后接下来里边的这个做法,当然也就是完全一样了啊,我们可以new一个啊,嗯,当然了,这里边看一旦弹出来之后,他认为是。
08:01
Object,为什么说object呢?那是因为前面我们这里没有指定它的泛型类型,对吧,这里面应该是event,然后接下来这样的话,他就直接把这个都弹出来了,我们甚至都不用再去直接override他的那个方法,他直接给我们补全。这里同样是element。看就可以了啊,这就是乱序流的watermark的生成。这样的话,上面这里就不能用。Data stream source了,可以用data stream,也可以用single output dream operator,它本身也是一个算子嘛,这就是生成这个water mark的过程,但是现在我们还看不到它效果,因为这个效果是必须要时间有用,你才能看出来到底是怎么回事,我们现在就只是测了一下到底怎么去生成。
我来说两句