00:00
那接下来就在代码里边做一个具体的实现。接下来还是在当前的这个包下边新建一个当前我这个叫UV啊,With布隆过滤器bloom filter,英文那叫bloom filter。那我们其实想到了这个前面啊,基本上这个过程都一致,对吧,我还是这个主方法,先写出来,里边创建执行环境,读取数据,然后转换为类型。另外,我们基于它去分配时间戳和water。这个过程都是一样的,下面最后一步en nv XQ执行起来。这里边给一个当前的job name,我叫呃,UV can'with布布隆filter,对吧,Job filter。好,这就是我们整整体的这个程序架构先放在这儿,然后接下来我们想要做的,其实主要就是说利用这个不容过滤器去做这个实现了对吧,那其实本身这里边的原理也是差不多的啊,那所以我就直接先把这个copy过来吧。
01:16
先把这个copy过来,然后这里边可能涉及到一个问题,就是在于首先做的这个filter啊,数据来了之后,把这个PV先PV行为都提取出来,这个肯定是要做的,然后接下来啊,比方说我们这个直接time这个也可以,就我们就不分区了,对吧,就不呃按照那个key重重重新去设计去分组了,直接就全部扔到同一个分区去做计算,Time window2,然后后边这里边可能就要有一点区别,大家想之前我们的这个做法是一个全窗口函数啊,直接上来之后是不是就把它全收集齐了。啊,然后再去做计算,那我们想到现在的话,当然不能这么去做啊,那你如果要是收集齐的话,那这个数据那不就全存在内存里了吗?另外大家想到我也不能在内存里边去设置这个set这样的一个结构,数据结构的中间聚合状态,对吧?啊,就是也不能做这个增量聚合,那我现在的整体思路其实应该是。
02:15
大家想是不是应该就是来一个数,我就应该去到redis里边去判断一下当前这个在不在,对不对,哎,然后如果说在的话,我就啊,直接啥都不用,啥都不用做,就相当于过滤掉了嘛,那如果要是不在的话,那我相当于是不是count要加一啊,然后里边对应的那个数,我要呃,就是对应的那个位对吧,呃,我要把它制成一,表示它已经存在了,整体思路就是这样的一个过程,那这里边就涉及到另外一个问题,我们当前保存当前统计的这个状态,这个count值的话。那是不是应该是在当前这个window状态里面去保存着呀。但是当前我们这个window里边呢,我又不想要存着数据,因为大家想,如果默认我这里边我这里边啊,给一个类似于process这样的一个全窗口函数去做计算的话,那里边是不是相当于所有数据都会保存在当前窗口里边啊,那你如果要是数据都已经能存在窗口里边的话,那还有必要再去再在外面去做这个布隆过滤器嘛,对吧,那已经都存下来了嘛,所以这里边我有一个思路是。
03:27
我要定义一个计算的规则,就是每来一个数据。我立刻就触发一次窗口计算,然后在窗口计算的过程当中去连接red,然后去取出它位图里边保存的那个状态,对吧?然后判断到底在不在里边,我去做怎么样的计算,所以这个过程大家会想到,呃,之前我们也有就是来一个数据,就触发一个计算去去做这个中间,呃,聚合的那个是我们之前讲的那个增量聚合函数,对吧。
04:00
但大家知道增量均衡函数里边没有那么多复杂的可以应用的方法,呃,我们在里边如果要去连接这个,呃,比方说连接的话,可能就没那么方便,对不对?哎,我们在这里边,你最好的方式还是在这里边我们直接用一个process方法啊,有生命周期,有上下文啊,能定义状态,能做的事情就会更多,那假如说我们不用增量最高函数的话。窗口的计算就只有在窗口结束的时候能触发一次呀。或者是后面那个迟到数据来的时候能触发对吧,那那我们之前就是没有到达窗口结束的时间的时候,这个这个数据怎么能来了之后就就触发一次呢。哎,这里面就涉及到我们之前讲的有另外的一个可选API,是不是可以自定义窗口的触发器啊。哎,所以之前我们一直没管,那就是默认条件下时间窗口是不是就是等到到达窗口结束时间的时候触发一个操作啊,那我们现在不是了,我们现在不想要到达窗口结束时间的时候才触发,直接来一个数据就触发一次,而且我是不是可以直接把这个窗口里边的数据都不保存,窗口状态都不留啊,我做直接做到极致,就是来一个数据直接触发一次窗口计算,然后把窗口状态全部清空,那是不是相当于对于我们这里的内存压力就是最小啊。
05:24
啥都不留对吧?啊,当然了,这个代价就是说我们每每来一个是不是都要去连接redis,都要去做判断对吧?啊,所以接下来我们要做的就是这样一件事啊,那是不是在这个time window all下边就得来一个。来一个自定义的trigger啊啊,所以这里面我们定义一个又一个my trigger。然后下边啊,用一个process方式啊,Process window方式来实现这个功能,我们把它叫做UV can'result with布隆过滤器。Bloom filter。
06:02
这就是我们呃定义好的这个整体处理流程,然后这里我们的这个my trigger到底应该怎么去做实现呢?自定义触发器,这里面我们首先把它写出来,Public static class my trigger,其实大家想到这里边我们想要实现的就是一个trigger,对吧?然后这个trigger本身是一个抽象类,所以这里面写的也是extend一个。Trigger。大家看到了,本身有泛型,这里面的类型是不是就是当前的数据类型T,另外还有窗口类型W,所以当前的数据类型是什么?这是不是还数据类型完全没变过对吧?前面只是做了一个future而已嘛,所以还是前面我们的user behavior对吧?哎,所以当前数据进来的时候,User behavior,那么窗口类型是窗口类型time window,好,先把它写好,大家看这样一写这就没问题了,对吧。
07:13
我们看一下这个抽象类里边具体要重写哪些方法。这里面的方法稍微的有点奇怪,大家看到是这么几个方法,叫on element on processing time。On time,这三个都长差不多对吧,都是on什么?最后还有一个clear clear clear的话相对来讲更好理解一点,就是直接清空嘛,对吧?清理啊,就是相当于呃,我们想要做什么操作啊,清,清除什么什么状态啊,做这样的一个操作,这里边要清除的主要是自定义的一些状态,所以如果我们不单独定义的话,那就不用管这个,空着放在这儿就完事了,那关键就是上面这些on啊。大家一看到on联想起来之前是不是有那个on timer啊,哎,是不是就是在什么条件下触发这样的一个东西啊,哎,所以这里边才是这个触发器嘛,所以大家看这个on什么,后面是不是跟着就是当前的条件啊,所以on element指的是什么?是不是就是来了一条数据之后,接下来要干什么事啊?呃,那这个on processing time呢?那是不是就相当于是处理时间发生改变的时候,我要看假如说有这个处理时间定时器的时候,我接下来干要干什么事儿,对吧?就类似于是我们那个处理时间定时器一样的那个那个做那个操作啊,那对应这里面还有on even time on time是不是就是事件时间改变,那大家想是不是就是whatmark前前进推移啊,然后我接下来要做的是不是就相当于是一个类似于on even time timer的那个操作啊,对吧?啊,接下来就看这个事件时间有没有定时器啊呃,需要去触发哪些操作,所以这里边主要就是这些用法。
08:51
那这里面我们看一下这里面做的这个操作,首先能拿到的东西啊,如果是element的话,是不是能拿到当前数据,然后另外还能拿到当前的时间戳time Sam,这是从是不是从数据里面提取出来的呀?啊,另外还有window的信息,还有当前的上下文信息,所以你看他能拿到的东西其实是很多的,如果是on processing time和on even time的话,这里大家看是不是就少了一个element啊?
09:19
这就跟数据没关系了,对吧,只是判断当前的时间戳是多少,当前时间是多少,然后诶,当前的这个窗口和上下文是什么。那我们看一下,它要返回的又是什么东西呢?是一个trigger result trigger result我们点进去看一下,它是一个枚举类型。这个枚举类型里边有四个不同的取值,我们挨个看一下啊,一个叫做continue,大家想这个trigger result,这是不是就表示我当前这个触发器的一个结果啊,触发器的结果当然就是说要不要触发对吧?就当前这个窗口到不要,到底要不要触发计算,那所以continue什么意思,继续,继续,那大家联想起来的话,像我们这个在哪里见过continue,语法里边是不是就是循环里边见过for,循环里面见过肯用啊,继续,那是不是相当于现在就什么都不做,继续下一个下一轮迭代对吧,下一个数据,所以就是相当于什么都不做,No action is taken对吧,什么都不做continue。
10:24
然后还有一个操作,我先看下边啊,还有一个叫做fire fire意思是它本身有火的意思啊,那那我们现在这个做动词的话就是开火,开火,其实额外的一个意思是不是就是触发呀,对吧?触发计算,那所以这里的发样指的就是我要触发一次窗口的计算,哎,那大家想到触发一次窗口的计算,之前我们还说说过触发计算是要输出,输出那个呃,计算结果了,但是窗口关不关呢?大家注意它是不关的。
11:00
关窗口的那个操操作,另外叫做叫做P对吧。P就是有清理清除的意思,所以你看一下这是所有的element,所有的元素在这个window里边,对吧,然后就被清清除掉,然后当前的这个窗口就被直接丢掉了,这是不是我们说的那个真正的关窗操作啊。对吧,哎,这里边就是它,它不做计算,直接就把它关掉,直接把它清空,这是所谓的这个push操作,那另外还有一个叫做fire and per,那顾名思义就是触发一次计算,然后用清空关窗,对吧?啊,清空所有数据和状态,把窗口关掉,所以这就是这个呃,不同的几种不同这个trigger触发器的result结果,那这里边我们再再来看的话,里边它其实包含两个字段的,就我们这里面的定义对吧,它包含两个字段,哪两个字段呢?大家看就是布尔类型的两个,一个叫做fire,一个叫做破,是不是就是控制窗口的两个行为啊,一个是是否触发计算,另外一个是是否清空关闭对吧?诶那所以大家会发现上面这个如果说两个都是false。
12:15
是不是就是又不触发计算也不清空,这个就叫做continue,啥都不干对吧?那如果两个都要做的话,这个是不是就叫fire and per啊对吧?又要计算又要清空啊,那如果要是只触发计算不清空,这个是不是就单独叫做fire啊?然后如果只清空不触发计算是不是就叫做破?哎,这就是这里边它具体的定义啊,枚举类型,结果就这么几个。那所以接下来按照我们的设计思路,当前每一个数据来了之后,我们的想法是不是每一条数据来到,是不是直接就触发。窗口计算并且怎么样,是不是直接清空窗口状态啊,直接把窗口关掉对不对,哎,大家大家想我们想的本来是这个,呃,就是把把窗口不是要直接关掉,我们其实就是把数据都清掉,但是大家其实大家想这里边的这个就是关闭窗口,其实操作没有我们想象的那么重啊,大家想这个窗口里面最重要的是什么。
13:24
窗口其实本身的定义啊,你像我们这个time window啊,大家看它里边主要就是啥呀。是不是就是一个start,一个end呀,所以它其实里边最重要的东西是我们自定义的那些状态和数据,对吧?只要那些清掉了,其实当前窗口就已经没什么意义了,所以这里边我直接关那下一个,如果要是还有下一个当前窗口内的数据来了,来了怎么办呢?来了我再开一个这个start和end,我我能算出来对不对?大家想之前我们那个每个数据来的时候,是不是它也是要自己算一下当前属于哪个窗口啊,哎,所以这个过程其实并并不涉及到我们计算逻辑,就是这个窗口就真的丢了就没了,对吧,你还是可以算出来的呀,呃,所以这没办法,呃,这个没什么关系,对吧?那但是我们这里边如果做了这个清空的话,那它里边的之前的数据和状态就彻底没有了。
14:18
对吧,你后面如果要基于之前的状态去做聚合,那就做不到了啊,所以这里边如果说我们是每来一条数据直接触发计算并清空的话,应该返回什么。是不是直接就是一个trigger result.fire and per,对,好,那所以既然我是按照每个数据来出发的,下边的这个处理时间和事件时间的那个定时操作是不是一点用都没有啊,这里面涉及对吧?所以我就直接给一个continue,啥都不用做,这样就完事了。Result continue,对吧,这就是前面我们这个自定义触发器的一个过程。
我来说两句