00:00
我们来看一个实际当中非常常见的应用案例,那就是top n啊,这个top n其实在电商网站里边,各个网站里边都经常使用,比如说现在我们的需求就是统计一段时间内的热门URL啊,这个说是热门URL,其实就是热门页面嘛,看哪个页面被访问的次数最多,比方说我们现在具体需求就是统计最近十秒钟之内。最热门的两个URL链接两个页面,然后每五秒钟更新一次。通过这个需求描述,这应该用什么东西来实现呢?这个需求很简单,那我就是统计这段时间内URL被点击的次数嘛,而且这里说的是统计最近十秒钟内的,那大家想这是不是应该是一个窗口啊?哎,我就开一个十秒钟窗口不就可以了吗?然后每五秒钟要更新一次,什么意思?也就是说我应该是每五秒钟就会输出。
01:01
过去十秒钟之内的最热门的两个页面的访问的次数,哎,所以大家看,如果画出这个窗口的话,那就是十秒钟一个窗口,然后隔五秒之后又一个窗口出现了。统计一个,又隔五秒之后,又一个窗口出现了,统计一次,大家看,这不就是一个典型的。滑动窗口吗?长度为十秒滑动不长为五秒的滑动窗口啊,所以通过当前的这个分析啊,一眼就能看出来用什么API去解决这个问题啊,那这里边我们这个统计十秒钟内,然后五秒钟更新一次,这个显然不是一个实际需求,我们这里是方便做测试,你如果要是实际的话,统计一天统计一个小时,我们这数据量可能要求都很大啊,你测试会非常的麻烦,所以我们这里边就还是以十秒钟为例讲解一下。其实前面我们该做的这个事情也已经做了一部分,比如说还记得我们统计过一个URL view count啊,我们包装了这样的一个po类,然后做了一个这个URL count的example,当时我们做的操作其实就是统计一段时间内十秒钟窗口,当然这个是个滚动窗口啊,统计这个滚动窗口十秒钟内的。
02:17
每个URL的访问量,现在是不是也要统计每个URL的访问量啊,你要统计热门URL嘛,那是不是要按照它的访问量排序啊。哎,所以现在我们的核心需求其实是要统计出每个URL的访问量。然后还要把它收集起来做一个排序。这个问题就稍微有点麻烦了啊,前面的这个访问量很简单,我们前面这个代码都已经搞定了,那如果说还要收集起来排序的话,接下来该怎么做呢?啊,那为了理解的更加透彻,我们直接新建一个代码,然后把前做过的这个再来重复一遍吧。
03:00
Open example。好,我们这个给大家完整的做一个书写啊,Main方法throws exception。然后下边首先要创建一个执行环境,Execution environment at。叫做烟V。我们直接把全局的并行度设成一,方便控制台打印输出啊,测试的时候方便一点,然后接下来,哎,那当然就是提取数据了,读取数据,这里边就还用那个click sourcece吧,ADD the source啊,你一个click source经典的测试数据源,然后后边呢,ADD了数据之后,直接ign time stamp and watermarks,直接把对应的automark strategy写进来,但这里边可以用bounded out of orderness乱序,也可以直接用升序,对不对,因为呃,我们这个click s直接就是声序列过来的嘛,啊,所以那为了一般话的话,我们还是一个这个乱序吧,这边给一个duration啊,因为即使是定义了乱句,我也可以直接给一个zero。
04:06
然后表示当前没有延迟啊,这跟升序是一模一样的啊,你知道所谓的没有延迟就是减一,相当于是一毫秒,然后接下来breath time sta a signer,然后我们这里边你有一个the lizable,呃,Time sta signer,大家看到这里边是object没有自动给我们补全它,因为我们这儿没有给这个对应的泛型,给了之后那就可以自动补全了啊,那接下来我们这里返回的当然是element r time3先把这个先分出来嘛。好,这里完成之后,就是我们一开始输入的stream。好,最下边的env execute我们先写出来去执行,那中间就是物理转换的过程了,我们知道一开始想要做的肯定是先去把这个URL分组,然后统计它们分别的个数,哎,所以这里边其实就是第一步嘛。
05:02
第一步,我们是按照URL分组。统计窗口内,我们是按这个时间窗口来统计的啊,窗口内每个URL的访问量。这就跟之前做的一样了啊,我们这个就是stream r,先按照URL分组,那当然是K了,用贝塔的URL做一个分组。走之后开窗,我们当前要的是一个滑动窗口,所以是tbling time Windows of2个参数,一个time second10。另一个TIMES5。就是我们定义的滑动窗口对吧。哦,这里写错了啊,我们应该写那个滑动啊。之前我们用的是滚动,现在我们用的是滑动,然后接下来当然就是要去做聚合了,我们之前已经非常熟悉啊,这个聚合的过程其实就是来一个加一,来一个加一嘛,然后最后的结果我们包装成一个URL view count啊,带着时间窗口的对应的那些信息,然后方便我们后面做排序嘛,因为大家想你最后出来的数据它是一连串啊,诶它本身这个数据不分窗口的,那你如果要是不带窗口信息的话,后边我收集它怎么知道。
06:24
哪些数据属于哪个窗口,大家知道,不同窗口的数据你是不能混在一起排序的,对吧?哎,所以你不能统一的把它做排序啊,还是要把这个按照窗口去做一个分组的,所以接下来我们这里边就还是aggregate前面一个增量聚合函数,前面都已经实现了URL view count agg,这这里已经有的话,干脆我们直接用吧,直接可以调URL count count view example里边直线的这一个过程其实非常简单,就是拟一个长整型的累加器,来一个加一,来一个加一啊,太简单了。然后下边的这个URL view count result呢,也非常简单,就是统计出来的那个结果,把它拿出来,然后呃,包装这个我们当前窗口的start and,把它放到这个view count里边不就完了吗?URL。
07:17
Will result,好,这样的话我们就搞定了这一步。目前我们做完了聚合之后,这个我们叫URL count stream,给一个给一个可读性更强的一个名字。啊,然后为了让大家看的清楚一点,我们在这儿啊打印一下ul stream。这个就是UIL。那现在的关键就在于第二步。第二步我们要。位于同一窗口统计出的。访问量并行收集和排序,还要输出TOP2。
08:05
那这里面的问题就在于接下来这个怎么收集呢?诶,那你就还是再去开窗吧,再开一个十秒钟长度,然后每隔五秒去滑动一次的这个窗口,然后到点的时候,里边收集到的就应该是同样的这个数据,诶,但是大家注意啊。一开始的时候啊,这个event里边我们当前是有数据的时间戳的,每一个数据对应是有时间戳的,那现在如果我们要基于这个窗口做了统计之后,它的输出结果里边这个时间戳又应该是什么呢?这里边其实就涉及到这样一个问题了,对吧,就是我们当前输出的东西到底是什么。到底我们应该怎么样去处理这个输出的东西,举个例子吧,我们这里边的这个数据,数据来了之后啊,比方说这里边一个是home访问了一下主页,我就拿H来表示啊,一个是访问了一下,比方说购物车,一个访问了一下product具体的商品页面啊,然后我们再来一个home。
09:07
再来一个cut。哎,大家会想到这样,如果要去统计的话,我们进来前面这个读取转换啊,分配时间,说这些算子我们不说了啊,我们就直接到这个窗口这里来吧,到了window算子这里。Window算子这里,假如说我一个窗口里边有这么些数据的话,最后输出的是什么呢?他们来的时候是一个一个来的,比方说现在我这个零到十秒的窗口。来的时候一个一个来都收集到了里边,然后依次去做叠加,大家注意叠加统计的时候是按照K来统计的,所以我们保存起来,最后要输出的是什么呢?是不是要针对每个K有一个输出啊?而且这里面窗口计算过程当中是没有输出的,这里面没有任何输出。只是有那个水位线在不停的往前推进,它会传递到下面去,但是窗口只有到了结束时间的时候才会触发计算输出到下游,所以它是到了水位线涨到现在没有延迟啊,所以涨到十秒的时候,那么这个时候触发当前窗口的计算输出对应的结果,所以是水位线到了十秒,这里会突然一下输出H它有两个,C有两个。
10:27
P有一个啊,当然我们输出的结果应该是都包装成UR view count了,呃,最里边核心的这个数据主要就是他俩,然后后面还带着起始点是零,结束点是十,起始点是零,结束点是十。包装好的这样的数据,他们应该理论上是同时输出的。但是自然我们想到不可能同时输出啊,既然是流嘛,呃,一个任务数据往下游输出的时候,肯定还是有顺序一个一个输出的,所以呢,它就是快速的啊,我把这个化成三角吧。
11:00
快速的跟在后边连续的做了一波输出,它是这个样子,然后输出之后呢,现在的每一个URL里边有几几条数据啊,肯定只有一条数据,对不对,我们都已经聚合起来了呀。肯定就只有当前URL的一个统计数字嘛。那我们现在应该要做什么事呢?其实是要把当前这个窗口里边输出的这些数据收集起来,要做一个排序。那我是不是直接在这儿啊,比方说我来一个process。用一个process方式收集所有的数据,把所有数据都扔到里边来,那接下来是不是整个这条流里边啊,窗口聚合的所有结果,所有数据。都会放到这个process里边来。这个就稍微有点麻烦了啊,如果这个时候啊,所有数据放到process里边来的话,那就不止一个窗口突出的结果会扔进来了,下一个窗口十啊,下一个窗口应该是五到。
12:03
15秒,因为我们是滑动窗口嘛。这个窗口里边可能也会有一组数据。他是不是也会流进来啊。啊,大家要注意啊,窗口都已经聚合完毕了,现在我们是不是得到的就是一个流,就是一个data stream呀。啊,它只不过就是这里边你连着输出了几条数据,数据可能有疏有密而已,它还是一条流。它是一个普通的data stream,所以这些数据直接传递到后边的时候,这边process是搞不清楚它到底属于哪个窗口的。你只能是看到,诶,可能是同时诶,EEE突然来了好几条数据,然后呢,稍微隔了一会儿,然后eee又来好几条数据,他可能会有这样的一个情况出现。所以大家会发现啊,在这种场景下,后边如果想要让他明确收集到所有的数据排序,这个很麻烦,为什么呢?呃,关键就在于前面我们提到的啊。
13:01
过了这个窗口window操作之后,它输出的这个数据的这个架势是同时输出好几个,哎,然后隔一段时间,像我们这个五秒钟一次的话,就是隔五秒钟,然后又EEE输出好几个,但它还是流,所以它是有先后顺序的,那所以这里就涉及到一个问题,我们在下游想要去做排序的话。你怎么排呢?你是来了一个数据就排吗?这肯定不对啊,你你来了一个数据,他点击了三次,它就一定是排第一吗?后面还会有数据的呀,啊,那大家就会想到,那我就来两个数据再排,那你TOP2他们俩就一定是TOP2吗?也不是啊,后面还会来对不对,然后来了之后呢,就他们之间可能有间隙,然后呢,可能中间又隔一段时间又会来别的数据,那这个大家可能就觉觉得这个就很乱了,我们现在中间这个距离还有点儿远,就是隔了五秒钟,那假如说我们隔一秒就输出一次的话,甚至有可能这个数据前后的乱序都会导致不同窗口输出的这个结果都有可能会打乱,那假如真的出现这种情况,那我们后面又该怎么做呢?
14:09
我难道把它收集起来之后统一排序吗?显然不对啊,这不同的窗口里面的数据是不应该统一去排序的。那到底应该怎么做,这就是我们想要去考虑的一个问题,那其实有一个最简单的想法,我干脆就把所有数据,你就不要先去开窗,按照URL去统计,然后统计出了它的这个值,就直接把它输出了,因为大家知道一旦要是KBY之后去开窗,那就当前窗口内统计的只有一个URL的数据。啊,最后是不同URL数据是分别输出的,那我们现在呢,我干脆就把所有数据都汇集到一起,所有的URL都放在一,统计的时候还是得分开的,对吧,你不能混在一起嘛,那就变成PV了嘛。啊那我现在呢,是要把所有的数据放在一起,然后又要把URL分开,分别统计它们的个数,统计完了之后,我就可以直接给他做一个排序了嘛。
15:08
啊,这个大家自然也就想到能能有对应的这样的一个行为去处理它,那中间我们可以用一个比方说哈希map这样的一些数据结构啊,那这个就比较简单一些,哈希map k就是当前的URL,这就相当于是我们KBY了嘛,只不过是在哈希map里边KBY了,用K来表示URL后边的值,跟上一个它对应的count值。啊,那最后我们就可以把所有的,就当前这个窗口要触发的时候,我可以把保存好的所有的数据拿出来,然后按从大到小的顺序放到一个这个release里边去做一个排序,这个其实是大家最容易理解,最容易想到的一种实现。
我来说两句