00:00
在已经对广告的点击量啊,按照不同的省份做了一个基本的统计,那大家其实会发现在之前点击的统计这个点击量的时候,我们是不是根本没有对这个用户进行去重的操作,对吧?啊,那大家可能就会想到,那对于这一个用户点击而言,我们好像是没有必要去重啊,啊,因为假如说这个用户他就对这个广告特别感兴趣,那确实是有可能会点好几次的,对吧?啊看完了之后再看啊,这个是有可能的,但是大家想到如果说。用户本身或者说其他的一些平台,他就知道当前我们是这个点击量是会统计起来去做这个定价分析判断的,那就会有动力想要给我们是不是刷一个点击量出来啊,哎,所以大家看到这就是类似于一个我们讲到的平台里边的刷单行为的一个操作,只不过这里是在刷广告的点击量。像我们前面看到的这个运行结果,我们看到某一些窗口里边,北京这个,呃,省份里边统计出来这个数据量的,明显比其他省份的那个数据量大很多,对吧?诶,所以大家其实有这样的一个理由去怀疑,就是说当前这个是不是当前是某一个人对吧?或者说就是某些特定的这个特某几个人拼命的在那刷某个广告的点击量刷出来的呢?诶这就是我们值得考虑的一个问题了,所以我们现在考虑的就是如果出现这种情况,比方说我设置一个上限,假如同一个用户点击同一个广告。
01:32
如果超过了100次的话,大家想这个数量已经很大了,对吧,如果超过100次的话,我有充分的理由相信这个用户一定是有问题的,那是不是我就应该做一个报警输出啊,输出一个报警信息,然后呢?呃,这个时候我要提示一下当前这个用户可能有问题啊,那至于说后续的操作,我到底是应该比方说把这个用户直接就封号对吧,不让他做继续操作了,还是说比方说发一个这个短信推送啊,说诶您当前账户可能有异常,这个就是我们后续的这个业务逻辑了,所以我们现在要做的就是检测到这样的行为,然后输出一个报警,那大家会想到当前在这个业务逻辑里边,我输出这个报警信息的话。
02:17
我当前的主要业务,业务逻辑还是要统计分省份的那个广告,广告点击量对吧?哎,所以我这里面并不是最终要输出一个报警信息的,那我这个报警该该怎么输出呢。这个报警就相当于是一个黑名单对吧,就是当前这个用户,我就直接把它添加到黑名单里边啊,如果他这个在刷单的话,那自然大家就想到了,我是不是可以做一个特殊出流,做一个黑名单的报警啊。也就是说我们当前主流里边,整体来讲还是按部就班,一个一个一个数据往后面传递,对吧?呃,得到的这个数据,呃,然后我们看这个sta ad statistics by province这个代码,那前面我们得到这个数据之后,后边KBY,然后去开窗去做聚合,这个流程是不变的,关键就是在前边我可能要单独做一个判断。
03:11
就是当前用户是否点击已经超过了100次,对吧,那超过100次的话,大家想我当前这个用户的点击数据是不是也就不应该到后边去做聚合统计了。啊,对吧,啊,你后边这里边做聚合统计的时候,我就应该刨除掉它这个,呃,异常的这些点击量啊,就不要再贡贡贡献这个点击量了,所以整体流程看起来的话,这就有点像一个。是不是有点像一个过滤操作一个filter啊。啊,所以那大家想一下,我当前这个需求是不是直接用一个filter就能搞定呢。首先我们想到filter的话,那是来一个数据,直接就判断这个数据里边有什么字段,然后去做过滤,我们现在显然不仅仅是要这个,我是不是还得判断,首先来了一个数据,我判断他的这个哪个用户,然后点击哪个哪个广告,对吧?然后接下来是不是我还得知道他之前点了多少次啊。
04:06
然后我才能,诶又点了一次,我判断是不是超过那个上限了,如果超过上限,诶,那就应该黑名单报警了,对吧,那如果不超过的话,是不是就正常把这个数据输出到后边,让他去开窗做聚合统计啊,哎,这是这样的一个思路,所以在这个filter的过程当中,其实是用到了一个状态的。那如果想要用状态,我们想到那至少就得用一个reach filter function对吧?啊,用一个这个负函数就可以自定义状态了,但是大家想到如果要是有这样的一个rich function,状态是有了,但是我们输出是不是正常情况下是要主流,是不是要正常输出到把这个数据原封不动的输出到后边来啊。对吧,如果要是出现异常的话,是不是应该把它拦住过滤掉,但是要输出一个侧输出流的报警信息啊,所以大家看现在这个主流和侧输出流又分开了。
05:01
那我们一般的瑞士方能做到,能做到测数数流吗?好像也做不到对不对,哎,所以当当当前我们这个需求就只能用一种方法了。那就是传说中的大招底层API process function,对吧,就只能用它来做一个调整,所以接下来我们呃用这种方式给大家做一个实现啊,看一看基于这个呃,用户点击对于广告点击量的这种行为啊,刷单行为,我们做一个检测和黑名单过滤报警。啊,那接下来大家想这一步操作应该差在哪里。是不是应该在开窗聚合之前啊啊,所以前面的这一步我们就把它叫成了第三步,上边再加一步,这步操作是第二步,我们管它叫。啊,对同一个用户点击同一个广告的行为进行啊检测。
06:07
黑名单报警对吧?呃,还要做这个报警,那大家知道这里边我们要用的就是一个process function了,那那前面的这个流我们首先基于这个ad click event stream是不是基于它首先要做,哎,大家想一下,现在我要分组吗?要K吗?因为后边我要定义那个状态的时候啊,大家会想到是不是当前是同一个用户,然后我应该是点击同一个广告对不对?呃,因为这里边就是涉及到如果说你是这个不同用户的话,那自然就没有报警的这一说了,对吧?那点击量大这个完全没有影响,那如果是同一个用户,那他点不同的广告是不是也有可能有有用户就是这个偏好,就喜欢看广告啊,他就不停的点,对吧,那超过一定数量的话,他也没问题啊啊所以我们要统计的是这个广告到底被点击了多少次,对吧?我们按照省省份划分这个广告被点击点击了多少次,然后安排我们当前的这个定价策略,所以说我们要统计的是当前同一个用户对同一个广告点击的这种次数,所以我们设置的这个状态是不是也应该是一个分组状态,就是一个我们所说的kid state啊。
07:19
那么当前的K是不是就应该是同一个用户和同一个广告啊,所以大家看这又是一个组合K对吧?啊,因为我们说定义了那个k state的话,它只针对当前K有效对吧?我们在处理的过程当中就不用再去判断是不是同一个用户同一个广告了,那接下来就直接做K败这个KBY里边是不是直接给字段啊,当前的user ID以及a did对吧?所以我们是基于用户ID和广告ID做分组。
08:00
然后接下来。那下一步操作是不是就可以直接做一个process处理了啊,所以这里边大家也知道了,我们要用的是一个key的process方式,对不对啊,就是做了这个分组之后的啊,里边就可以定义k state啊,可以定义这个分组状态了,所以我又一个当前自定义啊,我管它叫做filter过滤黑名单,呃,就是黑名单用户嘛,对吧,Filter。Black list user对吧?啊,这样的一个处理处理的过程,然后这里边我可以直接传一个参数,就是当前我们定义的那个点击的上限是多少对吧?比方说我们这里边定义的是100啊,但是这个数量是很大了,一般情况可能也也不要点击那么多次对吧,你可能设置个十或者20也差不多了啊呃,因为大家想那个同一个用户点同一个广告,你如果拼命点的话,这是个很少有说正常用户他点一个广告要点十次以上是吧?呃,所以这个就看大家具体的业务需求怎么样去定义这个逻辑了啊,那这里面我把这个定义出来,管这个叫做。
09:07
这个我叫做啊future。ADD click stream对吧?因为大家想到我当前定义的这个是不是最后流出来的数据,还应该是正常点击的数据啊,所以大家想到这里边我最后输出的结果应该是什么?主流里边输出的结果是不是还是应该是ad click呀,对吧,就本身我的那个点击事件是不是照常从这里边流出就可以了。我这相当于加了一个过滤器嘛,对吧,如果说呃,我这里边点击的这个行为没有触发报警的话,不是刷单行为,我是不是直接就直接就顺着这个过滤器就直接出去了,然后在后边再做一个统计报警,对吧?所以这里边后边这个result stream的话,就不再基于前面的ad click event了,而是基于filter ad click stream,对吧?啊,中间差了这么一步啊,做这么一步操作,所以接下来的关键就在于怎么样去实现这样一个自定义的处理函数了。
10:12
好,我们在下边把这个写出来,实现自定义处理函数public static class啊,我们把上面这个还是copy一下啊,这个名称copy下来。然后大家知道接下来它既然是一个process function,那当然就应该不是implement,而是对吧?然后接下来给一个当前继承的是K的process function,它里边的泛型大家还记得KIO对吧?呃,当前的K是什么类型呢?因为我们是两个字段,是不是肯定是元组类型啊,Temp类型,然后接下来输入的数据类型是不是ad click event啊,输出的类型大家注意是不是也是ad click event呀,诶,对吧,因为它是就是一个过滤器嘛啊对吧,只是在某些场景下要把它拦住啊,那正常情况下都是主流直接输出就完事了,所以还是ad click event,所以如果我们这么一写的话,大家看到上边的这个类型啊,除了这参数我们还没定义对吧,别的类型都就都对了,而且后边这里边我们的处理是不是也都对啊,因为这里边我们要的就还是得得是一个ad click event类型的数据输入,然后得到一个这个呃,Count view嘛,所以接下来我们实现这个啊,首先定义属性,这个属性是一个点击次数上限,对吧,最大能点击多少次啊,所以。
11:48
我把它定义出来private就定义一个integer,当前的这个叫count,呃,我把它叫upper bond吧,大家知道upper bond就是上线的意思,对吧。好,然后我们把这个constructor定义出来,然后除了这个上线之外,接下来大家想到是不是还应该定义一些状态啊,定义状态那主要就是保存什么,是不是就是当前用户对某一广告。
12:22
的点击次数对吧,主要就是保存这个,大家想这是不是应该是一个。我们定义k state,这应该是一个value state对吧,就只有一个值。Value。把它定义出来,里边保存的类型count值嘛,那我们就定义成长整形吧,啊,里边给一个名称,当前这个就叫做count state,另外我们还想定义什么状态呢?呃,大家再仔细思考一下这个处理流程,我们在后边就是判断啊,当前这个用户如果已经达到了这个count上限的话,那大家想是不是当前我就应该是把他拦住,首先是不往后输出对吧,然后是不是接下来要触发那个报警啊。
13:06
但是大家想,如果说这个用户他就是如果说我们没有把它封号的话,接下来是不是他可以继续不停的点啊,那如果已经达到这个上限之后,是不是他点一次我们就得报警一次,点一次报警一次啊,所以这里大家会想到,对于这个同一个用户,如果已经把它输出到这个黑名单报警过一次的话,那是不是可以就不再报警了呀,对吧,报警过一次了,那接下来是不是我只要把他拦住,不要不要让他再输出到这个后边的这个窗口统计就可以了,那这里边这个报警的话,就不再去继续重复报警了。所以大家想到,那怎么判断它到底报警过没有呢?是不是还得有一个标志状态啊,所以这里边我们再定义一个一个标志。标志状态。保存啊,就是当前用户是否已经被呃发送到了。
14:08
这个我们所说的这个黑名单里面测试流对吧,黑名单里那同样,当然这个也是一个value state了。Value it里边保存的当然就是一个布尔类型的值了。Bull,我们就把它叫做is sent state,对吧,是否已经发送啊,这就是我们基本的对于这个状态的定义,然后接下来当然是要在open生命周期里边。Get runtime contact在运行上下文里边获取状态句柄对吧?啊,所以首先是这个count set gettime contact,然后get里边要你一个value state script描述器,接下来里边的定义,这个就是我们自己定义啊,当前的这个count啊。或者说我叫这个ad count对吧。
15:00
它的类型是长整型long.class然后这里边我们还可以直接给一个初始值,因为大家知道一开始上来它肯定是nu对吧,那我们不想判断那个nu的话,初始应该是多少。零对吧,这个count值嘛,当然就是零了,然后接下来另外还有一个is set set get,同样还是get wrongtime cons get set里边我们又一个value state script,接下来里边这个是is sent这样一个标志,我们要是给的是boing.class。当然了,也可以给一个初始值,当前初始值应该给false对吧,一开始当然是没有没有直接,呃就是传传递到输出到测出流那个黑名单里面去啊,这就是我们一开始基本的一些定义,然后接下来关键的当然就是具体的处理流程了,那是不是应该有一个每一个数据来了,应该有一个process,调一个process element方法,在这个process element方法里边,我们想要做的事情是不是主要就是要对,是不是要判断当前啊,就是这个用户啊对。
16:12
呃,同意广告的点击次数,那大家想是不是,如果不够对吧,不够上限是不是就就加一,就是count加一对吧,Count加一。然后正常输出就不做任何的改变,对吧,直接把原封不动的这个数据正常输出就可以了,正常输出那如果要是达到上限呢。达到上限是不是直接直接过滤掉,并是不是测输出流报警啊,测输出流啊,就是输出这个黑名单对吧,输出黑名单报警啊,这就是我们整个的一个处理流程啊,啊,那这里面的关键当然就是要判断这个count值了,那大家想是不是我首先应该把这个count值拿出来啊,对吧,我们首先获取。
17:15
当前的count值,那这个获取的话,比方说我定义一个current count,就从count set里边value拿出来,对吧,这是获取值读取值的这样一个过程,然后接下来那就要做这个判断了,对吧?那那大家想是不是就是直接就是if。我们当前定义的那个上限current count,如果要是达到大于等于啊,如果达到了这个上限的话,我们那个上限叫做count upper bond,对吧?如果达到了这个上限,那是不是接下来就要加入黑名单测试这个报警了,那这里面是不是接下来还得做一重判断啊?啊,因为就是当前这个把它过滤掉是肯定要过滤掉的,但是是否要输出到黑名单报警,这个还要看是不是判断是否输出过啊,对吧,是否输出到黑名单过。
18:16
如果如果没有的话。就输出对吧,就测输出流,输出到侧输出流。所以这里面我们要做的是if,前面我们没有直接取那个状态值,这里边我们直接取吧,Is sand state啊,其实大家知道它点value里边就是一个布尔类型的值,如果它为true的话,那是不是不要做任何事情啊,那我们现在是不是如果它为false的话才要做下面的事情啊,对吧?所以这里边是如果false的话,接下来那就要做一些测试物流了,首先我把这个是不是应该状态应该更改一下呀,对吧?做一个update啊,把它制成处对吧?如果是false的话,我们这个更新状态。
19:11
然后接下来当然就是测输出流,怎么输出Ctx.output大家还记得这个这个做法吧,对吧,做这样的一个输出啊,那这里边我要去你有一个output tag了。Out啊,当然这里边我输出的这个数据类型,这可以有进一步的定义了,那大家想我当前想要输出什么样的数据类型呢?呃,这个就当前其实应该单独有一个对应的这个可以有一个这个报警的输出类型,对吧,那我报警的过程。就也应该把这个对应的放在这个B下边。比方说诶,这这个不是hot it是吧,我们要把对应的这个放在对应的包下边啊,当前这个病下边你有一个当前的报警信息,比方说我就叫做black list user warning对吧。
20:08
那么这样一个类型,我们里边最关心的就是哪个用户点哪个广告,然后点的,呃,这个超过上限了对吧?呃,所以大家知道这个用户肯定有问题,然后有可能是不是那个广告也有问题啊,啊所以这两个数据我们都要啊private,呃,就是长整型的一个user ID。把这个放在这儿对吧,然后private长整型的一个a did放在这儿,另外我们可以再追加一个字符串类型的一个message warning的一个message啊,这就是我们呃想能够想想到的啊,拿到的这些所有的数据,那接下来就是constructor啊,空参的创建出来,带参数的创建出来,下面还有get set创建出来。哦,最后这个to string也创建出来,好,这个port类已经定义好了之后,接下来我们就看这output tag那应该给什么类型。
21:08
这个就确定了,是不是就是我们定义好的那个,呃,对black blackist user warning这样一个类型啊,然后里边需要去给一个对应的当前的那个,相当于就是名称,或者说当前的这个ID,对吧?所以这里边我叫black list啊,那大家知道这个output传这个数据的时候,是不是还应该再把当前的数据也也要列在这儿啊,对吧?所以那接下来是不是得有一个你有一个这样的一个。Blacklist user warning啊,对吧,要不然的话你不符合这个类型的定义嘛,啊,所以这里边我们还得扭一个出来啊,那当年大家想当前的ID是不是就是value里边get user ID啊,那ad不是就是value里边get ad啊,啊所以这个其实非常简单啊,那后面再来一个报警的字符串,比方说我,我说它这个click点击超过over多少次呢?我们定义的那个上限对吧,Count upper bond这么多这么多次,对吧,这就是我们输出的这个报警信息。
22:20
呃,然后大家会想到,既然这里面已经输出报警信息了,不管输输出没输出过是不是,接下来我其实都要把这个滤掉,就是不再做剩下的操作了,所以我直接就来一个return是不是就完事了,对吧?到这一步直接就滤掉,一旦要满足这个条件直接滤掉,所以我们不在执行下边操作,这是这个基本的一个设设置啊,基本大家能够想到的一个过程。然后接下来呢,哎,那如果要是前面这里边没有返回,那怎么办,如果没有返回。
23:00
那是不是就点击次数是不是应该加一啊对吧,点击次数加一更新状态,然后是不是正常输出当前。数据到主流啊,是不是这样的一个过程,好,所以接下来那大家想我现在要做的是不是就是count state做一个update,做一个更新里边那就是诶是不是这里边这个current count是不是要加一,然后保存进去啊,啊这非常简单啊,然后接下来是不是正常输出到主流,应该怎么输出out.collect对吧?然后里边是不是直接把value输出出去就可以了,哎,这就是一个我们做这个呃,黑名单过滤的一个基本流程啊,基本操作这个代码呢,其实还有一点点小问题,大家会发现,就我们这里边统计这个信息就是count加王王齐磊加的时候,大家发现我们这里边是无限累加的,对吧。
24:06
就相当于这个用户,只要有这个用户有这个广告,他只要点过,那就会加一,点过就会加一大家想你,而且我们这个状态也不轻嘛。那是不是相当于这个数据会不停的增长,不停的增长,那有可能你过了好几年之后,有可能这个用户真的点击量就达到这个这个呃,我们定义的这个上限了,对吧,然后还真的就做了一个报警,这其实完全没有必要的,对不对?另外从我们代码运行的程度,呃,这个内存管理的角度来讲,是不是我们这个状态也应该要有一个清空的一个时间点啊,一个标志啊,那大家想实际运行的时候,一般什么时候清空呢?哎,对,大家想一般就是以天为单位,对吧,一天每天到零点的时候是不是清空,第二天重新开始统计啊,对吧?所以这是有一个时间段范围的啊,不是说你你这个永远去累加这个点击,那那用户这个长时间的点击,其实是并不是一个异常的这种刷单的行为,如果要是短时间啊,一天当中刷了这么多,那这这这就是一个刷单行为啊,所以接下来我们把这个再做一个定义,做一个清理,那所以大家想既然是要每一天晚上零点的时候。
25:17
或者24点的时候啊,做这样的一个清空操作,那我们应该怎么做,是不是应该注册一个零点的计时器啊呃,定时器对吧,所以接下来大家看到我们当前是不是应该在process element里边。这里边啊,Process element里边是不是首先就应该注册一个定时器,但是这个定时器是每一个数据来了之后都要注册吗?哎,所以大家这里这里需要注意啊,就下边我们这里边已经是做这个判断了,对吧,我们是呃,判断是否报警。那上边是不是要先做一个判断,就是判断是否是不是是第一个数啊。
26:01
第一个第一个数据,如果是第一个数据,那说明是不是没有定时器,我们去注册一个啊,如果要是不是第一个数据,是不是肯定之前我们我们的规则是第一个数据来了之后就注册对不对,注册一个零点,那呃定时器嘛,那这样的话,呃,那就相当于不用注册了,对吧?就如果是的话,注册一个。第二天零点的定时器啊,所以这里边我们有这样一个判断啊,所以if current count,我们前面不是已经get拿到了吗?如果它等于零的话啊,或者大家知道我不给初始值零的话,我直接判断它那个状态是否为为空是不是也可以啊。对吧,是否为none也能判断是不是第一个啊啊,那所以这里边我们要定义一个时间戳,这个时间戳怎么给呢。那当然了,这里边我们就想到底要的是处理时间还是事件时间啊,这个我们一般情况既然是零点吧,我直接就按系这个系统时间了,对吧,处理时间了啊,所以我先获取到当前timer service是不是可以拿到当前的这个current processing time呀,先拿到啊,拿到这个这个时间之后,那大家想我怎么样能拿到第二天零点的那个时间戳呢?
27:20
哎,大家看我的操作,我是直接用它除以一个数除以。24,然后乘以。60再乘以60,再乘以1000,大家知道这个啥意思吗?这是不是就相当于是一天当中乘以60,哎,那那这一天24小时对吧?乘以60是不是这么多分钟啊,再乘60这么多秒,再乘1000这么多毫秒,一天当中有这么多毫秒。我现在的这个时间,时间戳是不是就是有从1970年1月1号开始到现在为止的那个时间戳啊呃,多到毫秒数对不对,这个时间出这个到现在为止的毫秒总数啊,那么毫秒总数除以每一天有的毫秒数,这是不是就是到目前为止的天数啊,整数对吧,我们整数除法嘛,天数,所以那那就是今天的天数喽。
28:20
那今天的天数接下来是不是后边再加一就是明天的天数啊,第二天的天数对不对,那么明天的天数我接下来是不是在乘以这一串每一天的毫秒数,是不是就是第二天零点准时的那个时间戳,那个毫秒为单位的时间戳啊啊,所以是这样的一个计算过程啊。当然在实际应用的时候,大家可能会考,考虑到当前这个时间,它是以什么为单呃为基准的呢?是那个UTC伦敦时间对吧?UUTC标准时间,所以我们如果这么算的话,算下来这个零点,这是哪的零点啊,伦敦时间的零点对不对?所以就相当于说我们北京时间每天早上的八点钟,我们不是都挖区嘛,所以如果说我这里边要要是我们当前北京时间零点的话,怎么办再减掉。
29:12
八乘以,哎,后面是不是60乘以,60乘以1000啊,对吧,那就做这样的一个定义啊,注册这样一个定时器,那当然了,下面是不是timer service,呃,然后register当前是processing time,呃,Timer对吧?然后注册这样的一个,以这个TS注册一个,这样就完事了啊,但是你想注册那个even time timer也行,但是even time timer它的推进要靠那个,要靠数据来出发,对不对,数据的那个时间戳增长了才能触发啊啊大家知道我选这个晚上零点的这个时间就是因为数据少嘛,对吧,那那会儿可能没什么点击量,没什么数据啊啊然后接下来大家想到既然有这个定时器,是不是后面就必须得有一个on timer啊,那这个定时器触发的时候怎么办?是不是直接清空所有状态啊,对吧。
30:03
所有状态,所以我们当前的状态就是count state clear,另外还有一个iscent state clear,第二天重新来过就可以了,对吧?好,这就是完整的流程。好,那接下来我们运行一下测试一下,看看效果怎么样。好,大家看到这里面报错了,这个报错主要还是因为大家看这个我们类型定义的时候有问题,对吧,就当前我们定义那个类型auto tag类型的时候读不出来对不对?诶这个原因是啥?是不是还是我们少了那个花括号啊,所以这里边大家看这里是不是要加一个花括号啊,然后另外其实我们这个代码还有问题,就是就是如果说我们做特殊报警的话,这里边是不是看不到信息啊。哎,所以这里边我们其实可以加一步,就是基于大家看这里边得到的这个是filter ad click stream,那么基于它是不是可以做一个get side output,然后我这里边应该要把这个out是不是要。
31:06
要写在这儿啊,对吧,我把这个直接放过来。但是大家也可以定义成全局的啊,我这里面直接把它拿过来,然后print一下当前这个是,呃,就是blackli black list user对吧。这就是我们最后测输出流,也可以有对应的那个报警信息的输出了啊,我们再来运行一下。大家看一下当前运行的结果,诶大家看现在这个北京这里边是不是统计就直接给拦到100了。诶,那说明是不是之前有问题啊,我们看一下有没有那个测出流报警信息,诶大家看果然有一个当前测出流里边统计USER937166这个用户,然后点这个1715这个A广告超过了100次,那我们看一下之前的那个数据里边是不是这个样子呢。
32:02
看一下哦,大家看果然是不是就是这个用户点这个1715,这里边在不停的不停的点在刷这个点击量啊啊,所以我们就把它。把它直接筛选出来了,对吧?啊,所以这就是我们当前这个做黑名单广告点击量,黑名单报警的一个具体的实现。
我来说两句