00:00
接下来我们就针对每一个模块,每一个具体的需求来做一个详细的讲解,我们主要是以第一个需求,也就是实时热门商品统计,主要以它为例,我们详细的拆开,后面的话我们会做一个简单的描述啊,那首先我们来看一下这个,呃,需求到底是怎么样去想要做到什么样的事情呢?哎,看一下这个需求的基本描述啊,我们主要就是统计实时的热门商品,那统计多长时间内的热门呢?是近期的,不是所有的热门,对吧,不是比方说我们这个项目上线这个几年了,我从几年之前开始统计热门的商品,那大家知道这个就没有时效性了吗?我们这个要求时效性,所以只统计最近一个小时之内的热门商品啊,当然这个主要是受限于数据啊,我们的数据没有那么多啊,如果说正常来讲的话,可能我们会统计一天甚至一周对吧,甚至一个月,可能是统计这么长度时间范围,那个数据量就会大一点啊,然后我们要求呢。
01:00
它还要非常频繁的更新,大家看到要求是每五分钟更新一次,所以说现在我们这个要求是什么呢?就是我总是要统计最近一个小时之内的,呃,就是所有商品里边热门的商品,对吧?而且呢,每隔五分钟我就要统计一下过去一个小时之内,最近一小时之内的热门商品,呃,那意思就是说,假如说我们九点钟有一个窗口,对吧,那他统计的应该是什么呢?应该是八点到九点,大家想想是不是这样,八点到九点之内的这一个小时之内的所有商品统计一个热门度对吧?排行,然后统计我们统计top几啊,把它取出来,然后呢,五分钟之后再更新,所以是下一个就是8.05~9.05,又是一个统计的区间,对吧?哎,所以大家看到我们的输出就应该是什么时候呢,九点输出一次九点。
02:00
百05输出一次,下一个就应该是09:10 09:15对吧,五分钟输出一次,大家想想这是什么呢?这不就是我们说的滑动窗口嘛,对吧?哎,所以基于这个需求,我们一下就想到了要构建一个滑动窗口来解决这个这个问题,我们构建多长的滑动窗口,一个小时滑动不长五分钟啊,就是这样的一个思路,然后另外就是说我们数据里边你怎么样来统计这个热门度呢?对吧?一个商品的热门度是什么含义呢?哎,我们数据里边不是有那个PV行为吗?我们就用用户对于商品的浏览次数来表示当前商品的热门度,但这个就看产品需求定义了,对吧?啊,就是产品那边,他如果定义说你现在是拿这个下单量订单量啊,或者说我是以这个成交成交额来看这个热门度的话,那你就得需要用这种数据来来提取来去判断了,我们现在就直接用浏览次数就是用户。
03:00
有的那个行行为是PV的,我们提取出来,然后作为当前商品热门度的一个统计啊,啊,那所以当然我们就是要过滤这个行为了啊,整体思路其实还是非常简单的,接下来大家来看一看具体实现的过程,好,那我们首先会想到一上来之后应该是一个big stream,对吧?呃,就是所有的数据都来了,呃,每一个用户对于每一个商品的这个点击行为,PV行为应该都在里边啊那大家会想到,首先我们应该先做一个过滤,对吧?先来一个filter,这里边没有写出来,但是我们能想到啊,先filter一下,所有是PV的行为拿出来,然后接下来我们得做一个什么操作呢?啊,大家想我们接下来是要统计每个商品被浏览的次数,对不对,所以我们接下来统计次数的时候,就像word count一样,你得按照那个word去做分组,然后才能统计啊,那现在我们要统计的这个word类似于word是什么呢?是商。
04:00
分品对吧?呃,商品被点了多少次,被浏览了多少次,所以接下来我们要做一个分区,做一个分组啊,大家可以认为就是做一个分组,做一个KBY操作得到一个k stream,嗯,大家想一想,现在这个k stream每一组每个这个分区里边啊,每个分组里边它的K是什么呢?我们现在是以商品作为一个统计次数的标准,对吧?呃,这个商品被点了多少次?有同学可能想,哎,我这个那个既然是要做分组嘛,那肯定是user ID啊,用user ID分组嘛,但是大家注意啊,你如果按user ID分组,然后再统计当前组内的个数的话,那就变成了每一个用户点了多少次商品,对吧?不管哪个商品,看他点了多少次,这看起来就更像一个这个用户画像,你要做做某种统计的这种操作了,而我们现在不是,我们现在是针对商品,看一个商品被点了多少次。所以当前。
05:00
那我们的分组标志是应该是item ID对吧?商品ID做分组,然后接下来当然就是开时间窗了,呃,这个时间窗我们开一个滑动窗口,不同的窗口之间会有重叠对吧?呃,分居之后啊,我们做这个key by操作之后啊,得到的是一个kid stream,然后再开窗之后得到的是一个window the stream啊大家记记得我们那个time window对吧?点window.time window,然后接下来就是聚合了,聚合得到的当然就是每一个窗口内。诶,我输出当前每一个商品,统计出来它有多少个对吧?哎,大家看到这里边我们我们输出的这个又变成了一个data stream,那这个data stream里边的数据结构应该是什么样子的呢?它应该是,哎,我应该是每一个窗口,大家看我这外面的框是颜色是不一样的啊,比方说这个红色框,就表示前面第一个红色框,红色窗口里边输出的结果,那这个框里边有蓝色的这个分组,有橙色的这个分组,他们分别,这就相当于是每个商品嘛,分别都会统计出一个个数,然后输出一个结果,对吧?哎,大家知道做K外之后,每个组里边他们都是并行不悖的,各自各自处理各自的,对吧?你到一个窗口这个时间节点输出的时候,我们会同时输出好几个,呃,不同ID啊,就是不同K对应的他们的那个统计结果,所以大家看就是这个时间点。
06:28
其实前面我们这两个数据应该几乎都是同时输出对吧?哎,这俩几乎都是同时出来的,然后后边诶,到了这个绿色窗口关窗的时间节点,这俩几乎又是同时出来,然后后边到了这个紫色的这个节点,几乎又是同时出来,所以接下来诶,那大家就会想到了,我直接输出这个就完了吗?其实还不行,我们接下来是要针对每一个窗口里边的这些每个商品是不是还要做一个排序啊,哎,所以这里边接下来你还得针对当前这一个窗口输出的数据来做一个排序,再做一个最后的结果输出,所以后面还涉及到这个问题啊,那我们先一步一步看啊,先把前面这个步步骤给大家再详细的再说明一下,那首先我们做的是这个KBY对吧,当前我们也已经知道了要做。
07:19
分组的这个K是item ID按照商品来分组,那接下来得到的就是,诶大家看就是商品一,比方说10.01有一个有一个点击对吧,零四有一个点击,零五有一个点击,这是他们这一组,然后商品二呢,哎,同样也有它的一组点击数据,接下来接下来就是开窗了,对吧?设置一个时间窗口,那当然我们的要求是滑动嘛,一小时的窗口长度,后边是五分钟的滑动步长,就按这个来定义就可以把它设置出来,这个大家要注意,我们这里边注意两点,一个是左闭前闭后开对吧,左闭右开啊那那这里边要求的就是假如我来了一个10:05的数据的话,那它应该是属于什么呢?
08:06
应该是属于10.05~11.05这个窗口,对吧?呃,而不是属于,呃,就是就是那个10.05之前的窗口啊,那当然了,它同时大家发现了,它其实也应该属于十点到11点的窗口,对吧?呃,因为我们当前的窗口长度是一小时,所有这些数据都应该属于它啊,只不过它是不属于,就是10.05要关闭的那个窗口,它不属于啊,所以大家会发现现在我们的这个数据啊,同一个数据应该是会被分发到不同的窗口里边去,对吧?哎,那这里给大家一个思考题,大家想一想,现在同一份数据应该被分发到多少个窗口里边去做统计呢?所以大家能够想到,我们这里边其实应该是一个数据属同时属于12个窗口,对吧?啊,就直接用这个窗口长度60分钟除以五分钟的滑动步长,60除以五就是它同时属于的窗口个数。然后接下来我们。
09:06
有了这个,呃,开窗之后,那我们还记得啊,首先你要定义窗口分配器,这还窗口操作没定义完的,后边还得有一个窗口函数,对吧,你得定义每一个窗口到点的时候到底要做什么计算,那现在呢,我们其实就是要做一个聚合嘛,要做的这个聚合操作用什么来做呢?诶,这就涉及到了一个问题,大家回忆一下,之前我们看这个图啊,大家看最后拿到的这个数据,有同学可能想了,哎,那一开始你不是说我们定义的那个类型就从文件里边提取出来对吧?包装好了之后,呃,那个已经做过ETL了,包装好了之后就叫做user behavior啊那那可以啊,你这里边呃定义了这个呃窗口对吧,开窗然后去聚合,聚合出来不就是一个count值嘛,那我输出就输出一个长整形就完了嘛,对吧?呃,这这里边我这个呃长有有几个数我就输出几对吧,这个完全没问题啊,你就按照这个这个数字统计不就完了吗?但是大家注意,我这里边输出了这个结果,这里边你如果只输出一个数的话,我是不是连到底它这个对应的ID是谁都不知道啊,哎,所以你这里边不能那么简单粗暴对吧,直接就就做这样一个count啊,那有同学可能就想了,呃,那那这样,那你稍微的做一个调整对吧,你就像我们之前的那个,呃,Word count1。
10:27
样,你map成一个二元组嘛,你像之前我们是一个word一个一个一对吧,你像这里边也是一样,你这里边就来一个那个item ID,然后来一个一,来一个item ID来一个一,你后边根据这个一个ID一个一个它的count值这个二元组把它做一个统计,这个不就完了吗?诶这个也是一个,呃,就是很好的想法,对吧?就像我们当时做那个word count一样,这个输出就会非常的直白,但这里边又有一个问题,大家想这里边就是我这里边直接在在这输出的时候啊,这里边我怎么知道它到底是属于哪个窗口呢?哎,主要有这样一个问题对吧?哎,有同学那就想啊,你这里边既然是隔五分钟才输出一次,那五分钟到点的时候,这里输出的当然就是当前这个窗口里边统计出来的,呃,一堆一堆这个count值了,你就把这个收集起来做一个排序,然后取它的前几名不就完事了吗?哎。
11:27
动作当然也是可以的,但这个前提是什么呢?前提是我得知道,就是我这里边处理的这是这是一条完整的流,对吧?我得知道哪些我现在处理完了之后,接下来在处理的时候,你就不要跟之前的一起排队了,对不对?诶大家想一想,我要排的并不是到目前为止所有数据里边的前几名,我要排的其实就是当前这一个窗口,这里边的几个数据排一个队而已,那你这里边还得指定我当前到底是属于哪个窗口才行,对不对,要不然的话,我后面怎么去排排这个队呢?没法去排了呀。哎,所以这里边就涉及到一个问题,我最后输出的这个数据类型呢,应该还要再包进来一个信息,包进来一个窗口的信息,Window的信息,对不对?所以大家发现我现在最后其实是想要这三个信息包在一起,就是有一个当前的item ID有一。
12:27
个它统计出来的那个count的数量,然后呢,还得有当前统计的这个窗口的一个信息,对吧?那窗口的信息我们可以,呃,大家可能说窗口呢,用什么来表示呢?你用它那个结束时间来表示不就完了吗?对吧?几点钟现在要要关要统计的这个窗口,我用那个window and大家还记得吗?哎,就是之前我们不是在那个CQL里边啊,Table API里边就有一个window window end的那个属性可以直接提出来吗?同样现在我们也可以拿这个属性出来作为窗口的一个代表,所以接下来我们其实想输出的数据结构是这样一个类型,三个属性的这样一个一个数据类型,对吧?哎,那我这里边可以先把它做一个定义啊,啊,就是我把它包成另外的一个样例类,我管它叫做item will count啊,就是每一个商品我去呃,被浏览的那个当前窗口内被浏览的次数的一个统计,对吧,这里边三个字段,一个item ID。
13:27
然后一个window and,另外还有一个countt数量,最后我其实想要拿到的是这个对吧?哎,那这里边就有一个问题了,我们窗口要做聚合,拿到这么一个东西,你怎么样去做聚合呢?哎,有同学说这sum就完了吗?直接sum不就得到了吗?之前我们做那个word countt的时候确实是二元组,对吧?哎,直接some得到的还是二元组,直接就得到最后的结果了,或者我们直接用reduce reduce也可以很方便得到结果,对吧?但是发现大家会发现一个问题,就是这里边是不是输入的类型和输出的类型必须是一样的呀,对吧?诶,你sum之前是二元组,输出还是二元组,你reduce我们之之前用那个呃,传感器的例子啊,输入之前是聚合之前是s reading,那你reduce聚合完之后还是sensor reading对吧?
14:20
哎,这个过程其实它的这个数据类型是不能改的,另外还有一个非常重要的问题,就是我们在做这这个过程当中,没有办法拿到窗口的信息,对吧?大家还记得我们在聚合的过程当中,你你只有一个中间的状态,另外还有一个就是当前所有数据,呃,就是新最新来的那个数据,对吧?只能拿到这些信息,那我们的这个窗口信息到哪去拿呢?诶大家就回忆起来了,之前我们说过除了增量聚合函数之外,是不是还有一个全窗口函数可以得到当前窗口里边更多的信息啊,哎,所以接下来我们就想到了,诶,那是不是得用一个全窗口函数呢?但是我们又觉得全窗口函数它这个计算起来这个实时性不好,对吧?哎,它它并不是来一个处理一个,他是把数据全攒齐了,做批处理了,尽管他能拿到的东西更多,我们就想能不能结合起来这两者呢,把它的优势结合在一。
15:20
一起去用呢,诶可以这里边给大家来介绍一下aggregate的一种特殊的用法,或者说是最为一般化的一种用法啊,这里面就是怎么去用这个aate呢,之前给大家说过这个,呃,窗口聚合的时候,你可以直接点sum对吧?呃,可以直接点mean mean by max max by,另外还可以直接调点reduce啊,直接调这个aggregate.food这几个我们说里边默认传的应该都是一个。啊,就是aggregate里边传的就是就是一个aggregate function嘛,就是一个增量聚合函数,Reduce的话,里边要传的一个就是reduce function,这是我们之前给大家说过的啊,就直接可以这么去用,直接做一个增量聚合,但是现在呢,它还可以传第二个参数。
16:08
这两个参数分别是什么呢?前边这个是还是我们想要的那个aggregate function增量聚合函数对吧?哎,我这个简写了啊,而后边这个就是我们说的那个全窗口函数window function,哎,那这两个如果放在一起之后,它的效果是什么样的呢?它的效果其实还是每来一条数据之后,调用前面的这个增量聚和函数函数去做增量聚合,来一个处理一个,来一个处理一个,然后最后如果我们要去窗口输出的时候怎么办呢?窗口输出的时候再去调一下我们这里的这个全窗口函数window function,然后在这个函数里边,我可以获取到之前已经聚合好的那个状态,另外还能够拿到窗口相关的一些信息,额外的一些信息也能够做包装处理,所以最后的效果就是我可以在这里。
17:09
前面这个aggregate function里边做增量聚合来一个处理一个,而在后边的这个全窗口函数里边呢,拿到之前的聚合结果在哎,结合窗口里边的信息包装成一个想要的书书数据类型啊,所以这里边就用到了这样一个方式啊啊,然后接下来我们来看一下就是具体的这个操作啊,这里边大家看一下前面我们自己定义的这个count hg,这就是自己要实现的一个LG方式了啊,那大家看这里边这个在data stream API里边这个调用就非常的舒服啊,就是只要重写它这个接口里边的所有方法就可以了,那大家看这里边的这个方法是不是跟我们之前讲到的table,呃,就是table API里边自定义的。Tableable aggregate方式或者是aggregate方式非常像啊,啊对吧,大家看这个非常的像啊,所以基本上tableable API里面的那个就是借鉴了这里边的这种实现方式啊,同样也是先去创建一个accumulator,那accumulator是什么?不就是当前的那个聚合状态吗?啊,所以这里边我们做一个这样的一个聚合啊,定义一个状态,然后呢,它有一个方法叫做ADD,哎,这个也是必须去重写的一个方法,这个ADD的方法,那就是每来一条数据之后,你定义当前的这个状态怎么改变,对吧?哎,大家看到这个就直接操作这个状态改变,有点像我们之前看到的那个accumulate,对吧?只不过那个accumulate并不是重写的,而是写死的,你必须去叫那个名,对吧?啊,现在这个的话,你就是重写这个爱的方法就可以了,另外还有一个获取最后结果的方法,这个叫做get result对吧?呃,就是直接把我们当前想要打拿到的那个数据直接拿到,那这里边怎么样去拿到当前的这个结果呢?
18:56
我们要要输出什么结果呢?就输出当前的这个状态就行了,对吧?因为这里边你包装不成我们想要的那个样例类,那个item view count,而是怎么样呢?而是把这个状态只要交给后边的window function,然后由它去包装就完事了,所以这里边呃,有时候我们可以说前面的这一步操作,这叫一个预聚合,对吧?就是每来一条数据先做一个预聚合,聚合出一个结果,最后给到window function里边,那window function其实它拿到的就是调了我们aggregate function里边的get result方法,对吧,拿到了它的这个结果啊,那那当然这里边实现的就是这个aggre function接口了,然后我们再看看那window function里边干什么呢?啊,这个window function它实现的是啊scalela里边的这个treat这个接口,Window function这个接口,对吧,然后这里边它主要的要实现的一个方法是apply方法,大家还记得那个全窗口函数,呃,那个window function吗?对吧,里边就是。
19:56
现一个apply方法,这里边就能够拿到当前的K,拿到当前的window对吧?另外还能拿到聚合的结果对吧?Aggregate result就是我们前面预聚合已经传过来的那个数啊,大家看这里面是个特able类型,为什么是个可迭代类型呢?因为本来这是个全窗口函数对吧?它本来预计拿的不是聚合结果,本来是要干什么呢?本来是要把所有的数都放在这儿的,就是我们说所有数据都收集齐了才调用这里的这个window function嘛,呃,所以这这个interable类型本来是要放我们的全量数据的,只不过这里没有全量数据了,就一个结果放在这儿完事,对吧?所以大家看,接下来其实就是把item ID拿出来,把count值拿出来,另外利用这个window再拿一个它的and,对吧?调用window.get and这个方法,拿出来包装成样例类输出就完事了,大家看输出的时候又用到了一个collector的collect方法,对吧,又做了一个这样。
20:56
这样的一个输出,这就是我们最后窗口输出的结果。
我来说两句