00:00
前面我们已经介绍过了,Value state list state以及map state,这三种状态在实际应用当中其实非常的常见,使用也非常的简单。除此之外,Flink还给我们提供了另外一种比较特殊的k set,那就是所谓的聚合状态。前面我们已经提到了,聚合状态里边其实有两种,Reducing state和aggreg state,他们的特点可以认为就是一种特殊的value state,它使用起来的话就像是只是一个值而已。只不过呢,这个值比较特殊,它不是直接更新的,而是之前每一次传来的值经过聚合计算之后得到的一个结果。而reducing state和agggating state,它俩的区别就在于,Reducing state要传入的是一个reduce function,做一个规约,而aggregating this呢,它要传入的是一个aggregate方式,做的是一个更加一般化的聚合。
01:02
所以我们就自然能想到,在实际应用当中,Aggregating state会更加的灵活,它的应用也更加的广泛。所以在这里我们就以aggreating state为例来讲解一下聚合状态在实际项目当中的应用。在这里我们可以举一个非常简单的例子,就是还是基于前面我们的even的数据随机生成之后,然后每隔五个数据去统计一次当前五个数据的平均时间戳,然后看一下呃,他们的这个数据的疏密程度是怎么样的,这个就可以说是一个非常简单的应用,我们看一下,看一看使用are state怎么样来实现这样一个功能。好,那接下来我们就在代码里边直接去创建一个plus。这个就叫做。Average统计平均时间戳time stamp example。
02:01
同样,还是把那方法先写出来。Exception,那这个处理的流程,前面的数据源,包括这个环境的定义,显然跟前面还是一样的。所以我们可以直接把。Window这一部分,前部前面这一部分代码都copy过来。当然了,最后的因为execute执行起来这个也是完全一样的,直接可以放在这里,然后接下来关键就在于。我们需要去做一个对应的实现了。自定义实现。平均时间戳的统计。那这里区别在于我们自然能想到啊,如果开窗口的话,那是跟时间有关,我们现在每隔五个就要去做一次这样的一个统计,那这其实跟时间没关系,它其实是跟数量有关,所以这里有点像是一个技术窗口的一种感觉。
03:04
那如果要是说是技术窗口的话,我们能不能直接就调用一个count window,然后里边传入一个agg function去做增量头就可以了呢?很可惜,Count window里边没有对应这样的接口,我们之前所有的关于这个增量聚合函数和全窗口函数的传递传入都是基于时间窗口而言的,所以现在我们就只能自己去实现了。那现在我们就是自定义实现这样的一个过程,Stream首先先做一个K,我们要统计的是到底是按照用户他点击的次数去做划分呢,还是按照每个页面URL被点击的次数去做划分呢?这里我们还是就是使用user作为key,就是统计每个用户他点击的频次,然后看一看它的访问数据疏密程度是什么样的,这就代表了,诶这段时间用户到底有多活跃,哎,这个是比较有实际意义的。
04:02
那这样的话,我们就。Data的user。我们先把它定义出来,然后接下来我们自然想到了,那可以直接去还是用process啊,定义一个process就可以实现了,但是仔细一想的话,这里面我们只跟个数有关。跟时间没有关系,那之前我们必须要用kid process function,一般都是不光用到了状态,还用到了定时器,这个时候是没有办法,那现在不涉及到定时器,那是不是就可以不用process function的呢?那确实是的,我们这里面只要做一个简单的转换,但是要自定义状态就可以了,所以我这里可以直接使用flat map这样的一个算子。啊,那里边我们自然想到了,需要去自定义一个Fla function,这里边我们肯定要用到状态,所以一定是一个Fla function。
05:01
我这里可以定义一下,这个叫做avgts。我们还是把最后的结果直接定义成string类型,然后直接打印输出就可以了。所以接下来的关键就在于。实现自定义的。Rich。Flat map。Function。好,那接下来public。Static plus。AV gts result。现在我们需要去。继承flat map function这样一个抽象类。哎,那这里面flat map当然就只有输入输出两个类型了,输入当然是输出的话,我们直接定义成了词。里边必须要实现的有一个flat map方法。那在这个过程当中,我们需要结合一些状态来进行计算。
06:04
那需要定义哪些状态呢?首先我们应该需要去定义一个聚合的状态。定义一个聚合的状态。用来保存。平均时间戳。那这个聚合的状态,我们就干脆直接把它定义成aggregating state就好了啊,这样的话就省得我们再去在外边去做这个复杂的计算了啊,就是所有的计算都以这个everydaygggate方式里边的定义为准就可以了,我们的这个方式就是用来计算平均数,那这里面我们就直接定义一个。Aggreg it。A state,我们知道有两个。当前是有两个泛型的,它的泛型呢就是input和output,哎,所以这里边并没有涉及到当前聚合状态的类型,我们只要把这个input output写在这就可以了。
07:07
啊,这里还需要注意的一点是,当前的输出其实并不是整个rich map的输出,而是我们当前用来做这个聚合计算的输出。哎,那我们计算这个平均时间戳,最后到底要保存一个什么呢?哎,那可以自己定义了,我们如果想要保存double,那就把它定义成double,如果说我们也不需要double,直接取整就可以,那就时间戳嘛,直接取long就好了。长整形的话,我们还比较方便把它转换成一个可视化的时间啊。那接下来我们把它定义出来,Avgts,呃,我们叫AJ。State aggreg state,延禧,另外,除了它之外,我们会发现还应该得有一个状态,因为我们有这个count的限制。每来一个数就要统计加一,来一个就要加一,等到加到五的时候,按照我们这个要求,那是啊,相当于是一个五传一个五这样的一个技术窗口,那所以如果要有这样一个值的话。
08:12
我们当前还应该有一个属性了。Private。长整形。Count。然后哎,我们可以直接把当前的。构造方法要列举出来。当前传进来的话,哎,我们当前就是要以这个count作为一个标准,然后去判断,达到这个count值的时候,就要去输出结果了,就要去把当前的平均时间戳要输出了,哎,那所以我怎么能知道到底有没有达到这个count值呢?那还是需要一个状态来进行统计的。那就是保存当前用户已经访问了几次。那当前这个状态,当然就用一个value就可以了。
09:01
定义一个值状态。保存。用户。访问的次数。所以这里面我们单独的定义一个value state。里边当然就是长整形啊。当前这个我们直接就叫做好了。所以接下来我们需要在open生命周期里面。生命周期方法里边,把对应的这两个状态都要获取到它的状态控制距啊,所以首先这个。AV GT state啊state这个比较麻烦一点,我们知道。Time contact这个时要getgg这1STATE。然后这里边我们看到中间这个类型是object,它本身应该是我们定义的aggregate function里边。
10:10
中间聚合状态的类型,那我们现在要计算一个平均数,它中间的聚合状态应该是什么呢?哎,我们会发现啊,中间聚合的状态并不一定要跟最后的输出结果一样。我们当前做这个聚合的时候,其实是需要保存两个值的,一个是当前的。个数当前的次数,那另外一个应该是。啊,当前所有已经拿到的数据,时间戳的总和,所以这里面我们应该保存两个数,然后最后你要获取它平均值的时候,两个一除就完事了。所以我们可以把当前的类型。定义成一个二元组就是中间状态,定义成一个二元组。啊,那么里边当然就是两个成整形了,一个是当前的次各式,一个是所有时间处的和,所以就是一个sum,一个count。
11:08
这个定义完的话,里边就需要传入三个参数。这里边我们可以空行来写,首先是当前这一个状态的内,它的名称我们就把它叫做avgps。然后第二个参数,对于aggregating c的script而言,第二个参数是一个aggregate方式,最后是那个类型,类型是ACC的类型,也就是中间聚合状态的类型,这个不要忘记,所以我们可以先把第二个位置先空下来,然后最后一个位置,那应该是。Tips。点。元组类型啊,然后type。
12:00
电长整形,然后第二个位置也是长整形。这是我们要去处理的这个过程啊,三个参数的第一个和第三个,那关键是中间这个稍微复杂一点,中间我们要传的是一个。你有一个aggregate方式。那这里面这个过程跟我们在窗口函数里面实现的那个过程是完全一样的,那首先create accumulator这个。中间的状态,这是一个累加器umulator,它初始值是什么呢?当然初始值。都是零嘛,所以就是0L0L。然后接下来每来一个元素的时候,调用I的方法啊,这个应该怎么做呢?那当然就是还是返回一个二元组,这个二元组是要在之前accumulator基础上,它的第一个位置,如果我们当成的话,那接下来就是accumulator点零。
13:00
就要加上当前的时间戳,就是所有数据时间戳的和。然后第二个位置如果当做count的话,那就是F1,再加上一就可以了。所以这个过程其实还是非常简单的,然后get result的时候,因为我们得到的也是长整型,那这个就不用做其他的转换了,不用做类型转换了啊,那就直接accumulator f0,然后除以accumul f1就可以。非常简单,那最后当然这里面还还有一个这个。默认方法,这里边我们也不需要去考虑各种默认的形式,直接返回now就是没有问题,这就是我们定义的这个I的方式。它的聚合逻辑就都已经定义在这里。那这样的话。当然后面还有一个count state。我们需要把它也。获取到get,然后get,这就简单多了,需要去创建一个里边的话,我们就把它叫做。Count。
14:01
然后给一个点类型。这就是我们对于状态的定义。有了状态的定义,接下来在map里边就可以处理具体的逻辑了。那么我们这里的核心逻辑当然就是每来一条数据。当前的就要加一啊,所以我们把这个可以叫做current count就加一。因为当前我们定义的这个count值本身是类里边的一个属性,类似于技术窗口的长度,我们这里边呢,还应该有一个current count,所以我们单独把它定义出来吧。Current count。那这个是当前状态里边拿出来的值点value啊,那我们判断一下,如果当前的current count等于nu的话。这个时候我们就让它直接等于一就可以了。
15:01
Else,如果不让的话,那当然就是直接加加。这是我们呃,基本能够想到的一个基本的处理情况,然后接下来当然就是需要去更新状态。Countate。直接update当前的count值,然后接下来还有就是avgt。当前的聚合状态也要做一个更新,这个非常简单,直接把最新的数据value添加进去,它自动可以帮我们做。增量的聚合计算,那另外还需要做一个判断的是根据当前的current count,判断它是否达到了我们累积的那个技术窗口的长度当前的上限啊,所以这里边我们要判断。如果。达到。Count次数就输出结果。这里边我们需要做一个if判断,Cut count啊,这因为是包装类型啊,所以我们直接调它的E方法和count去做一个比较,那如果它俩相等,或者说我们说它这个如果要是超过了这个count值的话,就直接alt.collect。
16:17
输出,那当前我们输出的就是当前用户是谁,然后他过去几次访问啊,过去count次访问的平均时间戳到底是多少,所以接下来我们可以写一个。当前的用户当然是value user了。呃,那么他过去。多少次呢?很明显是counts。是访问。平均时间戳为。这个时候我们可以直接把当前average里边的数据get拿到可以了,当然如果说想要更加的可视化一点的话,我们可以把它通过这个time做一个转换,然后转换成一个。
17:14
年月日十分秒的这样的形式啊,这里直接看时间戳也是一样的,那另外我们需要把当前的状态进行一个清理。清理状态。之后就可以重新开始,重新开始统计了啊,那所以这里面清理状态的话非常简单,直接调方法,那另外avg。PS。A state也是直接这样方法清空就可以。现在还在报错,我们看一下上面这里我们定义的是一个长整型的传入,所以应该传入一个5L,这样的话就没有问题了,运行一下看一看得到的结果是什么样的。我们可以看到每一条数据输入之后,现在还没有任何的输出,因为我们是根据user做了K,那是每一个用户累积点击达到五次的时候才会有输出,哎,我们看到爱ice在这里已经有了五次点击了,这样的话就可以得到他的过去五次访问的平均时间,戳那后边,呃,如果Bob达到的话也就可以输出了啊,Carry达到又可以输出了。
18:25
所以后边我们看到这里爱丽丝又有一次输出,我们会看到第一次输出之后到后边一次。两次三次五,果然是有五条数据,然后他就输出了一个呃,平均访问的时间戳,这就类似于一个技术窗口,当然了,对于我们这个需求呢,也可以做一个更改,就是不要把它当成一个滚动的技术窗口,而是什么呢?类似于之前我们周期性的计算PV一样,是每隔五个数据,我就输出统计一次当前所有访问的平均时间戳,那如果要有这样一个需求,又应该怎么样去做计算呢?
19:10
其实也非常简单,就是清理状态的时候,我只把当前的count次数清清零,而不要把聚合状态清零就可以了,接下来如果再来新的数据的话,就在之前的基础上继续计算平均值就完事了啊。所以这样的话,如果我们。重新运行一下会看到呃,整体的情形啊,整体的这个运作的机制还是一样的,还是每隔五个数,当前这个USER5次访问之后才会得到一个一个平均数,但是呢,当前的平均数可能就会比之前我们的那个计算要小一点,因为它是所有数据历史数据的平均值。这个过程应该仔细计算一下,我们就可以看到到底是怎么样,这就是利用了聚合状态areating set实现了一个。
20:05
平均时间戳的统计计算。
我来说两句