00:00
大家就会发现,其实我们不光主流得输出这个统计结果,在这个过程当中,是不是还得输出一个黑名单啊,黑名单报警信息这个东西到哪里去输出呢?哎,我们自然就会想到在这个过滤的过程当中,既然定义了process function,是不是还可以输出测输出流啊,哎,可以,我们可以定一个测输出流,包装成这样的一个样域样例类,把它输出出来,对吧?然后我们只要去监测这个测输出流,就可以拿到当前的黑名单去做对应的处理了啊,所以这是我们的一个想法,那既然有测输出流,所以是不是大家记得那个怎么样拿到那个这个特殊物流吗?是不是得定义一个outbook tab呀,所以我们在为了方便统一使用啊,在在这个外边我们统一做一个定义,比方说这里边定义一个。Black list output tag,它是一个output tag啊,那么它的类型应该就是,是不是就是black list warning啊,那当然这里边我们又出来T给一个名字就叫blackist对吧,这样把它定义出来就可以了,定义侧输出流的tag对吧,标签。
01:29
呃,那之后我们是不是就可以统一用这个东西了,后边如果我们要去做这个呃输出的时候,这里大家要注意前面我们做了filter了,这里是不是做分组的时候,开窗聚合的时候就应该直接用对filter black list stream的输出结果啊,这里面主流是不是应该输出的是没有被呃,就是过滤之后留下来的那些数据还是正常输出啊,那数据输出的数据类型是不是还应该是我们那个原始的?
02:01
A click event这样的类型啊,所以这个过程当中,相当于我们这个process,就是一个相当于复杂版的filter,对不对啊,相当于做了一个这样的一个操作啊好。当然了,下边我们感兴趣的可能这个ad count,这里边要做一个输出,这是count,然后是不是还应该有一个测输出流,拿到它的输出啊,那可能是tu black list stream,可以获取什么get啊,当然这里边因为前面报错了,我们拿不到它的那个定义啊,Get a side output,对不对?可以直接拿到船进来那个标签,然后就直接拿到可以去P了。呃,这里边是呃,Black list。把它输出啊,这是我们改进后的现在的这个情况,那接下来我们最关键的其实就是去实现这个自定义的process方式,这个process方式我们为了能够一起用这个tag,那因为这个tag我们是定义在这个object里边了,所以我就把这个process方式是不是可以直接定义在这里啊,也放在object里边,对吧?呃,到时候那个那个呃,Tag就可以一起去使用了,这里定义class filter,呃,那么这个I其实对于我们而言是一个上限,对吧?Max count最大能够允许的这个计数的个数,能够点击的个数,然后他需要去实现一个什么接口呢?
03:38
对,它是一个key的process方式,这里边它的类型是KIO,大家看到了啊,KIO的话K是什么。KBY之后的这个K对不对啊,所以这里边我们写成这个元组类型就方便一点,要不然还得是抓temple对吧,这里边。
04:01
这里面是什么,就是一个两个long类型对不对,一个user ID,一个一个ad对广告的ID,然后另外输出呢。输出是不是还是我们的那个基本的样例类啊,Ad click event对吧?然后最后还应该有一个,诶这里边是输入输出对吧,IO对吧,KIO,所以输入是ad click event,输出是不是也是ad click event啊,就是这样的一个东西,那大家看实现之后是不是我们这里边就已经都不报错了,接下来就把它做一个实现就可以,那首先在这个过程当中,大家知道先要我们是状态编程吧,是不是先要定义状态啊,这里面我们涉及到哪几个状态呢?呃,其实有好几个我们,呃首先啊,这个定义状态首先要保存什么呢?首先是不是得定义一个当前用户对当前广告到底点击了多少次,这个count是一定要保存的,对不对,保存。
05:12
当前用户对当前广告的点击量啊,所以我们lazy用这种方式就不用生命周期了,对吧,就不用在open里边去创建了。直接定义一个count state,大家想想它是个什么类型呢?大家觉得还是一个list state,还是一个map state,还是一个什么样的类型?大家注意当前是不是已经对user ID和aidd做过k buy了,做过k buy的话,那是不是当前已经确定就是这个用户和对这个广告的一个数据啊,所以接下来进入到我们当前处理的过程当中,是不是只要有一个状态就可以对,那是不是其实这就一个值就够了,对吧?哎,所以是一个value state就够了,大家注意啊,就如果前面我们没有针对它去做KY的话,那是不是相当于不同的不同的用户,不同的广告,我们得按照那个KY的方式去保存啊,所以大家看前面的这个KY。
06:21
有点像就是前面已经存了K了,对吧,所以后面我们这里直接存一个value就可以了,相当于从更外层把它保存成一个k value这样一个map,好,这里把它要引入啊。那呃,里边的数据类型是什么呢?Count,那当然就是浪了,对吧?然后定义的过程上下文get wrong time contacts,呃,继续大家value state的话就直接get state,里边要去new一个value state script,呃,当然里边要传一个对名字concept,然后给一个class of law,对吧?这样就可以把它定义出来。
07:08
然后我们还得定义什么呢?诶这里边可能就涉及到具体的一些操作了,大家会想到在这个过程当中是不是呃,就是首先我们要去把所有的这个广告点击量要保存下来,然后来了一个新的点击的时候,判断是否超过了那个上限,对吧?呃,如果超过上限的话,就得把它呃相当于就过滤掉,然后输出到那个对应的呃黑名单里面去,让大家想到有些用户如果有这种刷单行为,或者是呃,就是相当于用了一个一个代码程序自己在那点击的话,那是不是很快就会超过我们定义的那个上限啊,然后它是不是还会有大量的那个数据进来。大家想我们到那个时候去输出那个黑名单的时候,是每次都得输出黑名单吗?是不是其实黑名单输出一次就够了,后边只要判断把它那个行为过滤就可以了,对吧,后边就不需要再去测输出流,再去输出一遍,诶这个用户黑名单,那所以这里边我们还得还得有一个什么状态去。
08:20
去处理这件事情呢,是不是得保存一个。是不是得保存一个到底是否已经发送过他的这个黑名单状态啊,哎,所以这里边我们定义一个状态保存是否发送过黑名单的。状态啊,所以这是个标记位对吧?Lazy啊,这个我们就叫is好了。Is sa blacklist。呃,这里边它应该还是一个value类型,大家能想到吧,里边就是对里边是一个bully,同样get runtime contacts get state对吧?呃,这里边你有一个value state script,那里边就要去定义,比方说这个is sa state of。
09:28
布尔类型对吧,啊,这是另外的一个一个状态,然后接下来还需要用什么呢。哎,大家可能会想到我们,我们还定了一个需求是每到每天的二十四点零点的时候,是不是要清空之前的状态啊,那也就是说,呃,我们是不是得定义一个。定义一个定时器触发的时间啊,啊,那大家会想到就是这个时间在我们的清理状态的过程当中,有可能要去判断是否定义了这个定时器,或者在某些特定的情况下,还有可能会清除定时器,对吧?所以是不是这个时间戳也得保存起来啊,哎,所以这里边还得去保存定时器触发的时间戳对吧?所以这里边lazy。
10:26
呃,这里边叫reset timer,就是重置我们那个状态的那个时间,同样它也是一个value state,里边的类型是时间戳,肯定就是浪了,Get runtime context get state,所以大家看里边的操作其实都一样,对吧?这个就是,哦,这里错了啊,State script。我们把它定义叫reset state class of了。
11:03
好,我们已经把想要用的这些状态都定义出来了,接下来是不是就得实现一个process element方法啊,就是每一条数据来了之后做什么操作,我们要定义出来,那具体是做什么操作呢?每一条数据来了之后,我们其实是不是就应该得判断到底之前已经存了多少,对不对,这count值到底是多少,然后啊,如果要是超过那个上限的话,那我们就把它,诶直接就就就就那个把它输出到那个黑名单里面去,当然中间还有很多逻辑判断啊,这里边不管怎么样,最开始第一步是不是先要拿出count的那个状态啊,对,取出count状态啊,这个我们把它保存在current count这里边怎么取呢?Count state是不是点value啊,Value state直接这么拿就可以,然后接下来大家注意这里边,如果这个拿出来是。
12:03
是零的话对吧,那是不是我们是需要去注册定时器的呀,就假如说一开始这是当前这个,呃,就是当前第一天这个这个用户他的第一次点击行为的话,那是不是相当于得注册一个定时器,然后到时候要把这个状态到第二天零点的时候把这个状态清空啊,所以第一次来的时候需要注册定时器,然后到时候呃,定时器触发的时候是把所有状态清空,这里边如果是第一次处理呃,注册定时器,所以我们的判断其实就可以直接看。直接看什么呢?看current count如果等于零的话,之前没有统计过对吧,所以现在哎,直接来创建这样一个定时器,那我们创建定时器应该是什么时间呢?大家想想。
13:09
第二天的零点对不对,对吧,这个定时器是每天零点出发啊,那这个这个怎么去计算啊。啊,大家可以看一看,我们当前能够拿到的时间是什么。能够拿到的是不是一个一个毫秒数啊,对吧?大家可能会想到我直接拿到当前的那个,呃,那个天数,然后直接加一不就完了吗?呃,但是现在我们直接能够拿到的是一个毫秒数,所以我们可以怎么怎么样做呢?首先我们先拿一下毫秒数啊,当前的毫秒数在哪里呢?是不是要从上下文里边,大家看是不是有time service啊,Time service可以拿到当前的current processing processing time对不对,当前的系统时间。
14:05
那么当然这个系统时间怎么样,就是变成了一个一个就是天的当当今天的一个状态呢,是不是可以直接除以。对,大家会想到我直接除以,首先除以1000,那是不是就变成秒了。然后再除以60是不是变成分钟啊,再除以60变成小时,再除以24是不是变成天啊,所以这就相当于一天当中有这么多毫秒。我把当前的。这个毫秒数除以这一个每天有的这个毫秒数,得到的是一个什么?是不是就是天数啊,这个天数就是从1970年1月1号到现在为止的那个天数,对不对?哎,这里边我们就不要把它变成这样的一个,呃,一个double类型,直接就把它当成一个int类型或者long,是不是直接得到就是一个整形啊,对吧,一个整数,然后是不是在它的基础上。
15:17
去。去加一是不是得到的就是。明天是从1970年开始数的第几天啊,我们真正注册的时候,是不是想要的是明天零点的那个时间戳,所以这个天数是不是还得再乘以对,是不是再乘以我们这里边定义的这这一串啊,对吧?是不是就是这样啊?所以大家想一想,哎,这个过程我们就实现了,这个每天零点触发这样一个一个系统时间定义好的一个定时操作啊,这样就实现了。然后接下来,当然既然已经定义了定时器,我们保存定时器的那个那个状态是不是就就得更新啊,啊,所以reset timer就得update,用当前的TS把这个要update出来,而这里边大家注意啊,这只是先定义了这个时间戳是什么,还没注册定时器呢,用什么注册呢?还是上下文的type service里边有register注册定时器,这里我们注册的是even time timer还是processing time timer,对,这里边是系统实现,所以是register processing time timer把TS传进去,哎,这是我们这个处理的过程。
16:41
然后接下来大家会想到,呃,除了这个判断一开始的这个状态去注册我们呃清空的这个定时器之外,还得去干什么呢?对核心的流程是不是得判断技术是否达到上限啊,对吧。
17:01
如果达到,则加入黑名单。所以这里边也很简单,直接去current count,如果要大于等于我们定义的max count的话,是不是就应该做一些操作了?呃,这里面注意,如果大于等于的话,我们是不是还得判断它到底是不是已经发送过黑名单没有,对吧?如果已经发送过,是不是不需要再发了?哎,所以这里边判断是否发送过黑名单,如果。呃,就是只发送一次对吧,我们的目标就是只发送一次,所以这里边判断的是if啊,那这里是不是我们应该把那个is sent blackist要拿出来啊,它的value。
18:02
如果不是真的话,如果它是它是false的话,那是不是说明没有发送过,没有发送过,那我们现在是不是就得发送一次,如果发送过是不是这一段逻辑就不用执行了,所以接下来我们发送,那首先这个先把这个is sent black list这个状态是不是应该制成true啊。对吧,哎处啊,然后接下来输出,输出的时候是怎么样发送黑名单是输出到输出到测输出流,所以这里边是ctx测输出流output,先给一个output tag,这是我们前面定义好的blackist output tag,第二个参数是value,我们要给的value是什么?是不是一个样一类啊,Blacklist warning包装好这里边的数据user ID message user ID就是value.user ID,然后是value.a did,最后一个字符串类型的。
19:12
报警信息对吧?啊,这里边比方说我们叫呃他click over。超过多少次呢?是不是超过我们这个max呀,对吧。呃呃,这么多次啊,Times,然后我们的是today对吧,今天超过了100次这样的一个输出结果。另外大家会想到,如果说这里边是真的已经大于等于max count的话,后边我们还要把它再做这个主流的输出吗?是不是就不要做主流输出了呀,如果是这样的话,我就应该直接把它return对吧。诶,这样做操作啊,当然我们也可以用那个if else那样的一些操作,对吧,这里边我就直接在这里把它return了,如果说。
20:07
这里边还比这个max com的小的话,那是不是这个过滤相当于没有生效,接下来是不是就应该正常输出结果啊,这里边要注意是不是还得把我们的那个计数要加一,所以我们当前的这个计数状态加一。输出。数据到主流啊,所以我们其实是count要去update一下,那么这里边是不是要用current count再加一把它更新啊,另外主流的输出是不是应该是out.C这里边给的是什么东西呢?是不是就是当前的啊。大家想想是不是这样?我们是不是输入的是value,它是一个ad click event输出,其实不用做什么操作,对吧,就是对只是过滤一下,如果他要是满足,只要没超过这个上限,那是不是原封不动把它输出到后边我们去做。
21:17
接下来做聚合,开窗聚合就完事了呀,那所以大家看这样就已经实现我们这个过程了,这就是我们做这个呃,Filter的一个过程,当然了我们还有一个一小块没有没有做完什么没做呢?对我们定时器是不是还那个触发操作没做呢,定时器我们触发的时候要干什么事情啊。对,其实很简单,定时器触发时清空状态,重新来重新进来过对吧?啊,当然这里面大家可以加一个判断,就是我可以判断。这个time STEM,就是是不是我当前这个触发的那个时间戳啊,定时器的那个时间戳对不对,如果说我当前的这个状态跟我本身存着的那个状态,就假如说我没有清空之前没有把这个状态清空过的话,那是不是现在就应该真正的把状态清空了,对吧?啊,这里边把这个is sent blacklist清空,还有什么状态count state。
22:24
清空,呃,另外我们这个reset timer。Value,呃,这个清空对吧?哎,就是这样的一个过程,所以大家看这一部分代码的话,主要就是大家发现在这个过程当中需要去自定义process function,去实现我们这样一个相对复杂一些的黑名单过滤的一个需求,这里给大家运行一下看一看。大家可以看到现在正常已经运行得到了结果,我们看到在这个过程当中,诶北京的那一条,它最多是统计到多少啊,哎,这是不是就是之前是121对吧,现在最多它是多少,是不是就到100就不再增长了呀,为什么?
23:13
到了四对,因为是不是已经到了我们的上限啊,当然我们这个上限是没有把它之前,就是大家可能会想到这100个里边是不是之前也有它刷出来的那个统计量啊啊,但是我们没有把之前的那个删掉,但是到100之后是不是就不再增长了,呃,另外我们还可以看到,呃,这个测殊输流应该有东西,对不对,既然它已经到了上限。哦,大家看测出测输出流在前面输出了,输出了一个什么,就是这个9371666这个用户,他点1715这个广告是不是超过了100次啊,今天超过了100次,那我们其实在数据里边也可以非常直观的看到,是不是就是他啊对吧,他在这里大量的刷屏去点,所以在这个过程当中,我们就把它真正的过滤出来,放到这个黑名单里边去了啊,这就是我们这一个指标需要去实现的一个任务。
我来说两句