00:00
关于处理函数这一部分呢,我们已经了解了基本处理函数process function,按键分区处理函数的process function,还有窗口处理函数,那就是process window function和process or window function。除了这几种之外呢,我们知道处理函数的分类里边主要有八种啊,那另外还有四种涉及到的是多流转换之后合并两条流之后的一些操作,哎,所以呢,我们会在下一章继续进行展开,现在呢,我们重点是要考虑在什么场景下要用到处理函数呢?呃,其实前面我们说啊,处理函数本身。这是我们底层的一个接口嘛,所以它是我们处理问题的大招,理论上来讲什么需求都能用它搞定,诶,但是有一些场景我们会知道啊,它本身比较简单,像之前我们举的这些例子里边,呃,我们知道如果统计这个PVUV,统计某段时间内啊,用户的访问量,或者说经过去重之后的独立访客数到底有多少,这些我们都可以直接用一个简单的增量聚合把它搞定啊,或者即使是我们有一些其他的需求啊,想要包装一些窗口的信息,也可以使用一个增量聚合加全窗口函数直接把它搞定,诶,那什么时候非用处理函数这样的大招不可呢?
01:20
诶,那我们自然就想到了,就是有一些非常复杂的需求,我们可能就非用它不可,比如说之前我们提到的,这里我们只统计这个URL,每一个URL当前在一个窗口内被浏览的次数,统计了它的活跃度,诶,那我们每一个进行输出之后会发现啊,每一个URL就对应着一个数字,一个count值啊,那另外一个URL又对应着一个count值。我们最后能不能把这一个窗口之内的所有URL进行一个排序,然后输出当前活跃的前几名呢?哎,所以这其实就是一个非常经典的统计指标,那就是top n。
02:02
比如说诶,我们针对当前的一个电商网站啊,就可以去统计一段时间内最热门的商品啊,那一般情况商品用什么来代表呢?诶,当然就是看这个商品详情页被浏览了多少次啊,我们就可以统计URL被点击访问的次数啊,那所以呢,呃,一般情况如果在实际应用的时候,统计的可能是一天啊,或者说一个小时,至少是这样的一个长度啊,现在我们的数据比较少,要做测试的话,那我们可以缩小一下时间范围,比如说我们就直接统计最近十秒钟内最热门的两个URL链接,然后每五秒钟更新一次,诶那我们知道这个每五秒钟更新一次,这其实是代表了当前的输出频率嘛,那这个需求很显然就可以使用一个十秒长度和滑动不长是五秒钟的滑动窗口来进行实现。那这里的这个热门度当然就直接可以用URL的访问量来进行表示了啊,那所以接下来我们就考虑怎么样实现这样的一个top n的问题,哎,那所以对于这个问题而言,前面我们其实已经做了一部分操作了啊,那这不就是我们这个URL count这个例子里边去做的事情吗?我们针对每一个URL进行一个分组开窗,然后进行聚合统计,这里边我们同时用到了增量聚合函数和全窗口函数,把最终每一个URL在当前窗口内统计出的访问频次,它的活跃度用一个。
03:32
URL view count这样一个样例类的形式来进行了一个表达啊,那所以这里面我们其实已经知道当前窗口内每一个URL被访问了多少次了,所以接下来我们要做的其实就是针对输出的所有的这些URL view count按照count值做一个排序就可以了。但是这个排序怎么办呢?这个稍微有点麻烦,因为我们知道前面做了这个KBY开窗之后啊,它的输出当前在这个窗口里边,我们这里做的所有操作都是针对当前K的。
04:06
所以最终呢,这个count就只有一个值,我们现在能够访问到,能够获取到的肯定也就只有一个值,所以我们现在是要把之前输出的结果再进行一个收集,再进行一个处理,才能够进行排序。这个就稍微有点麻烦了,那为了解决这个问题,我们可以考虑,那干脆啊,我就不要先按之前这个按URL进行一个分组了,我们干脆直接上一个大招。就是什么呢?我们可以直接基于datascript开窗,WINDOW2把所有数据都收集在一起,然后接下来调一个点process方法。我们应用一个。全窗口函数process or window function。把所有数据都收集齐了,都放在里边了,那接下来诶,我们该怎么统计,怎么统计嘛,先把个数一个一个都加起来,然后放在一起,再做一个排序输出就可以了,哎,所以这就是我们想到的第一种方法啊,非常简单粗暴的使用process or window function,好,接下来我们就来具体做一个实现。
05:10
我们新建一个scalela的object啊,这个是主要想实现top n的需求,而用到的是process or window function,所以我们叫做process window。好,Main方法里边首先还是我们非常熟悉的创建执行环境,然后读取数据源啊,分配当前的时间,戳生成watermark啊,所以这个都是非常类似啊,我们直接copy过来,然后我们还是把上边的stream execution environment改成下划线,引入相应的影视转换。那接下来我们要做的操作其实也非常简单,那就是。不做KPI分组,直接开窗统计。啊,那当然了,为了稍微的简单一点,我们会发现啊,在这里别的数据信息,在我们这个event里边别的信息都没用,我们现在只关心URL,就它什么时候来的,呃,也不关心了,因为当前我们已经做过了时间戳的提取,那时间戳呢,就相当于已经成为了我们每条数据上的一个附加字段,作为一个烙印已经印在上面了啊,那这个当然就不需要了啊啊那另外关于user这个也不重要啊,我们不关心到底是谁访问的,只关心每一个页面到底被访问了多少次啊,那当然我们现在并不是统计的都是商品页面,这相当于就是top n的活跃页面的统计,并不是top n商品的统计啊,这这个是可以按照我们的需求再做一个截取啊,我们这里就不做更复杂的筛选了,直接把对应的URL拿出来就完事了,所以这里边呢,我们可以做一个简单的map转换。
06:49
直接提取出所有的URL就完事,接下来直接开窗,WINDOW2,按照之前我们所说的需求,诶,这里我们要实现的是一个sliding event time Windows。
07:01
是一个滑动窗口里边传入当前的窗口大小,我们先引入。In time.time seconds,十秒一个。滑动步长是五秒钟。赛组。然后接下来诶,那就是直接做一个process操作,里边就是我们想要实现的。Process or?Window function。我们这里就直接用一个匿名类来进行实现啊,那前面我们只讲了process window方式,没有说process or window方式,其实这个实现是非常类似的,我们可以看到它本身也是一个抽象类里边的参数类型呢。比之前的process window function就少了一个,因为当前没有K嘛,哎,所以就是输入输出,还有当前的window类型,然后同样它也是继承自抽象的rich方式,这个抽象类,然后接下来必须要实现的抽象方法叫做process,所以整体来讲是一样的啊,可以拿到当前的所有数据,Elements,另外有上下文,还有一个alt,就是少了那个K而已嘛,所以接下来我们这里边写的输入的数据类型,因为我们已经把它转成URL了,那那当然输入的就只剩下string了。
08:14
输出的数据类型啊,这个我们也简单一点,干脆就直接打印成具体的信息展示输出吧,控制台打印输出,所以就是string,最后包装成信息输出啊,那最后window信息的话,窗口信息的话,当然就是time window,把对应的类要做一个引入。里边必须要实现的是一个process方法,在这个process方法里边,其实就是所有的数据都在elements里啊,我们想要的数都在这儿了,那接下来该怎么办呢?Top n,其实前面我们说了,主要就是两步走嘛,第一步是先要完成之前URL count这个例子里面我们做的事情啊,先要按照每一个URL分组统计出它的个数到底是多少。然后啊,那关于这个窗口的信息我们不用管,因为窗口我们已经开了嘛,接下来我们是直接统计每个URL的个数,然后把它做一个排序就完事了,哎,所以整体来讲,我们这里就是两步走,第一步。
09:13
统计。每个URL的。访问次数。相当于就是做我们之前的URL view count这个例子里面的事情,诶,那这个时候我们该怎么去做呢?嗯,那如果要自己实现的话,很显然我们就应该有一个k value这样的保存了,哎,那就是每个URL。这就是一个K。然后它对应的count值,那就是一个value。那什么样的数据结构能够实现简单的k value这样键值对的保存呢?诶,自然我们想到了,那就是哈希表吧,我们直接定义这样的一个哈希map,它就可以实现K86的保存,那接下来我们要做的事情就是直接初始化一个map。
10:02
以URL作为K。以它的count值作为。Value,哎,这就是我们想要保存信息,统计URL访问频次的这样的一个数据结构啊,那所以接下来我们就直接把它叫做URL count map吧,啊,它的key是URL value是count。接下来我们直接用scla当中给我们提供的map数据类型啊,那当然了,默认SC里边的map呢,我们看预定义的是不可变的map类型,那我们这里要的呢,其实是可变的map类型,因为我们当前在统计的时候,Count值肯定是会发生变化的嘛。所以我们这里可以指定一下我们要用的是multiple下边的map啊,这里我们引入scla collection multiple,哎,那这样的话,接下来我们就可以直接去更改里边的值了啊,那对应的K和value的数据类型呢,K是string。
11:02
URL,那value count值,我们给一个long,先创建这样的一个空map,然后接下来就要从elements里边去读取数据,依次写入到map当中,把它的个数依次叠加,来一个数就加一,来一个数就加一啊,那所以这个过程也非常简单了,我们直接可以调用当前集合类型的for each方法里边去实现一个拉达表达式,每一个data来了之后,哎,那我们干脆这里判断一下当前这个map里边,里边到底有没有这个K嘛,所以呢,我们可以直接调用map的contains方法去判断它是否包含当前的这个K啊,那当然了,也可以用另外的一种写法,就是可以用这个模式匹配啊,就是我们直接在这里去。Get当前的K的值啊,我们当前get当然就是data了,我们本身data就是URL嘛,直接get,然后接下来做一个模式匹配match。那么如果说当前有值的话,那得到的应该是一个萨,我们把它叫做count。
12:05
如果当前的K已经存在,哎,那么我们要做的就是在它之前的值的基础上加一,然后再写进去,所以这个map做一个put操作,里边K还是当前的data,那么值就变成count加一。当然了,如果说没有值的话,如果是那的话。那么我们要返回的当然就是直接PUT1就可以了。诶,那所以其实这个跟我们直接做一个一判断啊,判断是否contains这个K啊,如果要是没有的话,直接负一,有的话再获取出来加一,其实是一样啊,做完了这一步操作,那接下来就是第二步,那就是对map当中的所有数据进行一个排序,提取它们当中最大的两个值了啊,那所以当前我们就是。对。数据进行排序提取。
13:02
对于map来讲,如果我们直接对它排序,这个方法不太好调用,那很显然,如果说我们能够把它转换成一个list的话,转换成一个列表集合类型的话,那就可以直接对它进行salt方法的调用了吗?所以我们可以对URL count map做一个to list转换,得到列表之后呢,就可以直接调用S方法了,哎,那我们这里可以直接SBY,然后接下来指定的字段当然就是下划线二,那就是第二个count值作为我们排序的依据。那当前这个thought by,我们知道默认情况下,它其实应该是从小到大的一个升序排列,那现在要降序怎么办呢?呃,当然了,我们可以直接在后边再做一个reverirs反转,这当然是可以的,那不需要这么麻烦,我们可以直接在前面加一个负号就完了嘛,那这样的话,本身这里是从小到大的一个排列,那如果我们加了负号的话,那相当于就是从大到小。所以排好序之后,接下来我们就可以直接take前N个,我们当前呢,少一点啊,就取前两名,那就是TOP1和TOP2,最大的前两个count值的对应的URL信息我们就都提取出来了。
14:14
接下来呢,我们可以把它做一个保存,这个命名就把它叫做URL count list的吧,这是已经排好序的列表。接下来呢,呃,那我们还应该把这个数据做一个包装输出,因为我们最后要的是一个string类型的信息,直接在控制台打印的,所以这里还有一步,那就是包装信息打印输出。所以这里边我们可以直接定义一个result啊,因为我们的信息可能比较多,所以呢,我们在这里构建一个string builder,然后在后边不停的添加信息就可以了啊,就是构建一个我们想要的打印输出的信息,那首先呢,我们是按照当前窗口一个一个输出信息的,首先我们应该有一个窗口的信息了,Resultend一下。
15:03
我们做一个字符串,差值当前的窗口是什么?那主要窗口的信息,就是窗口的起始点和结束点,那从哪里去拿呢?当然是上下文,哎,所以context。点window.get start,这是窗口的起始点,然后加一个波浪线,接下来是窗口的结束点,同样是contacts window.get哎,这就是我们初始的一个信息,然后接下来呢,哎,我们为了方便看的更清楚一点啊,可以加一个这样的相当于分割线一样的信息。后边加一个换行符,然后接下来那就是打印当前窗口内的TOP1 top2对应的URL是什么?Count值是什么?我们想每一行输出一个,那应该用一个for循环来实现对应的需求。所以接下来那就是定义一个循环变量。我们这里其实就是要便利URL count list里边的所有数据,那但是呢,我们还需要去包装上,比方说当前是top几,它的排名信息也要有,所以最好我们还是用一个索引来进行便利吧,啊,那这里边如果使用索引的话,也不用从零开始了,我们直接可以用URL count list的。
16:17
Indic,哎,这就直接获取它的索引列表。然后接下来里边要做的信息,哎,那首先我们应该拿到当前的这个数据啊,现在我们的数据其实都已经包装成了一个URL,一个count这样的二元组,所以我们就把它叫做ta吧,然后从URL count list里边去拿到它的第个数据,然后接下来那就是result后边不停的添加我们想要的信息了,诶,那首先呃,我们就是。浏览量。Top,那top几呢?后边我们跟上Dollar I加一来,注意I索引位置是从零开始的,我们现在要的top几当然是从top一开始了,后面可以空一格,然后继续end。
17:04
接下来就是URL到底是什么,当然就是。二元组的第一个字段就是我们想要的URL,那后边当然继续跟就是浏览量到底是多少。那这个是DOLLAR2元组,二元组的第二个字段就是count值,好,那当前信息输出完了,我们换一行,这样的话就把所有的窗口信息都已经包装好了。这个信息已经有了,之后呢?接下来注意,这只是得到了一个string builder,我们当前的窗口,想要输出这个信息的话,那是得用out.click方法,所以后边还得有一句。out.collect把result做一个输出,当然了,它本身是string builder,我们还得调用to string方法得到string进行输出。这样的话,我们这个窗口算子的输出就已经定义好了,那如果说想在控制台看到具体的信息的话,那最后还应该定义一个流处理的think算子,点print啊,那最后不要忘记执行起来,这就是我们完整的流程。
18:15
好,那接下来我们可以运行一下,看看得到的结果到底是什么样子。我们现在应该是十秒钟一个窗口统计,当前十秒钟内的信息,五秒钟就会输出一次啊,那第一个窗口输出的信息当然就会比较少了,后面我们看的话,诶,这个窗口信息就会稍微多一点,我们看前两名就都输出了第一个窗口,甚至连呃第二个URL都没有啊,只有TOP1,然后后边我们看到。统计的数据就会越来越多,浏览量也有可能会越来越大啊,我们看到确实是第一名的浏览量会更大一点,前面都是并列的啊,后的话我们就看到有可能第一名是三次,第二名是两次访问。这就是关于热门页面top n的一个实现,我们使用了process or window function做了一个统计。
我来说两句