00:00
我们已经了解了在flink当中生成水位线的通用方法,那就是基于当前的data stream去调用一个assign time stamps and water marks这样的一个方法里边传入一个water mark strategy water mark的生成策略,然后在这个生成策略里边最重要的需要去实现的两个抽象方法,一个叫做create watermark generator,另外一个叫做create time step sign。那关于这个time sample s呢,我们看本身里边是有一个默认的实现的,也就是说如果我们当前数据里边自己就带着。已经指定好的time stamp的话,诶,那其实是可以直接提取出来的,注意这个并不是说我们日志数据里边有一个时间就可以直接提取了啊,就是我们说的啊,这个时间我们必须要明确的指定,这是当前我们流处理可以使用的time Sam才可以啊,那如果说没有的话,当然我们也可以看到这里边可以去重写一个extra time stamp这样一个方法,把对应的时间戳从数据当中提取出来。
01:06
啊,那一般情况更加重要的呢,就是我们基于提取出来的时间戳,又可以去制定一个water mark生成的策略,这就是我们最重要的需要去实现的东西,那在这个watermark generator啊,要返回这个生成器的时候,我们实现这个接口里边,其实是要实现两个抽象方法,一个叫做on event,另外一个叫做on periodic event啊,所以接下来我们在代码当中啊,其实就是要去实现。你有一个water mark generator里边要去实现one的方法和on periodic1米的方法。啊,那我们就会发现了,这个过程还是有点麻烦的,如果去自己做的话,肯定会非常的复杂。弗link也帮我们考虑到了这一点,所以他在watermark strategy这个接口里边,我们会发现下面内容其实还很多,它其实给我们直接就已经定义好了一些现成的生成水位线的策略,然后我们看到下边可以调很多辅助的默认实现的方法啊,比如说这里有with time sta s,哎,那我们知道这个显然是要去提供一个time stamp。
02:18
提取时间戳的,那后边呢?哎,真正我们这里需要调用的是for monotonous time stamps和for bounded out of orderness这两个方法,这两个静态方法,它会直接帮我们返回一个water strategy啊,相当于给我们内置了两种不同的watermark的生成策略。那之前我们说生成water mark其实有两种基本的策略,那就是一种是周期性生成,另外一种是非周期性的,来一个数据就考虑要不要生成水位线啊,那在这里呢,这两种静态方法其实都是周期性去生成水位线的,因为我们知道在实际的应用过程当中啊,一般周期性这样效率会比较高一点,那它俩的区别在于什么呢?这其实是针对不同的流处理场景啊。
03:11
首先上面这个for monotonous time stamps monotone。这个单词是持续的,连续的,有这个意思啊,所以它其实针对的就是我们当前数据是已经排好序的有序流啊,也就是说就是我们说的那种最简单最理想的场景,所有数据按照他们生成数据的时间点,按照先后顺序都已经排好了。生成早的数据就先到,生成晚的数据就后到,都是按照这个时间戳从小到大依次排列,时间戳是单调递增的,那我们直接把这个时间戳提出来作为watermark其实就可以了,哎,所以这就是我们所说的有序流的处理策略。对应着后边这个fored out of orderne呢?那就是乱序流的处理策略,所以接下来我们可以在代码当中去做一个测试,我们直接放在上面。第一个是。
04:07
有序流。的。水位线生成策略。所以这里边我们其实调用的就是直接assign time and water marks,然后里边可以去调用water mark strategy,它下边的静态方法for monotonous time steps啊,所以我们看到对于这种有序流而言啊,为什么这里边它只是for mono time stamps呢?没有考虑water mark呢?就是因为当前是有续流嘛,我直接把时间戳提取出来就可以作为当前的水平线了,诶,那所以其实我们没有必要再去指定。水位线具体的生成策略啊,只要提出时间戳就跟水位线一样,所以我们关键就是指定一个对应的提取时间戳的这样的一个方法就可以了,那对于当前的这一个静态方法,那么我们我们会发现它其实是有泛型类型的啊,但这是一个泛型方法,所以这里边我们需要指定当前的数据类型。
05:12
当然了,我们前面输入的数据类型都是event,所以我们可以把它写在这里,这一个方法里边没有任何的参数,而它的返回值呢,是一个拉姆达表达式啊,我们看可以看到它的参数是CX,然后返回值是一个谬了一个ending water,诶,这是怎么回事呢?诶,这主要是因为它要返回的是一个mark strategy嘛,我们知道这本来是一个接口,然后呢,诶,其他的方法其实这里边都有默认的实现,唯一我们需要专门实现的一个抽象方法,其实啊,那我们看到其实就是这里的。Create water generator,所以这可以理解成就是一个单一抽象方法接口,就是我们所说的Sam sum接口。跟我们前面讲的函数类其实是一样的啊,单一抽象方法接口,我们就可以用一个拉达表达式来把它做一个具体的实现啊,所以在这里的话,我们可以看到就直接返回了一个拉姆达表达式,实现了当前的这个单一抽象接口啊,那这里边这个拉姆达表达式其实表示的就是我们create watermark generator的具体的内容,那当然它的返回其实就是一个watermark generator啊,所以这里边我们可以看到啊,A sendtime stamp water marks,它本身继承自bonded out of water needs water marks,然后这本身又是一个water mark generator,这就是当前在watermark strategy里边对于这个内容的返回。
06:44
那接下来我们还要做什么事情呢?那当然就是需要去指定提取时间戳的策略了啊,因为这里边我们数据里边其实本身是没有时间戳指定的啊,尽管我们知道这个字段它就是时间戳,但是呢,弗link不知道啊,我们需要明确的指定,它是当前我们可以用在流处理里边的时间戳,所以接下来我们还应该有专门的一个方法调用去定义当前的时间戳提取。
07:12
好,那这个方法调用呢,也在当前的water strategy里边有一个方法叫做with time stamp,这就是前面我们看到的这个方法啊,默认它里边有实现,我们当然可以去重写了,这里我们可以看到这个方法,它里边可以传一个time stamp sign supplier,也可以传一个able time sample signer啊呃,就是这个是可序列化的time sample aer啊,这个可能一般情况下我们使用会更加简单一点,所以一般是用这种方式做一个传递就可以,所以接下来我们看应该怎么去用它,哎,那在这里的话就是直接后边调一个点with time step sign,因为我们看到当前这个mountain time stamps返回的是一个auto strategy嘛,所以基于auto strategy我们还可以继续调,调完了之后,它返回的依然是一个auto strategy,相当于给它配置了一个对应的time和sign,所以接下来我们在这儿直接调这个方法。
08:12
好,我们看里边必须要传入参数,这个参数我们的实现就是一个s liable。Time stamp a sign啊,那里边有一个关键的方法,当然就是extract time stamp提取时间戳的方法。这里我们看到它有两个参数,一个是event类型,这当然就是当前的每一条数据了,当前我们的数据元素可以捕获到,我们就是要从这个T里边去提取对应的数据,那后边这一个长整型的数据是什么呢?这其实就是我们的数据源也可以自己已经带着真正意义上的时间戳,就是已经指定好了,这就是当前的时间戳啊,那我们可以直接用这个也是可以。啊,当然了,现在我们并没有自带的时间戳啊,所以需要从T里边去做一个提取,那是t.TIME3,把它提取出来就可以了啊,所以前面我们一直在强调,大家可能会觉得这明明日志数据里边就有时间戳,为什么我们说没有还需要做提取呢?啊,主要问题就在于当前对于弗link而言,他只知道我们这个event里边有一个。
09:21
叫做time step的长整型的字段,但是并没有指定成我当前可用的时间戳,所以我们知道这个对于flink而言,你必须有专门的一个指定,把它叫做当前的time step,才可以在后边的事件时间里边使用它,所以这就是我们当前提取时间戳的一个过程。提取出时间戳之后,那当然了,我们现在既然是有续流嘛,那就是按照这个升序的时间戳依次生成对应的水位线就可以了啊,所以整体来讲这个调用还是非常简单的,这是第一种情况啊,那接下来呢,我们当然就要考虑第二种情况,那是乱续流的。
10:01
水位线生成策略。啊,那同样我们还是基于stream去调用一个assign times and water marks方法啊,里边同样还是要传一个water mark strategy,现在我们调用的那就是另外一个方法,叫做for bed的out of orderness方法啊,那同样这个我们看到啊,也是一个翻新方法,我们给定当前的类型还是event。这里只要把这个泛型指定之后,后边的对应的这些类型就可以自动给我们匹配出来啊,所以这个还是非常方便的。然后我们看到,诶现在不一样了,现在这还在报错,所以这个方法是有参数的,这个参数呢是一个duration,诶那我们知道这是一个持续时间或者是时间间隔的意思。那他这里给的这个参数名称叫做max out of order needs,所以这里是在指定什么呢?回忆一下之前我们所说的,对于乱序流而言,我们的基本的生成策略其实就是要基于当前最大时间戳指定一个延迟,我们要延迟发车嘛,那这个延迟到底给什么呢?最好就是给当前整个数据流里边的最大乱序程度啊,就是本身出现乱序之后,那么当前它的这个乱序到底延迟了几秒钟呢?我们就等这个时间就可以了。所以这里所谓的max out of orderness最大乱序程度就是我们这里需要去等待,需要去延迟的时间。
11:37
啊,那最终我们看到它返回的就是一个邦ED out water waters啊这样一个类啊,当然我们知道最后这个肯定它是实现了water generator的啊,我们这里本身需要的就是这个generator嘛,所以。我们只要在调用的时候给它传入一个延迟时间就可以了啊,那当然了,跟前面我们升序流是一样的啊,有序流是一样的,那这里面同样我们应该去指定一个。
12:04
时间戳的提取方法啊,那也就是在后边with time a sign就可以了啊,所以接下来的这个方式比上边的话,其实就是多了一个参数而已,当前这个参数我们指定,比如说可以是延迟两秒钟,哎,那我们就给一个duration。点,我们看到有zero,有of days,有hours million minutes nine seconds啊,比方说这里是秒,那么我们就of second2,那就是两秒钟的一个延迟时间。当然了,后边我们应该继续有一个with time stamp a sign,里边我们可以去new一个szable time sta sign,那这里同样的提取策略还是t.time项,提取出来就可以。啊,这就是乱序流里边水位线上的策略,那至于这个延迟时间给多少,那就是我们说的要去做一个综合考量了,我们要根据当前数据乱序程度的一个分布情况,选择一个合适的啊,那不能太大,也不能太小,在实际应用的过程当中呢,一般可能这个到不了秒级别,因为我们知道啊,对于这个flink实时流处理来讲,一般这个乱序程度主要就是在数据传输过程当中啊,因为分布式的处理和传输带来的一个不同程度的延迟,导致了这里的乱序啊,那这个一般可能就是在这个几十毫秒啊,甚至几毫秒之间就可以了,所以一般这个我们都给的可能是这个毫秒数啊,那这里是为了方便后面我们测试,可能一般我们给一个几秒钟整秒数来做测试就可以了。
13:35
我们可以看到啊,有序流和乱序流它的水位线生成策略是非常的类似的啊,直接在这里去调一个静态的方法,然后后边加上对应的时间戳提取策略就可以了,诶那关于这个有序流呢,其实点进去点到源码里边啊,我们看这个sending time samplemarks。本身它所返回的这个generator,我们会看到啊,它其实本身就是邦ED out of al water marks它的一个子类啊,它就继承自乱序流的这个生成策略。
14:10
那它具体来讲是一个什么样的策略呢?其实就是给了一个零毫秒的一个延迟时间啊,所以我们就知道了啊,对于升序流而言,其实就是延迟为零的乱序流生成策略啊,那所以本质上它俩就是完全一样。啊,那这里边我们还需要多说一点的是,对于我们这里边最终生成的水位线到底是怎么样得到的呢?这个我们可以点到源码里边看一下这个帮out order water rocks。在这里边我们会看到啊,它其实本身在这个类里边,我们看定义了一个out of order media,这是我们当前的延迟时间,另外还定义了一个this.max time stepmp,这就是我们当前要保存的已经获取到的最大时间戳。然后我们看它最终不是要实现返回一个automark generator吗?啊,那这里这个auTo Generator我们看必须有两个实现,两个抽象方法,一个叫on event,另外一个叫on periodicit,我们看它是怎么做的,One event这里,哎,那当然就是每来一个数据,来了数据之后怎么办呢?我们现在是要周期性的生成水位线来数据并不生成。
15:23
而是把当前数据的时间戳拿到,然后去比较一下,跟之前最大的比较一下,然后保存一个当前最大,也就是说更新当前的最大时间戳,数据来了之后做的是这件事情。那什么时候生成水位线呢?那就是周期性的去发射了on periodicit,在这个方法里边,我们看到它有一个output参数,里边有一个it water mark方法,这个里边就会new一个water mark,创建一个water mark的对象,然后把它发射出去,那我们可以看到water其实就是link底层啊,给我们实现的一个final class final类啊,那在这个类里边,它最重要的属性我们看到其实就是一个TIME3。
16:09
这就是我们说的啊,Water它是一种特殊的数据结构,里边最重要的呢,就是表示当前时间进展的一个时间戳啊,那所以我们new这个watermark的时候呢,也就是直接传一个时间戳就可以了。这个时间戳就代表了当前的时间点,到底是什么时间点呢?我们看到它的算法就是当前的最大时间戳减去我们设置好的延迟时间,注意后边还要再减一。诶,这跟我们说的稍微有一点不一样,因为我们想的,如果我们延迟两秒钟的话,那应该就是比方说当前这个时间说是十秒,那就直接减两秒,然后等于八秒不就完了吗?这就是一个八秒的时间戳吗?注意它后面还要再减一,这里减的一是。一毫秒。也就是说当前这个water mark啊,它里面给的这个长整型的时间戳是以毫秒为单位的,哎,那所以前面我们这个max time Sam,如果是十秒的话,那应该是十乘以1000,也就是1万,那后面两秒的延迟,那应该是二乘以1000是2000,所以最后减掉之后是8000,再减一一毫秒,最终是7999。
17:24
方面,这是我们当前的事件时间的进展,也就是我们当前的表进展到了这个时间点。诶,那他为什么要这么做呢?奇奇怪怪的,后面还要再减一呢,呃,这个主要是为了解决我们的一个矛盾啊,那我们想到之前对于watermark水位线的特性,我们讲到有一个特点,就是假如说有一个watermark t。来了,生成了,那么我们知道它的时间戳是T,就表示当前的事件时间。Even time已经进展到了当前T这个时刻。
18:01
那它表示的意思是小于等于T时刻的所有的数据都到齐了,所有的事件都发生了。诶,那所以我们发现它是包含了T这个时刻的,这跟我们日常生活当中的经验是一样的,就是时间嘛,我们的表如果已经到八点了,那很显然八点这一时刻要发生的事儿也就都发生了,所以这跟我们的对于时间的日常经验是一样的。但是呢,这会带来另外一个问题,就是我们在做流处理的过程当中。那流处理里边所有的数据,它是一个一个来的呀,那同样都是八点钟的数据,它的时间戳都是八点。但是第一个八点钟的数据来了之后,他就会推进我们对应的水位线到达八点钟,那如果说他就准准的已经到了八点的话,很显然我们应该是八点这一时刻的数据都到齐了,后边不应该再来了,但是后面明明还有可能继续来啊,相同时间戳的这个数据,即使我们当前没有乱序,他也有可能继续来。
19:03
所以为了解决这个问题,我们就考虑,即使是有续流啊,我们生成水位线的时候,也不要直接就按照当前这个数据里边带着的时间戳去生成对应的水位线,而是在它的基础上减掉一个很小的时间,就表示什么呢?就是这一时刻之前的所有数据都来了,但是唯独这一时刻的数据还有可能继续来,那就相当于啊,相当于把我们这个watermark的含义就变成了直小。于T的这个数据来了,不包含等于了啊,那这个怎么办呢?呃,那我们就减一个最小的时间单位,在flink里边,最小时间单位就是毫秒,所以我们这里边再减去一个一毫秒就可以了。就是说water mark的含义还是没变,它还是啊,你来了什么时候的water mark就表示当前的事件进展到这儿了,小于等于这一时刻的所有的事件都发生了,只不过呢,我们生成的water mark跟数据本身带着的时间戳,他们稍微的有一点点滞后,相当于我们滞后了一毫秒,这样就可以保证在后边还可以来同样时刻的数据。
20:17
这就是link里边内置的水位线生成策略,一般情况下我们在生产环境里边啊,用的更多的,呃,肯定就是这个for bounded out of orderne这个方法啊,我们针对乱序流去生成一个水位线,指定一个延迟时间就可以了啊,另外还要去指定时间戳提取策略。
我来说两句