00:00
然后接下来呢,我们要基于之前已经实现的这个广告统计,点击量统计的这个代码再做一些改进,我们要考虑一个新的需求,这个需求就是说如果用户出现大量的刷单行为的话,要做一个过滤,而且把当前就是做了刷单行为的这个刷广告点击量行为的这个用户输出到黑名单,输出一份黑名单,把它要添加进去,所以这就相当于有一个风控的指标了,那现在这个指标到底是一个什么样的需求呢?啊,那我们说的就是前面我们统计的时候呢,其实就是根本不做任何的筛选,对吧?所有的用户对于广告的这个点击行为,我们都会收进来,都会在count里边加一贡献一次点击量啊,但是我们现在如果说一个用用户啊,大家考虑这样一个场景,就是看一看我们这里边的这个测试数据,大家其实也能够发现啊,这里边这个测试数据啊。
01:00
这个ad click log里边大家会看到啊,我们当时输出的这一个值里边,好像就是这个北京它统计的这个数据特量特别大,对吧?啊,120多,那这个数据量到底是合理不合理呢?我们如果要检查一下这个北京这个数据的话,大家会发现这里边其实大量的都是重复的数据,对不对,都是同,都是同一个用户,然后特别是他还点了同一个广告啊,因为你如果要是说同一个用户,假如说这个用户就特别喜欢点广告,对吧?啊,不停的在那点不同的广告,那也算的,但关键现在是同一个用户,他就怼着一个广告在那使劲点,那你想这能代表他对这个用,对这个广告就是特别感兴趣吗?好像不能,对吧?哎,所以这里边我们其实就可以判断他可能这个行为是有问题的,哎,那在这个过程当中,我们可能就要设置一个上限啊,比方说这个上限呢,可能大家会想到还需要有一些限制条件,你比方说你设立一个上限。
02:00
说哎,我说呃,你用户点击这次次数,同一个广告点击超过十次之后,我就认为你有问题了,我就要报警,但是你说这个难道说呃用户我们这个广告假如说啊,你挂在这个网站上时间很长,挂了几个月,挂了几年,那难道说这个用户他他一直点这个是不会累积到十次吗?是有可能累计到十次的呀,哎,那有同学可能说,诶,那那我是不是应该要开窗口去做这件事情啊,就是统计这个某一个窗口内的数据呀,诶所以接下来类似我们要做一个时间段的一个判断,就我并不是考虑用户从头到尾历史数据里边,这个广告点击超过十次,我就要报警,而是我这里边可能是比方说我们要求就是一个自然日之内,对吧,一天之内,他如果对这个广告的点击量超过了十次,哎,或者说我们定义大一点啊,就是放宽一点,超过了100次,我就认为它有问题了,要报警,然后把它过滤掉,哎,这是一个。
03:00
可行的一个选择,然后大家会想到是不是相当于到了这个每个就是自然日,我们到零点到24点的时候,就要做一个清清空清除啊,啊对吧,就相当于在这个过程大家怎么想,是不是类似于我要保持一个状态,在这个过程当中不停的count,不停的count,对吧?然后一旦达到上限,比方说我这里边设了一个100是上限,那到了100之后,我这个就count值是不是就不应该再增加了,而且我要做一个行为,就相当于我有一个filter,就把这个用户数据就过滤掉了,后边我做那个分组的,按照这个province啊,做分组统计的时候也不要把它统计进去了,对吧?啊,就是有这样的一个控制,但是这个状态呢,你又不能一直都放在那儿,对吧,你不能说是这个到了100之后,一直就是一就是100了,然后你到明天的时候,他还是100,那那用户再点一次还是不行,我应该要把它清空掉,因为用户有可能是在不同的天数。
04:00
库里边它有可能会重复点击对吧,这是完全有可能的啊,所以我们这里边要做的操作其实还挺复杂,大家想一想啊,啊,就是我要首先要做一个filter,那我们想你直接filter不就完了吗?但是呢,我们可能还要涉及到状态,我需要去保持一个状态,表示用户当前对这个广告点了多少次一个count值,那那大家可能想到了,之前我们不是想到这个每一个算子都可以有状态吗?对吧,我用一个这个reach filter function,那或者说之前我们不是还讲到K之后可以有那个filter with state嘛,之前我们讲的是那个map with state,对吧?Fla map with state,那现在我们用一个filter with state不不也可以吗?对,这个可以实现,但是大家想到你后边这个要清空这个状态的时候,你要每天的就是24点对吧,零点钟的时候准时清空这个状态,这个需求怎么去实现呢。这个就有点麻烦了,对吧,你直接要是这个filter with state好像做不了这件事情啊,它只能是你定义好我们那个状态,然后就不停的叠加,不停的叠加,对吧?你定义那个状态怎么去更改,没有办法定义这个在某个固定的时间段把它清空掉,哎,那所以接下来我们就会想到怎么去定时做清空操作呢?定时器对吧?对,非常好啊,所以我们要注册定时器做这件事,那是不是就必须要用到一个process function了啊,那所以接下来我们的这个需求,最后啊,又要实现一个process function,而且另外我们还想到当时不是说这个达到100之后,就比方说达到那个上限之后,还需要把当前,比方说用户输出一个黑名单,对吧,那你这个黑名单是怎么输出呢?因为大家想到我们当前是一个filter嘛,Filter正常来讲是不是应该是把不让它出现的这个数据,把它滤拦截住,把它过滤掉,然后正。
05:58
正常应该有输出是我们放行的那些数据对吧,那就正常的数据应该把它放行,作为我们的输出要放行的,那你现在如果要再再要输出一个黑名单,那得怎么去输出呢。
06:12
测输出流是不是啊,所以接下来其实我们这个很简单一个需求,其实我们会发现之前process function里边的很多应用的这个例子啊,都在里边有了一个实现啊,就是我们说的可以做状态编程对吧,可以去注册定时器去做一个定时触发的操作,另外还有一个就是测输出流,假如说我们出现这种检测到符合条件,那就输出到测输出流做一个报警,就我们是把这个黑名单输出到测输出流里面去了,那接下来我们在代码里边给大家看一看到底应该怎么做这件这件事情啊,首先大家看一下我们这个代码里边整体的处理流程,前面其实我们已经做的这个操作是什么?就是转换成这个数据读进来之后转换成样例类,然后后边直接就去做这个开窗聚合了,那那我们现在如果要做过滤的话,应该在哪一步做过滤呢。
07:07
大家觉得这个在哪一步做过这个比较合理?其实哎有有同学说,诶应该在这个KBY之前是不是对吧?因因为你前面这个数据如果就是当前我们已经卖成样率类之后,已经读进来之后,其实就应该知道当前是哪个用户对哪个广告做了点击,那我就可以判断了,对吧,我保持一个状态判断是不是超达到了那个上限,达到的话直接把它滤掉,你就不要到下边再去分组开窗统计了,对吧,那后边我们统计的那个数据就会少减减少掉这一部分内容啊,那所以接下来我们是要在这儿中间要插入一部操作,哎,这里啊,我们插入一步过滤操作,并将刷,呃,就是有有刷单行为的,有刷单行为的用户。
08:13
添加,呃,就是输出到测输出流,测输出流,呃,就是我们的所谓的那个黑名单对吧,黑名单报警啊,就是这样的一个过程啊,所以接下来我们其实在中间这里边加一步啊,比方说我这里边定义一个,哎,这个叫呃,Filter,我我这个相当于是添加一步filter操作,另外呢,还要输出一个黑名单输出到侧输出流,所以我叫filter blackist user stream,稍微有点长啊,但是大家知道是做了一个什么操作,就可以基于之前的这个定义好的样例类的这个流,对吧,Ad love stream,然后接下来呢,哎,做一个大家可能知道我要做一个这个filter操作,但是现在我不是做future操作,我做的是process,对吧?然后另外还有一个问题,就是我后边要去做那。
09:13
个状态判断的时候,大家想啊,是不是要针对的是什么呢?每个用户然后点击了同一个广告的时候,然后给他保保存一个count数量,那这个状态大家想想是不是应该只是针对同一个用户和对应的那同一个广告有效啊,你不能说是每来一个数据,我都在后面那个count加一对吧,我必须是针对当前的用户和当前的count,呃,当前的那个广告才有效,那所以这种场景我们应该定义什么呢?定义的状态应该是kid sit对吧?分组状态对不对?所以是不是前面应该先做一个分组啊。前面如果我就针对当前的用户ID和广告ID做了一个分组的话,后边定义的状态是不是就只是针对当前这个分组有效了,就不会再去直接叠加了,对吧?哎,所以接下来我们这个操作相当于是比较,呃,就是用了一个这个利用了这个K败的过程啊,所以把后边的这一个状态就可以直接局限在当前用户和当前广告啊,那接下来我们把这个定义出来,那当前这个data根据什么来做分组呢?这个我们也知道,想要的应该得有得是一个二元组,对吧?我提取的应该是user ID和ADD,那我用data.user ID和data.a did把这两个提出来,然后后边直接做一个process,里面传的是一个key的process function啊,里边我们可以做状态编程,可以注册定时器,另外还可以做测出数流啊,这个我直接把它叫做呃。
10:52
Future black list you the result啊,其实就是做了一个这样的一个,就是获取这样一个结果的操作,对吧?然后后续的操作是不是就直接基于这个做过filter之后的这个流再做操作就可以了,所以大家看我是在这中间把它插了一步放在这儿,好,当然这里边这个类型还不匹配,因为这里边我并没有定义好嘛,对吧?啊,那大家可以知道我这里边主流应该输出的这个数据结构应该是什么呢?
11:28
因为主流我做的是一个过滤,那是不是是样例类的话,还应该是样例类啊,对吧,就正常的数据我应该直接放行,数据类型不改变,所以本来这里边主流里边啊,还是基于ad click log这个样例类类型去做的转换,那这里边就涉及到一个测输出流里边我们是要输出那个黑名单报警信息的,对吧?哎,所以要对针对当前的这个侧输出流的信息做一个单独的定义,那比方说这里边我们还是啊定义一个his class啊,那现在我们这个就是测殊流定义成black list对吧?呃,User,呃,就是做一个warning吧,我们把这个当成一个报警信息warning信息对吧?那我们关于这个报警信息关心的是什么呢?当然就是哪个user user ID场整型,然后哪一个aidd点了哪一个广告,对吧,这个用户有问题,可能这个广告也有问题啊,然后另外我再来一个。
12:29
诶,一个一个字符串的一个输出的一个报警信息就完事了,对吧,我做一个报警提示,那这里面给一个message string,诶把它放在这儿,这就是我们要输出的这个样例类类型,所以最终我要求得到的啊呃,就是整体来讲我们这个count,正常来讲最后的输出是这个样例的练习,而这里边定义的啊,大家看这是测输出流,报警信息就是黑名单报警信息样了一类,所以他俩不一样啊,那后边同样你这里边就是我们要最后要打印输出的时候,这个是当前我们的count result啊,然后另外还应该得有一个,大家知道还应该得有一个测输数流,对吧?那测殊流怎么样去获取呢?当然是基于data stream去调它的get s。
13:29
Outp put那个方法了,哎,那大家说是基于哪个流去get呢?注意大家不要直接基于ADD count result这里边去get,因为它这里边它的转换操作里边并没有定义对应的那个特殊流对吧?哎,我们是在这里啊,这个filter blackist user stream,它里边这一步process里边定义了测殊特殊流,所以要基于它去做转换啊,所以这里边我们把这个拿出来,它去get side output里边大家知道要去传一个output tag对吧?这里边的类型是什么呢?类型是我们定义的特殊出流的这个warning类型,然后这里边给一个比方说我们这个就叫warning吧,好,这是这样的一个过程,对吧,然后print打印输出,比方说这个我还是就叫warning啊,显示的时显示的时候好看一点啊,这是这一部分内容,这是整体的处理流程,我们就定义好了。
我来说两句