00:00
已经实现了一个基本版本的UV的统计,那大家会发现这个过程当中,我们其实是用到了一个本身tla里边的set结构,大家想一想,这个我们在处理这个UV去重的过程当中,我们对于内存的一个占用情况是什么样呢?哎,我们自然就想到,其实在这个过程当中,首先window里边我们说它会把所有的数据全存下来,对吧?啊,Window在处理的过程当中,是把这个所有数据如果是全窗口函数的话,所有数据是存成状态的啊,那我们就想到这个数据量本身就非常非常大,对吧?然后另外呢,我们还在内存里边又创建了这样的一个set结构,进行这个去重操作,具体做运算的时候呢,又有这么一个大数据量的一个一个存储过程,所以在整个这个过程当中,我们尽管实现非常简单,但是如果数据量大到一定程度,可能我们当前这个window的这个操作就有点扛不住了。
01:00
啊,那有同学可能就想到,哎,这个数据量我们现在统计这个一个小时嘛,一个小时能有多少呢?诶大家不要小瞧这一个小时之内的用户uva的这个,呃,就是点击量啊,这样的一个数据,对于一些大厂,对于一些顶级的这个企业而言,他们的用户访问数据量一个小时里边的,哎,那那可是大家能够想到啊,就是可能会达到这个,呃,百万千万甚至一级的用户来来做这个访问操作的,那本身的数据量就非常非常大,即使你去重之后得到的这个set结构也会非常非常大,对吧?那有同学可能想到了,哎,我这里边这个,呃,你既然说这个window里边把所有的数据都存下来了,这个数据确实是量有点大了,呃,有点大,那我只存这个ID嘛,或者说我做那个增量聚合对吧,我只保保持一个这个set作为当前的状态,然后我就每来一个数据,就把它添加到这个set里,就把那个。
02:00
User ID添加到当前这个set里,这样的话,不就把我所有数据要保存的这个内存内,内存的占用就可以省略掉了吗?诶,当然这是一个很很简单的一个优化的方向啊,啊,但是我们会想到这个尽管能优化一些,把这些不必要的其他的信息可以滤掉啊,最后我就保存ID,保存一个set,但是你想这个数据它还是很大呀,大到如果说上亿个user ID的话,呃,我们先大概的算一算,这个上亿user ID的话,我们存储空间需要多大,对吧?呃,你比方说这个当前这个上亿的user ID,按照我们这个数量级来讲的话,1亿就是十的八次方啊,那我们一般情况说这个,呃,当当前这个每一个每每一个user啊,User ID如果说我们一般去保存的话,那可能,呃,就像这里边,如果我们用一个长整型的话,那可能只有这个八个字节,但一般情况下我们都会。
03:00
这个十个字节,甚至这个几十个字节,这个这个量级对吧?啊,那比方说我们这里面就简单一点啊,就乘以这个啊,当前是这个八八个字节,那是二的三次bitt对吧?大B,也就是说当前我们的整个要保存的数据量应该是这个量级,那十的八次方这是呃,这是1亿啊,大家如果要是对应来做这个,呃就是转换的话,我们转换成二进制可能会更加好理解一点,它本身十的六次方我们认为就是一兆嘛,那这里边就相当于应该有呃就是按按我们这里边的这个统计,就应该是有十的,呃,就是100乘以十的六次方,100兆对吧,100兆再乘以呃这个8BIT,那这里边相当于就应该有800兆,对吧。诶,这是我们就是一个窗口里边,你如果要去存储这个所有的上亿的user ID,假如一个只只用这个就是八个字节来表示的话啊,那实际上大家知道一般情况我们在做这个考量的时候,哎,我们会会,呃,一般认为一条数据的这个数据量,我们会认为是在1K左右,对吧?啊,1KB左右,而如果说你如果按照这个标准去看里边的每一个字段,我们的user ID的话,你考虑到它,它能够取值的这个各种不同,对吧,因为我们这里长整形的话,你可能想到它就是一个数嘛,但事实上很多我们在做这个呃,ID去做存储的时候,它其实都是一个字符串对吧?哎,都是这个比较复杂的啊,很长的一个字符串,打乱的这样的一个编码的一个形式,所以说往往这个数据量呢,可能你这个这个八个bit根本搞不定,你看这八个bit一个窗口里边的这个状态啊,我当前要存的这个set已经达到了800兆。
04:58
接近一个G的这个级别了,那假如说你这里边啊,就是如果要达到这个几十兆,甚至呃几十几十个bit,甚至100个bit这样的一个一个一个字节的存储的话,ID如果要达到这个量级的话,大家想想啊,你这里边如果用一个100个bit by bitt的话,整个我们就是十的十次方。
05:21
啊,十的十次方,这个可就不是一般的量级了,大家知道十的九次方是一个G对吧?哎,所以如果是十的八次方乘以100个BAT的话,这个得到的可就是十个大概啊,这个不是精确的值,大概十个GBAT对吧?10GB,那你说一个窗口的状态,我要存十个G,这这基本上就已经不可能了,对吧?啊,就是我们最小的情况可能也得接近一个G,一个窗口的状态啊,就一个G,然后你如果要是这个比较严重比较大的那种情况下,那可能得达到几个G甚至十个G这个级别啊,这个就是完全不可接受,那有同学可能就想到,那这个用什么样的方法去做这个存储呢?诶,我们一种替代方式是想到可以用red,对吧,Red作为一个内存数据库,我们对于一些啊,就是要快速访问读取的一些应用而言,经常会把它作为一个类似于扩展存。
06:22
储的一个呃地方对吧,而且我们知道在release里边,你定义这个set数据结构,直接做这个驱重其实非常方便的,但是我们会想到你假如说是几个G,至少大家看到啊,就是最小800兆大概一个G啊,如果大一点的话到十个G,那基本上平均来看的话,可能是个几G的这样一个量级,对吧,一个窗口就几个G,那你想想我们在同时在做计算的过程当中,哎,可能这个窗口是非常多的,对吧,我们要占用的内存,那可能就是连re这样的一个数据库也是也是不能接受的啊,因为re本身这个内存,其实数据库大家知道它本身成本是比较高昂的嘛,哎,那这里面就有一个问题了,那如果要出现这种情情况,这种场景到底该怎么做呢?
07:08
在这种超大数据量的情况下啊,如果要做数据驱虫的话,那可能你直接放内存或者说放release都不是一个好的选择,这里边给大家提出一个,呃,就是基本的想法,就是可以用特殊的数据结构来做这件事儿,这就是我们在文档里面给大家提到的布隆过滤器,那布隆过滤器到底是个什么东西呢?呃,简单来讲的话,其实这个布隆过滤器其实就是一个大家可以认为就是一个位图的存储,对吧?哎,简单来理解,它就是它里边的这个存储空间,就是一个一张大的位图,什么叫位图呢?Bit map嘛,就是里边每一位就是按照位去保存的一张,呃,一个大的存储空间,对吧?里边的一位都有它单独的含义,单独的这个意义在里边,那我们这里边每一位代表什么呢?代表与之对应的某个字符串或者某一个。
08:09
ID啊,我们如果要做ID去重的话,某个ID是否存在,因为大家想到每一位不就里边存存什么一位不就是要不是零,要不是一吗?那零和一不就能代表一个布尔类型的值吗?所以我这里边其实判断这个当前我们要去重的时候,大家会发现啊,我其实并不需要把当前的这个user ID所有的数据全存下来,哎,大家想你一个user ID可能这个,呃,就是十几几十个这个BAT对吧,几十个字节,甚至上百个字节,那那这个没有必要啊,我只需要知道这个UID到底在不在,就是有没有存在过,有没有出现过就可以了,我并不需要知道它到底长什么样,到底每个字节是什么样子的,对吧?哎,所以利用这个特性,我们就可以对它进行压缩,如果压缩到极致的话,那就是就是我们这里边想到的这个想法啊,就是一个user ID,它。
09:09
它的这比方说我们以这个十个或者说几十个BAT啊字节做作为例子的话,压缩之后就应该可以只以一位来表示,对吧?当然这一位你必须得有得有规定,就是说比方说我一张很大的一个位图,那位按顺序排列对吧?我必须得知道哪一位代表的是哪个user ID,大家想想是不是这样,你不能说是这个,哎,直接我就把这个user ID压成一位随便放,那我怎么知道它代表哪个uz ID呢?下次再来了之后,我找不到了,我不知道到底去哪找,它到底存不存在,所以这里边我们这张位图含义就在于里边的每一位跟user ID之间都会有一个一一的对应关系,对吧?哎,那这样的一个对应关系就是说我通过UCID做一个快速的计算,直接就能把它直接对应的找到那个对应的那个位上去,那这样的话怎么。
10:09
怎样做这个快速的匹配关系呢?诶,我们自然想到了哈希嘛,做一个哈希计算对吧?诶,所以其实所谓的不容过滤器,它的根本的算法含义就是说我们把一个想要做去重的一个user ID啊,一个ID,一个字符串,然后取一下哈哈希,然后呢,这个哈希值就对应着我们一张很大的位图里边的一个偏移量,对吧?哈希值我们算出来一个这个整数啊,一个正整数,那么它就对应一个偏移量,这个偏移量上对应的位图里边我们肯定存了一个零或者一,对不对,哎,那这里边就是你如果存了一个零的话,我就表示当前这个uz ID没有出现过,那如果说要存了一个一的话,我就表示当前uz ID已经出现过了,那最后我怎么统计一共有多少个去重之后的UID呢?是不是直接看当前的这一个bit map这个位图里边到底有多少?
11:09
一是不是就可以了啊,所以整个的这个处理过程其实是非常简单的一个过程啊,就是这样的一个对应关系,当然这里面涉及到一个问题,就是大家想到你要做哈希的话,那是不是就有一个问题,首先是有一个哈希函数的选择对吧?另外还有一个问题就是你哈希做完之后,对应的是我们这里边的某一个位,那假如说呃,就是你你这个取,取这个之后,有可能是每一个哈希对应每一个位,也有可能哈希还要做一个那个曲模运算对吧?啊,就多个哈希对应同一个位,你这里面会想到如果是多个哈希,那就更不用说了啊,那肯定就会出现碰撞对吧?就同样的ID,不同的ID就就对应着同一个位了,那即使是算出来是不同的哈希,但是大家,呃,就是即使是我们每一个哈希都对应着不同的位,对吧,那我们在做这个哈希计算的时候,如果说哈希函数选的不好,是不是也会出现不同的uz ID。
12:09
经过求取哈希之后,得到了同样的一个一个位上的这个数据啊,对吧?呃,这就是我们所说的这个哈希碰撞,那怎么解决这个问题呢?哎,那简单来讲就是一个你的哈希函数得选的好一点,对吧?针对我们当前user ID的这个长度,当前它的特点去选择对应匹配的哈希函数,另外还有一个要求,就是我们这里边尽量要稍微做一个,诶,不要那么稠密对吧?你比方说我一共有1亿个数据啊,然后我就让它对应到1亿位上,每一位都必须严严格的对应,对应着一个uz ID,那这个出现哈希碰撞的概率就很高啊,你没有任何容错空间嘛,稍微这个哈希函数出点问题,你就两个就撞上了,对吧?啊,因为你都是一个萝卜,一个坑,必须是占到它单独的那个位置上去才行啊,这种情况就不好,那可以怎么样呢?我可以把它做的稀疏一点,对吧?啊,就比方说我一共要呃判断这个1亿个数据。
13:09
我给你2亿个位去保存这1亿个数据的状态啊,那这种情况就有可能隔一个对吧,或者隔几个才来一个状态,表示我当前这个位置,那出现碰撞的概率就会小很多啊,所以这是一般做这个波轮过滤器的时候常用的一些处理的手段,但是大家知道现在不用过滤器其实是一个非常成熟被非常完善的数据结构啊,大家用谷歌里边给我们提供的这个组件啊,大家知道底层就会给我们做这个哈希函数的优化,对吧,然后给我们它可以算出来一个,呃,就是最优的哈希函数和当前位图最优的一个大小,这都是可以去,呃,就是你直接调包就可以实现的,但是在呃在这里呢,我们还是要手动给大家做一个实现,让大家对这个底层的原理理解的更加的清楚一些啊,那这里边我们先大概的给大家还是算一笔账吧啊,就是前面我们说了啊,每一个user ID,假如说这里边我们以这个十个BAT来算。
14:09
的话,十个BAT相当于是呃,不算太太多对吧,大家说这个一个长整形也是八个字节嘛啊,那所以这个十个BAT肯定不多啊,如果是有1亿个当前这个有有1亿个这个呃,数据啊,就是我们说上亿用户UID要做去除,那乘起来之后,这就是1G对吧,十的九次方111GB。这是我们当前如果你要完整的把它存成一个map啊,就是存成一个set啊,不管是在red里边还是在内存里边要占据的空间一个G对吧?哎,那如果说现在我让每一个uz ID都对应到位图里边是一位的话,那大家再看一下当前1亿个位占据的空间就变成了多少呢?
15:04
就是十的八次方位对吧?哎,那大家知道这个,呃,一个BAT字节是八个位嘛,那所以我再除以八,那大大概就是这个,呃,就是128再乘以十的,这这里边相当于你是用这个1000除以八,呃,应应该是这个大概一百一百二十五对吧,125大概就是一一百左右啊100,然后再乘以,大家想到就是乘以这个十的五次方对吧?哎,那所以我们这里边得到的其实大概就是十乘以十的六次方,也就是十兆对吧。大概是这样一个数量级啊,那所以这样的一个对应关系的话,我们就看的就非常明明显了啊,你这样原先的这个一个G,我觉得放内存和放可能都不太能接受对吧?一个窗口一个G或者几个G,这个接受不了,哎,那这里边如果要是一个窗口只有十兆的话,哎,那我直接放内存或者说放re release好像都是一个很好的选择,对吧?哎,这个就是看起来就完全没什么问题了啊,那另外我们还提到了,因为要考虑到哈希碰撞的问题啊,就是布隆过滤器,从严格意义上来讲,它应该是什么呢?它是一个。
16:21
概率性的表示这个当前某一个user ID在不在我们这个位图里边出现过的一个一个数据结构啊,啊,他问含义什么叫概率性呢?就是说因为它有可能出现哈希碰撞,所以假如说我经过求取它的这个哈希值啊,一个UID。求这个哈希之后,如果说对应的到了这一位上,假如说这一位上是零的话,那我可以确确认它肯定没有出现过,这个是可以保证的,为什么?因为即使是有别的uz ID求哈希之后也是到了这个位上,那发发现这一位是零,那说明别的uz ID也没有嘛,对吧?那肯定这个当前这个uz ID也没出现过,只要有弱,这里边是不是就应该是一啊啊,但是如果这里是本身是一的话,它只表示有一定的概率,这个uz ID出现过,这个概率就是不发生哈希碰撞的概率,对吧?你假如说发生了哈希碰撞啊,另外就是呃,这个发生了哈希碰撞的话,还得就是别的user ID也已经出现过,对吧?这种情况下,哎,碰撞到了同一位上,而且他也出现过,这里边写了一啊,所以你如果要是当前这个user ID发现它是一的话,说不准自己是没有出现过,是别人出现过,所以我们说这是一个。
17:42
概率性的表示一个一个UID啊,是否出现过的一个数据结构啊,那那这里面我们其实可以不考虑那么多啊,我们怎么样避免这个哈希碰撞呢?啊,一个是这个哈希函数的选择,我们这里边先先就不考虑这个了,这主要是一个算法的问题,那这里边我们能做的是什么呢?你把这个空间稍微扩展一点嘛,就我们说的,你让这个位图稀疏一点,不要那么严格的一个萝卜一个坑,对吧,这样的话就好做多了,那比方说我可以给它稍微的扩展上几倍对吧?哎,我给它做一个比方说,我用一个50兆60兆的一个,比方说64兆的一个空间去表示它,哎,那当前的这个数据的稀收度就非常高了,出现哈希碰撞的概率也就很少了,对吧,哎,那你说当前的一个,呃,一个窗口里边要保持的这个状态啊,所有的这个数据是60多兆,这个看起来也可以接受,对吧,你不想放在内存里边的话,我扔在red里边嘛,肯定可以。
18:42
搞定这个数据啊,但是你那里几个G的话,这个我们就搞不定了,所以这就是关于这个布轮过滤器使用的一个原理啊,那接下来我们就在代码里边还是给大家来做一个简单的实现啊,我们用一个手写的方式把这个底层的原理给大家,呃,相当于都都实现一下啊,啊然后这里边我们有一个基本的想法是后边呢,要把这个位图还是存到red里边去,那所以这里边还涉及到跟red之间的一个交互,对吧,因为我们设置这个,假如说64兆这个,呃,这样一个位图的话,那其实还是稍微有点大的,我不想放在内存,那放到red里面去,所以这里边我需要引入相关的依赖啊,这个大家知道,呃,这个不是我们那个和这个flink的连接器的依赖,我就是只是想去操作一下对吧,向他去发指令而已,所以这里边我们直接引入的是这个je对吧,那就是的客户端这个依赖啊,我这里边用的2.8.1,我这里就直接把这个引入了。
19:42
呃,大家知道我当前就应该是只在自己的这个POM文件里边引入就可以对吧?呃,这不需要在副项目里面引入,因为只是当前这个UV驱重用到了它,好先把这个引入,接下来我们在代码里边去新建一个scla的单立对象object,然后接下来,呃,我们当前这个就是UV,呃,With布隆过滤器对吧?Bloom好先把它定义出来啊,然后main方法里边,那整体的流程我们想到其实跟前面肯定是差不多的,对吧?诶,我们前面这个就不改了啊,而且这里边我们就还用之前的这个WINDOW2这个方式直接给呃,就是把把把这个先啊,或者我们不用WINDOW2也行啊,先抄到这个filter对吧,把这个PV行为全抄出来,全这个过滤出来,然后这里边还是上面,因为我们不想引入对应的那个class啊,所以单独做一个类的引入这里边啊,上面不要忘记把。
20:42
这个下划线啊,影食转换引入,然后时间特性引入后边啊,后边就没什么了啊,主要就是做这个啊样衣类类型的一个转换,然后分配时间戳watermark,后边呢,我们基于它做了一个filter,把对应的这个PV行为选取出来,然后接下来啊,如果说呃,大家觉得这一块我们不想去做,呃,那个WINDOW2的话,也可以还是自己去定义一个K对吧?哎,那这个定义K太麻烦了,我还是就直接用我们之前那个雅K的那个形式给大家直接做出来吧,对吧,比方说这里边有一个呃,Map成一个二元组,我们这里边就是给一个默认的所有数据都一样的一个P啊,就就不做那个随机生成了啊,然后给一个这个UV啊,然后后边我们想要的主要是什么呢?主要想要的就是它当前的那个userz ID对吧?啊,就别的数据其实我都不关心,我关键只要是有这个userz ID就够了,然后前面的那个。
21:42
啊呃,当前的这个UV呢,主要是用来分组去开窗对吧?做这个,呃,当前做这个聚合操作要用的,那我用这个下划线一全部分到同一个组,接下来开一个一小时的时间窗口,Time还是引入这个window in time.time hours1,这前面的过程基本上差不多啊,我有时候可能用这个不同的实现方式啊,大家知道这里边还是分到了一个组里边,跟WINDOW2其实差不多的,然后呢,我相当于已经对这个数据做了一个一个,呃,就是筛选了,对吧?就呃,不,不能说筛选就是已经做了,砍掉了一些没用的东西了,我只提取了当前的这个user ID传到了后边啊,那接下来我们就要做一个啊,大家可能想到这个可能操作有点复杂,那我直接放大招吧,用一个process function式对吧,这里边我实现一个UV count with bloom哎,这样的自定义的一个process function。
22:42
然后在这个过程当中,我们会想到啊,呃,假如说我这里边这个process function还是像我们之前说的啊,就是你要不是增量聚合函数,要不是全窗口函数,对吧?如果是全窗口函数的话,那就不用说了,你这就相当于是什么把所有的数据不就全存在当前这个窗口的状态里了吗?全放在这个input里边,直接拿出来再去做操作吗?哎,这就完全没意义吗?你如果要是我们当前呃这个窗口的状态就能把所有数据都存下的话,那何必还要再去另外做这个布准过滤器呢?哎,就没必要了,对吧?那另外还有一个想法就是说,诶,我们做一个增量聚合,就是那你增量聚合的状态是什么呢?哎,大家会想到这里边我增量聚合的状态应该得是一个啊,就是应该得是一个set类型,或者是一个不能过滤器,对吧?啊,这个实现起来其实是可能的,但是呢,处理起来就相对来讲会麻烦一点啊,因为你如果要做这样的一个状态的定义的。
23:42
啊,我们其实是需要对这个window做更多的配置的,那这里边有一个非常简单的实现机制,我们有一个这样非常简单的实践思路啊,就是你不是window里边就不想存更多的状态,存更多的数据吗?好,我直接window里边什么状态都不存,然后我定义一个什么呢?每来一条数据之后,我直接把当前的这个,呃,所有的操作啊,全放到release里边去做操作,然后window的状态呢,全部清空。大家还记得之前我们有一个那个trigger的定义,对吧?Trigger里边定义两种操作,一种是窗口的计算触发那个fire,另外一个是就是啊所谓的这一个清空,对吧?P,所以当前我可以定义一个什么,我自己定义一个trigger,自定义一个触发器,大家还记得这个可选的API对吧,我定义一个my trigger。
24:42
那么当前这个自定义的触发器主要是用来干什么呢?自定义触发器主要就是用来每来一条数据,我根本就什么都不存,那我应该做什么操作?我直接触发一次后边的窗口计算,然后直接清空所有的状态。
25:02
所以大家想到这里边我应该实现的是什么呢?Fire and per对吧?哎,就是前面大家提到的那个几种不同的返回的那个trigg根result啊,我们这里边相当于就是既触发计算又直接清空状态,我当前这个窗口里边什么都不留,什么都不存,然后所有用到的那些东西呢,我全存到里面去,那具体的逻辑当然就都在当前我们定义的这个process方式里边连接,呃,这个对吧,去做呃位图的保存,然后去做判断,然后去去重,所有的这些东西都放在里边就完事了。好,这是一个非常简单直观的思路啊,所以我们这是自定义触发器,呃呃,这个我们就直接在后边给大家先把这个实现出来啊,我们呃自定义触发器啊,触发器它主要的做法就是每来一条数据直接清空。
26:07
呃,直接触发窗口计算,也就是调我们后面的这个全窗口函数,对吧,里边的那个计算并清空。窗口状态,所以这里边我们其实做的就是这样的一个操作啊,那我们看一下本身要实现的这一个当前的这一个触发器应该是什么样的呢?My trigger啊,然后extend啊,大家知道这个本身它实现的就是一个trigger对吧?之前我们在呃源码里面也看过这个trigger啊,它有一些这个fli底层已经给我们实现的even time trigger,对吧?Proing time trigger其实是有这样的一个类型的,然后它里边传入的这个数据类型又是什么呢?两个泛型,一个是当前的数据类型T,另外一个是当前的window类型W啊,所以这里边我们传的数据类型大家注意一下啊,不要再出现之前的错误,我们现在的数据类型是二元组对吧?所以里边的这个数据类型string,然后另外还有一个,哎,我定义成一个长整型,哎U的ID本身就是长整形对吧,这个不需要加L什么的啊放在这儿,然后另外当前的。
27:19
Window类型开window,把这个定义好,然后大家看上面就不再报错了,对吧?类型匹配这就不再报错了,看一下必须要重写哪些方法,有四个方法需要重写好,然后这里面大家看其实有三个方法都差不多,什么意思呢?On time on processing time,还有一个on element,大家一看这个on就想起来之前我们那个on timer对吧,定时器触发的时候on timer,所以它是类似于一个事件触发的一个操作,就是某种事件来了之后,我要做什么操作,它就用这个on来定义啊大家如果做过那个前端编程的话,应该也,呃,就是写过JS的话,大家肯定知道对吧?哎,我们一个页面上有一个有一个按钮啊,有一个button,然后你定义它那个on click方法啊on click什么意思,就是我当我鼠标点这个按钮点一下的时候,出现这个事件的时候,我做什么操作,这里边也类似,我们这里边之前接触过on timer定时器触发的时候做什么操作,现在呢。
28:19
On even的time on even time指的就是什么?那就是收到water mark的时候,有water mark改变的时候,我现在要做什么操作,那还有就是on processing time对吧?也就是当前我这个系统系统时间有进展的时候作我当前要做什么操作?那另外还有一个on element,这个就很好理解了,那就是每来一个数据元素的时候,大家看数据不是二元组吗?每来一个数据的时候,我当前做什么操作?那家想我现在做什么操作呢?不就是每来一个数据的时候,清空状态触发我的那个窗口计算吗?啊,大家看怎么控制我触发窗口计算和清空状态呢?返回值都是trigger result对吧?哎,所以我就是通过当前的这个trigger result来控制做什么操作,哎,这个就比较简单了,我现在就是on element的时候,这里边真正做一个操作,返回一个trigger trigger result,这是一个里边可选枚举类型,对吧。
29:19
我要的就是fire and per,诶,那至于别的情况下,假如说watermark,或者说这个这个处理时间,那就更没什么用了,对吧,我们当前是事件时间吧,那watermark不管对吧,我只关心数据,数据来了之后直接触发计算清空,好,那所以前面这个就是什么都不做,什么都不做,那个叫什么continue,对吧?啊,就是可以是只fire,只计算,也可以是只清空,也可以是两者都做,也可以都不做,都不做,这个叫continue啊,所以我们用这个啊,那同样后边这个processing time下边也是continue,对吧?啊,另外还有一个clear clear这里是什么呢?是我们在这个trigger里边也可以定义一些状态,或者说定义一些上下文相关的一些东西,那在这里呢,做一个收尾工作,就是做一个清理工作,那我们现在啥都没定义啊,哎,那当然啥都不用做对吧,它也不需要有返回值嘛,Unit类只是做清理工作而已,所以整体来讲我们这个trigger啊,定义还是比较简单的。
30:19
那后边重点就在于怎么样去实现这样一个UV count with b自定义的这个呃处理流程了,对吧?啊,那上面我们先把这个完整的写完,就是做完这个操作之后,UV stream就可以去做一个print打印输出,然后后面最后大家不要忘记当前是UV withm中,这就是我们整个的处理流程,先定义好。
我来说两句