00:00
在我们整个。处理流程都已经清晰了,那最后就是要实现一个自定义的process,就是一个window process function对吧?呃,Process window function啊,呃,那那么这里边最关键的我们是不是得有一个布隆过滤器这样的一个数据结构啊,啊,那当然实际生产项目当中,可能我们是直接引入这个第三方给我们构建的那些组件,对吧?比方说你引入这个谷歌的那个瓜A里边提供的那个bloom filter那样一个数据类型,其实在现在这个,呃,就当当前这个。Flink里边本身也是有对应的这个布隆过滤器的引用的,诶大家看这里边是不是在这个,呃呃,瓜瓦18这个谷歌给我们提供的这个包里边,本身就有这样一个布隆过滤器,这样一个实现啊,就甚至连这个flink本身底层的这个运行时算子里边也有自定义的一个布隆过滤器,对吧?啊,所以大家如果想要想要用这个东西的话,肯定就是直接用这个第三方的这个工具就可以了,好,那我们这里边的话,呃,主要是要给大家把这个原理讲解的更清晰一点,所以这里呢,我们不要用现成的,我们来自定义一个,自定义一个布隆过滤器,其实这个原理非常简单啊。
01:18
啊,那这里面我public static class,我定义一个my bloom filter。呃,那大家想到这里边自己自定义的这个布隆过滤器,最核心的东西应该是什么呢?我们前面说过,布隆过滤器里边最重要的其实就是一个位图,然后还有一个哈希函数,对吧,主要就是这俩,那这里边位图,我是在这里边直接就要创建出这对应的一个位图吗?那就不对了,对吧,我在代码里边直接创建的话,这是不是又变成了一个内存里面的数据结构啊,诶所以大家知道,呃,就是我这里边不直接利用这个,像这个,呃,谷歌啊,其他的这些给我们提供的这个数据结构,也主要就就是因为我们当前是想想把它丢到red对吧?呃,就是我当前的这个位图,当然大家可以直接觉得那个如果不大的话,你放在当前的这个内存里边也是可以的,但是我们想到前面我们那个做扩展,如果说它出现这个哈希碰撞的话,我们尽量要给它扩的大一点,对不对,当前一个窗口按照那个扩大之后,至少也要几十兆或者上百兆,那你说这个放在内存里边好像也还是有点占地方,对吧?哎,所以我们自然想到也把它直接丢到这个里面去,所以。
02:33
所以接下来呢,我们其实是把它要放到那个red里边去做一个存储,那这里边我自己定义的这个布隆过滤器,我应该存什么呢?哎,那其实不用存什么,我这里面是不是直接只要知道一个大小就可以了,对吧,我这里边定义。位图的大小啊,所以这里边我直接private啊,定义一个,直接就用一个integer定义这个大小吧,直接用一个cap对吧,当前的这个capacity啊,当前的容量哎,那这里面大家要注意,就是我一般这个大小要定义成一般需要定定义为。
03:16
二的整次幂,这个大家能理解吧?啊对吧,如果是二的整次幂的话,我们做计算或者说做这个内存分配的时候都更加的简单一些对吧?啊,就或者是最好是八的整次幂对吧?啊,就这样分配这个位的时候,按字节一个一个去分配啊,这个会更整齐一点啊,那所以一般这个数据是这样去用的啊,所以这里边我可以把当前的constructor做一个实现传参的时候,创建这个布隆过滤器的时候,直接把这个参数传进来,就相当于我就创建了这样一个大小的位图,只不过这个位图大家知道在release里对吧?哎,那正常来讲,我这儿是不是还应该连接red去创建这个位图啊。但大家想里边有必要我们单独去创建吗?
04:02
是不是你之前没有没有这个key的话,然后你直接用它就可以啊,对吧,直接set就可以直接set它的值,所以说这里边我们不要不需要单独去创建,这就没有没有任何的必要啊啊,所以这里边就省略了很多操作,那接下来是不是剩下一个就是我们必须要去实现一个哈希函数了,对吧。实现一个哈希函数,好,那所以这里边我们定义一个public,那这个哈希函数我直接返回一个长整型吧,返回一个long啊啊,然后这个我就叫做哈希code,诶大家会想到这里边传进来是不是应该传一个string类型的值啊,我把这个叫做value。然后另外一般我们在做计算的时候呢,还可以传一个参数,就是我把当前传一个随机数的种子,对吧?啊,就是相当于这个这个种子不一样的话,后边相当于我做计算的过程就不一样了,对吧,算出来结果就不一样了,也作为我们参数可调的一部分,这里边我给一个整数类型的随机数种子,这就是当前的这个哈奇函数。
05:11
那其实大家能想到我定一个长整型的这个result啊。初始值我给一个零,大家能想到就是我做哈希的时候主要是怎么做吗?最简单的方法是不是就是把这个字符串每一个字,每一个字符都拿出来,然后求它对应的那个阿斯玛值对吧,然后对应的到我们这个result里边去做一个叠加呀,那这个叠加的话,有同学可能想那就直接sum起来不就完了吗?但这样做不合适,对,大家想,如果你要是直接算起来的话,那是不是ABC和CBA是一样的呀,那就太容易碰撞了,对吧?啊,所以这里边我们最常见的一种做法其实是。对,做一个拼接,然后或者说我们这个对每一位,对每一个字符,不同位置的每一个字符去做一个做一个权重的调整,对吧?给它乘以一个,我们就乘当前的这个随机数种子做一个权重调整就可以了,所以具体来做的话,那就是一个for循环啊,我这里边int因为有对应的位置啊,Int I等于零,I小于value的length,对吧,然后I加加。
06:28
那在里边的话,我其实就是当前的result,就等于什么呢?之前的result我可以直接让它先乘以一个CD,然后再加上。Value。Char at。当前的这个I这个位置,这样是不是就可以了,我直接拿到当前这个位置对应的阿索玛值,然后呢,在之前那个result基础上去做,要要做叠加,但是叠加之前呢,先把这个result乘以一个sit对吧。就是这样的话,大家想到是不是我最初的那一位,是不是后边乘的C的就会多啊,每往后这个再再往后变历遍历一个字节,我这里边的这个数据就相当于再多成一个C对吧?啊,每往后就多成一个CD,所以这样的话,就相当于做这个调整的时候,它就可以得到更多的呃,更多不同的这个权重啊啊,那当然大家知道,就是最终我们做这个哈希计算,这是一个很简单的一个实现啊,最后return的话,那是。
07:32
有同学可能想,那我直接把这个result return不就完了吗?这里大家注意,我们最好还是要保证你前面不是有这个capacity吗?有范围吗?那假如说你这儿做的这个计算,你这有成有加,万一要超过那个上限怎么办呢?哎,对,那大家想到我是不是应该对那个容量做一个取域操作啊?哎,那这里边我可以做一个优化,就是大家知道取值操作还是比较就底层计算是比较复杂的啊,那怎么样操作更快呢?
08:03
我直接做一个with操作,大家看一下我的这个写法,直接去按谓语与什么呢?我定义的capacity减一。大家想一下这个是什么意思?这主要是利用了我们前面说的。当前的这个容量必须是什么呢?二的整次幂对不对,那如果要是二的整次幂的话,写成二进制,最最终底层底底层的这个零幺零幺二进制码的话,大家想它有个特点,就是二二的整次密码,大家回忆一下二的话。是幺零对不对,那如果四的话就是100。八的话就是1000对吧?哎,这个是非常简单的,所以二的整次幂是不是都是一个一后面跟好多个零啊,哎,所以是二的一次幂是不是后面就跟一个零,二的二次幂后面就跟两个零对吧?三次幂就跟三个零,所以它其实就是二的几次幂,就是一后面跟几个零,那大家想如果它再减一的话,那变成什么了?
09:12
是不是就是二的几次幂,结果减一之后,它就是几个一对不对,然后这几个一跟result去做一个按位取。按位与,那最后得到结果是不是就是只截取了最后的这几个几个位啊,对吧?比方说我现在二的N次幂啊,那我减了之后是不是就是N个一啊对吧,N个一最后如果跟它按位于,大家想前面补齐是不是全是零啊?所以你跟零做与当然就全是零了,那跟一语是不是本身自己是什么就是什么啊,就相当于我就直接截取了最后的N位数据对吧?啊,这就是我们当前这个布隆过滤器的一个设计啊啊那当然了,就是说在实际操作的时候,这个哈西扣的肯定不会这么简单,大家可能也知道,呃,在当前底层的话,其实一般比较常用的有一种哈希函数叫做家可能也听说过这个叫摸摸哈希对吧,大家看这个阿卡里边就用到了啊,就或者这个skyla里边哈哈什这个包里边也有这个算法叫mor哈希三啊,那尽管大家听着这个名字好像有点有点萌啊摸摸哈希其实他这里边。
10:26
这是表示它操作的,它其实就是U,然后U什么意思呢?MU表示一个操作,叫multipla乘法吗?R表示rotate,表示翻转旋转对吧?哎,所以它整体的做法就是你来了一个字符串,来了一个值,对吧,它就不停的乘,然后翻转乘翻转成翻转,这样来回做的话,最后得到那个碰撞,碰撞率就非常的低对吧?这个哈气函数就非常的好啊,所以我们这里边就不去专门去实现那么复杂的过程了,我们如果直接要用的话,你是不是直接调那个就完事了啊,这里我们只是把这个原理给大家讲一下就可以。
我来说两句