00:00
整体流程已经定义好,接下来就是自定义实现这样一个process function了,啊,Key的process function式,好,那么我们这里边的这个class,呃,我们定义那个叫什么叫filter blacklist啊,Filter blacklist user result,然后它需要去实现的是一个kid process function,然后大家还记得里边要传的当前的这一个类型是KIO,对吧?啊,那这里边当前的K是什么呢?是一个二元组类型,大家还记得我们定义的那个UID和I did,那是两个长整型构成的一个二元组,然后接下来呢,还有输入和输出,大家注意一下,输入当然是样一类类型,Ad click clock,输出是什么呢?对,也是对吧,当前我们做的是filter,所以输出是原封不动的类型,对吧?
01:00
主流输出还是它这个定义好之后,正常来讲上面应该不报错,大家要检查一下,随时养成一种好习惯啊,那接下来我们本身要实现的啊,重写的一个方法,那就是process element了,我们知道就是每来一条数据都会调用到这里的这个process element做一个处理,然后在里边呢,我们因为用到了对应的状态,所以我们要先把状态定义出来,来定义状态,那首先我们是不是应该要保存点击量啊,对吧?用户对广告的点击量,哎,那所以这个既然是点击量,那就是一个count而已嘛,对吧?哎,所以我们就定义一个count state,它不就是一个value state吗?我们知道它应该是一个长整形,因为我们后面的那个count都是长整形嘛,哎,这里边还是按照我们。
02:00
译的那个过程啊,Get run runtime contacts,然后get state,你有一个value state script,然后里边同样还是当前的name,我把这个放大一点,当前的name给一个就叫做count。然后另外还有class,当前是长整性,诶这就是我们最基本的一个定义啊,肯定需要这样一个状态,然后这里大家注意诶,我这里边定义有问题,你如果要是直接这么定义的话,是不是会导致我们一开始这个类创建实例的时候拿不到上下文啊,对吧?哎,所以这里边要不你把它放到open生命周期里边去定义,要不我们这里边前面加一个lazy懒加载,延迟加载,那除了这一个状态,我们还需要什么呢?诶大家可能会想到还需要有一个就是诶我后面不是需要用到那个定时器吗?哎,那定时器假如说啊,当然这个例子里边我们好像没有用到更多的定时器,假如说我用到很多定时器的话,我当那个on timer啊那个方法出发的时候,我要去执行当前的那个清空操作,对吧,我怎么知道当前是到了那个零点钟要去做清空操作呢。
03:16
哎,那是不是一个简单方法是我可以去把当时的那个当时设置的定时器的那个时间戳保存成一个状态存起来啊,对吧,到时候我只要判断诶,只要是当前的时间,等于我当时保存的要去做清空,要去重置的那个时间的话,那我就直接做清空操作,如果不是的话,那就算了,对吧,那就是可能是别的操作,有可能我们别的行为定义了定时器啊,那所以这里边再给大家定义一个,呃,就是这里啊广告的点击量,另外还有一个呃,就是定,就是每天零点定时清空状态的时间戳,就是还要保存一个这个对吧?啊,同样还是lazy定义一个当前这个我叫做reset重置,重置的那个timer state。
04:16
T的TS啊,其实就是它的那个时间戳state,这里面定义一个value state,那大家知道这个是不是也是一个长整型啊时间戳嘛,呃,那后面其实都几乎完全一样,我直接copy过来好了。后面这里大家要注意的是名字千万不能相同对吧?诶当前是一个reset的TS啊,就是重置的一个时间戳类型,当然还是这个长整型啊,就注意要把这个要改过来就对了,然后另外还得有一个状态,还得有一个什么呢?因为大家想到假如说啊,我当前一个用户啊,大家知道我后边不是说如果他发生发现他点击这个行为太多,超过我们那个上限的话,我就把它直接输出到这个测输出流里面去嘛,哎,那在当前我定义的这个过程当中,你如果说他达到这个上限之后,如果说每次达到大家知道只要达到上限之后,接下来他可能不停还在那刷,对吧,那后面刷的是不是都达到上限了,那难道说就是同一个人达到上限之后,每次刷的这个行为都要再去给他这个测殊流里边去输出一次报警吗?其实不用,我们只输出一次就完了,对吧,那是不是就需要有一个状态保存这个。
05:32
用户到底有没有输出过报警信息,有没有进过黑名单对吧?哎,那所以接下来我们用什么来表示他是否已经进入过黑名单呢?一个布尔类型的value state是不是就可以搞定这个事儿啊,对吧?我们也把它保存成状态,大家注意啊,为什么要保存成状态呢?有同学说,哎,你定义一个本地变量不就完了吗?注意,如果是本地变量的话,那是不是就不是针对当前P,就是所有当前这个分区任务对吧?当前slo上执行的所有数据来了之后都可以调用到,都可以用到访问到我们这个变量,然后都可以更改它的值啊,对吧?那你这个出false就代表不了我当前这个用户的行为了,所以接下来我们要定义一个,还有一个状态啊,啊,这个是标记当前用户是否已经进入黑名单。
06:32
哎,那所以这里边我们重新定义一个,这个叫做呃,Is black is,呃,我我直接就叫做is black得了,对吧?Is black state value state里边这个类型应该是一个布尔类型,对吧?哎,这我还是把后面复制过来,但是要注意这里边的描述啊,描述器类型也要改成布尔类型,后边类型改过来,注意这里边的这个名称也要改,对吧?当前我们这个叫is black,好,这就是整体的这个处理,我们先把这个该用到的这个状态先定义好,好啊这稍微有点麻烦啊,要考虑到的逻辑比较多,那接下来就是我们具体的梳理流程了,来了数据之后,到底到底怎么做呢?诶,首先我们先把那个当前的状态先拿到对吧来,那比方说我这里边啊,首先我定义一个就是current count,当前。
07:32
的那个计数值到底是多少呢?我把con.value先拿到,然后接下来大家会发现啊,是不是我得判断要不要去注册一个定时器啊,对吧?哎,就是我们要清空的那个零点的那个定时器,什么时候注册呢?诶大家会想到这一个注册定时器,其实跟当前用户是否黑名单报警没关系,对吧,跟你那个count去判断也没关系,其实就是只要在今天范围内,这用户只要他那个数据来了,有状态了,那接下来我是不是就应该注册一个定时器,到时候要清空啊,对吧,因为你不累计嘛,即使比方说我们现在累计的那个上限是100,那你用户今天点这个点了十几次,那你不要说到明天的时候还累计,继续往后加,我肯定要清空,对吧,并不是说只有这个,呃,他做了这个报警,达到上限的时候我才清空,所以这里边其实没有任何的关系,只要是第一次,第一个数据来了,我就直接注册定时器,对吧,所以这里边。
08:32
判判断,只要是第一个数据来了,直接注册零点的清空状态定时器啊,那这里边当然就是一个一服判断了,哎,那什么叫做第一个数据来了呢?那你当前的这个count假如是零的话,肯定就是第一个数据嘛,对吧?正常来讲肯定后面我们要去加加一的嘛,所以如果说它等于零,我定义一个时间戳,这个时间戳就要是相当于是今天晚上的12点,今天晚上24点,或者说是明天的零点,对吧?哎,那这个时间戳怎么去拿到呢?大家想一下,哎,这个有点奇怪是吧,我要拿这个就是当当前,呃,有些同学可能说,哎,那你这个直接去我我用这个当前时间去去加一个,加一个24小时不就完了吗?是是这么。
09:32
他吗?不是对吧,你这个的话,那就相当于是来了一个数据,你让他过一天之后去出发了,而不是说等到今天晚上的24点去出发,诶那所以这里边就有一个小技巧,我们做一个什么操作,利用时间出戳去做一个,就是相当于做一个这个除法计算,大家看一下这个具体的操作啊,那首先这里边我要用拿到当前的时间,我拿什么时间,大家可能想到我现在不需要拿事件时间了,我直接拿处理时间就好,对不对,而我就判断当前的现在这个处理时间就够了,然后我当前的处理时间呢,不能直接往后面加,我做一个大家看啊,做一个除法操作,我要除什么呢?首先当前的处理时间大家知道啊,我拿到的这个是一个什么毫秒为单位的时间戳,对吧,也就是说它是相当于从这个1970年1月1号零点,然后就伦敦标准时间。
10:32
开始到现在为止的一个毫秒数,哎,那所以你想我如果要是直接除以1000是什么效果,那是不是变成了从1970年1月1号零点开始到现在的一个秒数对吧?哎,取整数的话肯定就是一个秒数,然后我再除以,呃,我这个里边乘就表示再除对吧?再除以大家想接下来除什么?再除以60是不是就表示从1970年1月1号零点到现在的分钟数对吧?哎,再除以60就是小时数,再除以24是不是就是到现在为止的天数啊,整天数对吧?刨除了零头之后的整天数。
11:19
那他想是不是相当于这个整天数是什么,这个时间点是什么。是不是就是今天零点的那个时间点啊。大家想想,整天数如果要转换成一个时间戳的话,那是不是就就是相当于是零点钟的那个准准的那个时间对吧?哎,这就是我们能想到的这个过程啊,然后你既然是要明天嘛,要明天零点的那个时间,那怎么办?哎,是不是对后面加一是不是就完事了呀,注意这是明天的那个天数,整天数如果要拿到时间戳的话,是不是整个再乘以啊,那那其实我可以直接copy前面,但是大家知道我可以现在整天数是不是再乘以24,这就是小时数对吧?再乘以60就是分钟数,再乘以60就是秒数,再乘以1000就是毫秒数,就是从1970年1月1号零点到明天零点的一个毫秒数,那是不是就是它的时间戳啊,哎,所以就以它作为当前的这个时间戳进行判决啊,这个稍微有点绕啊,大家。
12:28
可能要稍微的琢磨琢磨,看看这个到底是怎么做的啊,这里还有一个小细节,就是大家如果呃有同学下来之后,把这个做一个测试啊,你如果输出这个时间的话,可能会发现有问题,为什么呢?输出的这个时间发现每天我清空状态的时间是什么呢?是早上八点钟。诶啊,这个大家一想就想到了,既然是八点钟啊,那我就想到了,我们现在是东八区,北京时间东八区对吧?诶为什么是八点钟清空呢?因为你现在按照这个时间戳,这是不是都是按照伦敦那个标准时间定义的呀,对吧?是按照伦敦时间的毫秒数,所以你按照这个整除,然后再乘以,乘出来之后呢,是伦敦时间的零点去清空,那伦敦时间的零点我们这边东东边要早八个小时对吧?那当然就是说是我们这边早上八点钟了,你如果想做一个调整的话,那怎么做呢?啊对,这不就是后面你再去减去八个小时,八个小时乘以什么?呃,乘以这个六六十秒对吧?呃,六乘以60分,再乘以60秒,再乘以1000毫秒,再减去这个,这就是一个我们当前北京时间的每天的零点了啊,稍微有点麻烦啊,大家只要大概知道这个过程就可以,这是涉及到一些具体在。
13:47
做这个项目应用的过程当中,可能这个还是比较实际对吧,要不然你这个时间设的不对的话,可能这个清空的时间就不太对劲啊,然后这里面接下来我就需要把它首先保存到我们的状态里面,大家还记得对吧?我需要把这个定时器保存在我们那个定义好的重置时间时,呃,时间戳的那个状态里边啊,然后另外呢,注册定时器timer service当前,那我既然是零点准时清空,那直接注册一个处理时间定时期就完事了,这个不用那个事件时间了,对吧,我就到那个机器时间到零点直接出发就完事,这是前面我们的这个过程,然后接下来呢,其实也比较容易,那就是要判断接下来判断什么,判断countt值对吧,判断countt值是否已经达到定义的阈值,定义的阈值呃,如果超过。
14:47
错,呃,那么就输出到黑名单测输出流对吧?啊,那如果没有的话,那是不是就就不用做这个操作了,对吧?如果没到的话,就加一直接往后加就完事了嘛,这里边有一个阈值,那其实我们知道这个可以作为一个相当于我们的一个参数直接传进来对吧?啊,比方说我们这里边定义一个100,那其实在这儿的话,我们可以定义一个max count,比方说哎这哎这个还是没有换过来啊。
15:25
这里边我们定义一个长整形,或者说直接给一个int类型的这样一个count值,那我们要判断的就是跟传进来的这个值比,到底有没有超过对吧?哎,比方说我们现在的要求是current count,如果要是大于等于max count的话,那就输出报警,诶,但是大家注意要输出报警的时候,我们只输出一次对吧?啊,还要判断是否已经在黑名单里了,黑名单里了啊,没有的话才输出特殊数流,就是只输出一次嘛,所以再加一重一符判断,我们现在要求,如果我们之前的那个,哎,不是有一个is black state对吧?如果它这个点value取出来之后,大家知道默认这个里边应该都是false对吧?默认不是应该是。
16:25
零嘛,状态里边没有给初值应该是零对吧?啊,那那对于这个布尔类型当然就是false了,那对于这个false拿出来之后,那表示不在里边对吧?啊,那一开始应该进来之后,这个应该是处非取飞之后取反之后是触,所以一开始应该是不在里边的,那如果不在里边的话,我就需要把它在制成处对吧,下一次来了就不要再去再去做这个输出了,那另外呢,直接输出一次输出到侧输出流用什么?大家记得是ctx对点output对吧,里边要给一个当前的output tag,那这个output tag里边的类型和我们对应的那个名称的定义是不是必须跟这里边的这个一样啊对吧?你在外边要去取的时候,和我们这里边输出的时候定义的这个必须是一模一样,所以我们直接把这个copy过来写在这儿,然后后边呢,还需要定义什么数据输出到这儿呢?诶,那我们得定义一个black。
17:25
User warning自己包装成样另类对吧?啊,我们现在要当前的user ID,当前的a didd,诶那另外还有一个报警信息,比方说我直接就说这个,呃,当前诶,Click ad超过了多少次对吧?比方说我加上这个max count啊,Times today,输出这样一个一句话对吧,告诉你当前它超过这个上限了,这就是我们完整的一个黑黑名单啊,测试出流报警的一个过程,然后这里边还有一个问题是,呃,那假如说诶这里边我们这个大于等于啊,假如说我们已经做了这样一个报警的话,那接下来后边还要做那个countt加一吗?就不用了,对吧?诶,因为而且你要把它滤掉,不要再往后面去输出数据了,那当然这里边我们可以控制,就是你不要去做那个out.CLA的操作不就完了吗?或者还有一个非常简单粗暴的方法啊。
18:25
可以怎么样,只要他超过了这个上限,我直接return后边就什么都不不行了,对吧?哎,这个其实就非常简单,然后接下来呢,如果说没有在这里边直接return,那说明还没达到上限,没达到上限的话,那是正常情况,Count加一。然后将数据原样输出,就相当于直接放行对吧?啊,直接放到后面去做那个count统计窗口count统计就可以了,所以接下来我们做的是当前的这个update一下,用这个当前的值加一做一个update,然后呢,out.collect不是close啊,点collect直接输出原样的这个value输出就完事了,我们输出到主流的就是当前原样的这个ad click,呃,Log,对吧,就是原样的这个样例类类型输出,而输出到侧输出流的是我们定义好的这个black list user warning报警信息,所以大家要把这个要区分开啊,到底做了哪些操作,这个还没完,最后还有一个。
19:39
还有一个on timer对吧?这定时器触发的时候,哎,我们要去做一个清空操作,那我们这里边做一个判断,就是其实这里边不做判断也行,因为只有这一个地方注册定时器嘛,对吧?啊,那这里边我们判断的就是如果当前的触发的这个time sample等于我们定义好的那个重置的那个时间的话,这真的到了零点了啊,那这个时候去做一些清空back,呃,Is black做一个clear,另外还有一个countate去做一个clear,对吧?接下来就所有的都从头开始就可以了,好,这就是我们整个完整处理的一个流程,好好,那接下来我们把这个运行一下,大家再看看效果啊,之前大家记得那个北京的那个输出有120多,对吧,我们接下来看一看这个效果怎么样?
20:29
啊,大家看到这个输出的结果,诶,这里边大家会看到上面我们看一眼,你看这个北京这里边的这个这个输出好像就没有达到之前我们那个120多对吧,到100就截止了,就不再输出了啊,然后我们看一下有没有这个warning信息啊,诶我们这里边看不到这个汪信息。找一下前面我们看到有一个地方好像是。
21:01
诶哦,大家看这个是因为被被截掉了是吧,这个是我们前面有很多信息这里被截掉了啊呃,这个为了让大家完整的看到我们输出的那个报警信息,我们可以把这个稍微的时间调短一点,比方说这里边我们给一个一小时吧,对吧。再重新运行一下,看一下,接下来我们再重新运行一下,看看这个得到的结果是什么,好,现在我们看一下当前的这个输出结果,诶大家看到现在最上面有一个warning对吧?这里边black list user warning啊,这个937166,这个用户点1715这个广告超过了100次,确实是把它过滤出来了,对吧?而且做了一个测殊输流的输出报警,我们回到数据里边去看一眼的话,当时我们看到大量重复的这个点击,诶果然就是它,对吧?当前这个用户点这个1715广告有大量的重复点击,我们现在经过检测之后把它过滤出来了,而且后边的这个统计窗口,当前的那个输出的count值的时候,也把里边的内容就过滤掉了啊,那当然有的同学说,诶,这里边你还是有100次啊,就是说大家发现它过滤的是什么呢?是超过100次的那些count给过滤掉了那。
22:23
如果要是在100次之内的,它还是加到这里了,对吧?那这个有没有必要再去做一个减法呢?当然是可以做的,这也就是为什么我们看啊前面这里边做这个操作的时候,我们在前面这里啊,就直接超过之后就直接给他return了,不要再去加一,有些同学可能想到你这里边如果再去加一,其实也也可以对吧,只要不让它输出不就完了吗?呃,你直接之前到了100,然后你去你去加一,然后去呃,变成101,下一次还是大于等于max count嘛,这其实没什么影响,但是大家会想到你后边如果要去,呃,后边我们最后统计出来的结果去去直接减的时候可能就没那么方便,对吧?哎,我们现在的话,你就知道它里边每一个状态啊,肯定都是统计它多加了100,你直接在那个count里边减100不就完了吗?啊,所以这其实是比较简单的一种,呃,实现的一种方式。
我来说两句