00:00
我们已经讲完了UV的一个实现,但是大家会发现在这个过程当中,其实UV是有一些有一些问题的,一个很严重的问题就是我们这里把它放在哪里了,Set,这个set是在哪里呢?在什么存储空间里呢?在内存里,然后那大家自然就会想到了,那我这个size是要存全部的,尽管不是全量数据,是把那个ID一个一个拿出来存了,对吧?但他存的是所有数据的ID,对吧?那假如说我这个数据非常非常多的时候怎么办呢?就之前跟大家讲的,阿里双11你统计一个小时的这个量,甚至要统计一天的量的时候,那那个数据可真的是非常非常庞大啊,一个小时可能就是上一条几十亿条对吧,十几亿几十亿条数据,这种情况下你都放内存。大家可以大概的估算一下,比方说就是几亿条数据的话,一条数据假如按这个按多大算,呃几呃,大家可能想到你按几K算的话,或者你就算比较小啊,几几百几百个bit对吧,几百个bit的话,那相当于1亿的数据量,就是几百亿个字节,百亿字节的话大概是多少。
01:24
就是几十个G对吧,就是几十个G,那假如说你的这个数据量更大的话,那那那这个几乎是不可接受的一个状态,对不对,你的这个内存肯定是搞不定的,那有同学说,哎,那放red嘛,Red不是也可以做驱虫吗?我们直接把它放到那个set里边对不对?呃,这个过程,呃,对rei也撑爆了,对吧?呃,这个显然是不太合理的一种方式。另外大家可能会想到还有一个问题,我们在这里边其实是把所有的这个数据全收集起来之后,一下去做的这个取重操作,对不对,那我们其实还可以有一个优化,就是是不是可以有类似于这种预聚合的这种操作啊,其实可以是来一个就把它就把它查重,先过滤一次对不对,然后最后我们数据来完了之后,也就过滤完了,最后只要输出一个这个size,输出这个结果就可以了啊,这也是一个优化的点,当然我们可以在内存里边做优化,呃,那现在如果要是内存放不下的话,我们就得考虑。
02:32
用别的一些实现方法了,比方说我们用什么样的方法来实现呢?其实这个简单的一个思想,就是我现在其实就是数据存不下对吧,那如果数据存不下怎么怎么办,怎么做。啊,有些有些同学说那就不存了呗,那你这个我们现在就是要对所有的key去做查查重去重,你如果不存这个key的话,后面来的数据你怎么知道它,它到底这个呃有没有过出呃出现过没有呢?对吧?哎,那他就会想到我可以是不是把这个K的信息简化一下,把它压缩一下呀。
03:19
啊,大家就会想到,哎,对呀,我们这个K,其实我并不关心这个K到底是什么,对吧,我关心的是什么呢?对,我只关心在这一个sat里边,它到底有没有啊这这这就说这个都都关心你飞得高不高,呃,飞得高不高,没人关心你飞的累不累,对吧,我们现在是不管他,不关心他具体到底ID是什么,只关心他。有没有对,只关心他这个状态,那大家想有没有这个状态是不是就是一个布尔类型啊,如果我们进一步压缩的话,有或者没有,这是不是就是一个零一这样的一个状态啊,如果压缩到极致,应该是用什么就可以保表示,有没有这样的一个数据,是不是用一个位就能表示啊,呃,二进制嘛,你零跟一就是一个位的状态嘛,所以如果压缩到极致,我们的一个数据,一个要要去重的这个ID这个这个字段啊,其实对应到我们的压缩之后的状态就是一个位。
04:26
所以我们的一个思路是什么呢?就是把所有的数据都保存到。一个所谓的位图里边来,对吧,它的状态都保存到一个位图里边来,这个位图又是什么呢?就是一位一位组成的,那来想这不就是一个正常的一个一个数据嘛,对吧,就是组合起来的一个很大的一个数据,只不过它里边就是每一位都代表一个特殊的含义,都代表。某一个字段到底存不存在对吧?啊,它就是这样的一个含义而已啊,所以这样的一个东西就叫一个未图,那我们具体的一个实现呢,在这个过程当中,我们要又怎么样去实现这个去重呢?啊,这个思想叫布容过滤器,那布容过滤器其实它的底层的数据结构简单来讲就是一个,呃,就是一个位图啊,就是大家如果要是就是简单来说的话,当然它不不仅仅是存储了一个位图,还得包括什么呢?
05:29
大家想你光是把这个来一个数据,我就存到对应的一个位置上去了,那你怎么对应,是不是这个就很有讲究啊,对吧,正常来讲我是不是应该是。就是单独的一个一个KR这样的,要存的去重的这样的一个键值,必须把它对应到唯一的,而且是能够确定找到的一个一个位置上对应一个位对不对?那大家想我应该用什么样的方法去找它对应的这个位呢?对,这其实显然就是应该用计算一个哈希值对吧?啊,用这个哈希函数去实现这样一个计算,所以实际计算的过程当中,大家会发现它最终实现的是一个什么呢?它是一个实现的是一个,呃,就是概率性的告诉你。
06:19
一个数据是可不可能存在对吧,它是要不是告诉你如果是零的话,表示这个数据肯定不存在,对不对,不存在的话是确定的,那假如说这个位置是一的话,是表示这个数据一定存在吗。不是一定存在,而是可能存在,为什么可能存在呢?因为是哈希之后,对吧?因为我并不是我们完整的把这个元素直接存进去了,而是哈希之后存到了一个位上,哈希的话,那就跟我们选择的哈希函数有关系了,如果哈希函数比较简单,在某种情况下就会出现冲突,所谓的哈希碰撞对吧?一旦要碰撞,那其实就是两个不同的K有可能会对应到同一个位上,对,所以那你当前这个位是变成一了,那其实我并不确定到底是哪个K导致它变成了一,对不对,所以这只能是代表一定的概率。
07:17
当然了,如果我们这个哈希函数设计的好,设计的比较复杂,往往我们都要实现这个就是好几个哈希函数做做反复计算对吧,最后得到一个随机性很强的一个结果,那我们这里呢,可能就给大家只是示意啊,做一个简单的一个实现,自己手写一个简单的哈希函数,呃,那那这个过程大家会想到它得到的是一个不确定的一个结果,那说明它这个压缩是压缩了,但是好像并不是特别的好,是吧,那它主要的特点就在于。就是在于极大的节省了空间,那大家可以想象一下,你假如说要把一个呃,几百个100BIT啊,100字节的一个数据压缩到一位的话,这个压缩率大概在多少啊。
08:08
100个bit的话,呃,八位,那八百八百位对不对,那差不多是1100:1000:1左右的这个压缩比,那你假如说用到了这样的一个状态的话,那大家想这个如果我们之前那个几十个G的这个数据,就相当于压缩到了几十个兆。那几十兆这个证是不是就随便存了对吧,甚至你更大的数据,我照样可以把它存起来啊,我这个就这样的一个位图,那处理起来就存储起来,就没有太大的这个存储压力了啊,所以大家看这其实是处理这种大数据,呃,这个环境下,极极大量海量数据情况下,要判断这个这个舰池是否存在一个常用的方法啊,就是用这样的一个布鲁过滤器啊,所以它其实就是用一个位图存储我们所有这个键是否存在的一个状态,当然这是一个概率性的一个状态。
09:08
好,接下来呢,我们就来具体实现一下吧,啊,当然这里边我们要实现的时候还是先new一个。Object。这个叫UV with布隆过滤器对吧?啊b blue filter,所以就是布隆过滤器。呃,首先这里边大家要注意一下啊,我需要去引入一个新的依赖,因为现在我们的这个布隆过滤器的这个结果呢,就是存储的这个数据结构呢,我不准备直接放在内存里边。呃,因为这个大家会想到啊,这个数据有可能比较大,对不对啊,我我就直接把它放到red里面去好了,那所以这里边所有的这个过程,我要跟red去交互,要有red的读取操作,但是不是我得引入red相关的依赖啊,啊这个大家要注意一下,所以POM文件里边这个是子模块,对吧?Po文件引入red的依赖,看一下文档里边。
10:08
啊,有同学说,那我们把之前写过的那个,呃,Think引入的那个连接器引入就可以了嘛。我们要引入的是连接器吗?诶,这个大家要注意一下啊,这里我们引入的就直接是对reis的客户端,我们就是直接对reis做读取,对不对,并不是flik跟redis那边要做写入SK的这个连接啊,并不是这一块啊,所以我们就直接操作reddi就好了,把这个dependency引入。然后我们在这里边就可以去。呃,数据的话,我们就还用这个user behavior对吧,然后在这里边main函数其实还是一样,这个我们就直接抄之前的内容吧,我们可以抄到哪里啊,先引入环境,然后定义数据源,呃,包成user behavior,然后做这个时间戳给定对吧,然后统计这个。
11:15
Filter出来。至少可以做到这。我们把这些该引入的东西先引入。然后接下来哎,基于已经做过PV之后的这个状态,我是不是可以去对它做一些统计啊,啊,当然之前我们是直接那个type window2了,现在的话我把它转换一下用还是用那个P的方式。我们就直接type type type window对吧,基于k stream去做操作,那这个转换是不是还是要把这个data,我们既然是还是所有的数据要统计,是不是还得给他一个,哎,类似于UV对吧,或者是叫就叫Du k这样的一个字段,用它来做K后边是不是要把。
12:14
这里边我们是还是直接给一个一就可以了吗。我们要要什么数据,是不是得要用户的ID啊,要不然你到时候怎么去存,怎么去做那个哈希判断它查重呢,对吧,所以这里边必须要传入的是data.user ID。把这个要穿入,然后接下来我们就可以K了。KY当前的第一个元素,这相当于还是所有的数据都来了,对吧,接下来开窗,Time window time window这里边还是。呃,HOURS1小时,然后接下来大家注意,如果说我们要把它往这个red里边写的话,那相当于我们本身的这个内存里边,大家想一想这个过程里边,我们的这个这个窗口啊,它的这个计算我们定义好的那个,我们要写入的这个操作,肯定都是在窗口处理函数里面写的,对不对,本身窗口处理函数是什么时候调用啊。
13:24
窗口是不是都是在窗口关闭的时候才去调用一下啊,那有同学就想啊,那没问题啊,你窗口关闭的时候,然后去呃呃,调用这个red的连接,把我们的数据都往那个red里边放,然后放到一个bitmap里边,对吧,然后去查重不就完了吗?大家想一想,你在这个过程当中,窗口是不是要把所有的数据都先缓存下来,存下来,收集起来啊,你要是窗口都能把数据都存下来了,你还有必要去做那个bitmap吗?你直接在直接在这里面内存里面都搞定不就完事了吗?还那么麻烦干什么呢?
14:06
所以我们现在既然要往red的那个bitmap里边去存,就是因为窗口这里是不是内存放不下,数据放不下。所以我们是不是就不应该等到窗口所有数据都到齐,窗口关闭的时候再去做操作啊。所以我们是不是在之前。或者大家会想到一个最简单的处理过程,过程就是我们就真的流处理了,是不是来一个数据就应该触发一次窗口那个操作啊。哎,所以现在我们相当于就得做什么操做什么事情。触发窗口操作。有一个窗口的API,可以去做自定义的触发窗口操作的那个方法叫。触发器trigger对不对,所以现在我们就要去自定义实现一个trigger了啊,那当然这里边我们比方说这个叫my trigger啊,实现一个这个东西。
15:06
当然了,后边啊,这些东西做完了之后,是不是一定要去做处理啊,这个处理我就直接放大招了,呃,当然大家就是如果想用apply或者用别的也都可以,对不对,我这里边因为这个方便我做所有的操作啊,我就直接这个,呃,Process这里边UV count with blue。定义一个这样的process方式,最后我把它打印输出,不要忘记执行,呃,这里边UV with bloom drop,好,这就是我们的程序的主体流程,所以接下来我们的任务其实还比较重啊,大家看,还得又实现一个自定义的trigger,哎哟,这又是什么玩意儿呢?好,大家来看一下。
16:02
呃,My trigger对吧?啊这个,那那后面肯定要实现一个什么接口了,要实现的什么接口是什么东西呢?就是一个trigger对吧。好,我们把trigger引入,然后看一下它肯定有类型,这里的类型是什么啊?这个比较简单,就是当前的数据类型再加上一个窗口的类型就可以,所以当前的数据类型应该是什么呢?当前的数据类型。好,那应该是map之后对吧,那是不是一个一个WK,这是个string,还有一个user ID这是一个了,对,所以它是一个二元组啊。这样的类型对吧,后边是window的类型,那是time window。好,所以大家看实现之后,上面不报错了啊,然后我们看一看它里边到底要实现哪些方法呢?重写哪些方法呢?有这么几个方法,一个叫on even time啊,所以就是说假如我们定义的时间语义是even time的话,我当前触发怎么去触发对吧?啊,那大家自然会想到这种触发当然是跟auto mark有关嘛,Auto mark对应来了之后,它就调到这儿来了,然后接下来是on processing time,也就是说如果在处理时间语义下怎么去出发对吧?出发这个窗口,另外还有clear clear就是最后收尾的时候了,对吧?我在那个做了窗口清理操作之后,关闭之后要有哪些收尾的工作啊,这里面如果没有的话,我们什么都不定义就可以。最后还有一个,嗯,Element on element,顾名思义,那就是每来一个元素的时候,做什么操作对吧,是否要出发对不对,那就是这样的一个状态。
17:56
那这里边这是我们的自定义窗口触发器,呃,那我们现在是想定义什么呢。
18:07
是不是我们现在的想法很简单,就是想在每个元素来的时候定义它真正的触发,对不对?哎,那那大家会想到我们如果要是不定义默认的,那那那个trigger它应该是什么情况下出发啊,是不是应该是even time的时候出发啊,那even time到什么情况?到什么时候去出发呢?是不是对water要找到?哎,这个这个给大家看一看吧,啊,既然讲到这里了,就给大家看一下啊,Trigger这里大家看有哪些trigger呢。是不是有这个presenting time。Trigger有even time trigger对吧,还有count trigger之类的,那比方说这里边给大家看一下even trigger,我们如果定义了事件时间语义的话,是不是就应该调用到even time trigger这里来啊,对吧,其实相当于用的就是它,那大家看它的处理是什么呢?
19:05
每来一个元素的时候,它是干什么?诶大家看它是,如果它是不是要判断当前window的最大的那个时间戳啊,然后是不是还要看当前的water mark对不对,如果要是当前的water mark已经超过了大于等于window的最大时间戳的话。它就返回一个,大家看返回一个什么。Trigger result.fire fire就是触发对吧?大家知道fire有开火的意思吧,所以这其实就是发的意思,大家看一眼这个那个trigger result到底有哪哪几种类型啊,Continue continue,就是什么都不做对吧?然后是不是就是继续呃,进行处理这个窗口接下来的工作对吧?啊,当前什么都不做,不做他的触发,然后还有一个叫呃,当然先看下面啊,一个叫fire fire就是。
20:04
触发当前的这个窗口操作对不对啊,注意触发窗口的计算操作,但是并不会清空窗口,不会关闭窗口,不会把窗口给状态全部清除掉,对吧?那什么样的情况下是清除窗口的这个状态呢?Perch,对,就是清除窗口的状态啊,那还有一个叫做fire and perch,它的意思就是触发它的计算,对,计算完成之后又把它清除掉啊,那大家这里就看到了。Even time tIgEr,呃呃,Eventime trigger,它的做法是什么呢?它的做法其实就是如果来了一个元素的时候,我先判断它当前的这个water mark是否已经大于我们的那个窗口的最大时间戳,其实就是已经到关窗时间了,对吧,是否已经在关窗时间之外了,如果要已经达到了的话,是不是直接就应该fire啊,对吧?然后如果要没达到的话,那干什么?他是注意他是干什么,他继续,对吧,继续之前还做了一个事情。
21:13
是不是注册了一个,这这这是不是很熟悉啊,这就是注册了一个定时器嘛,对吧,这个定时器其实就是什么呢?是不是就是关窗的那个操作啊,大家想想它是按照这个最大的时间戳,是不是定义了一个英time timer对吧?定义了这样的一个定时器,然后大家看看,那那他到这个on time的时候干什么事情呢。On even time的时候是判断这个time是否等于最大的时间车,如果要等于的话,是不是就直接fire啊,如果不等于是不是就继续continue对吧?啊,然后啊,如果process的话,他什么都不做,直接continue对不对?
22:00
然后clear的时候,大家看它是不是要这个delete当前定义好的那个那个定时器啊,啊,所以大家看诶,它为什么没有做P那个那个关窗还有清空的那个操作呢,那是大家会想到是不是要在触发这个定时器的时候。他才去做操作对吧,所以大家能想到是在这里面包装好了,我们这里边既然是自定义了,那是不是把这个该清空也要清空啊,该触发要触发,大家看看我们现在怎么样去做这个事情啊。On even time,我们现在想的是不是就是直接直接不用管那个关窗时间了都对吧,我现在是不是相当于就是直接一个,呃,就是相当于一个元素来了之后,直接就就触发一次操作,直接就往里边去判断一次写一次对吧,那这个是不是前面on even time直接返回。对,返回trigger result.continue对吧,然后processing time,那当然是不是也一样啊,Trigger。
23:08
result.continue啊,那这里面clear clear是不是表示我最后关闭的时候要做什么操作啊,这里边我好像也没定义什么定时器,所以这里面不用管对吧?啊直接啥都不用做。啊,那大家可能会想,那你不用去去破清空那些东西吗?哎,我可以怎么样啊。我是不是可以直接在这里边要触发的过程当中就直接把它清空啊,这里边我返回可以返回一个什么result,这里是不是要去fire了。呃,大家想fire的话,是不是还不会清空窗口的数据啊,是不是接下来窗口所有的数据还存在状态里边,还都一个一个来了之后还存在里边啊,那这个最后是不是还会撑爆内存。
24:02
所以我干脆就来一条fire,一次直接就清空,那所以是不是clear这里边也不用做任何操作了,相当于每个数据来了之后,直接就全把这个清状态全清空,全搞定了啊,所以就干干净净什么都没剩啊,这就是最简单的一种实现啊,直接在这里fire and per,所以大家看一下它的思路就是呃,每来一条数据。就。直接触发口操作,并清空所有窗口状态。好,这就是我们实现的这个自定义的触发器。
我来说两句