00:00
我们现在已经了解了k state中比较常见的一些状态类型啊,那有value state值状态,它只保存一个值啊,那与之对应的呢?另外两种都可以认为是集合类型的状态了,Lisa set保存的是一个列表,Map set保存的是k value这样的键值,对,那除了这些基本类型的状态之外呢?Flink还支持一种比较特殊的状态类型,好,那就是。所谓的聚合状态的类型啊,这又分成两类,一种就叫规约状态reducing state,另外一种叫聚合状态一,更加一般化的聚合状态啊,Aggregating state。那直观上来看的话,这两种状态本质上都是要做一个聚合啊,最后得到的其实还是一个值,所以有时候呢,我们可以认为它们的调用跟前面我们讲到的这个值状态其实是非常像,因为这个状态里边其实保存的就是一个聚合结果啊,就是一个规约的结果,或者是一般化聚合的一个结果,所以就是一个值。
01:03
但是呢,区别就在于我们在使用这个状态的时候,来了新的数据进行处理的时候呢,保存状态的规则,它是要进行规约或者说进行聚合的,哎,这个跟直状态就有所不同了,所以接下来呢,我们可以看一看规约状态和聚合状态到底又是怎么用的,哎,那首先我们还是在上面直接做一个定义啊,规约状态我们直接就定义一个reducing state。它的类型就是reducing state。我们看到本身它也需要有一个泛型参数啊,那这个当然就是一个T类型啊,就当前的数据要保存的类型啊,如果是reducing的话,做规约聚合,我们知道它只能针对同样的数据类型去进行规约,而且得到的结果也是同样的我所以我们保存的中间状态也是相同的数据类型啊,所以这个就不涉及到别的啊,我们当前处理的类型是event。
02:02
那当然它的类型也只能是英文的了,那同样类似的,我们还可以定义一个aggregating state,我就简写吧,就叫a state。这个是aggregating state,我们看到aggregating state的话,它的类型哎就稍微多一点了,它有input和OUTPUT2个类型,因为我们说之前我们也介绍过啊,一般化的aggregate function,它里边是输入输出类型可以不同的,中间聚合的类型也可以不同,哎,所以当然了,我们这里面就可以传入更多的东西了。那比如说我们这里输入的数据类型当然是event,输出的话简单一点我们就直接还是一个string。目前我们没有想到太特殊的应用场景,所以呢,呃,这个输出我们就直接指定成string就可以了,然后这里边有一个问题,就是我们会考虑到,诶,之前那个aate function里边不是还有一个ACC类型吗?还有一个中间聚合状态的累加器类型吗?那那那个类型这里边怎么不去做定义了呢?
03:04
不要着急,接下来我们看看在运行上下文里边怎么样去获取到这两个聚合状态啊,所以首先是reducing state。它的获取同样还是that time contact,接下来我们就是get reducing state,里边要传的当然就是一个reducing state script啊,那里边同样我们需要传入当前的名称my reduce。然后接下来我们看一下。它的构造方法里边呢,又多了一个参数。这里面首先最后一个还是一个class啊,当前的类型啊,那我们就写一个class of event,这个没什么区别,中间多了一个reduce function。所以这就是我们说的啊,规约状态,聚合状态,它跟直状态区别在哪里呢?就多了一个规约或者聚合的这样的一个函数,这个函数类,这就相当于代表了来了新数据的时候,到底要怎么改变这个状态,你像之前的这个value的话,你想改变它,那就直接update嘛,就把一个新的值直接塞进去,直接更新这个状态就可以了。
04:14
那现在我们如果是聚合状态或者规约状态呢,那是要调用这里的reduce function去进行规约处理的,所以这里面其实我们是可以去自定义这样的一个reduce function啊。又一个reduce。Function。哎,那当然了,这里边具体的逻辑呢,还是按照我们之前的啊,就是定义两两规约的一个处理逻辑,到底要怎么样,然后最后呢,还是返回一个event,我们这里也没有什么具体的逻辑,我们干脆就直接返回一个event,使用什么呢?呃,干脆我们就把当前的这个时间戳更新成最新数据的时间戳,而user和URL的信息都用之前的,那这样的话,我们就是使用t.user。
05:02
t.URL另外使用T一点。这就是我们所谓的这个reduce function的一个规约逻辑啊,那除了这个reduce function之外,后边还需要有第三个参数,哎,那这个参数的话,当然就是传一个类型class,我们现在指定的泛型的类型啊。这样的话,规约状态就获取到了,就定义好了,好,这就是索菲的规约状态的一个用法,然后接下来我们再看看aggregating state啊,这个其实非常的类似,哎,那我们既然想到了它也是啊,从运营上下文,现在要获取的就应该是一个get aggregating state啊,然后你一个。Aggregating state。同样这里边需要有泛型,我们看一下这个script的泛型是什么,哎,现在终于看到了我们九维的ACC类型中间聚合的状态类型在这里出现,它是在描述器里边出现,而前面我们定义这个状态类型的时候呢,泛型里边反而没有出现ACC,只有in和out。
06:10
这是我们需要注意的一点啊,所以这里in和out肯定跟上面定义的一样'和string。那中间还需要有一个ACC类型啊,这个ACC类型我们定义一个什么呢?呃,干脆我们就这样吧,就像map state做的那件事一样,我们做一个抗统计,我们就是每来一个用户的访问事件,我们就加一就加一,哎,那这样的话,这个就简单了,可以直接给一个长整型进行一个抗统计。然后里边当然要传入对应的名称,我们把这个叫做MY。然后点进去我们看一下哦,那它的构造方法,当然里边也是需要传类型,这个就不用说了,这个类型注意指定的是ACC的类型,我们中间聚合累加器的类型啊,跟input outook就没关系了,然后另外呢,哎,关键就是要传一个aggregate方式,这个如果我们点进去的就会发现啊,这个aggregate function跟我们之前在窗口增量聚和函数里边使用的aggregate function就是同一个接口。
07:16
里面我们看到同样就是实现这样的四个抽象方法,创建一个累加器,然后每来一个数据元素就调用一个at方法去进行累加,最后的结果呢,调用get result得到,那还有一个默认方法,默认方法一般我们不实现也是可以的,所以这样的话就一目了然了啊,跟我们之前讲的内容就全部可以结合在一起了,所以这里边的关键就是中间我们需要去传入一个。Aggregate function。要把这个要实现出来。那aggregate function的泛型参数当然是in ACC和out,这三个都不能少,然后接下来我们看一下它里边必须要实现的四个抽象方法啊,首先我们create accumulator啊,那当然初始的时候就是零啊,什么都没有嘛,那初始化的时候访问次数就是零,然后每来一条数据。
08:11
怎么办呢?哎,当前这个ACC更新成什么呢?当然就是在之前的基础上加一返回就完了。然后接下来get result,最后返回什么呢?当然就是直接把ACC的这个count数量返回就可以了,哎,但是我们现在最后要的这个out啊,最后我们输出的结果是一个string啊,那干脆。我们就直接把它to string。直接输出了啊,那或者我们还可以加一句,就是当前的这个,呃,聚合出来的状态是什么啊,也可以写的更加的清楚一点,就是我们当前聚合状态。为到底是多少啊?聚合出来的count值到底是多少?那最后的墨呢?我们也可以不实现目前不涉及到状态的合并啊,那最后还应该有一个类型class of,注意是中间聚合状态ACC的类型,所以这里是class of law。
09:07
这样的话就把我们的aggregating state也定义好了啊,那最后呢,我们在这儿还是啊加一个分割线。可以把对应的这两个聚合状态也做一个输出,我们可以print line,当前的reducing state,我们看看它能叫什么方法呢?其实我们看就是一个get方法,一个at方法。所以。在这儿我们可以直接让他做一个A当前的in,把当前的数据直接添加进去,诶,那我们就想就会在之前数据的基础上进行更新,进行合并规约了,那最后呢,我们可以调一个get方法,直接把当前的值拿出来做一个打印输出。同样我们看一下aggregating state,它可以调什么方法呢?啊,最经典的也是一个A,一个get,哎,直接获取当前的值,或者是直接A添加一个数据元素进去,那添加一个数据元素进去,我们知道啊,其实就是累加结果中间的累加器加了一,那我们如果获取最后的值的话,就是会输出这一句聚合状态为多少啊。
10:14
所以这里我们可以在print line打印一下a state点。然后除了这些之外,我们看到啊,所有的状态其实都能够调用一个通用的方法。这个点clear,好,那这就是清空状态,因为我们知道当前如果是自定义KC的自定义状态的话,那就需要我们自己去进行维护了,每来一个K都会有自己对应的一个状态实力,那如果说我们这个K不停的来,不停的增加,那如果说一直不清除的话,不清理的话,那资源就会不停的占用,就会导致最后资源耗尽,哎,所以如果说我们这个状态啊,已经统计结束了,已经告一段落了,那就可以掉clear方法把它清理掉。
11:01
啊,现在的话我们就是看一下得到结果就可以,也不需要做任何的清理,好运行一下。看一看reducing state和aating state到底输出的结果又是什么样?好,我们可以看到来了一个数据,第一个数据是Bob的,哎,所以我们看到啊,输出的当前的这个reducing state,当然就是把这条数据直接更新进去了,而aggregating state呢,就是统计count值一。又来了一个Alice的数据,哎,那它也是直接把当前这条数据写入到reducing state里边,那另外呢,它的聚合状态也是一,因为我们是按照不同的K分别统计的嘛,K的state就是这个特点,分别统计不同用户对应的抗值。啊,那后边我们看到啊,这个Mary来了之后也是同样的结果,爱ice丝再来第二条数据的时候,哎,那之前我们这个map里边可以统计出来它的访问频次是二,现在的聚合结果同样也能统计出来它的访问频次是二,那这里要注意的是这个reducing state。
12:03
我们看前面的这个Alice Alice丝,它的URL是product ID是三,然后它的时间戳是8223,现在呢。时间戳做了一个改变,URL没有变啊,那当然了,这里我们这个好像刚好来的这个数据访问的URL还是一样啊,那我们继续往后看吧。我们可以看下面这一条,Mary的数据,Mary的数据之前的数据访问的是ID为三的商品页面,现在的新数据呢,因为我们直状态是更新过的嘛,新数据是ID为二的商品页面,那reducing state它是会输出最初的URL以及最新的time step。这就是我们之前定义的规约逻辑啊,那当然了,聚合状态跟前面我们的map输出的都是一样,就是Mar用户访问的频次count值。这就是关于k set的一些基本的使用,我们把不同类型的状态都做了一个测试。
我来说两句