00:00
来最后一步,那既然已经有了布隆过滤器了,那我们是不是就是实现这个自定义的处理函数,把用布隆过滤器来做这样的一个操作了啊,那接下来我们就在这里边做一个实现,实现自定义的处理函数public static class啊,上面这个函数名称有点长,我们直接copy过来,U we can'result with bloom filter,然后接下来,哎,我们想要去实现的这个,诶大家想想一下,我们当前想要这个实现这个接口是个什么呢?应该是那个process window function对吧?哎,这里边大家注意,既然是process function,那是不是它其实也是那个负函数对吧?那应该是要extend。另外大家还要注意一个是当前是不是也是做了window or的呀?那window or之后调process的话,这传的还是那个process window function吗?不是了,对它这个稍微做一点改变,是不是叫process or window function啊,跟前面那个类似是吧?啊,同样后边这个,呃,是不是也是少一个,少一个K的那个类型就可以了,对吧?大家看到这个TR的当然就是输入输出嘛,In out,少了那个K的类型,然后加一个W。
01:22
就可以了,所以接下来我们就在这里边做一个实现。Extend process or window function里边的类型,当前的输入,那那当然还是这个user behavior,我们定义好的这个po类当前的输出,那是我们想要的page count,对吧,输入输出。然后当前的window的类型,那当然是window time window把这个定义好之后,上边不再报错,这个类型就完全匹配了,这就没问题了啊啊,那接下来我们来实现一下里边的这个具体的过程啊,那这里面首先大家会想到我要跟那边去做连接,那是不是应该在。
02:10
Open生命周期里面去做一个连接啊啊对吧,在在里边去创建这个才是最合适的啊啊所以呢,我在外边先把它做一个生明,然后在open open生命周期里边去做一个实现,做一个连接,所以我在外边先定义这个je连接,对吧?啊那另外还有就是我把布隆过滤器也直接定义在外边布隆过滤器对吧,也是放在那个open生命周期,我再把它拗出来就可以了啊那这里边我直接,呃,诶大家想我这里边要跟那个red去做连接的话,是不是得有red的连接工具啊,诶在之前我们那个给大家讲的是那个弗link卡夫卡,呃呃,弗link跟red的那个本身的连接器,那现在我们要用的其实是一个。是不是就是一个gene客户端啊,只要有客户端我们就可以发请求,那接下来我们需要引入相关依赖啊,大家看直接引入这个客户端啊,Je里当前只在。
03:13
我们的当前的这个,呃。模块里边生效,所以说我直接把这个放在子模块里面po文件就可以了。现在如果引入之后,接下来就可以去把这个gene引入进来了,对吧?啊,首先首先我定义一个je啊。然后接下来,呃,还可以定义一个布隆过滤器my bloom filter,对吧?啊,我直接把它叫做my bloom filter,好,直接先放在这儿,接下来那就是open生命周期。里边,哎,我现在要用的就是je,等于get runtime contacts,然后这里边。呃,不不是不是那个运行上下文这个我就直接连接就完事了,对吧,又不是创建状态啊,直接去new一个je是不是搞定啊,里边传local host,大家对应的主机名和端口号6379,然后下边这个my bloom filter,那就是直接去你有一个my bloom filter,哎,注意这里边还得给那个指定我们当前的那个大小呢,给多大呢?现在我要对1亿个位去重,我给多大?
04:31
哎,那家想,那就是之前我们说的那个1亿个位去做驱虫的话,最后算下来大概可能给个扩展之后,可能给个几十兆或或者100兆,对吧?呃,我们就给个几十兆吧,呃,那大家想就是比方说啊,呃,我我就选一个给一个这个大概64兆对吧?啊或或或者说这个几十兆,因为64兆大家想大概是一个二的整整整次幂嘛,我们是不是要选一个二的整次幂传进来啊,哎,大家想一下64兆的这个大小大概是二写成二的整次幂,应该怎么表达呢?
05:08
大家想那是不是就要表达成二的多少次方个位啊,对吧?哎,那大家想这个64兆个位啊,这个几十兆啊,64兆个位。这个64兆拆开的话,64是二的。二的六次方对不对,然后这个,呃,当当前我们这里边的这个这个兆的话,拆开就是这个拆开就是多少二的。20次方对吧?哎,这个其实没拆完,我们当当时说的这个是大B对吧,BAT啊64兆BAT那接下来是不是还得乘以二的三次方啊,哎,所以大家想我现在所有的这个位的个数是不是就是这么多啊,总共是多少?二的。20这是加起来对不对?呃,这这个幂次肯定就是加起来的嘛,二的29次对吧?啊,所以当前我们是二的29次方,那这里边当然大家可以用那个数学计算去写那个二的29次方对吧?这里边还是我们做一个简单计算,因为之前我们不是说了二的多少次方就是一后边多少个零嘛。
06:20
那我二的29次方是不是一后面29个零,那可以直接是不是直接写一个位位移操作啊,是不是用一个位的左移操作一直接,哎,就是相当于直接左移29位,后面跟29个零就可以了,哎,所以这个其实整体来讲就有很多这个小细节的这种优化的做法啊,直接这么写对吧?所以大家注意这是当前是啊呃,就是。要处理1亿。个数据我们是用大概,呃,就是这里边大家知道这大概是扩充了几倍之后啊,可能是几亿个位,然后最后保存起来大概就是64兆B大小,对吧的位图。
07:15
这就是网站基本的这样的一个定义啊,然后接下来呢,就是需要去实现最关键的一个process方法了,对吧?这里面可能有些同学涉及到我这里边是不是得定义那个状态呢?因为当前我们得有那个count值,对吧,这中间是不是应该得有一个count呀。就是假如说我这里边我去连接外部的那个,呃,Re,里面存着的那个位图,呃,如果说里边不存在当前的那个数据的话,我当前是不是应该count加一,然后最后等到我当前的这个数据都都已经没问题,都已经到齐了之后,窗口到关呃结束时间的时候,是不是我把那个count值直接输出到那个配置view count里面就可以了,诶所以这是一个基本的思路,但是这里边有一个问题,大家如果在外面这去定义状态的话,后面会发现他搞不定为什么呢?就是我们前面这里啊做了一个fire and p。
08:13
它的这个原则是不是要直接把窗口里边的所有数据和状态全干掉啊,所以那我们这里如果定义一个count作为状态的话,后面你会发现它的效果就是什么呢?就永远都是0101对吧,然后就是来一个数变成一,然后就清空了,来一个数又变成又又清空了,对吧?那所以最后是不是又叠加不起来啊,那呃,这个为了避免这种情况,那干脆我这样吧。我是不是直接也可以把当前count的状态也存到red里面去啊对,所以接下来我们其实就是我们将将位图和呃,Count窗口的窗口的count值全部存入RA,好,那接下来就就有这个问题了,那我们当前存在里边的话,它是key value这样的结构,对吧?那你到底当前的这个怎么存呢?对吧?位图的话怎么存呢?
09:14
那家想我们现在位图是不是一个窗口一个位图啊,所以这个非常好,非常简单,是不是我们直接就用,是不是用window and作为这个key就可以了,对吧?直接用window and作为key,所以接下来我定义,首先我拿到当前的window and,这个大家都知道,现在是这个全中函数process方式,它有没有那个window,但是有上下文对吧,我直接拿window是不是就可以了,然后get and拿到,对吧?然后接下来我定义一个当前的bit map key,位图的key就用window and toth,呃,这里边我直接to string也是可以的,对吧?或者我直接用string,呃,这个value of也可以,对吧,我直接把它转成一个string就够够了。
10:05
那另外呢,大家想到那个count值又怎么存呢?Count值是不是也是。一个window一个count值啊,那在想最后我是不是相当于存起来应该是个表啊。对吧,类似于一张哈希表对吧?哎,所以我们最后是要希望啊,把count值存成一张哈希表,哎,那所以这里边我可以定义一个,就是当前定义一个我们当前这张哈希表的名称,对吧?比方说count哈希name,比方说我这个就叫做UV count。这个没问题吧,对吧,我直接叫做它,然后接下来这张表里边存的是不是就是一个K一个value,一个K一个value啊,那就是每一个window是不是一个count,每个window一个count,那它的这个key是不是还是当前的window and呀,对吧,我当前统计的时候还是它啊,所以再定义一下当前是这个count k等于啊,这个我就直接。
11:12
还是to string对吧,或者直接你拿前面那个bit map key其实是一样的啊,这就是基本的一些定义,我们要跟做连接做操作,那我先把这个先定义好,然后接下来呢,就是接下来该怎么做了,这就是关键操操作了啊,是不是要取当前的user ID啊,要去重了嘛啊,所以我定义当前的user ID,我要求的是一个,大家看我要求的是一个string类型,为什么?因为后边是不是要针对他去取那个哈希值啊,哈希值我们给的这个value是个string对吧?所以我这儿直接就把它定义成string就完了啊,或者大家如果觉得这个有点奇怪的话,我还是给一个这个长整型啊。长整形的话,那我们取的时候当然是elements,这是全量数据对吧?但大家知道我是来一个数据就清空一次,是不是里面只有一个数啊,啊又来了对吧?所以我直接拿它的这个next取出来就完事了,然后get user ID对吧?直接做这样一个操作啊,然后接下来是不是要计算计算位图中的。
12:24
偏移量就是我们的那个offset对不对啊,那所以接下来我们还要定义一个这个当前的offset,我们需要调用布隆my bloom filter,布隆过滤器里边的。诶,这个是可以直接调用对吧,直接取这个哈,Code是不是就可以算出来啊,那我们当前要的是user ID to string做一个转换,后边给随便给一个随机数种子,比方说给个61对吧,给一个这个质数啊,给一个呃,不轻易能被整除的这样的一个数,不能被拆分质行数的这样数啊啊,那然后接下来是不是就继续判断它是否存在啊对吧?哎,我们用呃,Red red的它有一个命令叫做获取某个key里边的某一位,就是未操作的命令,叫做get bit用get beat命令。
13:24
那我们要去判断对应位置的值,所以接下来其实就是得到的肯定就是一个布尔类型了,对吧,是否存在嘛,Is exist,接下来是je调用它的方法,大家看是不是直接get bit呀,Get bit里边要传的东西,那是不是就是首先你得告诉我当前,哎,就是到底你要get的是哪个K对吧,到底是你要找哪个值,然后后面是不是得给我一个oppositeet,得给我一个对应位置啊啊,所以接下来我其实要传的就是bit map k以及当前的oppositeet。
14:06
大家看这是不是就是我们想要的那个流程。按照哈西code算出当前的偏移量,然后到当前的位图里边看它到底存在不存在,这个如果直接get bit是零的话,它是不是相当于是false啊,是不是就不存在?如果是一的话,是不是就是存在啊,啊,这其实非常简单啊,呃,那后边啊,这个我们就直接去做判断就好了,If if,当前如果不存在,大家想如果存在的话,是不是啥都不用干啊,对吧,存在就没什么意义了啊,不存在的话现在才有意义,所以接下来是如果不存在。是不是对应位图位置是不是要是不是要制成一啊对吧,制成一。
15:00
直接写这个至一对吧,然后另外我们可以更新当前的那个count值了,对吧?啊,所以接下来我们首先先制成一,那是还是未操作,大家看有get就有set嘛,Set bit,然后对应的那个还是bit mapb k里边给一个offset,大家看它是不是要三个参数了,对吧?后面再给一个参数,那是不是就是出敖boss啊,我们当前要的是一个处对吧,直接至一就完事了。好,所以这就是我们基本的一个操作,当然了还得把那个count值更新一下,对吧。更新呃,Red中保存的count值。那大家想一下,这个就分两种情况,一种情况是之前那个count值就没有对吧,那假如说没有的话,这个是不是我们直接就是一就完事了呀,那如果有的话,那是不是就是先取出之前的count值,然后加一啊,再保存进去对吧?有点像我们那个读状态,然后加加一个数,然后再保存进去那个过程,所以这里面我可以直接初始定义一个当前的UV count,我把它叫做0L。
16:12
就是初始count值,哎,那这是什么意思呢?就是我直接去做一个判断啊,我直接取一下当前这个Je.h get大家知道是在一张哈希表里吧,所以是h get对吧,H get当前的,呃,我们定义的那个叫做。叫做count哈希count哈希内对吧,取这张表,然后现在接下来是不是我要用当前的那个就是count k啊。Count k对应我要看它的那个count值到底是多少啊,那那现在这个值的话,这就这就没准了,对吧,所以这里边我直接把它先定义出来,比方说这得到是一个string,所以说我先把它定义成呃,比比方说啊,我把它定义成UV count string啊把这个定义出来。
17:10
然后接下来那就得判断了啊,如果当前的这个UV count string,如果不等于,那就是它不为空对吧,就是里面有有值的话,并且就是不是这个空的对吧?点equals当前的这个UV count的话,不为空有值的话,那是不是直接我把这个UV count赋值过来就可以了,对吧,直接用这个UV count string。啊,当然这个最好是大家用那个long.value of对吧,long.value of,然后把当前的这个UV count string传进来,把它转成一个长整性值获取到,那如果要是它是那的话,那是不是就还是保持原来的零啊,对吧,这个就没问题了啊,那接下来是不是我们当前的值就应该是加一啊,所以这个我也不用复杂再写了,直接je是不是?呃,可以去set了对吧?呃,不是set是h set对吧哈,西表嘛,还是抗他西name,然后对应的。
18:18
呃,Count t接下来是不是应该是当前的UV count要加一啊啊,那当然这里边我们要的是一个。一个对吧。哦,这里面直接去去做的话,这个还做不了,那我们直接还是拿那个string.value of是不是就可以了,对吧,直接直接把对应的这个值传进来,这样的话就可以去做一个计算了,诶我们看一下这个啊。H set对吧,我们这里面应该是set这个值啊,这样就没问题了。这就是完整的一个处理流程,好啊,那当然了,就是如果大家把这个已经处理完了之后,你想在控制台也做一个打印输出的话,这就相当于我们所有状态都在re里面做了,是吧?如果我们还想打印输出的话,是不是你可以直接out.collect一下呀,对吧?你有一个page view count,当前这里边就是UV,然后后边是window and,另外还有就是当前的这个UV count加一对不对啊,就是当前的这个值啊。
19:27
最后啊,既然我们有open,那是不是对应的还应该有close啊,对吧?最后这个close生命周期里边再做一个genes.close关闭操作就完事了。这就是完整的流程。接下来我们运行这个代码做一下测试啊,这个我们还是先去启启动一个red red server啊,先起起来,然后接下来我们先看一下当前的这个K有没有没东西,那我们直接在这儿运行代码,看看能不能写进去啊,看看这效果怎么样。
20:10
好,执行起来我们看一下。等一下这边如果要是连续控制台有输出的话,其实我们就应该能够到那个red里边找到结果,对吧?哎,大家看到这里边都有这个结果对吧?啊,大家看到现在我们其实这个输出的结果就是有问题的,对吧,就这里边这个count值明明是同一个窗口,它的count值不增加,这明显就是我们代码里边出现问题了是吧,你看这个count它它永远都是一嘛,呃,这明显就就相当于是我们这里边这个加一没加进去,然后之后又清空了,又是又是一那样的一个状态,这其实主要的问题在于。大家看一下这里边这个逻辑,其实我们写错了啊,就是if这里边当前的这个count不为空,我们确实是让它不为空的话,要要等于这个对吧,而后面呢,我们又并且是要让它等于空,对不对,那当然是不是相当于这里面永远都不执行啊,那是不是永远都是零啊,然后零加一呢,永远都是一对吧?啊,所以主要问题出在这儿啊,那怎么改呢?对,非常简单,是不是加一个感叹号不为空啊,我们要的其实就是不为空啊对,我们还是把它停掉之后重新运行一下。
21:19
看一下现在的效果怎么样。哦,这个这个我们先把这个先清掉一下啊,先把它清掉对吧,现在我们看到这里边没了啊,现在重新来运行一下。好,大家看现在这个数值就增长上来了,对吧?这个就是一个正常的状态了啊,所以大家看不停的在增长,当前这个661600啊这个呃,这个窗口大家看一下大概它能涨到什么程度,2万多了已经,诶大概就是两万多对不对,之前我们好像印象当中去重之后,每个窗口就从那个四五万就变成了两三万对吧?呃,刚才大概到27000多就没了,差不多啊,你看现在就是两万多,3万哦,大概三万多对吧?啊,如果大家要是不想光看这个变化看不清楚的话,那我们还是来看一下里边到底是什么样,大家看现在已经有三个窗口的数据了,对吧,三个窗口的位图都在里边了,所以接下来我们看这个,呃,大家知道这个每一个窗口里边,它都是就对应的这个窗口window and的这个K啊,对应都是一个64兆的位图来,所以这个其实最好不要看它里边的东西都是大部分都是零吧,我们现在这个数据比较少啊,几万个数放在这个。
22:37
呃,几亿个位里边,那当然是很很小的啊,所以我们直接就看这个就好了,HK UV count对吧?啊,那大家知道这个第一个窗口。6616002816196,那这个跟我们之前好像得到那个结果应该是差不多的是吧?啊UV count,我们再看第二个窗口,6665200啊。
23:04
大家看这个32160对吧,32000多,那确实跟之前得到那个结果是差不多的,所以最后如果大家想要看到最终结果的话,我们这里面直接h get2看一下啊UV count大家看现在已经有大概有。九个窗口了,已经都统计出来了,我们看到最大是不是现在就到这个34700多,34734746,对吧,跟之前那个数是一样的啊啊,那现在已经都跑完了,我们再再最终看一眼UV count,这是不是最后一个是13,其他都是两万多三万多啊,最大三4746,哎跟之前我们直接在内存里边用一个set做去重,其实效果是完全一样的啊,这就是我们用布隆过滤器去做驱虫这样的一个效果。
我来说两句