00:00
然后后边我们要做的就是实现自定义的process function了。Plus UV count with blue,它应该是一个,它是一个process function,这里边还是基于window去做的process,对吧?呃,前面是这个time window开的窗,所以这里边我们要去实现一个process window function这样一个接口,这里边它的类型应该是什么呢?还是输入输出,还有K,还有window类型对吧,所以它的输入是什么。大家还记得之前这个输入是什么吗?输入是哎,这样的一个二元组,一个string,一个law对吧,输出是不是我们想要的那个,呃,就是UV count那个,就是前面定义好的那个样例类对吧?所以这里输入写二元组string law。
01:06
输出是UV count,那P的话,P是不是就是string啊,我们定义了一个呃,雅雅的那个键值对不对,对吧,就是所有的都是一样的啊,所以是一个string,最后time window的类型,Time window把这个写完之后,上面没有毛病了,对吧,类型都是匹配上的。好,然后接下来在这个里边。是不是最主要的要实现的就是一个process方法啊,因为是process function吧,那这里边大家注意。我们是不是在里边有可能还要去创建这个到red的连接去跟red进行操作啊,那这里边肯定不可能,我们这个方法是什么呢?是最调用的时候要调到这儿来,那大家会想到我难道是说因为前面我垂格已经指定了,是来一个元素就触发一次就会掉到这儿来,那难道我是到这里边每来一个元素触发一次都重新创建一个连接吗?不应该对吧,整个这个je连接应该在外面定义好,所以定义red连接。
02:23
这可以用一个lazy对吧。好,我们从这个red的客户端里边引入。当然这里面参数要传local host是6379。除了red之外,我们是不是还得需要有一个布隆过滤器啊,这里边它本来是一个类,那我们是不是把得把它实例化啊,所以这里边同样我还是lazy定义一个。Blue布隆过滤器这个初始化的时候可以给一个啊,比方说大家觉得这个如果1亿多还不够给力的话,如果他有可能好几亿的话,那比方说我们这里边给一个一左一二十九位可以不可以?
03:18
一左一二十九位的话,表示的数是。再想想大概是多少?到。那二的九次方兆对吧,二的九次方那不就是512嘛,对吧?对,这就是,呃,本来是512位,512个K对不对?512位就是能过滤512个K,那512位具体占的存储空间是多大呢?是不是再除以八对吧,512个位嘛,除以八就是占的这个字节,那么除以八的话,那就应该是64对,正好是二的二十二十六次方64兆对吧?啊,那大家一比对也是,那是吗?27那是16兆,这个29那是不是就是它四倍啊,64兆对,所以我们这里面定义了一个64兆大小的位图,这样的一个波轮过滤器,它能够处理的是5亿多个K对不对,对吧,512兆这样的一个K的量级。
04:28
好,那我们先把这个先定义好,接下来就看这个process里边怎么去做了,大家想想在process里边这里是不是需要。这里需要干什么呢?我是不是把对应的这些东西都要跟red去做操作啊,往red里边去去存,存到位图里边,然后去拿出来判断对吧?呃,在这个之前大家想一想,我们在在这个red里边的存储到底是一个什么样的结构呢?位图是怎么样存的呢?我们先得想好对吧,这里边位。
05:05
图的存储方式是什么样的呢?啊,我们是准备就是啊,大家会想到比较简单的方式,是不是就是一个K,然后对应着一个位图,这就可以了,对吧,那我们的K应该是什么呢?K是。哎,大家想想,Key是我们是所有的这个程序里边就一个位图就搞定了吗。还是要跟什么有关。做过滤。UV做过滤,我们现在是不是每一个大家想想是不是每一个窗口都要做一次过滤啊。那所有的窗口可以共用一个位图吗?共用一个位图的话,就是就是上一个窗口里边,如果要这个user已经来过了,那那一位就变成一了,对不对,那下一个窗口我就我就不要了。
06:03
我就不统,如果下一个窗口时间窗口里边又这个这个用户又有点击行为的话,我就不统计UV了。我们的想法是这样吗?其实不是,对吧?上一个窗口的是不是不应该影响到下一个窗口啊,所以每个窗口是不是都应该是自己单独的一个位图表示啊?哦,所以大家想想这个其实还是还是空间挺大的呢,对吧?啊,大家不要以为是我们定义这个几十兆直接一个图就能搞定了,那有同学说诶你这个那直接放放内存里面好了,对吧?哎,不是那么简单,每个窗口是不是都得有一个啊,所以这里大家会想到我们可以以窗口的时间戳某个时间戳作为这个K,呃,比方说我们可以是不是可以用window and呀。作为它的它的这个K,那么value就是是不是就是位图啊bit map好,所以这是我们现在存储的一个方式,呃,那当然我们可以先把这个当前的K先定义好啊,就是要要做存储的这个K,那其实就是window and,那window and从哪里找呢?诶大家看这个process里边没有window了。
07:24
那window可以从哪里拿?没有window至少有上下文嘛,上下文肯定有,对不对?看一下诶,果然有window,有window当然就可以get end把这个拿出来,呃,当然这里大家注意,就是我们最后要存的时候,是不是尽量要把它转成string去去存get end,本来拿到的是一个long类型的时间戳,所以这里把它做一个to string操作,转成字符串啊,这是我们要存的这个K。啊,然后我们可能还得去定义一个当前到底有多少,就是具体到底这个这个技术,我们的那个UV统计嘛,去重之后那个计数到底是记了多少,是不是还得定义一个这样的变量啊,啊这里边比方说我们就定义一个叫count吧。
08:17
一开始应该是零对吧,啊,那就是每一次去。大家会想到,那这个东西我是不是可以作为一个状态存在内存里边呢?当然本来也是可以的,但是这里面有一个什么问题呢?我们前面是不是在做那个trigger的时候,每来一个元素就会把窗口的所有状态全清空了,而这里边简单了,那后边我们是不是就没办法再去做这个状态编程,因为那个状态是不是也会被清空?每来一个元素就清空,那你这个状态就没法保持了。就就每次来了都是零对不对,每次都是只能加一,那这个当然是不合适的,那如果说我们还想把这个状态能够就是就是能够持续的获取到的话,要这样的话,我们是不是还把它存到red里边就完事了呀,所以再把它存到red里。
09:13
好,所以这里啊。呃,把。每个窗口的count值的u UV count值。也存入。呃,所以要呃,这里如果我们要上来之后调用这个process process方法的时候,是不是应该先把这个count先从red里边拿到啊,然后再判断当前触发的那个触发这个操作的那个新来的元素到底要不要过滤掉,如果过滤掉的话,是不是countt不变。如果没有被过滤掉,是之前没有的,那是不是应该加一,而且要把它的那个对应的位图里边的那个标志位是不是要置一啊,啊,我们的想法其实就是这样,我们就是要改这个count,这个count也存到里面去。所以要。
10:12
先从red中读取啊,那当然这里边我们就判断一下喽,这个这个要去reis里边存放的话,那大家想这个count肯定就是一个就是一个P一个value了嘛,但是但是大家注意啊,这count假如说我们在里边存的这个P就叫count的话。那他就一个值就行了吗?就只有一个值吗?还是应该每一个窗口是不是都应该有一个count值啊,一个时间段内是不是都应该有一个UV的输出,统统计的这个count值,所以在red里边存的这个过程当中,我们也应该是按照不同的时间把它去存下,对不对,要不然的话,那你就是随时更新了,相当于当然也可以做成随时更新,就一个count值对吧,来了之后就更新,来了之后就更新也是可以,这里我们把它存成一个一一张表。
11:14
对吧,呃,那就相当于一个叫做count的表,里边每一个K。是不是相当于又是。当前的那个window的那个值啊,当前window and,然后对应的是真正技术出来的那个UV的count值,对不对啊,这就是这样的一个数据结构啊。存入我们这里写一下吧,呃,表里。呃,存放内容为。这里面我们存的应该是window and,这是key value的意思啊。Key是window and,然后对是一个UV count,对吧,这这是我们要做的这样的一个操作,所以在这里边先读吧,那读的时候一张表,那应该是。
12:13
对,H。哈希表啊,然后h get什么呢,这个表名就应该是存存入。名为。Count的表。存放内容是window and和UV count对吧?所以这里面我们h get是不是要get count这张表啊,然后get对应是不是还得传一个K啊,这个K就应该是。诶,是不是就是刚才window and window and刚好我们已经算过了,是不是就是store k啊好把这个穿进来。假如能够拿到东西,它不为nu的话,对count是不是就应该等于?
13:02
啊,当然这个其实我们应该是把它先拿是吧,呃,这个这个就省得做两次操作了啊。直接把这个写到,呃,给大家觉得可以把这个count写到里边去,对不对啊,就是或者我们这里边就是你直接掉两次其实也是可以的,对吧?前面这里边h get先判断是否为空,因为如果如果本身是空的话,你这里边直接给count赋值肯定是有问题的,对吧?这里边肯定是要做一次判断的,大家可以做一些优化啊,我可以直接再把它get出来count stop,诶这就是当前拿到的这个数据,那注意还要把它再转成浪类型。当前的值对不对?Count的值,当前窗口的count值。好,然后接下来是不是就是要判断当前的这个用户了,那当前用户是什么呢?判断呃,用布隆过滤器判断当前用户。
14:10
是否已经存在,那我是不是得先拿到user ID啊?大家想想是不是这样,所以定义一个user ID。从哪里去拿。现在的数据都应该在。诶,大家看这里边传进来的东西有K有上下文还有。Elements对,Elements是不是相当于就是当前窗口收集起来的所有的数据,哎,所以这是我们现在收集到所有数据,当然前面这是我们定义那个trigger的时候,是每来一条数据就触发一次,那其实大家知道这elements里边是不是。是不是只有一条数据啊。大家想应该是这样对吧。所以这里边我可以直接element啊,直接可以点last,直接把最后一个数拿出来是不是就可以了,反正就这么一条数嘛,然后它的什么是ID呢。
15:11
当前的elements都是一个二元组,前面这个是没用的那个雅key,然后后边这个是不是就是user ID,所以把他的二拿出来,然后啊,当然然后我们布隆过滤器里边是不是要的那个字段是个string啊。所以我们把它to string long再转成一个string啊,这里就是你平常用的string都可以啊,然后接下来要算什么了。已经拿到这个UID了,算什么?是不是得算哈希啊?算出哈希,然后我们就可以到位图里边对应的那个位置去看它到底是零还是一,对吧,如果是零的话,说明没有过,如果是一,那是不是就已经有过了啊?那就驱虫把它排除掉,所以这里我们接下来要算一个offset。
16:05
或者是算它的哈希,算出来是不是算出的哈希就是在位图当中的那个偏移量,对不对啊,它们是一一对应的,所以这里边我们调用bloom的哈希函数,这里边把user ID传进去,另外传一个随机数种子,这个大家随便给啊,比方说我随便给一个61。哦,这种一般会给一个质数对吧,让让这个它尽量分布的散开一些,而且你这个质数可能在允许的范围内,就是大一点比较好。当然,接下来是不是可以判断。到底有没有这个了,对吧?诶这里边那可能我们先定义一个这样的定义一个标志位。判断位图中有没有。
17:08
这一位对吧。呃,所以比方说我定义就叫is exist,那这个是不是又要从从RA里面去取了,从那个位图里边找对应的偏偏移亮这个offset那个位置的那一位到底是零还是一,所以接下来是je。大家知道操作位图里面操作位图有一个方法叫。是不是他可以直接set bit get bit呀?那大家知道里面这个操作吧,哦,大家之前没讲过啊,那可以跟大家说一下,就是有一个方法叫大家看get bit get bit意思就是说直接可以去取某一个某一个我们保存的值里边的某一位,对不对,对吧?啊直接拿它里边的某一位,那这里边我们要的是谁呢?
18:07
靠。现在要的是谁?是count吗?Count是我们保存最后的那个计数值的那张表,对不对?我们的位图是位图,是不是就是一个P一个value啊。对吧,前面这个是我们定义的位图,后边是我们定义的保存那个count值的那张表,现在我们是不是直接到位图里边去找啊,所以现在的K是不是就是store k对store k那对对应的那个位置也得传进去,大家可以看一下这个get beat调用的时候。对,这里是不是就是要把K传进去,还要把offset传进去啊,到底这个偏移量值在哪里,那这里我们的偏移量就是前面算出来的那个哈希值offset。拿到啊,那当然如果要是不存在的话,怎么做。
19:07
如果不存在,对,把那个位图是不是就应该置一啊,如果不存在。位图对应位置位置一。呃,另外我们如果要是统计count的话,Count应该变成。Count是不是应该变成count加一啊,Count加一,所以je我们现在为图制一,是不是对应有一个set bit位操作,同样还是操作store key这个位图,然后接下来是不是得把off set传进去,另外还得把值传进去,对不对?现在要传的值就是啊,大家看这是布尔类型的,是不是true啊,是不是直接传true就可以把它制成一,然后接下来。
20:05
Count加1COUNT本身是不是也在里啊,那现在是不是就变成了h set是不是要改count那个表了?现在改的是count对应的K还是stock k,另外要存的值是,是不是就是count加一,注意这本身是一个long类型,还要把它。To string是不是这样就可以了?啊,我们把对应的这个这个值再返回去,存到我们里面的一张表。大家注意,Re里边存了两部分内容,一个是我们做这个布隆过滤的时候用到的那个位图对吧?啊,那个很大的一个,就几十兆的这样的一个一个存储空间,另外还有一个是相对比较小,就是我们每一个窗口的那个count值也在里边保存着,对吧?啊。
21:02
两部分内容,所以这里面的操作不一样,大家看一部分是未操作操作位图的,另外一部分是h set h get,拿我们那个count值的。啊,当然如果我这里边如果在这个控制台也想有一个输出的话,那是不是可以在流里边把它也输出啊,流里边输出是不是out.collect啊,这里边比方说我包装成UV count,这是输出类型对吧?那么这里边UV count的类型啊,一开始是。大家看我们包装成什么了,UV count window and和UV count对吧?是每个窗口对应的那个UV是多少?所以这里边是不是就直接是stock k啊啊,这就是对应的那个窗口的and吗?然后另外count,诶,是count吗?是不是还得count加一啊,因为这里边count只是在这里边内部定义的一个变量对吧?并不能代表什么,呃,就是本身能改变的这样的一个状态对吧?所以这里边我们要把count再加一返回,当然了,这里边本身这个是一个,本来是一个string了,对吧?存成string了,所以还要too long再把它返回去,这样就没问题了。
22:20
好,这是如果不存在,那如果存在呢?如果存在,那是不是就什么事都不干啊,啊,如果存在的话,是不是位图也不用改,已经是一了,对吧?啊,然后我们那个count是不是也不变啊啊,那这里边我们做的可能就直接在这里输出一下就可以了,每来一条数据我们都输出一下当前是什么样子,UV count,呃,这里就是还是store key to long。另外这里要输出的就是count对吧,诶这里写错了,大写啊。
23:01
You will cut。所以大家看,这就是我们完整的这个处理的过程。这个过程里边,其实呃,我们这里主要要注意的就是大家要搞清楚在redis里边存着的到底是哪些东西,这里边可能就是位图,大家首先要搞清楚这是不能过滤器要用的,另外还穿插了就是中间我们把那个状态要清空,所以说没有办法直接在内存里边存这样的一个东西,而是把这个count值,本来之前我们都是当状态存的,对吧,现在是把这个count值直接引放到red里去了。哎,那可能有同学会想,这个过程当中,你来一个数,就直接输出一下,这个是不是就是。就是处理有点操作太频繁了呀,频繁要跟redis去做这个交互对不对,读写这个操作都很频繁啊,那确实是大家看一下这个测试结果吧,我们现在这里边可以打印输出,然后这里是这个已经执行了,大家首先先看一下之前我们做的那个没有用布轮过滤器的时候,呃,28000多对吧,三万多32356,呃,这个是当时的一个UV的一个统计结果,我们现在如果把它放在这个re里边做不容过滤的话,最后结果是不是应该跟这个一样啊,好,所以我们要跑一下这个代码啊,跑之前先把提起来。
24:42
好,我们先看一下,里边应该没有东西啊,Red起一个命令行,好,里边是空的,我们现在可以看一看这个效果怎么样。大家看这边已经运行起来之后,程序已经有输出结果了,而且呃,大家看这是每来一条数据是不是就要统计输出一次啊,呃,不停的在增长对吧?哦,这里边已经有这个好几千了,因为大家记得当时那个窗口是有几万的,两万多三万多对吧,第一个窗口好像是两万多。
25:22
大家可以大概的看一下啊,诶好像就是两万多对吧,好像是是差不多的。第二个窗口应该就是3万多了,对不对,呃,如果要是正常的话,应该是这样的一个结果。呃,大家可以看到就是这个过程,因为每来一个是三万多对吧,每来一个数据都要去做读写的操作,所以这个过程是不是比刚才就慢多了呀。啊,那其实大家可以想到我们有什么优化的这个空间吗。用什么方式可以做一个优化呢?哎,其实像这个,哎这里边大家会发现有点这个时间换空间的意思,对不对?呃,或者说这个如果我们想让它快的话,就是大家会想到把数据全放到内存里边,那不就快了吗?那就是空间换时间,但现在我们的问题是,如果说全放到内存里边,我们空间又不够,现在是来一个就就直接,呃,直接触发,直接输出,直接读取这个red,全放到red里边,内存是不是相当于啥都不放了,呃里边全是空的,内存是完全就是没没有利用,然后现在又是时间又觉得耗费特别长。
26:38
那我们是不是可以考虑对两者是不是可以结合一下啊对大家可以考虑的一个改进就是我可以对不要每一个元素来了之后,马上就直接把它触发就清空,而是对可能就是说也类似于是把这个窗口分片了,对吧,就隔几个元素,然后我触发一下这个窗口的操作,隔几个元素触发一下,在这这段时间之内是不是数据还应该是。
27:08
还应该缓存在就是我们的内存里边啊啊,那这样就是内存也得到了一些利用,而也不至于说让我们跟那个release的操作啊,这个太过于频繁啊,大家看现在才终于把这个都跑完了啊好最后一个还是13对不对,而且大家看到好最后一个也是三万多,32356应该是一致的,或者我们到这个red里边来。好,大家看上面的这些是不是都是位位图啊,应该是就是几十兆64兆的那个位图对吧?啊,那个我们就不看了,那个肯定看的很痛苦,那我们想看的是。我们是不是应该h get all这个count啊,看看这里边的东西啊,所以大家看这就是我们最终统计出来每一个窗口里边的UV值对不对,输出在这里啊,28000多,其他都是三万多对吧?啊,最后一个是13,这跟我们之前计算的结果是完全一样的啊,这就是一个不容过过滤器的实现。
28:14
大家可以把这一部分再好好的梳理梳理,看看这一部分呃,就是能不能自已把它实现出来。
我来说两句