00:00
前面我们利用mapate和定时器模拟了一个滚动窗口啊,那当然了,这个滚动窗口主要是基于时间的,这是一个固定时间长度的时间窗口,那我们自然想到了,能不能实现技术窗口呢?哎,当然也是可以的,而且技术窗口其实更加的简单,我们自然就想到了,那其实就是来一个我们就加一,来一个count加一,然后当当前的个数达到我们的限定的。技术窗口的长度的时候,然后直接就输出结果不就完了吗?连定时器都不需要用了,更加简单啊。所以接下来呢,我们就可以直接使用一个value state去进行一个实现,当然了,我们也可以直接使用一个聚合状态,因为每来一个就加一嘛,我们自然想到了,那这个用reduce function或者是aggregate function就更容易实现。所以接下来呢,我们所举的聚合状态的应用实例就是实现一个类似技术窗口的功能,我们对于用户点击事件流里边每五个数据到来之后,我们就技算一次。
01:06
而且这个呢,我们计算的稍微复杂一点,我们要统计一次这五个数据的平均时间戳,也就是按照用户去进行分组啊,那统计每一个用户过去五次点击访问事件平均时间到底是什么时候?诶,那这个大概能说明一个就是用户最近一段时间内访问的频繁程度啊,所以也是有一些具体的分析意义的啊,那所以接下来我们就在代码里边利用聚合状态来对这个需求做一个实现啊,那当然了,前面我们也提到这个过程当中没有涉及到定时器,所以说我们也没必要使用process function啊,直接使用一个rich Fla map function就可以直接搞定好,所以接下来直接还是新建一个SC的object。那这里我们要统计的是平均的时间戳,所以我们可以把它叫做。
02:02
Every time stamp example。没方法写出来啊,那前面当然还是完全一样啊,先get execution environment,因为。把下划线对应的引入DV set parallelism1。因为接下来我们就直接at source,还是以我们自定义的click source作为测试数据源,后边去提取时间戳,生成水位线啊,Time Sam提取出来,然后接下来当然就是KBY了,我们现在是以用户作为分组的标准,那就是下划线点user,接下来直接做一个flat map,里边要实现的是一个rich flat map function啊,我们把它叫做。Average timemp啊,我们就叫做avg time吧。后边直接把结果做一个打印输出env执行起来,这就是我们整个流处理的程序的基本结构。接下来就是实现。
03:03
自定义的rich。Flat map方式。Plus average time stand。Extends rich flat map function。那它的泛型参数有两个,就是input output啊,输入输出。输入当然还是event,输出的话,我们还是包装成STEM吧,一句话一条信息打印输出啊,这里必须要实现的是一个Fla map方法,然后接下来呢,因为我们想要自定义状态,做状态编程,去统计每五次访问的平均时间戳,哎,那所以这里边我们要去定义一个聚合状态。我们还是用lazy的方式,哎,直接定义这个我们就叫AV gts time stamp,聚合状态a j state。那它的类型是一个aggreating state。
04:01
同样,这里边它也需要有泛型参数,主要就是input output输入和输出,这里我们的输入当然还是event,那它的输出是什么呢?平均时间戳,哎,那这个时间戳的话,我们当然还是直接给一个长整型吧,或者如果我们想计算最后得到一个double类型的话,那就输出一个double也是可以的啊,我们这里就直接用长景音好了。那它的获取还是get runtime contact,然后get a state,这里边我们需要去new一个。Aggreating。State script。啊,那这个script里边需要有三个翻新参数,好,那这个就是input output,前面我们都已经有了,接下来中间最关键的还有一个acccc ACC是中间聚合状态的类型,哎,所以接下来我们求这个平均时间戳的话,需要什么样的中间状态呢?诶,那其实我们知道啊,就是每来一个数据之后,我们要做什么样的聚合呢?啊,那如果是求平均值的话,我们知道它其实最后就是所有数据的总和上值。
05:13
然后再除以所有数据的个数,除以一个count值。啊,那对于这两个值而言,都可以每来一条数据就直接做一个叠加聚合啊,那比方说sum的话,那就是每来一个我们就把他们STEM提取出来,做一个叠加嘛,那countt值呢,更简单,每来一个直接加一嘛,所以中间的状态我们其实可以保存sum和countt的二元组,把它们保存起来,每来一个就更新这两个值,然后接下来呢,等到最后想要达到,比方说已经有五个数据了,达到这个五个数了,接下来我们就sum除以count得到平均数输出就可以了,哎,这就是我们整体的一个想法,所以这里我们需要去写的类型输入还是。
06:01
中间的聚合状态呢?两个长整形,一个some,一个count。最后输出一个长整型了,那然后接下来这个描述器script里边需要有三个参数,第一个参数是名称啊,那这名称的话,我们就还是叫avgts吧,平均时间戳。第二个参数,注意这是一个aggregate function式啊,那所以我们要你用一个aggregate方式。里边的类型还是input ACC还有output啊,那这个类型其实跟外边我们这个script这里边指定的是完全一样,可以直接copy过来。然后里边必须要实现的有四个抽象方法,Create accumul,哎,创建的这个累加器,初始的时候当然就是零了。0L0L。初始的sum是零,哎,那count值也是零。
07:02
接下来第二个抽象方法,爱的方法,那就是每来一个数据的时候,我们需要怎么样去聚合状态,哎,那就是ACC有什么样的变化啊,那这个其实也很简单,ACC有两个字段吗?这个二元组第一个字段。在之前的萨的基础上。下划线一的基础上要加上当前数据的time STEM。而第二个字段呢,哎,那就是ACC。在之前的下划线二的基础上再加一就完事了。然后接下来get result,这就是最后输出结果的时候,我们要怎么样去进行计算啊,那我们现在呢,其实就是可以取这个中间聚合状态里边的第一个字段除以第二个字段就完了啊,这里我们还是一个整数除法啊,直接ACC点下划线一除以ACC点下划线二,这样的话得到的结果直接输出。啊,那关于墨的话,我们现在不涉及到会话窗口,那当然就没有必要实现对应的这个合并方法了,如果说我们想要实现的话,那也很简单啊,那这里边有两个状态怎么合并呢?那不就是下划线一加下划线一,下划线二加下划线二嘛,两个字段分别合并就完事了。
08:17
啊,这就是关于我们中间所定义的这个agate function,注意后边还得有一个参数在aggregating state script里边,它的泛型参数三个in ACC out,呃,我们需要注意啊,就是在它的构造方法里边,这里边最后一个参数指定类型的时候,只需要指定中间聚合状态,累加状态ACC的类型啊,那所以这里边我们的这个类型其实就是。这里的二元组类型吧,Long long啊,那这里边做一个class of。把这个二元组类型放进,哎,这就是完整的定义,稍微有一点麻烦,关键就是搞清楚类型到底是什么,然后把中间的aggregate function要实现出来。
09:06
这就是我们关于状态的一个定义。定义完聚合状态呢?那接下来我们就可以考虑处理flat map里边的具体逻辑了啊,那这个具体逻辑其实就是每来一条数据,我们就把这条数据直接添加到聚合状态里边,那到时候调用的其实就是aggregate function里边的ADD方法,那这两个状态就都更新了,然后呢,等到当前的个数达到了五个,这个时候我们就直接调用get result方法,直接输出结果就完了。但是这里还有一个问题,就是我们在这里想要判断当前已经达到了五个数的时候,那是得直接找到这个聚合状态里边它的ACC累加器里边的第二个字段,这个时候我们才能知道当前到底是一共有几个。
10:02
而这个累加器呢,我们在外边对于这个aggregating state而言,它并没有对应的方法能够让我们直接获取到内部的累加器状态到底是多少,哎,它只能是调用,我们看这个aggreg state啊,它本身继承字marging state,然后呢,继承字ending state,那这里边其实只有get和AT2个方法。那所以我们获取不到里边的ACC状态到底是什么,那该怎么办呢?哎,那这个时候我们就干脆再用另外一个值状态value来单独的保存一下,当前一共有几个数,每来一个加一几个数达到五次的时候,哎,那这个时候就把之前我们聚合好的结果直接拿出来就完了。啊,那所以这里边看着有点麻烦啊,主要就是因为aggreg state没有给我们提供对应的这样的方法啊,那所以这里边我们在。定义一个值状态。
11:02
保存。当前已经到达的数据个数。那这里我们还是用lazy的方式去做一个声明啊,它它其实就是一个count值,我们就把它叫做count吧。Get runtime context,然后直接get state,你有一个value state script,那这里呢,它要保存的值其实就是一个长整型的count值嘛,所以就是long。然后我们就把它叫做count class of law。诶,这就是我们想要保存的信息,所以接下来呢,在Fla map里边,我们就是每来一条数据就要做一个判断,然后进行状态的更新啊,那所以首先来了数据之后啊,其实这个非常简单,就是a state直接ADD当前的数据就可以了。然后另外呢,注意还得。更新count值。
12:00
更新count值,哎,这个就是我们先拿到当前的count到底是多少con.value。然后呢A呢,就是countate去做一个update,这里边我们不用去判断它到底是不是初始值,因为我们知道valueate啊,长整形初始值就是零,所以这样的话直接获取,然后去阿佩它加一就完了。Can't加一。然后接下来呢,哎,那就是要判断。是否达到了?技术。窗口的长度,也就是他的size啊,五次如果达到的话,那就输出结果,没有达到的话继续啊,就什么都不做,所以我们就判断用countate拿到它里边的值,判断一下是否已经等于我们现在指定的五,哎,那当然了,如果说我们想要跟之前的这个滚动时间窗口类似的话,想把这个参数传进来的话,也是可以的啊,这里我们就直接写死在这里了,等于五的时候做一个判断。
13:08
那如果等于五的话,我们这里就直接输出对应的结果collector.collect来做一个输出。啊,先说一下到底是哪个用户,我们直接从数据里边提取点user,然后说它的。平均时间戳为。那接下来的平均时间戳还是在aggregating state里边要去做一个获取,直接点get拿到就可以了,哎,这就是我们最后输出的结果。所以这里就类似于我们所说的计算窗口结果。并且输出,哎,那窗口输出了计算结果之后,接下来还应该做一个窗口的清理,窗口的销毁啊,那所以这里。我们直接做窗口销毁的话,那就应该把当前的统计的这个count清零就可以了,哎,那所以这个清零的话就直接countate.clear把这个直状态value清空,接下来下一个数据再到来的时候呢,哎,那就count又从零开始重重新累加了。
14:19
这就是我们整个的处理流程,那接下来我们还是直接运行来测试一下,看一看效果到底是怎么样的。我们现在是每五个数要输出一个结果,所以应该按之前我们click啊,每一秒输出一个数的话,应该是五秒钟会输出一个结果。然后我们看到输出了一个Bob,前五秒钟应该是只有Bob的信息啊,他的平均时间戳是啊48410 Mary的平均时间戳51235。哦,那有时候呢,我们可能就会连续输出两条信息啊,那就是Bob和Mary他的平均时间戳。我们会发现这个时间是不太均匀的,因为我们当前是一个技术窗口,每五个数就会统计输出一次当前的结果,啊,这就是我们当前能够看到的效果啊,啊,那如果说我们想要看的更加明显一点的话,我们也可以在代码里边把对应的信息更多的输出一些,比如说我们在这里可以把每一条输入的数据。
15:25
都打印出来,哎,我们在前面这里啊,读取数据源之后,把它叫做stream,然后呢,呃,经过处理转换之后,最终的一个打印结果,这是result,然后我们可以把每一条数据本身也做一个打印输出,这个其实就相当于是我们的input了,这个时候我们再来运行一下。就可以看到每一次输出,它对应的前面的输入到底是什么样的啊,我们看每隔一秒钟会有一条数据生成。啊,我们看现在一直都没有输出的数据。
16:00
那是要等到某一个用户有五条数据到来之后才会有输出,所以我们现在可以看一下carry的第一次输出的结果,那那其实就是凯瑞已经有五条数据到来之后,第五条数据到来之后,把前面的五个数据五个时间戳做了一个平均的统计,这就是我们能够看到的这个效果啊,这样的话就看的比之前会更加的明显,那每一次输出的时间点呢,确实都是不均匀的,因为我们当前都是随机生成嘛,按照每个用户他有五个数据的时候,这个时候就会统计一次。这就是关于聚合状态aggregating state的一个具体的应用案例。
我来说两句