00:00
我们现在已经了解了窗口函数中的两种类型,增量聚合函数和全窗口函数,它们其实可以说是各有特点,增量聚合函数呢,它的优势其实非常的明显,我们看到它相当于就是一个流处理,每来一条数据就会调用这里的艾特方法进行一次叠加计算,哎,那所以我们窗口在收集数据的过程当中,相当于就把大量的计算工作已经做完了,那最后窗口到达结束时间的时候要输出结果,怎么办呢?直接调get result方法,可能直接输出,也有可能做一个简单的转换计算就可以得到结果了,所以它的处理效率是非常高的。而全窗口函数呢,啊,那对应的啊,就有点像一个批处理,看起来比较笨一点,它会把所有的数据都放在这个elements这样一个集合类型里边,类型里边,然后呢,等到窗口达到了结束时间的时候,想要计算输出结果的时候,直接调用一次这里的process方法。
01:00
或者对于window function的话,那就是调用对应的apply方法,那在这里边统一进行处理,用out点进行结果的输出。但是它的好处在于我们在里边能够包装窗口的各种信息,如果我们使用了process window function的话,在这个上下文里边就会获取到更多的信息。另外我们还可以使用open close对应的这些生命周期方法,诶,这些都是可以的。另外我们看到process window方式里边还有一个clear方法,这个主要是用来清空当前的状态啊,就是如果我们使用了状态编程的话,那么在clear方法里面可以进行清空啊,这是关于全窗口函数和增量聚额函数它们之间的区别,以及它们各自的优势。那我们自然就想到了,诶,那能不能有一种方式让两者他们的优势结合在一起呢?当然有,这就是我们之前所说的啊,可以放在一起去调用,这种调用方式呢?哎,那现在推荐的方法是直接调用aggregate。
02:04
那么我们可以直接传一个参数,那就是传一个aggate function式,那如果说我们要传多个参数的话,那就是第一个参数是一个聚合函数aggregate方式,我们把它叫做预聚合器啊,就是预先先做一个聚合,注意这个聚合函数里边同样有get result方法,那我们想要输出结果的时候是直接调它吗?不是,这个时候呢,我们是要调后边的window function,哎,所以第二个参数呢,就是一个全窗口函数,当然这个全窗口函数可以是window function,我们看也可以是一个拉姆达表达式,这个拉姆达表达式本质上跟window function是一样的啊,因为我们知道这个process运动方式就不止一个单一抽象方法了,所以我们一般用的更多的是第二个参数使用一个process运读方式。它能够获取到的信息更多,那在这种情况下,我们具体的处理流程就变成了,每来一个数据就会调用agggate function的at方法进行叠加统计,增量聚合,诶,聚合完成之后,到窗口的结束时间的时候,这个时候我们接下来就要得到输出结果了,那那输出结果从哪来呢?注意我们是先调用。
03:17
Aggregate function get result方法啊,首先还是要先拿,拿到它,只不过呢,这个结果就不是直接输出到下游算子了,而是把这个结果输出给后边的process window方式。所以接下来的process window function里边,它的process方法,这里的输入elements是谁呢?我们本来认为这里应该是保存了当前的所有数据,窗口里边所有数据都收集在这里,现在就不是了,现在它的element里边就只有一个之前AG function里边给我们get result拿到的这个输出。所以现在就相当于它的输出变成了后边我们process window function的输入。
04:01
最后在这里我们结合当前上下文信息,结合窗口信息,最后再输出整个窗口的结果。这就是两者结合的一种调用。啊,这样去介绍的话,可能我们听着还是有点晕啊,呃,还是直接在代码里边来做一个实现吧,我们想到在之前实现的process window function的这个测试里边,其实就可以利用增量聚合函数做一个改进啊,就比方说这里啊,我们不要直接点process调一个点。这里调一个aggregate啊,那第一个参数呢,就可以传入一个增量聚合函数aggregate function,第二个参数呢,就可以传一个全窗口函数去进行一个包装,哎,那所以这种方式还是非常容易想到,那为了更清楚的说明呢,我们另外举一个例子,把这个场景再稍微的做一个改进,因为我们可以看到啊,在这里不管是我们测试全窗口函数的时候统计UV也好,还是之前增量聚合函数我们计这个PV和UV的比值也好,其实当时都是统计了所有数据,全量数据都放在一组了啊,所以当时呢,其实这个K我们没什么用了,最后所有的数据是放到了同一个分区,那当然了,如果说我们真的是针对全量数据去统计UV的时候,这个没有办法,只有把它全部都放在一起统计,才能得到最终的结果。
05:22
不过在实际应用的过程当中呢,一般情况我们要充分利用现在分布式架构啊,并行处理的这个优势,那可能往往还是先要指定某个K,先做一个分区统计,统计完成的结果呢,再把它合并在一起啊,当然最终还是把所有的要合并在一起啊啊,那接下来呢,我们就来分一个更加细致的场景,那就是我们不要统计全量数据了,我们按照不同的商品,或者说不同的URL,不同的页面来进行一个统计,诶这个的话,这就是更加标准意义上的PV的统计了。因为我们知道啊,呃,PV其实就是page view页面浏览的这个统计,哎,那我们现在呢,就可以根据当前的URL做一个分组,URL不就代表一个页面吗?哎,那所以每一个URL统计出它所有的访问量,注意现在就跟用户没有关系了,所有用户的访问都可以统计在里边,我们统计的是当前页面被点击的次数。
06:24
然后接下来呢,我们还可以包装上窗口的一些信息,哎,这样的话就得到了一个完整的输出,最后的效果相当于就是我们这里看到的啊,我们可以输出一个哪一个时间窗口,然后对应的每一个URL被点击访问的次数到底是多少啊,那对于这样一个需求呢,我们可以看到好像这个里边输出的字段就会稍微的有点多啊啊,我们想到输出的字段首先应该得有一个URL。然后呢,接下来还应该有一个时间窗口window信息,哎,那最后还应该有一个当前它的PV值,哎,或者说这个count值访问次数,那所以呢,我们就考可以考虑啊,使用一个样例类啊,或者Java里面的po类型,就可以把这样的一个数据结构包装起来了,当然我们也可以用简单的三元组啊,但很显然包装成样例类的话,使用会更加的方便,在skyla里面样例类的定义会非常简单嘛,也不耗费我们更多的时间,所以呢,接下来我们就另外创建一个。
07:28
测试的object,现在我们要统计的是每一个页面的URL被访问点击的次数,哎,那所以干脆我们就把这个叫做UR view count啊,这个是一个具体的案例了啊,我们叫做example吧。没方法,先写出来,哎,那首先前面的这些操作流程当然还是一样,我们直接把它copy过来,创建执行环境,然后直接使用我们的测试数据源click source作为当前的数据源,然后把这个下划线对应引入,后边还要提取时间戳,生成auto。
08:03
然后接下来我们就是结合使用。增量聚合函数和全窗口函数。包装统计信息。所以接下来我们最后的统计信息其实就是三部分了,URL,还有窗口信息,另外还有最后一个PV值,或者说count值,那我们就想到了,哎,干脆就可以在外边直接定义这样的一个样例类case class。定义统计输出的。结果数据结构。这个是一个case class,我们就把这个统计输出的结果就叫做URL。Will count就可以了啊,那当然了,里边需要有对应的字段,哎,那首先URL。这相当于是我们的K啊,当前统计的哪个页面啊,然后最关键的它应该有一个PV值,一个count值,我们还是把它定义成长整形。
09:02
然后后面还应该有窗口的信息啊,那我们这里边应该有window start长整型啊,如果是时间戳的话就长整型,如果说我们还想把它做一个转换,比方说啊,解析成年月日十分秒的形式,哎,那它可以是一个string啊,我们只要用相应的这个工具把它做转换就可以了。这里我们就简单一点吧,直接长整型输出,那最后还有一个window and也是一个长整型。这就是我们所定义的URL will cut。接下来我们就可以在这里直接基于stream去做一个KBY,现在的KBY,显然我们就要提取URL作为分组的键值。然后呢,我们当然就可以去定义窗口了啊,比方说我们当前还是一个十秒钟的窗口啊啊,当然了,前面我们如果是滚动窗口的话,这个输出的频率有点太低了啊,我们隔好久啊,隔十秒钟才会得到一个输出结果,所以这里呢,为了让它输出频次更高,我们可以来一个滑动窗口嘛,诶,那所以可以是sliding。
10:05
Even time windows.of这里就要有两个参数了。首先我们还是把对应的time要先引入time.SECOND10秒钟一个窗口。另外我们希望它。快一点输出,那就五秒钟输出一次吧,稍微的频率快一点,然后接下来当然就是掉一个aggregate,它里边现在要传两个参数。一个是我们定义好的预聚合器,也就是聚合函数啊,增量聚合函数,你有一个,呃,我们就把它叫做URL。Count。A,做一个增量聚合,另外我们再定义一个包装窗口信息的全窗口函数,哎,那我们把它叫做URL view count result。最后输出的结果就是它最后我们可以直接做一个打印输出env执行起来,这就是我们整个的程序架构,接下来的关键当然就是实现自定义的增量聚合函数和全窗口函数了,实现。
11:10
增量聚合函数。另外还需要去实现。全窗口函数。好,那么我们这里class URL welcome hg extend。Aggregate function。这里的agggate function,我们首先先把它引入,里边的参数类型我们还记得是三个,它是in ACC out,也就是输入类型和中间聚合状态的类型,还有输出的类型,输入类型当然是event了,诶,这里event我们需要做一个引入。中间的聚合状态呢?呃,其实这里我们的中间聚合状态就不用做任何的处理了,我们就非常简单,增量聚合嘛,那就是对于每一个URL我们已经分组了,那就每来一个数据,我就记一个数就完了嘛,所以我们当前的中间聚合状态就一个长整形的值,就是一个count值。
12:07
然后哎,另外后面我们输出的结果呢,当然也是一个抗值,剩下的那些。我们这里边的count值有了,剩下的URL从哪里去找?诶,那我们知道最后的全窗口函数里边,它可以提取当前K的信息啊,当然可以把URL拿出来,另外还可以拿到窗口信息,所以剩下的信息我们全部到后边的全窗口函数去进行处理就可以了。所以当前把它定义好,接下来就是实现对应的四个抽象方法了啊,首先创建当前的。聚合器create a cuumulator,那我们现在一开始的聚合器,当然什么东西都没有了,就是零嘛,0L,然后接下来艾方法,每来一个数据就调用这里的艾特方法,那我们其实就是加一嘛。ACC加一非常简单。然后我get result的时候返回什么呢?诶,那当然就是返回ACC嘛,我们当前统计出来是多少直接返回。
13:05
那墨呢,这里边不需要有任何的实现,所以我们看这个过程非常的简单,就是。每来一个数据。就加一。所以这个逻辑是非常简单的啊,那得到的这个结果get result,这会直接输出吗?当然不会,我们会把它传给后边的全窗口函数作为输入,所以这里的输入就变成了一个长整形,好,那接下来我们就得实现这个了,Plus。URL result现在是一个process。Window function。里边的参数我们也知道,它有四个参数,首先是输入,现在的输入,注意不是event,而是前面的输出,所以是一个长整性。那它的输出呢?哎,它的输出我们已经想好了,包装成这样的样例类类型吧,所以是URL view count。
14:02
直接放在这儿。后面还有两个参数,一个是当前K的参数,当前K是string。URL嘛,最后还有一个窗口的类型time window。哎,那所以把这些都定义好了之后,接下来就是要实现里边的process方法。这个process方法里边对应的这个数据,我们依次去提取就可以了。提取需要的数据。首先我们应该找当前的URLURL在哪里呢?其实我们知道URL就是这里的K嘛啊,如果我们清楚的话,其实这个URL这个变量都不需要定义了啊,我们甚至可以直接把这个K改成URL就可以了,后面直接拿就完事了,那然后接下来还有一个count值,Count值是在element里面,所以我们这里可以定义一下count等于elements啊,那有同学可能想到,哎,对呀,这里边我们直接elements.size不就是它的个数吗?注意不是,因为现在的elements里边它只放了一个长整形的值,就是之前我们统计出来的。
15:10
那个数量,哎,那所以现在它并没有保存我们的数据嘛,它这里边就有一个值,所以呢,我们不是要取这个集合类型的size,而是要取这个集合类型里边的唯一的那个数据,所以这里我们其实就是直接取它的第一个数,我们可以直接用迭代器点next把它拿出来就完事。那另外我们还应该有当前的。Start起始点和结束点,那这个我们已经非常熟悉了,前面也已经做过,直接用context.window里边去获取start和end就可以了。window.get哎,这样的话我们就捕捉到了想要的所有数据,那最后要输出的时候。输出。数据当然就是调用out.collect我们要把它包装成一个URL will count里面的字段,首先一个URL,然后一个count,一个start,一个and,哎,就是这样,非常的简单。
16:13
这就是我们完整的一个处理流程,所以接下来我们前面都已经写完了,也不再报错了,所以可以直接来运行一下。我们看一看处理的结果输出是什么样子。我们现在的窗口是十秒钟一个,每五秒钟会输出一个结果,哎,我们看到一下就输出了两个,因为我们现在是以URL作为T分组了嘛,所以每一个组,每一个分组对应的URL都会输出它的统计结果,所以我们看啊,每隔五秒钟就会有一堆数据输出,那我们怎么知道哪些数据是同一个窗口里边的呢?诶,那当然是看这个时间戳嘛,如果是同样的起始和结束的话,那当然这就是同一个窗口了,诶,那所以这些数据就都是同一个窗口,我们看就是这个3880000和3890000这个窗口十秒钟之间的所有数据。
17:09
当然了,这里边的个数各有不同啊,点后点击的次数就比较多,是三次啊,那别的呢,可能就只有一两次,所以我们就想到了,有了这个结果呢,后续我们还可以做更多的操作,比如说我们可以输出一下当前这段时间范围内的最热门的几个页面,这就是我们所谓的top n这样一个需求。这是我们统计里边非常经典的一个需求啊,但是现在呢,我们可能还不知道该怎么做,在之后的章节里边,我们会详细进行讲解。这就是关于窗口函数的整体用法。
我来说两句