00:00
接下来我们再看一看其他的需求,那第二个需求呢,其实是主要是针对这个实时流量的一个统计,那首先哎,我们说可以去统计实时的热门页面啊,那对于这个实时热门页面,而且给大家的要求是就是从这个web服务器的日志里面去提取,就当前我们没有那个呃,本身用户的一些数据了啊,那没有买点日志的话,那就只有从web服务器日志里面去提取,那这这样的话我们怎么做呢?也很简单,先做一个ETL,把我们想要的数据先拿出来,对吧,来包装成样一类,然后接下来呢,哎,它要的是统计每分钟的IP访问量,取出访问量最大的五个地址,然后每五秒钟更新一次,那大家看这个这个需求只不过就是前面我们那个是每小时的,现在是每分钟对吧,之前是每五分钟更新一次,现在是每五秒更新一次,那大家看这不还是一个top n吗?只不过就是把这个之前的商品换成了现在的。
01:00
访问地址对吧,那就把那个URL作为当前的那个K去做一个分配统计台的。com值就可以了,所以这个其实整体来讲思路是一模一样啊,所以呃,我们后面讲到再给大家再再讲这个需求啊,就基本上是前面的一个复习,然后在这个基础上呢,我们还可以给大家统计经典的PV和UV这样的两个指标,那大家知道PV的话就相当于一般都是一个都是一个滚动窗口对吧?哎,直接就是每小时访问量哎,统计一个PV,然后如果要是说我们想要对这个访量还要做用户去重的话,那就变成了一个UV对吧?哎,那这样的一个统计PV,大家想到非常简单对吧?啊,这个直接那个开一个滚动窗口,直接把它算count的值啊,算出来就完了,那这个UV要去做去重怎么去呢?很简单的一个想法是,我就在内存空间里边啊,就相当于或者说我用一个状态,或者说用一个本地变量。
02:00
对吧,直接把所有的数据都存下来,然后就存成一个set set是不是自动就会去重,对吧?我们就把那个user ID直接,呃,那个放到这个set里边,它如果来了同样的user ID,那进去之后还是同样的一个元素嘛,那就不会不会再增加了,最后我只统计这个set里边有多少个,不就完事了吗?呃,这是最简单的一种实现,但是我们大家可能会想到,如果说你把所有的数据都放在当前的内存里边,不管是本地变量还是状态,对吧?你只要是定义在内存里边,那那现在是不是就很有可能出现oom的情况啊,如果要是状态,这个数据量特别特别大,你要做驱虫的这个用户数量特别多,怎么办呢?诶,这有一个解决思路啊,有同学可能想到了,我可以先放red嘛,对吧,啊,Red也是一种,就是首先是内存放不下,之后扩展到这是一个基本的想法,那假如说里边也放不下了呢,对吧,因为大家知道类似于也。
03:00
是一种内存级的数据库嘛,它的容量也是有限的,它其实本身也是可以认为它的这个代价跟跟内存是差不多的啊,访问速度是非踌,那所以这里边给大家提供一个思路,就是可以用传说中的布隆过滤器去进行去除啊,那具体怎么做,到时候我们讲到这个需求的时候再给大家详细展开,呃,那后边的话,那就是关于市场营销的统计分析了,呃,比方说像这个APP市场推广统计,那主要就是买点日志里边,呃,可能会收集到了,我们做这个APP推广的时候,用户是从不同的渠道来点击下载安装的,对吧?哎,那这样的话,我们就可以按照不同的推广渠道去统计到底有多少用户通过这个渠道来做了这个下载安装,那这个其实就非常简单,这不也是按照渠道做一个K,然后去呃收集所有的数据,然后我们做count就完了嘛,所以这就是之前跟大家说的啊,只要把前面的。
04:00
鲍恩,搞定了后边所有的统计需求,其实只是业务场景稍微的换了一下逻辑可能需要调整,大部分的思路都一样啊,这个就没什么问题,对吧?啊啊,当然了,这里边说是可以用process function进行处理,然后自定义输出信息啊,这个就具体调整了,对吧?你你想要用这个aggregate,我们增量聚合就增量聚合,想要用别的啊,用别的也可以,后边还有一个呢,是这我们讲到这个广告页面,广告统计本身这个统计非常简单,那我其实就还是把那个所有的数据做一个count嘛,对吧?呃,做一个统计,这里面关键麻烦的是,就是我这里要针对那个刷单式的频繁点击行为做过滤,就是有一个广告黑名单过滤的一个需求,这个需求怎么去做呢?啊,我们自然就想到了,那你要是做这样的一个过滤的话,那我可以统计就是同一个用户,如果点击同一个同一个。
05:00
广告点击次数太多的话,那我就要报警对吧,那就要把他加入到黑名单里边,所以这里边呃,我可以去设置一个状态啊,这个状态那就是保存的是一个用户针对某一个广告的一个点击量,对吧?啊直接把它保存下来,那然后我统计的这个过程呢,应该还有时间限制,你不能说是这个,呃就是一直从头开始一直统计啊,那万一说这个你你经过统计了两三年之后,这用户他还真是就点那个广告,点的到了我们的那个上限,那怎么办呢?哎,所以一般都有一个短时间内的统计周期,比方说你就在一天之内,当天点击超过了100次,那我就觉得你这个用户有问题,对吧?呃,这样的话可以给他做一个报警,总有一个这个,呃,就是对对这个有一个限定,那所以对于这样一个,就是当天我要统计他这个状态,统计他点击次数,那大家想是不是就到第二天零点的时候要把这个状态清空呢,那这。
06:00
那第二天零点的时候要清空状态,这个怎么去做呢?那就只能用一个定时器了,对吧?啊,所以大家看这里边我用到了状态,又用到了定时器,那怎么办?Process function对吧?啊,另外就还可以怎么样呢?我还可以用测输出流进行过滤,就是主流里边我输出的就是过滤之后的,对吧?因为这个类似于既然是过滤嘛,类似于一个filter一样,呃,类似于一个rich filter function一样啊,就是我可以把它做一个,呃,就是拦截对吧?如果说达到这种条件,我这个数据就不往后发送,不做后续的那个统计点击量,贡贡献这个点击量了,那但是如果说达到了那个上限呢,我是不是还要有一个黑名单的过滤,输出一个黑名单啊,那这个黑名单怎么去输出呢?用侧输出流对吧?哎,所以这个需求就是相当于把这个process function能干的事都干了一遍啊,定时器对吧,特殊出流,呃,状态编程所有的东西都都用上了,这个需求还是比较典型的一个例子啊。
07:00
大家可以稍作调整,就把它变成一个刷单行为的检测啊,这个其实还是很容易的,那后面呃,还有就是关于这个登录的监控,恶意登录监控,那这个就是如果短时间内频繁登录失败的话,比方说同一个用户可以是不同的IP,对吧,两秒之内连续,我们这个就比较简单啊,就是两秒之内连续两次登录失败,我们就直接报警了啊,这个次数就比较少,那大家想到这里边我们怎么样去判断这个呢?有点像我们之前讲过的那个,诶十秒钟之内温度连续上升之类的这种这种判断对不对啊,所以大家就想到了,我可以去用process function设一个定时器,对吧?哎,设一个两秒钟之后触发的定时器,然后呢,在这过程当中失败的行为呢,都存入到一个list state里边去,等到它触发的时候,看里边诶是不是有两次连续两次的登录失败啊,这是比较简单的一种实现方式啊,然后还有另外一种要给大家说的就是cepcp,它就是用于这种复杂事件检测的场景啊,啊,它相当于可以实现事件流里边的一个模式匹配,大家先大概有这样一个概念啊,就用用来做这个啊监控啊,事件检测啊,做这个风控CP经常会用到先有这样一个概念,后面我们讲到再详细展开,呃,那后面呢,最后还有一个拈是订单相关的拈,一个是。
08:26
就是支付的实时监控对吧?这里边主要有两个需求,一个就是说超时应该要要报警对吧,超时的话应该他这个状态是呃,要做一个调整的,另外还有一个就是进行一个一个实时对账对吧?这两个需求,那大家想到对于这样的需求我怎么样去判断呢?哎,那这个超时这类似于比方说下单之后15分钟没有支付做一个报警,那怎么办?定一个定时器嘛,对吧,如果要是你用这个process function的话,还是定一个定时器去做一个实现就可以了,那另外呢,同样也可以用cep这个库去进行视线流里边的检测,另外它还可以实现这个超时事件的一个检测啊,所以说呃,这个应用还是比较比较广的啊呃,CP基本上就贯穿了,贯穿了我们后边的这几个需求,最后还有一个就是实时对照,实时对账,主要说呢,这就是牵扯到了两条流的匹配,对吧?啊,那这里边我们给大家讲一个。
09:26
Connect合并两条流,然后再用这个Co process function去做一个匹配处理啊,当然有同学可能想到合并两条流,那有没有别的方法呢?之前我们讲过也可以UN,哎,但是我们这里边可能两条流的数据结构不一样,那就不能用union了嘛,那另外就是前面也给大家就是简单的提到过一个概念,就是可以做状语,对吧?两条流可以做状语,那这个场景适不适合状语呢?到时候我们可以展开再给大家分析一下啊,都给大家做一个整体的实现,这就是接下来我们要实现的所有需求,我们先整个的过了一遍。
我来说两句