00:00
接下来呢,我们把当前项目里边的几大模块分别给大家做一个简单的分析,那我们的重点呢,会放在第一个模块,也就是第一个需求实时热门商品统计上面啊,大家会呃跟通过这一个模块,这个需求的实现,把这个设计思路捋清楚,那那后面我们写代码的时候,就照着这个思路直接做一个实现就可以了。那首先我们看一下这个当前的基本需求啊,实时热门商品统计,那大家会想到首先有一个问题,就是说你要统计是历史全部的商品的热门,还是一段时间内,近期一段时间内的热门商品统计,对吧?我们这里的统计需求是近期的热门商品统计,然后这个近期进到多长时间呢?啊,这是受限于我们的数据啊数据我们的数据量不大嘛,所以说你如果统计太长时间,你统计一个月的,那那我们这个数据只有总共只有几天的数据,对吧?那那这个就统计不到了,所以我们统计的是一小时内的热门商品,然后这里大家注意啊,我当前统计一小时内的热门商品,是最近一小时内的热门商品,我要求还有一个很快的更新频率,所以我要求五分钟就更新一次,那大家想我的更新意思指的是什么呢?
01:21
其实是就是,诶对,就是我过五分钟是不是就要更新过去这一小时之内最近啊,最新的这个热门商品,Top top盆到底是到底是哪几个,对吧?哎,所以这里边其实就相当于是比方说我八点到九点统计一次,九点钟统计出来,然后是不是09:05再统计一次,八点到零五,8.05~9.05之内的这个最热门商品啊,对吧,就隔五分钟随时去统计最近一小时的热门商品啊,那么还有一个要求,就是说你既然要统计它的那个热门嘛,那我总得有一个热门度的标志啊,什么叫做这个商品热门呢?按什么来评判呢?我们就按数据里边UKVR不是有浏览次数,有PV吗?我们就用PV的次数,每一个商品PV的次数来表示它在当前时间段内的热门度。
02:14
那整体大家看知道这个需求之后,现在大家有有思路了吗?怎么实现。其实非常简单,前面你说统计近一一个小时的热门商品,然后五分钟更新一次,这不说的就是一个滑动窗口啊,对吧,我们之前说统计的这个时间段,这是窗口长度,那更新的频率是不是就是滑动不长啊,所以我只要去开一个一小时长度滑动距离为五分钟的窗口就可以了。啊,然后另外是不是就是筛选过滤PV行为,接下来开窗啊,那那接下来是不是就是做一个聚合统计啊,统计出这个窗口内所有的商品,他们被PV浏览的那个次数,统计出一个值来,然后接下来做一个做一个排序降序,我取套喷是不是就完事了啊,这个整体思路其实还是比较简单的啊,那接下来我们具体给大家看一看这个数据结构的一个转变。
03:12
首先基于我们来了之后,所有数据在一起啊,都是data stream,那大家想首先是不是应该做一个KBY分组啊,我这里写的是分区,其实大家知道就是分组的意思啊,啊基于哈奇code重分区对吧?那KBY之后得到的是Kate Kate stream。就可以开窗了,那开一个滑动时间窗口,接下来统计每一个窗口内的数据,做一个聚合,这个聚合得到的这个值,大家会想到是不是我应该带着当前的那个,就是我应该知道当前是哪个商品,然后得知道当前它的那个统计出来的个数是多少,大家想想是不是应该这样,因为你如果当前不带着是哪个商品的话,我我就是一堆统计出来,比方说统计出来100 200、300,就输出这么几个数,那我哪知道这到底谁是谁啊?最后我的那个top n一定得告诉我哪个商品它点击了多少次对吧,热门度是多少,然后排行第几,应该是这样的一个输出,所以最终这个数据里边是不是得包含着我们当前的。
04:16
就是当前的商品信息啊啊,那另外这里边还有另外一个问题,大家看我这里边当时分组的时候,不就是按按商品分组了吗?KY,你看按照这个颜色,蓝色的分蓝色,橙色的分橙色,那后边同样是蓝色,是不是不同窗口也会有不同的数据,一个统计数据出来啊。那大想一下,那后边我做排序的时候,是要对所有的这些数据都要排序吗?是不是还是应该一个窗口统计出来的结果,所有的这个商品做一个排序啊,不同窗口统计的结果是不是就不要混在一起去去排序了?要不然的话,你是不是同样是蓝色的这个商品,你在不同的窗口是不是他要自己跟自己排一次啊,这就没道理了,好,所以大家看我们最后聚合出来的结果,其实是不是应该包含三个主要信息。
05:09
最重要的当然是当前的count值,然后另外呢,还得知道当前你的这个count到底是哪个商品,对吧?得有商品的信息,另外还还需要一个是不是窗口的信息啊,所以我最后得出来的这个数据统计的结果应该是哪个哪个时间段的窗口,哪个一小时的窗口内,然后当前的商品被浏览了多少次,对吧?是这样的一个数据结果。啊,这就是我们整体的一个思路,然后接下来针对这个在每一个窗口内再去排序是不是就可以啊,啊,这是基本思路啊,然后接下来我们把具体的过程再给大家过一遍,首先原始的数据来了之后,首先是不是要分区啊,分组对吧?按照什么字段分组呢?哎,对,这个大家要注意啊,因为一开始我们看那个user behavior的时候,大家记得第一个字段就是user ID,那有同学说那上来之后分组按照user ID分组啊,习惯了对吧?所有这个分析用户数据的时候都是按照uz ID分组。
06:07
我们现在要讨论的这个热门商品是不是跟用户就没关系啊,对吧?我们说是用户行为分析,其实这个只是一个整个平台的一个统计指标,对不对啊,其实跟具体用户没关系,所以我们是要按照商品ID分组,后边同一个商品ID,然后就取count值对吧?来一个就count加一,然后大家看啊,首先我按照这个item ID做一个分组,接下来是不是要开窗了啊,开窗我们现在是滑动时间窗口,是不是直接太window就可以了,然后里边一小时的时间长度啊,就是给了个60分钟啊,然后滑动不长五分钟,这个没有任何问题,对吧?直接设置这样的一个滑动窗口就可以了。这里大家需要注意的一点是,每一个数据它会它只属于一个窗口吗?诶不是,显然这个窗口应该有重叠部分对吧?诶滑动窗口吗?那同一个数据属于多少个窗口呢?
07:07
对,这个大家知道是不是,就是窗口长度除以滑动不长一除的话12个对吧?好,所以接下来我们就知道了,大家要注意呃,时间窗口首先有一个特点是区间左闭右开对吧,所以比方说10:10这个数据它是不是应该属于。十点到11点,10.05~11.05,另外还属于十点十分到11:10,但是不属于10:15~11:15,再后一个就不属于了,对吧,另外它也不属于。它也不属于九点十分到10:10是不是也不属于啊,啊右边开的嘛,对吧,开区间啊,所以我们就知道它最前面属于的那个窗口应该是最早的那个窗口应该是哪个。我们说最早往前面数的话,09:10,九点十分到09:15九对九点十分到10:10,这个它是不属于的,大家知道这是刚好不属于对不对,因为它不包含10:10这个数据嘛,那再往后推移五分钟,09:15~10:15,是不是就应该包含它了呀?哎,所以它最初属于的第一个窗口应该是09:15~10:15,然后五分钟一个,五分钟一个,接下来是不是09:20~10:20对吧?啊,09:25~10:25,一直往后一直到最后一个是不是就是十点十分到11:10啊。
08:41
然后大家看这个过程,这这不就是一小时之内的五分钟隔一个五分钟隔一个一共12个窗口嘛,啊,所以同一个数据会被分发到不同的窗口,我们当前的这个场景下,一个数据分别属于12个窗口。然后接下来我们就做聚合了,呃,接下来这个聚合的过程,大家首先想一下,我想要得到一个什么结果呢?
09:04
得到聚合想要的是那个,哎,对,大家想到是要item ID,然后看的次数,还要有窗口信息对吧?哎,之前我们想的是最后要得到这样一个结果,所以接下来呢,我们应该定义一个输出的数据类型啊,输输出的数据结构,比方说我包装成一个,也是包装成一个这个po类型啊,类似于抓va病,我把它叫做item view count,每个商品被浏览的那个技数值,对吧?然后里边三个字段,Item ID window and,还有count,因为大家会想到这个,我用什么来表示当前window信息呢?是不是不用那么复杂,只要有window的一个结束时间就够了,对吧?一看结束时间,当然你知道是什么时候开始,什么时候结束啊,这个比较明显啊,所以接下来我们就可以通过一个聚合操作,最后得到一个item will count类型的一个data stream。那大家想一下,我们接下来怎么去做聚合呢?
10:00
首先我们做这个窗口聚合的时候,有两种选择,一种叫增量聚合,另外一种是全窗口聚合,对吧?诶,那之前我们讲的。增量聚合是不是从效率上来讲更加的合理一点,对吧?呃,就效率上更高一些,来一个就是聚合一次嘛,真正的流处理,那全窗口函数有什么好处呢?全窗口函数是不是能拿到更多的信息啊,那大家想一下,当前如果说我只是做一个reduce,或者只做一个这个aregate的话。如果里边只传一个aity,如果是reduce function,大家知道那个数据类型不能变,对吧,那你本来包装好,比方说我们那个叫user behavior嘛,那你这里边是不是最后输出也得是user behavior啊,啊,你这个想得到I count,这不行,所以我呃,我就想到什么样的类型可以变呢?Aggreate function可以变,对吧?哎,这是首先我们想到的。但是只有一个a function的话,是不是最后我根本拿不到当前的window信息啊,哎,所以这个就比较麻烦了,那那怎么样做呢?啊,大家就想起来了,之前我们是不是有这样的一个把增量聚合函数和全窗口函数结合在一起的这种API调用方式啊。
11:11
所以现在我的操作就是前同时传两个函数进来,前边这是一个。增量聚合函数aggate方式,来一个聚合一个,来一个聚合一个,对吧,增量聚合,然后后边这个呢,全窗口函数,但是大家注意,全窗口函数本来应该拿到所有数据,但是放在这儿它就不用去拿所有数据了,而是只拿到前面对聚合的最后结果就可以了,然后我们在包装,结合当前的window信息窗口信息包装一下输出结果是不是就完事了,哎,所以这就是我们当前的这个使用的过程啊啊,那具体来说的话,Aggregate这里边。大家还记得这个aggre function,我们这个data three没PI啊,里边的这个agg function是不是有必须要重写这样的四个方法呀,对吧?啊,就是create accumulator初始值,然后大家想我这里面的状态是不是只要保存一个count那个计数值就行了,那所以这里边我就是一个长整型的accumulator,然后ADD每来一个数据是不是加一就完了,之前我们不是做过这个count吗?所以这个非常简单啊,最后是不是把这个拿呃,取结果的时候get result,把这个accumulator输出就可以了,所以非常的简单啊,实现了aggregate function接口,那后边呢,全窗口函数,这里边的核心是在于是不是要提取出当前的item ID和window and呀。
12:33
对吧,呃,就是把这两个从当前的这个K啊temple和当前的这个window里面提取出来,然后另外这个count值从哪里拿呢?当然就是当前的这个input,对吧。当前的input它本来应该是所有数据,当前窗口的所有数据,现在你放在我们aggregate第二个参数里边,它其实是拿到的是前面聚合的结果,对吧,所以是不是input里面就一个值,所以我是不是直接点next拿出来就完事了啊,所以这个其实比较简单的啊。
13:04
啊,这就是我们前面做这个,呃,窗口的统计输出的一个过程,那统计得到这个item view count啊,我们不是得到这样一个结果吗?得到这个结果,接下来我们要。是不是按照窗口然后分组,然后再排序输出套盆就可以了,哎,所以接下来我们要进一步继续做一个分组。本来我们这里面得到的它是不做区分的,对吧,你完整的直接在这儿去做一个排序的话,呃,这是不合理的,所以接下来我们首先要做一个KBY,接下来KBY的是什么。哎,对,是时间那个window and那个时间戳对不对,只要是之前我们相同窗口聚合出来的结果,它的那个window and是不是一定一样啊,不管它的那个item ID是哪个,对吧?大家还记得之前我们那个窗口聚合结果是不是到一个时间点之后,不同的K是会同时输出啊对吧?一下就好多数据都出来了,然后它的窗口都是一样的,Window是一样的,所以我是要把所有的这些数据都收集起来,然后去排序。
14:07
但这里面就有一个另外一个问题啊,有同学可能想到我这里边已经批败了,那后边怎么样去做排序呢?大家想到排序的话,这个就有点麻烦,这不像我们之前求和或者count来一个处理一个就完事了,对吧?那你说要排序的话,我我什么时候排才算给他排完了呢?什么时候才有这个top n了呢?哎,我们应该想到的是不是应该还是要等到当前这个窗口,所有数据都到齐了,然后我直接给他做所有的数据做一个排序输出top盆就完事了,那问题就又来了,这现在我的数据理论上是同时输出的,但是我现在流处理啊,流处理是不是形式上还应该是一个一个出来的呀?哎,那你说我到底是碰到哪一个的时候,这就算是。就算是所有数据都到齐了呢,这个就很难理解对吧,就是这个怎么样去做呢?我们现在还是一一条数据一条数据啊,这个现在的数据就是这个聚合结果嘛,Item view count,那来了一个这个抗ITEM1啊,这个商品一的一个技术结果是四啊,那现在就可以结结束了吗?那当然不是啊,那假如说又来了一个二,这就可以结束了吗?那当然还有可能要来其他的商品的统计结果,对不对,我并不知道当前这个窗口内统计的商品有多少个,对吧?这就很麻烦,那怎么办?
15:32
哎,有一个很简单的想法,大家会想到我是不是稍微的等一段时间,是不是把所有数据等齐了就完事了,这还是之前我们那个watermark延迟的思路,对吧,我就等他等,等到这个时间不就完事了吗?那这里边就有一个问题,我到底是等多长时间呢?哎,可能有同学说,那这个简单,你你等上一分钟肯定都来了,对吧,但是这个是不是代表我们的延迟就太高了呀,对吧,你要本来五分钟输出一次,结果你最后的结果是延迟了一分钟才输出的。
16:06
所以这里边我们可以有这样的一个做法,就是首先我可以设计一个状态编程的思路。我首先是什么来了,所有的数据聚合出来的结果,根据窗口先做一个分组对吧,然后每一个分组对应的是不是都设置一个list state呀,把当前的那个数据是不是全存进来,存成一个列表就一直存着,来一个就存一个,来一个就存一个,全存进去。但是我要干一件什么事呢?我要注册一个定时器,这个定时器就是要延迟,稍微延迟,等它所有到齐了触发对吧,我注册的定时器呢,大家注意啊,我就注册一个啊,这这就涉及到我们整个的这个流程了啊。我就设注册一个当前的window end结束时间,然后再加上100毫秒,或者甚至我可以再短一点,我加上一毫秒是不是都可以啊?
17:03
因为大家想一下,我当前设置这个的话,我我只要把它设置成什么,就可以设置成事件时间触发是是不是就可以了呀?如果是事件时间触发的话,那是不是就应该是wal mark到了这个程度,然后就会触发我当前这个定时计算,那么我现在wal mark既然已经到了window and加100或者是加一了。那大家想是不是我前面water涨到window and,是不是就前面窗口肯定全输出了呀,哎,所以现在你既然已经过了这个时间点了,当然所有数据都到齐了,对吧,这个时候出发就可以把所有数据收集起来进行排序输出了。啊,这就是我们整体的一个思路,所以大家会发现,现在是不是我们就需要用到状态编程,还要用到定时器,那是不是必须得上。Process function了呀啊,必须得放大招了啊,那大家再回忆一下process function,呃,它是我们的底层API,那既然是现在我们是针对分组,而且是开窗之后做的这个操作,那我们现在是不是必须是一个kid process function啊,每个每个键,每个窗口里边都要有对应的这个输出对吧?啊,那所以接下来我们是以window and作为K的保证,我们排序的都是同一个窗口内的所有的数据啊,那我们具体的操作就是。
18:21
注册一个定时器到点的时候,呃,就是数据都到齐的时候再去出发,那出发的时候是不是从Lisa set里边把所有的数据拿出来排序就好了啊,所以整个的实现思路还是比较简单的啊。呃,那大家再回忆一下,就还有我们必须要实现的一些方法,是不是有生命周期啊,因为k process function是继承自rich function的,对吧?它是一个负函数,所以有生命周期,Open close这些都有。另外最重要的方法就是process element,每来一个方法是不是就要每来一个元素就要调这个方法,对吧?我们这里边主要就是把这个数直接扔到状态里面就完事了。
19:01
然后另外得注册定时器啊,还有一个方法叫on timer,那当然我们就是时间定时器,触发了,说明所有数据都到齐了,是不是把状态list里边的数据都拿出来排序输出套盆就完事了,哎,这就是我们完整的一个处理流程,画出这个程序处理的结构图的话,就是这样一个过程,对吧?Open生命周期里边,我们把对应的那个状态先创建出来啊,然后接下来每来一个元素调process element,把它都添加到这个list set里边来,然后呢,注册定时器,设置这个定时时间window and,再加一点点,对吧?然后到时间的时候,事件时间吧,Water mark到了的时候,我们直接触发拿到当前所有的数据排序输出。这就是我们这一个需求啊,实时热门商品套喷的一个处理思路,呃,另外还有一些其他的指标,那我们给大家还是类似的过一遍啊,比方说后面第二个指标是统计这个实时的热门页面,呃,大家知道这个热门页面的话,这不也是一个top喷吗?你既然有热门,那是不是肯定有一个比方说统计出它的这个呃访问量,然后做一个排序,对吧,输出这个top n,所以整体实现思路跟前面完全一样,只不过这个例子呢,我们会把它换成。
20:20
数据源不是有点behavior了,就是我们现在如果没有对应的那个日志数据的话,买点日志的话,那怎么办呢。可以直接从web服务器日志里边,它的那个对URL的访问,是不是也可以代表当前的这个页面的,呃,热门度啊,哎,所以我们就用这个来指标做一个筛选啊啊,那再往后还有这个PV和UV,这个大家就更加熟悉了啊,我们可以从买点日志里边选取里边的那个PV操作,这不就相当于是可以统计PV的嘛,本来它叫PV嘛,那另外还有就是UVUV的话是是不是要对相同的用户做一个去重啊,这里就涉及到我们对于uz ID的一个去重操作了啊啊,常规的想法大家会想到怎么样去重呢?
21:04
直接放在这个,呃,首先最简单的其实是放在我们内存里边的一个set结构,是不是就去重了对吧?你直接放在set里边,它那个把把当前的这个uz ID啊直接塞进去,这不就驱重了吗?啊那另外大家想到如果说我们当前这个内存里边放不下的话,因为大家想这个相当于我们那个状态是不是很大呀,那是不是可以扔到里边去做驱虫啊,就是挺呃很常见的啊,也是容易想到的一个进行驱虫的一个扩展的方案啊,但是这里还涉及到另外一个就是假如说超级海量的这个数据的话,后面我们会给大家具体去分析啊,就是在什么场景下就里面也放不下了,那怎么办呢?那就得啊,有同学说那可能放h base对吧,放放更大的地方,那我们还要实时的呀,对吧,我们要要那个查询起来要要更快才行啊,所以这个其实是有一些数据结构和算法上的优化的,我们可以用一个什么样的数据结构呢?这个叫布隆过滤器。
22:04
可以用这个去做一个驱虫。啊,这个后面我们讲到再说,然后另外接下来的这个统计类指标,还有这个市场推广的统计,这主要就是我们可能按照这个APP下载量对吧,有不同的推广下载渠道,我们统计出来哪个渠道下载量更多,就可以去,呃,去制定一些接下来的这个营销策略了啊,所以它是跟这个市场营销相关的一些统计指标啊,那除了这个之外,还有这个广告页面广告的统计,它也跟这个市场营营销有关,因为我们可能涉及到这个每个页面上的广告投放,呃,也包括这个广告计价,对吧,你怎么样去给这个页面的广告去定价啊,这些都是跟页面广告的点击量有关的。那在这个里边呢,又涉及到另外一个问题,就是假如说你统计它的时候是靠点击量去给他计价的话,那就不排除有些人发现了这一点,是不是要给你刷单啊,并拼命的刷点击量,对吧?啊,这这类似于一个刷单行为,所以我们想把它检测出来,把这个用户添加到黑名单里面去,那这个需求又怎么做呢?
23:10
啊,这个需求就是后面我们要自己定义一个process function去做一个这样的过滤了,对吧?因为大家会想到这个操作可能会比较复杂,而且你过滤完了之后,相当于这个用户加入黑名单,是不是应该是一个一个测输出流啊,哎,所以这里边当然是要用到process function啊,这个是关于我们前面统计类的指标,然后关于这个页面,页面广告统计的时候,这个刷单行为检测加入黑名单,这其实已经涉及到一些。这其实已经属于风控类指标了,对吧?哎,所以这只是同一个需求,但它其实用同时用到了统计指标和风控指标,然后接下来的内容呢,那主要就是跟风控相关了,比方说恶意登录啊,那大家知道这个如果要是短时间内连续登录失败的话,比方说我们这里面给一个限制啊,同一个用户如果两秒之内连续两次登录失败,我就直接就检测到就报警。
24:05
那这个需求怎么样去实现呢?呃,自然我们也能想到。类似于之前我们做那个传感器温度上升的那个检测啊,可以去设置一个定时器,对吧,那这个当然就是要用这个process方式来去做一个实现了,那后面我们会想到还有更加现成的类库可以实现。相同的功能,那就是所谓的CP,它可以做这个复杂事件处理啊,这个是后续我们要讲的内容。呃,那再往后呢,还有这个订单支付的一个实时监控,这主要说的就是用户下单一段时间之后,应该订单要设置成失效的,对吧?啊,我们传统来讲这一部分内容其实是业务系统需要去做的,对吧?啊,我们有时候可能直接把它丢到那个release里边去,Release可以设置那个呃,超时时间嘛,TTL对吧?啊然后接下来如果说他这个超时了之后,可能业务系统那边还要做一个查询啊,做一个这个轮询或者说呃,实时。
25:04
检测一下它的状态,然后发现他如果失效的话,可能更改更改对应的那个字段就完事了,我们现在相当于是把这个业务功能直接用flink相当于承接过来了,我们想把它直接检测到,然后给他一个输出报警啊,那大家也可以想到怎么去做呢。哎,同样也是可以用process方式去做,也可以基于capp去做。啊,最后还有一个是实时对账,就是我们说的订单支付了之后,最后我还得再看到底那个账户到了没,对吧?呃,我们认为的那个订单支付成功,就是第三方支付平台给我们返回信息,他说交易成功就交易成功了,那我们得看一看到底到账了没有,所以这个需求涉及到的是一个。两条流要做join,要做合并了,对吧,要做合流操作,然后去做匹配判断了啊,所以接下来我们这个需求要给大家用connect和Co process function去做一个处理啊,当然了,这还涉及到就是关于呃,Flink里边留的draw影的这种操作,我们也可以到在后边给大家做一个扩展啊,讲一讲怎么样用状语实现类似的功能。
26:12
这就是关于我们所有模块、所有需求的一个分析和实现思路。
我来说两句