00:00
现在已经了解了怎么样在代码里边去提取时间戳,生成水位线,关键核心就在于要去创建一个watermark strategy,把这个water mark策略作为参数给到这个ign time stamp and water marks,而这里边的核心呢,又是要有一个water mark genator啊,就是我们所说的一个水位线的生成器,以及一个。Time signer时间戳的分配器,一个提取器。为了方便我们应用,在flink里边内置了两种用法,针对有序流和乱序流。那他们本身呢,都是给我们提供了一个watermark的生成器,一个generator。而对应的那个time sample a signer,这个还要我们再去调一个with方法,然后把它单独的声明出来啊,因为你这个线程这个它并不知道我们当前数据的字段,你到底是哪个嘛,所以这个他没办法集成,所以就是说呃,在这儿你需要用这样的两个操作分别获得water generator和time sample s。
01:05
那假如说现成的这种生成autom strategy的方法满足不了我的需求,我们的这个业务需求非常的奇葩,必须得去自定义,那怎么办呢?其实这个也简单,那就是前面我们说的,你直接去new不就完了吗?直接实现一下这个watermark strategy接口就完事了。那这里边就会有两种方法,一种是周期性的生成watermark,另外一种是断点式的生成watermark。所以这里边就是涉及到这个water genator的实现,其实time stamp a啊,这个时间戳的这个分配器,提取器基本上都一样,就是调一下这个方法,然后从这个数据里边提取不就完了吗?提取字段就完了吗?关键就在于water mark是周期性的生成还是断点式的生成。那这两种方法它的区别在哪里呢?还记得我们之前看源码的时候,Watermark generator里边的两个方法吗?What generator里边。
02:01
一个on event,一个on periodicit,哎,所以如果你想周期性的生成的话,在这个里边去发吗?用这个output,它不是有一个it这个方法吗?在这里边去发就可以了,那如果说你要是想以数据去出发点式的生成的话,那就是在这个方法里边什么都不要做,而在这个uneven的方法里边去发送当前的automark。好,大家可以看一下体的代码怎么实现啊,周期性的生成,期性生成怎么生成呢?这里边ADD source,然后assign assign的时候我们直接new自己去new了一个custom,呃,Strategy,这是我们直接要定义一个类了,直接在下边去创建一个内部的静态类,没有直接在这里边去拗这个接口,所以这里边实现了o RA strategy这样的一个接口。里边这里边要实现的关键就是一个create time sample aigner,哎,大家看这里边可以实现这个方法啊,啊,这个方法实现了之后,其实相当于把我们的那个with time就不用再调了啊,就是你既然有了这个time了嘛,啊,那接下来我们就不用再去点with了,这里边要实现的跟那个with方法里边一样,都是要有一个方法叫extract time,都是从字段里面去提取啊,这个大同小异就不说了,然后另外关键就是楼层rock generator。
03:21
Map generator怎么办呢?诶,那这里边我就又去new一个customed out of allness,这就是我又要处理乱序数据了,周期性的生成处理乱序数据,那怎么办呢?这个类就是要去实现一个water mark generator这样一个接口。然后怎么办,就在on event里边这个接口的one event方法里边去。更新当前的时间戳,然后不发送水位线,为什么呢?哎,我要周期性的发送,这是来了数据之后,我要保存一下当前的最大时间戳,我总得有这个保存的操作啊,要不然我怎么知道当前最大时间戳是什么呢?
04:01
然后我还定义一个延迟时间delay time,呃,然后每来一个之后更新一下当前的最大时间戳,那么周期性的生成的时候调用on periodicit这个方法,在这里我才去发送,调这个it waterma方法去发送。这里边new了一个water mark,它的时间就是最大的时间戳减去延迟时间,而且还要减一,诶为什么要减一呢?这就提到了我们之前窗口在做这个闭合的时候啊,啊一个窗口都是左闭右开啊,八点到九点的这个窗口是包括八点的数据,不包括九点的这个数据的。哎,所以这样的话,我们的这个water mark应该是怎么样去生成呢?比方说来了一个九点的数据,我们现在的water mark应该是多少呢?啊,如果正常来讲啊,假如说当前延迟时间是五秒,那九点的数据来了,那当前的这个water mark就应该是08:59:55。
05:03
所以现在这个窗口肯定还不关,那什么时候关呢?哎,等到9.05秒的时候。这个数据来了,这是对应的这个water啊。那么它对应的watermark又是多少呢?当然就是九点整了。按照我们之前的想法应该是这样,哎,那如果说要是直接就是这样的话,那就会有一个问题,如果它对应的是九点整的数据的话,我们说它代表的含义是小于等于这个water mark的时间,对应的那个数据就都不会来了。那如果whatmark已经到九点的话,是表示九点钟的数据不会再来了吗?不是的,我们之前说有可能后面还会来九点的数据啊,因为九点的数据是不包含在这个窗口里边的嘛,我当前是要只是要直接关闭这个窗口而已,九点整的数据还可以之后再来。
06:00
所以我现在的操作是什么呢?再减一减一是减一毫秒,哎,所以这就相当于又减了一个。很小很小的数,跟这个时间非常的接近,但不是。严格意义上的这个实践。那要这样的话,那就说明是什么呢?是九点减一毫秒之前的所有的数据都到齐了,小于等于这些数据都到齐了,但是九点的数据是不是还有可能继续来啊,刚好这是不是就跟我们这个窗口的定义就刚好一致了,窗口也是前闭后开嘛,我要的就是九点,不包括九点之前的所有的数据,哪怕比他小一毫秒的数据也都到齐了,不能再来了,但是九点整的数据还可以来。啊,所以这里边我们就二再减去。刚好就跟窗口的这个包含的数据的范围能够对应起来,这样的话就可以很方便的实现了对于窗口时间的控制了啊,所以这里给大家解释一下层代码,之前我们不是说处理乱序数据,处理乱序数据的时候我们是点了这个for bounded out of out这个吗?然后在这里边我们是拗了一个bounded out water,这是实现了这个water genator接口的一个类,在这里边它到底是怎么生成water的呢?
07:16
啊,大家看他就是就是这样做的。当前最大的时间戳max time stamp减去out waterness这个毫秒数最大延迟,然后减掉一,它就是保证了我们当前的这个做法。而与之对应的前面升序数据的这个处理,这里边声序数据处理的时候,大家还记得它直接就继承自邦地的auTo Borderness乱序数据的处理。所以说你可以认为升序就是特殊的一种乱序数据的auto生成器,它是怎么个特殊法呢?哎,它其实就是把它的duration制成了零。制成零是什么意思?乱序程度是零,那不就是升序吗?啊,那如果制成零的话,大家看这是零,那是不是就相当于最后还是变成了最大的时间戳减一啊啊,就最后还是要减一的。
08:06
这样的话就跟我们那个窗口的结束时间刚好就对上。可以给大家解释一下,以后测试的时候,假如说十秒的这个数据来了。假如说现在这个延迟啊,延迟两秒钟,我们不再去说它真正生成的奥mark应该是多少呢?应该是十减二再减一毫秒对不对,理论上应该是不到八秒,是七秒,然后999毫秒这个时间点。但是以后我们测试的时候,不会这么麻烦的去跟大家说了啊,就统一跟大家说,就不考虑这个一毫秒这个问题了,我们直接就是说啊,十秒的数据来了,现在延迟两秒,那么当前的water是八秒,那么八秒的窗口要关闭了,这样就完全对上了,你都不考虑这个最后的这个界限,它就刚好对。呃,那另外还有一个是断点式的生成,断点式生成非常类似,那就是在实现这个watermark generator接口的时候,这两个方法里边我们重点在实现on event这个方法啊,那on periodic1米的这里就啥都不要干了,周期性什么都不做,这里边就是只要来了数据我们就判断,比方说这里是遇到特定的user的时候,诶,这个时候就直接发出一个水位线,如果刚才的user是maryry,它才触发我们的时间推推进,哎,但但是这个好像没什么意义啊,除非你有特别特殊的需求。
09:27
这就是关于自定义水位线的这个生成,然后另外还有一种特殊的用法是在自定义的数据源里边发送水位线啊,这个更有意思,就是说我们可以一开始定义这个s function的时候,直接就发水位线,这是通过什么来发呢?这个也很简单,就是我们前面不是把这个数据随机生成好了,然后包装成这个event要发了吗?那之前我们是contact,直接点collect就把这个数据发出去了。那现在不要这样,我们发的时候用collect with time stamp就带着时间戳去发。
10:03
那这样的话,就是有当前的数据,还有他自己的时间戳也要发送出去。这个时间戳有什么含义呢?还记得一取时间戳的时候得到的这两参数吗?哎,就在这儿对应着呢,我们在那个圆那里,你可以指定当前的数据是什么,还可以指定当前它的时间戳是什么,所以我们提取的时候也能把这两个参数都拿到,然后你去对比啊,你可以做一个处理啊,但一般情况如果有这个的话,那就那就不用再去提了嘛啊,那另外就是说还有水位线,水位线的话。可以调context上下文的it watermark方法,然后直接生成水位线啊,这个就是在测试的时候非常有用啊,需要注意的是,假如说我们用了这个在测试源里边直接发送水位线,那在后面就不要再去定义了啊,两者只能用一个啊,就是你这个分配水位线这个算子肯定也是只有一个有效嘛,只是分配一次就可以了,要不然你时间不是乱了吗?这是关于水位线自定义的一些方法。
我来说两句