00:00
接下来我们再讲下一个需求,就是所谓的恶意登录监控啊,这个需求其实也比较简单,整体来讲就是说我们在网站上啊,假如说用户的这个登录行为出现异常的话,那显然是我们需要对他做一些监控检测啊,然后做一个报警提示的啊,因为本身用户的登录行为涉及到用户的账户安全,这是一个非常重要的一个行为,那对于我们检测的这个场景呢,大家知道就是对于用户的这个登录行为而言,正常来讲一段时间内应该就是这个登录失败的次数应该是有限的,对吧?就即使是用户他忘记密码了,我我们去重新去做尝试,那也应该是有限的次数,而且呢,用户如果自己去手输密码去试的话,这间隔的时间不会太短,对吧,哎,正常来讲肯定是我输入一个之后,哎,再想想密码,然后再敲一遍,怎么也得隔几秒钟,那假如说你在这个短时间内。
01:00
出现大量的登录失败,但这种说呃事件来,呃就是这种事件被我们收集到的话,大家就想到可能就会出现异常,当前的这个账户可能是被人恶意攻击了,对吧,就是拿这个呃机器啊在做暴力破解啊写了一个程序在做破解,所以这个时候我们就要做一个报警的提示了,那所以接下来我们就来给大家说一说,在这个呃flink程序里边怎么样去做这样一件事呢?呃传统大家想这个东西应该怎么做,可能我们一般情况都是用这个呃业务系统本身这个服务器后台,或者说呃,我我我们直接在其他的一些模块里边就把这个判断了,对吧,那你你连续给我发请求,发来之后,我看一下你上一次的那个呃请求,假如说我都保存起来的话啊,我再去判断一下,上一次是不是跟这个呃,间隔太短了,或者说这个这一段时间内你都连续登录失败了啊,那那我就去做一个报警,或者说我直接拦住就就不让他继续。
02:00
对,发这个请求了,对吧,不做这个继续判断了,那这是一种方式,但是大家发现这种方式,传统这种方式啊,对于业务系统而言,它的压力是比较大的,对吧?那就正常来讲,我们业务系统的登录拈,我应该就是放在那儿接收你的请求,就去做那个数据库里边密码数据,呃,用户名密码的判断,你只要匹配上了,我就登录成功,对吧?呃,你接下来你就可以去做操作了,如果匹配不上,我返回一个失败,你继续重新去登录,就正常来讲,我就应该处理这个,你现在呢,又加了很多工作要求我去保存之前一段时间内他登录成功和失败的这个数据,对吧?呃,相当于历史的行为的一些数据,另外呢,还需要我再去判断,哎,就是按照规则,你是多长时间内连续登录失败了,或者说是这个,呃,就是出现了什么样的一个异常,那这种逻辑判断,你再追加到业务逻辑里边,其实对于我们的业务系统的性能影响是比较。
03:00
较大的对吧?呃,你想本来用户发一个请求过来,我连一下数据库查出来直接就返回了,这个很快,但你要做更多的这种选择判断的时候,跟数据库的连接也会更加的频繁,业务系统要处理的事情也会更多,所以这里边我们就要考虑是否可以把它摘出来,用其他的一些系统来做这件事儿呢?啊,那现在我们用这个流实时的流处理去做这件事儿,这就是一个非常的考虑的一个替代的选择啊,啊,那所以接下来我们在代码里边还是把这个需求来给大家再来试一下啊,那这个是另外一个新的模块了,我们有一个module。当前这个我们叫做,呃,就是关于登录失败的一个检测啊,Logging field detect,先把这一个模块创建出来,呃,然后关于我们这里边想要引入的依赖,这个我们现在暂时还想不到,等到用到的时候我们再去引就可以了,所以啊,同样我们还是把这个先放在这儿啊,接下来我们看一下logging feel这个里边。
04:14
还是我们的代码会放在SW下边,重命名一个scalela文件夹啊,那我们这里边需要有这个登录失败或者成功的一个数据啊,当前还是在这个user behavior里边没有对应的登录数据,所以说我们还是自己提供了一些测试的模拟数据啊,这个数据呢,是在这个里边,就是logging logging log啊,我们先把这个数据先copy在resource下边,好把这个拿过来,然后我们大概的看一看这里边长什么样对吧?大家看看这个数据大概是什么情况呢?还是逗号分割CSV文件啊,相当于做过ETL的这个状态啊,里边的数据大家看到,诶,前面这里边只有一个类似于ID的东西,那应该是什么呀?啊,那当然就是user ID对吧?然后后面这个什么点什么,一看这个就是IP嘛,啊,就是用户登录的一个IP,然后另外还有呢,呃,大家看到这里边它并没有一。
05:15
个比方说这个行为叫做loging对吧,这边没有,因为我们ETL出来的话,所有的行为都是login的行为,只是有一个最后结果的不同,对吧?这里边提取了一个这个success和一个feel,后面跟着一个时间戳啊,所以接下来我们要判断的就是这个需求给大家先提出来啊,就是假如在一段时间比较短的一段时间内,用户出现连续多次登录失败的行为的话,我们检测到,那就要做一个报警提示,诶,这就是我们的一个需求啊,那这里边为了简单起见,大家看到文档里边把这个需求一开始的需求是给大家定义成了就是在两秒钟之内连续两次登录失败啊,那当然两秒钟之内这个时间设的短一点倒是没什么问题,但大家可能会想到,你就只检测两次登录失败,这个好像有点不够给力是吧,好像我们应该多检测检检检测几次,这个大家可以。
06:15
自己去做一个扩展啊,我们这里只是以连续登录两次,两两次登录失败作为一个例子给大家来做一个实现,好,那首先我们就在这里边还是skyla下边创建一个object单立对象啊,然后我们带上包名com.at硅谷点logging file,呃,当前我们是这个检测类的指标啊detect,然后我们当前的这个就直接就叫logging feel吧,其实logging feel要报警对吧,检测到它要做报警,呃,Main方法里边我们先放在这儿,首先来定义一下输入输出的样例类类型对吧?啊这里边我们输入输入的登录事件样例类case class啊,这个我们就随便叫一个,比方说我这个叫loging event登。
07:15
事件前面我们也已经看到那几个字段了,User ID长整型啊,然后后边是有一个,呃,就是IP对吧,这个IP我们先列在这儿,然后另外还有一个就是成登录成功还是失败对吧?啊,这个我们叫做event type事件的类型吧,String类型,最后还有一个时间,Time stemmp长整型放在这儿,这就是我们的输入的数据对吧?然后另外还有一个输出,我们输出要什么呢?现在就要报警信息对吧?就假如检测到它这个,呃,登录异常了,我们直接要做一个报警,所以这里边输出报警信息,样例类我们也可以包成一个,呃,样例类类型啊,Case class,我这里边可以定义一个,比方说啊,Login fair warning,那同样我们关心这个报警,当然要提示哪个用户谁登录。
08:15
连续失败了对吧?那这里边我们定义一个uz ID长整型,然后另外还需要就是他登录失败的时间,我这里边关心的是就是第一次登录失败和第二次登录失败对吧?First few time,这里边我直接给一个长整型,把那个时间戳输出就好了,然后另外还有一个,诶,这里边我不要叫second,叫last,就是方便扩展吧,假如说我们现在检测的是连续两次登录失败对吧?哎,那你如果要是连续三次,连续五次呢,我们留下这个扩展的余地,我叫last few time长整型啊,最后我们再输出一个warning message,一个字符串,对吧,报警的一个信息,看到这个的话,我们就知道当前是有问题了啊,所以接下来我们就是还是在这个代码里边给大家做一个整体的实现啊,首先,环境stream execution environment。
09:15
二大家这个应该都轻车熟路啊,上面这个影视转换引入,另外这里边我们还是先把这个全局并行录释成一,不影响结果正确性,然后把这个时间特性设成还是一样啊,事件时间,因为我们这里边有那个时间戳嘛,只要有时间戳,大家看到一般情况我判断都是按照事件时间来判断的,是吧?啊,都是按照这个来,然后接下来那就是读取数据,好,我们定义一个input stream啊,当然了,按照我们之前的流程,最好还是先定义一个这个resource对吧,我们用这个get class,然后get resource,用一个相对路径把这个文件传进来,当前这个logging log啊,Pass copy过来,然后接下来我们这个input stream就直接。
10:15
Read的当前resource的这个get pass就可以了,这样就不需要那个hard code把那个全路径写死了啊,然后接下来呢,我们再定义一个做转换了,对吧?呃,Map成转换成样例类,转换成样例类类型,并提取时间戳和water mark又是一样的流程,我定义一个当前这个比方说叫logging event stream啊,那基于之前的这个input stream去做一个map转换,然后这里边我们还是一样的过程啊,这个就快速写,先拆开,按照逗号分割做sweet,接下来是不是包成样例类啊,我们当前的那个样例类就叫做logging event,这定义的有点多,大家搞清楚哪个是输入个是。
11:15
输出对吧,因warning是输出,这里边这个呃,输入是叫事件,叫logging event,然后里边的字段呢,当然就是ARRAY0,第一个是那个用户ID,所以这里边我要做一个to长整型转成长整型,第二个是IP啊,那直接拿出来stream拿出来用就行了,第三个是那个类型,就是成功还是失败,还是一个字符串,对吧,拿出来直接用就行了,第四个最后一个这是一个时间戳,所以要涂了转成一个长整形。先放到这儿,接下来该分配时间戳和watermark了,诶那又有一个问题,就是到底是升序还是乱序呢?回过头来还是看数据对吧?哎,我大家看到我们这里边你怎么去选取,关键是看数据到底什么样,这就要求我们对这个数据是场景有一有一定的了解啊啊那我们看一看15263,哎,有乱序对吧?啊大家看这个三三后面来了三二,说明我们这里边是有乱序数据的啊,所以这里边如果大家大概的看一看的话,可能这个诶可能有个两秒对吧?呃,我看看一下。
12:26
呃,就是大概看一看的话,可能就是有有这么一个两三秒的这样的一个乱序程度,所以如果这里边我们只是一个估计,做做一个设定的话,那比方说这里边可以给一个as sign time stamp and watermarks里边要引入的这个类,那是bonded out of order,你大家还记得这个吧,对吧?Bounded out of orderness,呃,然后这里边我们提取时间戳的时候用的是element.time step对吧,是不是用这个字段啊啊,我们前面定义好的对吧?然后这里大家要注意到底是它本身是秒还是毫秒呢?来确认一下这个应该是秒是吧?所以这里边要再乘以1000给一个毫秒数,然后另外上边这里边你还得给一个当前的延迟,就是我们认为的watermark的延迟时间,对吧?哎,这里边把这个引入给一个几秒钟就可以了啊,比方说我给两秒或者给三秒,对吧,按照我们之前的一个判断。
13:26
给一个大概的延迟时间,我这边给了三秒啊,这是前面做的这一步,然后接下来,接下来就是继续进行转换操作,对吧?啊继进行。判断和检测,如果两秒之内连续登录失败,输出报警信息,对吧?啊,现在我们就要得到一个报警的理由了,类似于啊,所以接下来我们是logging fair warning stream啊,那我们还是基于之前的这个事件流去做处理吧,啊,那大家会想到这里边我们处理怎么去处理呢?是一个map吗?好像没那么简单是吧?啊,因为这里边我们是首先跟时间有关系,连续两秒钟之内连续两次登录失败,那另外还有一个呢,就是你要判断连续登录失败是不是应该要跟之前的数据还得有关系啊,你还得保持状态对不对?哎,所以在这个处理的过程当中,大家就想到了之前我们在理论部分学习的时。
14:43
时候传感器是不是有一次处理,那个是怎么处理的,温度连续上升报警对吧,十秒钟之内温度连续上升就报警,哎大家想一想,现在这个场景是不是很类似啊?哎,之前是温度连续上升,现在我们是连续登录失败嘛,哎都是有一个时间限定,那之前我们是用什么来处理的呢?
15:05
诶,就是有一个定时器对吧,然后有这个状态编程,设置了状态保存之前的那个温度值,然后去做判断,哎,所以就相当于我们得用一个process function去搞定这件事了,所以现在的想法也是这样,如果你用简单的一些聚合或者开窗搞不定这个需求的话,那你就用process function啊,在用process function之前,那我们得思考一下,因为process方式里边涉及到了状态,涉及到了这个状态编程,对吧?哎,那我们说一般情况用这个状态的时候,我们用的都是kid state。都是按照某个K分组之后,然后去做的这个设置的状态,对吧?是针对当前这一组有用,那我们现在是所有数据都放在一起去统计呢,还是说要分组他们状态也应该是只针对某一个某一个K有用呢?诶,那现在我们得看一看现在的这个数据到底是什么了,对吧,当前诶当前就是这个logging event嘛,有有登录,登录数据有成功有失败,那问题就来了,是所有用户的所有数据来了之后,有成功有失败,我们都混在一起去判断吗?
16:18
肯定是应该只判断某一个用户对不对,那你不能说是我这个用户,呃,登录失败了,然后别的又来一个用户登录失败了,然后你报警了,那你说这到底算是谁的谁的这个账户报警了呢?这个就没道理了,对吧,所以我肯定是针对同一个用户,那这个IP有用没有呢。哦,大家想到IP其实在这里边可能不参与计算,但是呢,我们要求是它即使是不同IP,是不是这个也应该要检测它报警啊,只要是同一个用户ID啊,因为大家知道有些情况下,我们知道这个被攻击的时候,它可能会自动去切换IP去做攻击,对吧?呃,所以在这里边我们是不考虑IP,只要是同一个user ID,后边如果要是连续登录失败的话,我们就检测出来,把它做一个报警啊,那接下来我们就做这样的一件事情,那显然是先需要做一个KBY,根据用户ID user ID做一个分组,然后后边我们做一个自己定义的kid的process function,对吧?啊在这里边我们要自己去实现了,你有一个比方说这个我们就叫呃,Logging feel logning result做一个这样的处理,对吧?然后同样里边我可以传一个数,比方说就是我们说的。
17:40
扩展对吧,当前我要求的是必须要检测到两次登录失败,呃,就是两次以上的登录失败我就报警,那我传一个二进去,就是假如说你当前登录失败次数超过这个了,报警啊,那或者说我这里可以传三传五传十都可以,最后我们得到的结果啊,那就是把这个logging field warning stream做一个打印输出,最后不要忘记还有一个execute啊,当前是logging film detect job,这就是完整的处理流程啊。
我来说两句