00:00
接下来下一个模块是叫做恶意登录监控啊,这一部分大家可能知道,对于一个网站而言,不光是电商网站啊啊,就是用户的登录啊,当然了还包括其他的一些,像后面我们要讲到的这个支付对吧?啊,订单这些动作其实并不是很频繁的,业务需求,业务的操作,如果说像登录这种行为,一个用户在短时间内大量的登录失败。那我们就会发现这其实有问题的,对吧?啊,这可能很可能就是在被人暴力破解密码,很可能有黑客在攻击,所以对于我们的系统而言。出于风险控制的角度考虑,出于安全性的考虑,是不是应该非常实时的把这种异常的行为检测出来,做一个报警提示啊,所以这是我们接下来要实现的这一部分,就是恶意登录监控的一个模块。呃,比方说我们现在的这个需求啊,需求就是说我们简单一点啊,就是在两秒钟之内,用户连续两次登录失败就报警啊,但是这个大家感觉好像不太符合常这个这个平常好像有点就是你两次登录失败就直接报警,好像是有一点太严格了,对吧?啊但是有时候有些网站其实也会的,就是你连续很短时间内很快输入,输入错误的话,它就会弹一个提示啊,这个其实是也是比较正常的一个状态,那我们想这个东西用什么去实现呢?接下来这个模块会给大家介绍两种不同的实现方法,一种是常规的状态编程啊,那大家就会想到我们应该,呃,如果用到定时器的话,那还应该就是直接用这个process方对吧?呃,在里边定义状态,保存什么样的东西,然后做各种各样的逻辑处理,在某个时间点可能要去触发什么操作,把这些都定义好就可以实现这个功能。
01:54
另外一种方法是啊,就是用比较高级的API或者库来实现这个需求,这个库叫做CP啊,就是flink里边给我们提供了一个CP库,实现事件流的一个模式匹配,呃,类似于模式匹配的一种状态啊,我们首先在这个模块里边就得去加入这样的一个相关的依赖,对不对啊,所以这里边我们先去创建一个新的模块,然后把这个依赖先加进去。
02:29
当前的这个模块就叫做law detect检测。我们把刚才需要引入的dependency先放在这里。需要把这个flink cep。引入,当然我们现在第一第一次要实现的这一部分代码呢,并不涉及到cep,我们先来做一个简单的。状态编程,当然我们的源码还是放在SW下边的scla里边。
03:07
啊,另外大家会想到resources里边应该得有数据源,对不对啊,我们这里边用的是什么数据源呢?啊,同样给大家提供了一份数据啊,叫做。Logging log对吧,大家可以看到这里边有很多这个用户logging的这样的一些行为,正常来讲是不是这个东西也应该从买点日志里边去提取,所以把它存复制到这个目录下边来,大家大概的看一眼,这里面包含了什么信息呢?这个简单的信息,其实大家会想到,既然是用户的登录行为嘛,那这里边对显然应该有一个用户ID啊,User ID,然后后边对有一个他登录的IP啊,这个是我们可以获取到的对吧?然后接下来哦,还有一个登录到底成功还是失败,这样一个状态,Success还是feel对吧?最后还有一个时间戳,都是大家能够想到的一些基本的一些数据啊,然后我们就可以在tla下边去创建自己的object,加上包名,这个叫做at硅谷点login feel detect。
04:29
类名就叫logging trip,好,然后第一步大家还是想到,既然是这样的一个数据类型,我们是不是先定义好这个输入的登录事件数据类型?靓丽类对吧?所以case class logging event,刚才我们也已经看到了,先要有一个user ID对吧,这个是一个浪类型,然后后边是IP string,后面还有成功还是失败,对吧?我们这个叫even type吧,一个string,最后还有一个时间戳啊,Even time或者我们叫time step都可以,这是一个long类型,有输入对应想到有输出对吧,我们还是提前定义好吧,输出应该是一个报警信息,对不对,输出的。
05:31
呃,异常报警信息样例类。K plus定义一个叫做warning的样音乐,呃,这里边可能我们关心的就是什么呢?首先是不是哪一个用户登录异常了,这个要知道有点,然后这个我们浪类型啊,然后我们关心的可能是因为我们是定义好超过某个线,超过呃这个两次我们就直接就报警了,那所以我们可能更关心的是在多长的时间范围内,然后它的登录失败,这个超过了两次对不对啊,所以这里面我们可以定义一个first few type。
06:20
呃,这里边我们可以定义一个long,大家也可以把它转成一个string,对吧?啊,这个就是都一样,我们就这里就不详细讲了,Blast。Type,这也是一个了。最后再给出一个warning messages,一个字符串的输出信息,这是我们预先定义好的内容。接下来就在这个代码里边去做一个实现啊,这个过程还是很类似啊,这个我就直接copy吧,这个大家觉得应该不需要再详细讲了吧,我们从前面已经实现过的一个例子里边,把这些先写进来。
07:04
环境先创建出来。当然这里边大家注意把这个影视转换的这一部分引入,然后把这个时间语义要用这个英文态,后边的话,那同样还是resource get class get resource。呃,这里边我们要传入的,诶,当前。数据文件的对应的这个位置,然后接下来这个是读取数据对吧,读取事件数据。接下来我们就可以去得到一个logging event stream,当然是env file,对吧?呃,当然这里边我们需要用那个resource.get pass把它传进去。
08:09
读进来之后,常规操作是不是还需要做一个map呀?呃,这个没办法啊,只能是重新再做一个实现data RA,同样这里边也是一个CSV文件,那是不是可以去按照逗号去做一个分割,然后包装成logging event,呃,里边user ID是0tri.long对吧,图浪,然后后边的那个IP是一诶一。直接直接tri,它本身就是一个stream,对不对啊,然后接下来后边的这个类型本身也是string,所以直接tri就可以,最后的时间戳是一个long类型,所以我们要把它做一个to long的操作,把它写完就可以了,然后接下来诶常规步骤大家记得接下来是不是该做什么了,是不是该做那个分配时间戳了啊,这个我们要根据当前的数据来看,那我们看一眼这个当前的数据是什么样子呢?
09:25
看看是不是升序的,升序我们就简单了,对吧,1582633A32,这是不是有可能有乱序啊啊,所以这个过程当中,我们如果要想正确处理的话,哎,可以去设置一个walmark的一个延迟,对吧?啊,假如说对实时性要求不那么高的话,那我们就直接给一个比方说,大家看到可能这个最大延迟是几秒钟啊。好,可能会有这个五六秒钟几秒钟的延迟,对不对,所以这里边我可以直接给一个。
10:05
这个时候是乱序数据就得用asign time STEM and water marks去传了,你有一个对最简单的实现bonded的out of orderness timetime,这里面的时间戳提取就从element.even time里面提取,对不对啊,这里边它需要去是不是要乘以1000啊,因为它本来是秒。然后另外这里边的延迟我们可以给一个啊,这个就我们看可能是五秒对吧,大家自己看那个数据啊,按照我们的这个需求来去给一个就可以,比方说time点五秒啊,给一个SECOND5,诶,这就得到我们基本的logging stream了,然后接下来大家会想到我们是不是正常来讲的话,得去对它去进行处理啊,对吧,处理之后是不是应该要得到一个一个warning的一个输出结果啊啊,所以这里边我们就直接定义一个warning stream。
11:11
呃,那么直接用这个logging even stream,接下来我们是要干什么事情呢?是做状态编程,呃,那当然了,就是我们应该是直接process对吧?那process的时候我们还得考虑,你是考虑所有的数据呢,还是先把数据分组之后,因为大家知道分组之后有一个好处,就是说我们那个状态link自然就给我们隔隔离开了,分别管理了,对不对,不需要再去考虑,哎,不同的那个P是什么样的情况,那这里边我们需要给他按照某个东西去做分组吗?哎,这里大家注意这里边所有的数据里边,我们关心的是什么呢?是所有人放在一起考虑,他是连续失败了,还是说我只考虑某一个人,他是他是失败了还是成功了,是不是每一个人啊,诶这里大家注意,这里边有人还有IP,那我们是主要考虑什么呢?是以IP为准还是以人为准呢?哎,有同学说是用这个用户的IP为准,有些同学说用这个,诶对,这个就看具体情况了,对吧?啊有些时候可能我们拿不到那个user ID,呃,就是只拿到这个这个IP的话,那可能你只能用IP。
12:33
但是如果要是登录行为的话,正常是不是肯定他是要去测某一个是某一个用户的登录啊,正常应该肯定这个数据是能拿到的啊,另外就是有一些人去做攻击的时候,是不是他应该是同一个用户去登录,但是在不停的换IP啊,呃,你这种情况他其实也是攻击的一种行为,所以我们用用这个IP去做测试,呃,去做这个分组,其实是那不是一个很好的选择,我们应该用对user ID用户ID。
13:06
去做一个分组。所以以用户ID做分组,然后直接放大招,是不是可以点process啊,对吧?在这里边就是只要是当前用户的所有的行为都来了,都到这一组里边,我就判断就可以了,如果是连续的登录失败,是不是就应该报警啊,而且在两秒钟之内对不对?所以大家看这个需求是不是跟我们之前在理论那一部分讲的温度连续上升,其实那个有点有点接近对不对啊,实现的这个过程是有点接近的啊,所以这个我们先做一个简单实现,你有一个logging warning这样的一个process function,但是这里边我们可以传一个参数啊,就是。就是我们说的,你到底是要要求对吧,是登录失败几次就算,那么这里边我们要求是登录失败两次就算,就把这个二传进去,那最后可以把它做一个print输出。
14:15
我们还要把这个执行起来对不对?呃,这里边给一个叫log fair detect job,所以接下来我们的核心任务还是去做这个process function,呃,大家看就是flink本身的这种编程的特点啊,如果要是你那个比较简单的一些需求的话,大家做一些统计指标的话,那其实你用dance stream也好,对吧,你开窗去聚合也好,或者用这个table API也好,或者是用这个直接写CQ也好,其实都很简单,而且都大同小异,都很类似,但是如果要是比较复杂一点的需求,那可能很多情况就需要用到process function这样的底层API了。呃,这里我们定义的这个应该叫做它的呃,Max feel times对吧,最多能能这个失败几次。
15:13
超过这个限额就就报警了,对不对,他要去实现一个什么接口呢?Process function啊,它是做了key外之后的,所以是一个key的process方式,这里边它的状态是不是按照key要做隔离啊,对吧,就各自管理各自的,这个就非趁,然后这里边要传的对k iok是user ID应该是long类型,对我们是用下划线这个选举出来的,所以直接给long就可以,那么输入是一个对样利类logging event,输出是一个也是样利类包装的,是不是叫warning啊,对吧?所以这个过程其实非常简单啊,因为我们在前面已经把这个样利类都定义好了,所以大家看这个过程其实非常清晰,就是直接一个点process,那么这个里边是不是就是啊,定义好的key是浪类型的字段uz ID,然后输入是什么,输出是什么,一目了然,对吧。
16:13
对吧?好,上面已经不报错了,我们接下来是要实现它的process element方法,那么既然是状态编程,是不是在前面还是先得去定义很多状态啊,所以大家想一想,我们要定义什么状态呢?我们要定义的状态是不是就得保存上一次的那个,哎,对,上一次登录失败的那个状态对不对?呃,我们得达到它,然后就可以,其实这个最简单的一个实现是什么?我就来一个登录失败就塞到这个一个例子里边,是不是可以这么做啊,来一个就塞到list里边。然后我什么时候去判断。
17:01
一个简单的想法,因为我们还有一个要求是限定两秒钟之内呢,对不对?哎,那么如果要做判断,我是不是可以注册一个定时器啊,第一次登录失败的时候,注册一个定时器,然后后面来一个就塞一个,来一个就塞一个,等到两秒钟的时候触发,对触发定时器判断里边到底有几个登录失败,如果要是比两个大,那是不是就输出报警啊,如果比两个小啊,那就不输出报警,直接清空对不对,在中间过程当中。如果要是说。有这个,呃,大家会想到,如果要是有这个登录成功的数据来了怎么办?那是不是就应该清空状态啊,重新来对不对啊,所以这个其实实现很简单啊,这是一个简单实现,所以我们先定义状态,保存两秒内的所有失败事件,登录失败事件,所以现在我们定义的。
18:07
呃,如果大家要觉得我们这个,呃,这个情形下面只考虑两次登录失败比较简单的话,那是不是其实是可以就是。就是我直接就保存上一次的一个登录失败事件就可以,对不对,但是我们这里边把它提取出来了,因为你这个数量是不是传进来的呀,啊,所以这里边你那样去实现可能就不太好,我们为了更加的通用,这里边我们要用一个list来把它保存起来,所以这里定义的是一个logging,呃,Fair state,它其实是一个list state,它的类型应该是央离类,对吧,Logging。输入进来。然后再去获取它的这个状态句柄的时候,还是要new一个list state script。
19:05
诶,大家看这里哦,这个这个我们应该首先是那个上下文对吧,首先得去get wrong time,然后get list state,然后再去new一个list state script,诶这样里边我们传进来它的那个名称,这里边我们就叫logging feel state,对吧,然后还应该得有它的类型class of logging event,把这个传进来就可以了。好,然后接下来我们就得看了,每来一个事件,接下来怎么处理呢?Process element是来一个事件处理一次吗?那这里边我们首先是不是要判断判断类型是否是feel?是不是应该只添加field,添加到我们这里边来啊,对吧。
20:03
事件到状态,哎,那大家想一下,有同学可能就说了说了,那你到这里才来这个判断干什么呢?我之之前你这个状态是什么,我一个filter把它搞定不就完了吗。这个逻辑上对不对。这里大家要注意那含含义就是言下之意就是说如果我们这里中间要有success的话,对我们的逻辑有没有影响呢?其实是有影响的,为什么?对,如果要是我两秒钟之内登录失败了两次,但是中间有一次成功了,那你说这个还还叫报警的情况吗?那人家已经登录成功了,那说明这是知道密码对吧?呃,当然也有可能是已经试出来了,但但但这个不重要啊,就是我们正常理解的话,他既然登录成功,那应该是不应该报警的,应该把这个状态清空对不对?所以登录成功也是有相对应的操作的,所以这里边不能直接在外面把它全部filter掉,所以这里边我们就if value.event不是type time啊,是type,如果是feel的话,接下来干什么,是不是就应该把我们当前的这个Y。
21:24
要添加到logging field state里边状态里面来啊,对吧,如果是feel的话,直接添加进去,另外注意还得做一个什么操作,他一次是否有对,当然这里边大家会还会想到,就是说需要判断它是否是第一次,哎,那那大家想如果不是第一次的话,我可以怎么样。哎,如果不是第一次我怎么办呢?如果是第一次怎么办呢?是第一次,第一次,对,如果是第一次的话,是不是应该要创建一个定时器啊,如果不是第一次的话,是不是就不用再创建定时器了,哎,所以这里边其实是,呃,就是这样的一个一个。
22:16
就是在我们ADD之前,是不是应该要去判断一下当前里边是不是有值啊,对不对啊,所以这里边其实是我们应该在外边把这个状态先拿出来,对不对,定义一个logging feel state。呃,就就logging feel list,它应该是什么呢?从状态里边是不是要把对应的这个东西拿到啊,呃,当然了,然后接下来我们是不是得判断这个里边到底有没有东西啊。
23:00
啊,所以这个东西是不是我相当于得拿到他的一个。大家可能会想到是不是拿他的size啊,诶,但是这里边好像拿不到他的size,他因为这里拿到是一个就是一个可迭代的类型,对不对啊,那这里边能拿到size吗?也拿不到,所以是不是相当于是得遍利这个所有的数据,才能够知道这里边到底有多少个多少个数据啊啊,所以我们可能一般不用这种方式去拿它的这个数据个数,而是怎么样只要看他。呃,这个feel list里边是不是只要看它。Has next是不是就可以了对吧?呃,那当然就是如果他没有的话。如果有的话,是不是比较正常,我们就不用做别的操作,如果没有的话,这里是不是就应该去注册一个定时器啊对,所以这里边怎么注册定时器呢?CTx.time service点呃,当然这里边我去注册的是什么even timer对吧?那这里边的时间戳给什么呢?
24:25
加两秒秒,是不是应该是两秒钟之后出发,那是当前的那个时间戳,再加两秒钟对吧,所以是value点的type。呃,不是type啊,Time,然后乘以1000,注意这才是毫秒数,然后再加上2000,对吧?因为定义定时器的时候,这个时间戳是不是必须是一个毫秒啊,所以我们要按照这种方式把它定义出来,然后定义好了之后,哎,不管到底有没有当前,只要是feel,是不是我们就应该把它添加到这个列表里边来啊,那假如说假如说不是费用呢?
25:12
Else的话,这是不是就是如果是成功对吧,如果是成功,那么是不是直接清空状态。Logging film state.clear大家可以看到是这样的一个过程,对吧?然后接下来这是我们这个每一个元素来了之后的这个处理的过程,那呃,大家会想到那个接下来是还得做什么事情呢?是不是得制定那个定时器触发的时候再操作啊,对吧?所以这里边我们要定义一个触发操作,触发定时器的时候。
26:00
根据是不是要根据状态里的失败个数个数决定是否输出报警,对吧,所以是这样的一个过程,然后大家想一下,那首先是不是我得知道里边到底有多少个啊,那到底有多少个,大家发现这下没办法去取巧了,是不是只有。对对,只有便利了,只有把那个所有的东西都拿出来,然后或者说把所有的这个,呃,这个保存到一个,比方说list buffer里边,对不对啊,那这里边我们就还用这个保存到list buffer的这个处理过程吧,比方说叫log in fields or logging fields,它是一个list buffer。对吧。你有一个Li buffer,然后接下来我们是不是可以先定义一个这个迭代器,它从哪里去拿,是不是它的get,然后点interator,这就是一个迭代器啊,已经拿到它的话,是不是,如果对。
27:17
Pass next的话,我们是不是就直接把它保存到all logging fields里边去啊,哎,这跟我们之前的那个做法是一样的,对吧。我们之前在做排序的时候,是不是也要先把它便利先提取出来啊,现在想要知道它的这个具体的每呃,就是大小,其实也是做一个这样的一个操作,那接下来是不是就要判断。判断个数,所以我们if什么?是不是all long fields它的啊,Length对吧?啊,这里它是list buffer,它有一个length,如果它大于等于。
28:04
我们定义好的那个max f t的话,超过我们定义好的那个失败上限的话,是不是就应该输出报警信息啊。对吧,主流里边输出这里边我们是包装成一个warning,首先是这里要的是那个user ID user ID怎么拿。这里边所有数据是不是都是一样的ID啊,啊根据这个做的KBY嘛,对吧,但这里边大家会看到,那么K从哪里去拿呢?能不能拿到K呢?啊其实也是能拿到的,怎么样呢?是不是ctx可以get current k啊。大家看上下文里边随时都能够所有的东西都能拿到对不对,或者有些同学说,哎,那你直接这里边都已经保存到状态中了嘛啊,那我这里是不是也可以随便拿出一个来,对吧,是不是也可以啊,那既然它这个长度是大于这个max few times了,那肯定是不是第一个存在啊,点had拿出第一个来,他的user ID是不是没没问题,然后接下来后边是first few time,那是不是也是直接拿第一个啊,Head点。
29:21
太。另外last few time,那这个我们能拿had就能拿last,对不对啊,这里边even time传进去,最后再给一个报警信息,比方说这个报警信息我们叫呃,Logging in2秒钟对吧,Second,呃,一共几次呢?我们这里边。是不是要输出他的这个LS啊,他在这里边真的已经这个爆就是fail了这么多次对吧,两秒钟之内这么多次times啊,这是我们要给的一个输出的结果,那另外大家注意这里边我们到时间了,触发这个这个呃,定时器已经触发完了之后,输出判断输出了这个报警信息了,最后是不是应该把对状态都清空,清空状态。
30:23
Logging feel data clear啊,这就是我们完整的一个处理流程啊,把这个过程就已经存到了这里面,好,接下来我们来运行一下,看看它的结果怎么样。看到已经得到了输出结果,这里边输出了一条报警信息,1035这个用户他在842和844这两秒钟之内。啊,连续登录失败了三次对吧,呃,我们看一下这个数据里边是不是这个样子,1035,诶大家看到了对吧。
31:00
对,是不是就是这连续的三次fail啊,对吧,呃,424344连续的这三次fail,导致他这里边就输出了这样的一个结果啊,那么大家看一看,就是这样的一个简单实现,它有没有什么问题呢?就是如果接单次成功,他就这。诶对,大家会想到,如果说这里边中间出现那个,就比方说我们改一下这个数据啊,啊最后一次是吗?我们先中间插一次成功啊,Success给一次成功,那大家会想到这样的输出结果应该是什么,重新跑一下,看到这样的一个效果,是不是同样有一次报警,只不过这里就变成了四三到四四之间是不是有两次。这个呃,就是log in file对吧,这里边做了一次报警,那大家可能会想到,那我要不是在这里的这样的一个success呢?啊,有同学说我把最下边改成success是吗?那最下面改成success会出现一个什么情况?
32:10
大家想一下这会出现什么情况,14对吧。按照我们的逻辑,他是怎么做的呢?是不是好,我们来看一下吧,口说无凭啊。大家可以看到现在的话,它里边就没有任何的输出了。逻辑应该是,呃,就是至少我们这么实现的就肯定没有bug对吧?啊,就是说逻辑上的bug是另外的事情,就至少它是正常能运行的,刚才我们数据如果对的话,肯定是有输出的,但这里面大家看,如果我们把这个success放在后边的话,它直接前面这两个也检测不到了,为什么呢?因为我们的要求是不是定了一个定时器两秒钟之后才出发,在这个之间是不是来的数据它继续会处理啊,一旦来了一个success是不是直接就清空了,所以相当于这里边就根本就没有检测到,最后出发的时候就没有检测到我们前面的那个F的情况,对不对?
33:20
所以大家发现这种实现其实还是有问题的,对不对,而且大家会知道这是在后面有success的情况下,那假如说我们就是,呃,就是都是费,那有一种是什么情况呢?有可能在这两秒钟之内发生了很多很多费,对吧?它是机器再去测,那两秒钟之内可能根本就不仅仅是两三次的问题了,有可能一秒钟之内就成百上千次费用就来了,那你说这个时候如果我们不能及时的把它检测出来,你非得等到两秒钟之后再去给它做处理的话,那说不准那个时候密码都已经是碰出来了,对吧?呃,这个都完全有可能的,所以大家发现我们其实更好的一种处理方式是什么呢?不应该等到两秒钟之后再去做判断,对不对,是不是就应该是对,一旦判断这个次数达到这个标准了,就应该。
34:17
是是不是就应该直接把它做一个报警输出啊,应该更实时一点啊,所以这其实从这个角度考虑的话,我们这个代码是有改进的空间的啊,下节课我们就可以在基于这个基础上对它进行一些改进。
我来说两句