00:00
在我们需要的东西都已经准备好了,那关最关键的就是实现我们自定义的这个,呃,当前定义好的这个process function了啊,那所以接下来我们在后边实现自定义的呃窗口处理函数,其实就是一个window function啊,只不过这里边我们是一个process window function对吧,是process里面调的,所以class,然后接下来我们把这个定义好,Extend,一个process window function,哎,我们说这个process function里边它的这个家族成员很多,在不同的地方调process这个接口都会里边传一个不同类型的process方式,那现在我们又接触新的一个类类型process window方式,对吧?我们说它是一个全窗口函数,然后里边它需要传这个类型还是input output k和W,大家看到这里边的处理跟我们前面那个KBY之后,呃,做的那个就是window。
01:00
方式的类型定义是一样的,对吧?所以它也是一个全传播函数啊啊,这里边我们定义当前的数据类型二元组对吧,已经map成二元组了,String long,然后后边output呢,Output我们不是要那个UV count嘛,对吧?呃,包装成样例类,所以就是这个,那当前的key是什么呢?当前的K,你要看KY的时候KBY,什么k by UV对吧?啊,就是这个雅K啊,所以这里边就是一个string,最后还有一个time window,当前的这个窗口的类型定义在这儿看一眼检查一下,诶,不报错对吧?所有的前面这个编译的类型啊,都都已经不报错了,编译已经通过了,呃,所以这里边我们可以看到这个当前没有语法错误的话,就实现它里边的这个process方法就可以了,对吧,这是里边核心的,就是每来一条数据,呃,不是每来一条数据啊,就是我们这里边本来应该是所有数据都收集齐了之后,触发窗口操作的时候,大家看这里边艾。
02:00
本对吧,所有数据跟我们之前那个window度function一样,它等到所有数据都是收集齐的时候,触发窗口计算的时候才会掉一下,这里的这个process跟我们之前那个process element不一样啊,那个是key的process function,那个是每来一条数据都会调的,这里是window,这个process window function它是要收集其数据才调用,但现在不一样了,现在我们知道我是每来一条数据就直接这里边trigger就触发一次这个当前这个函数窗口函数的调用了,那就相当于我这里边是不是又是每来一条数据就调用一次啊,啊,只不过这里边elements只有一个数而已,对吧?啊,所以这里边大家看到啊,我这里定义一下,本来是呃,收集其所有数据。呃,这个窗口触发计算的时候才会调用,现在那就是每来一条数据都调用一次process方法啊,那这里边我们要做这个计算的时候,那这个数据,这个其实是非常明显的,从element里边就可以把当前的这个数据拿出来,但是我们现在重点是要定义那个位图啊,对吧,我要连接red,把那个位图要定义好,哎,所以在外层我还得先做一些基本的定义,比方说我先得把那个跟red的连接先定义好,对吧?呃,这里边我们定义呃,Red red连接啊,以及这个不容过滤器。
03:44
过滤器对吧?啊,然后这里边lazy,我们先定义一个这个jes连接,本来我们知道常见的手段是什么呢?在那个open生命周期里边创建连接,对吧?我们这里面偷懒一下,用这个lazy去定义,这也是完全可以的,我们引入这个je,它的这个客户端里边,当然只要传local host,我们的这个host name啊,还有端口号默认6379传进来,然后另外呢,还要创建一个当前的部门过滤器,我们定义一个m filter,你有个呃,我们当前这个就叫做M,对吧,里边要传一个当前的大小,那我们这个大小传多少呢?我们不是说诶保存这个要要处理这个上亿个uz ID的驱虫的话,我们最后想着给一个60几十兆差不多了,对吧,给个64兆,那64兆对应在我们这里边,应该给一个多大的数呢?
04:41
因为我们这里边的这个size其实应该是什么呀,大家看这里边应该是V的个数对不对?哎,我们这里边就是你你对应的这个cap,我们指指定的是位的个数,比方说我这里边二的四次就是一共有啊16个位对吧,然后对应的你你这里边得到的这个结果不就是到底是第几位吗?对吧,我做一个这个在它范围内做一个映射就完了,所以这里面我们这个cap其实是位的个数,那这里位的个数64兆。
05:12
这么大的这个空间到底有多少位,到底有多少位呢?拆开嘛,对吧,64大家知道二的六次方对吧?然后一兆二的20次方对吧,二的十次方是1K嘛,一兆是1K乘1K,那就是二的20次方,那一个BAT那是二的三次方对吧?所以乘起来二的29次方啊,所以这里边我们传一个二的29次方,这大家注意一下啊,呃,位的个数这里边就是二的,呃,我们用这个写啊,二的六次方,这是六,64对吧,然后乘以二的20次方,这是一兆,我把这我把这个括号写出来啊,这是十,然后面这个是一的。
06:12
这是就是一个字节里边八个八个位对吧,8BIT,哎,所以最终我们为的个数就是这么多,64兆对吧?64兆对吧,是这样的一个过程,那这里面我怎么传这个二的,呃,二的29次方呢?当然你可以用这种次方的这种形式写,对吧?还有另外一种方式,我直接用位移的方式,位运算的方式,我直接一左位移29位,大家想一想。哎,就是我这里边本身定义了一个一对吧,哎,就是000100000001,大家想象一下啊,定义了这么一个数,如果要是左移一位的话,是不是就是二的一次方,左移两位是不是就是后面两个零二的二次方左移29位是不是二的29次方啊,所以我们最终就是这么多个位对吧?啊啊,那如果你要是详细去看这么多个位的话,二的29次方啊,大家知道对应的这个二的十次方应该就是1024,就是1K嘛,对吧?所以这接近于二的三十三十次方就是10G了啊啊呃,就是就就就是一个G了啊,所以这相当于是半个半个G对吧,半个G一个G是10亿嘛,所以这大概是5亿,所以我们大概是用5亿个位保存了,呃,就是1亿个UCID的一个驱重的结果,对吧,所以这个比例还是可以接受的啊。
07:45
好啊,那那这里边我们定义好了这个啊,部门过滤器的大小,然后接下来大家可能会发现你创建是创建好了,但是我们那个re里边根本就没有对应的那个那个内存空间的一个声明啊,你没有定义那那里的K啊,诶不用着急,因为re release里边的K是不是随时定义随时可以用,对吧?而且里边red里边做位操作是非常非常简单的,你直接在后面去做扩展是完全可以的,只要你不超出我们定义的这个,就是相当于定义的这个范围,定义的这个off off off set对吧,我们的那个偏移量你不要超出这个范围,那它就是控制在64兆以内的啊,所以这里边其实是利用了这个red里边的这种k value存储的一个特点啊,未运算比较比较容易,而且容易扩展,所以我们前面连声明都没声明没有在里面专门定义对吧?没有定义死直接操作就完了,好,那接下来后面我们就先定义一下,首先先定义red。
08:45
This中存储,呃,这个位图对吧?Bit map的K的那个键到底是什么?哎,那这里边我定一下啊,呃,就是store的bit map key对吧?把这个定义出来哎,那我这里边用哪个来作为当前的这个存储位图的key呢?我们发现每个窗口不是都要统计一次吗?做一次uva的这个驱重都要有一个位图吗?那我就以这个window end就完了嘛,这不是现成的key吗?哎,所以这里边我们直接从哪里取呢?大家看到现在没有window了,对吧?不像window function式有window,但是我们发现有一个上下文,它里边有window,哎,那这个就比较简单了,我直接拿到这个window再去get and不就完了吗?啊那另外注意我们存储的时候一般用的是一个string类型对吧?我做一个to string转换得到这个string类型的一个window and啊,把它当成key去做存储,然后接下来,因为我们中间不保存任何的状态,对吧?哎,所以这里边呢,我们还得保存一个什么,我还得保。
09:45
当前那个窗口里边UV的那个统计值,因为这就相当于我可以增量聚合嘛,来一个就统计一下当前这个UV到底是多少,要不然的话,我最后是不是还得去再去遍利一下当前那个那个bit map它里边有多少个一对吧,那个就很麻烦了,我中间保存了状态的话就省事了嘛,所以另外将当前窗口的呃,UV count值作为状态,哎,但是我们不保存在当前内存空间对吧?因为window的状态已经全部被清空了,你这里边保存的状态也会被清空对吧?那个破已调用的时候状态也会被清空,所以这里边我们就作为状态保存保RA,哎,那这个怎么去保存呢?我们用一个用一个叫做叫做UV count的这个哈希表来保存。
10:45
哎,为什么是哈希表呢?因为里边还得是k value,对,为什么你你得保存那个我们最后的结果是每一个窗口都得对应的,对应着有一个UV count值啊,对吧?所以里边我们保存的结果还是什么呢?就是一张叫UV count的表,里边存在的结果是每一个窗口我还用这个window and,对吧,然后对应着一个count值,这样的一个k value,对,保存到这张表里边,这是我们的那个状态,中间来一个数据,就不停的聚合这个状态,最终所有数据处理完成之后,得到结果应该跟我们之前这张表里面的结果啊,应该跟我之前去除的结果是一样的,对吧?啊,这是我们的一个基本想法啊,那这里边我定义一个当前的这个UV map对吧,就他的这个表名我们叫,呃,就叫做这个UV count啊,然后接下来诶,我我要去先取一下,当前这个数据来了之后我不知道。
11:45
当前窗口里边已经有没有这个count值了,对吧?我先取一下当前状态是什么,就类似于我们之前做那个状态操作一样,只不过现在这个状态不在我们flink内部里面去管理了,放在release里啊,对吧?啊,这里边我们就是定义把这个count值初始值先给给一个这个零,然后哎,为什么直接给一个初始值零呢?因为我现在要连接red去取这里边的那个状态值,不一定有没有值对吧?哎,没有值的话,我这里边就直接用用这个零就完事了,所以这里边呃,接下来我就是取这个值啊,Get,大家看啊,从red中取出当前窗口的UV count值,所以我这里边调用de连接,哎,什么呢?H get对吧?因为是一张哈希表嘛,我直接h get,那当前的K是什么呢?哎,当然就是UV count map对吧?呃,当前这张这张表UV。
12:45
Count里边,然后呢,诶对应的那个K呢,我就也用当前的这个start bit map k对吧?啊,就是我当前那个位图的那个K,也是当前存储的这个window and嘛,啊对吧?啊,那那这里边我可以单独再定义一下啊,尽管跟那个都一样,所以我可以在这里边单独定义一下啊,就是当前的这个current k,等于还是用这个对吧,防止大家搞混啊。
13:13
还是get and之后再to string保存成string啊,那后面我们用这个current k传进去,获取当前它的那个值,Count值到底是多少,如果说它大家想到如果要等于none的话,那就算了,对吧?哎,那如果不等于none的话,等于none的话就用零值就完了,如果不等于none的话,我再把它做一个负值,好,那这里边就相当于还是你得用这个东西啊。啊,当然我可以在外面只调用一次对吧?我这里面相当于调用了两次啊,就是把这个数拿出来之后,它本身是一个长整形,我还要再土浪把它拿到啊,这就是相当于我们处理那个状态啊,先拿到当前那个count值的一个过程,然后接下来哎,那就是要判断真正的驱虫过程了啊,去重判断当前啊,User ID的哈希值对应的位图位置对吧?是否为,是否为一,如果为一的话,就说明有没有的话,呃,如果为零的话,才是我们现在要叠加C要加一对吧,如果为一的话,那就相当于不做操作了啊,所以我们判断是否为零,这是做的这个去重操作,那首先我们先先得把这个当前user ID拿到对吧,当前的user ID那是element里。
14:40
大家还记得吧,我们的那个每来一条数据,它都在当前的这个elements里边了,当前这个elementsable其实只有一个数,直接拿出来就完事啊,这个我可以去拿它的head,当然也可以拿它的这个last,对吧?啊,其实都一样啊,拿哪个都行,为什么我这里边拿last呢?因为大家想到如果我这里边呃,大家每来一个数据,我都去去做一个这个调用,其实好像有点呃,性能上好像有点浪费是吧?因为你后边要对那个red做操作嘛,所以一个可行的优化方法是什么呢?诶,我可以隔几条数据才去触发一次操作,那这样的话,诶,这这个elements里面就有多条数据了,对吧?呃,我这里边取这个最后一个也是可以的啊,然后我要它的ID的话,那其实这里边的这个,呃,正常来讲啊,当前这里边的这个ID都一样,我们这里边用这个下划线二就是包装好的这个ID嘛,然后把它做一个to string,因为本来ID都是长整形。
15:40
我最终要的都是字符串对吧,因为哈希做计算的时候,输入参数类型也是字符串嘛,接下来是计算啊,接下来当然就是计算那个哈希值对吧,哈希值就是就对应着位图中的偏移量,所以我直接把这个定义成叫做offset。
16:06
怎么做这个处理呢?布隆过滤器里边的哈希函数对吧?直接把这个UID传进去,另外还需要有一个随机数种子,这个无所谓啊,我给一个随便给一个61对吧?只要是类型随便给一个就可以了,当然这就相当于你可以调参对吧?就相于我们这这个哈希函数,通过这个调参你可以避免一些哈希碰撞的情况啊,然后后边,那接下来就是既然已经拿到这个offet了嘛,那就是啊,用red的位操作命令取啊bit map中对应位的值做这个操作对吧?哎,所以我要判断一下,那得到的肯定不是零就是一了,那所以它其实就是一个布尔类型,返回就是一个布尔类型,我可以把它叫做是否存在对吧?1EXIST如果是零的话,不存在就是false,如果是一的话,处就是存在对吧?啊,那所以这里面我直接调用。
17:07
Al里边有一个命令叫做大家看到叫做get bit未操作命令,然后呢,里边我要传当前的这一个这个K对吧?你要操作哪一个,我们re里面都是k value这个存储嘛,你到底要去对于哪一个K去取它的里边的某一位,对吧?那当前哪个K呢?我们定义的当前的这个K对吧?Start bit bit map k取这个啊,然后接下来取哪一位呢?Offset对吧?哎,大家可能说诶你之前都没有去在里边创建这个东西,那取offset是什么呢?没值的话,你直直接定义这个K,其实大家知道re里边你直接定义这个K也是可以的,对吧,里边就都是零了,所有的值都是零,都是空的,然后你直接拿到,当然就是零了,然后你如果要是给里边给一位给了一的话,那其实我们当前就相当于这个bit map里面就有值了,对吧,而且我们会看到res里边控制它的时候,当前的这个bit map大小。
18:07
它就是动态增长的,你如果后边没值,它就相当于没那么大,只要你后边对应的那个位置,大家可能知道啊,就是一开始我这个bit map里边都是零对吧?啊,那其实对于我red而言,我管理这个K,它里边其实就维值,我就根本就没有给它真正的分配空间,只有在什么程度,什么时候呢,比方说我对应的这个大家看从零开始啊,零一第一位给了一个值,好,那我给它分配一个四节空间对吧,它现在这个叫01000000,那假如说我给他后边的这个这个某一位又给来了,来了一个一,哎,那可能这里边我再去给他分配下一个字节的值对吧?呃,就是我再给它分配对应的这个存储空间,所以我们这个位图可以相,可以认为相当于是一个动态扩展的状状态,你假如说没用到这个位的话,我这里边就不需要去分配啊,这就是red做这个位图操作非常舒服的一点啊,它的这个内存还很节省啊,那但是我们能相。
19:07
性它最多不会超过六十六十四兆,为什么有这个自信呢?因为当前的offet是有这个限制的,对吧?有这个限制的不会超过我们定义的二的29次方减一的这个范围啊,那所以它最大也就是64兆嘛,啊所以定义这个一一个呃11EXIST啊是否存在,那然后我们就判断了,如果说啊,如果说存在的话,是不是就什么都不用做对吧?如果不存在的话,我们才要做做事情,如果不存在,那么呃,位图对应位置至一对吧,表示它已经存在了制一,并且将countt值就是我们的那个状态countt值加一对吧,这就是我们要做的操作,哎,那这个怎么样制一呢?都是red的操作啊,Je直接前面不是有get beat吗?有get当然就有set set beat。
20:07
那同样这里边还是start bit map key,然后同样对应的offset后面还得有一个值,到底是给true还是false,对吧?大家看给一个布尔类型的值啊,Bulling对吧?啊,那这里边给一个true,这就表示写一,然后另外呢,哎,状态要做一个更新对吧?Count值加1jeice.h set对吧?这是我们那张表里边状态更新,我们当前那个表本身的那个名叫做UV count map,然后里边我们当前的那个K呢,当前对应的那个K叫做current k对吧?Current k啊,然后对应的位置呢,Count,当前的count不是已经拿出来了吗?前面我们不是已经已经取过这个count值吗?对吧?这里边count加一,然后注意本身传的是一个string对吧?所以to string在做一个转换,这就是我们最后要做的这些操作,最后我们所有的这个数据都已经写到这张,呃,就是所有的这个就是位图都在这个瑞里位图里边,那所有的这个结果状态呢,都在最后我们这张表里边,对吧?把它当做状态来用就可以了啊,那我们现在来做完所有的操作了啊,来做一个测试吧,运行一下啊,首先我们起一下这个red red server先起起来啊,然后我先起一下这个客户端看一眼啊,Red client啊,先看一眼当前这里边有东西没有空的,好,接下来我们就直接看啊,空的情况下,我们直接运行一下。
21:43
当前这个代码,看一看最终的结果是什么样的。好,现在这个代码已经运行起来,哎,但是这个可能写入会比较慢,因为我们每条数据都要连接red做操作,那我们这里边先看一下吧,中间应该有这个一些结果了,对吧?诶大家看这里面已经有一些东西了,那这里边我们看到的这个以一个window and,呃,就作为key的这个东西,这到底是什么呢?这就是我们说的当前的那个bit map那个位图,对吧?啊,当然这里边我最好就不要get这个值了啊,大家知道如果这个值里边已经有很大的那个偏移量的话,它可是64兆啊,我直接把这个64兆读出来,在这里要做一个显示,呃,也也是有点困难的啊,啊大家知道我这里边主要是为了1亿个数据做呃,做驱重而设置的这样的一个位图,那其实我现在数据量根本没那么大,我总共才48万嘛,那其实这里边的应该大部分值应该都是零,对不对啊,这个其实大家一想就能想到啊,大家可以感兴趣,可以下去之后看一看里边的值,好现在我们这里边已经运行完了,我们就看一下当前UV。
22:48
Count里边值到底是什么对吧,我们拿一下这个啊,H get all,因为它是个当前是一个哈奇map嘛,诶大家看这就是最终我们得到的这个结果,呃,一共十个窗口对吧?啊,20个数啊,十个窗口,然后大家看到每一个窗口的这个数值啊,大概两万多三万多对吧,最大三4746,最小的这个啊,最后一个窗口这个因为数少,只有13个,跟我们之前在内存里边用set做驱虫的结果是不是完全一样啊,这就是用这个,呃,布隆过滤器啊,而且我们把它保存在了red里边,用到了red里边的一些未操作做的一个最终的一个实现。
我来说两句