00:00
啊,我们已经通过value set值状态对flink当中k state的用法有了一个基本的了解,那我们说除了最简单的值状态类型之外呢,弗林还支持其他类型的k state啊,那主要就是有一些集合类型的状态可以进行保存,比如说前面我们也已经用过的列表状态list state。顾名思义,这就是把我们想要保存的数据以列表的形式组织起来了,啊,就保存成一张列表了,那在list里边呢?同样这也是一个接口,它有一个泛型,这个泛型参数就指定了我们在这个列表里边每一个数据元素具体的数据类型。那同样Lisa state也有一系列的方法来对于这个状态进行操作了啊啊,那这些方法呢,就有点像列表的一些get set方法啊,比如说最简单的有一个就是get get,它返回的是一个interable类型,这是直接获取当前的列表状态啊,因为我们知道列表本身是一个集合类型嘛,那么它得到的是一个可迭代的类型。
01:08
然后另外呢,对应的还有set方法,那set这里也是跟直状态类似,是一个update update方法里边直接传入的就是一个list啊,所以这里面我们就是相当于把一个列表直接写入到当前的列表状态里面直接覆盖。另外呢,除了覆盖这种比较重的操作,我们还可以在这个列表里边追加元素,这个可能是我们在列表状态里边使用最多的方法啊,那就是A,之前我们在代码当中,哎,之前第七章我们曾经实现过top n,使用key process方式实现过top n的需求,在这里边我们定义了一个列表状态,那其实最关键的对它的调用啊,方法的调用就是每来一个数据就把它添加到当前的列表里面,哎,那其实这个方法就是调一个at就可以了。
02:00
把当前数据添加到列表状态。那最后呢,还有一个叫at all at all,那就是也是追加,只不过呢,不是追加一个元素,而是直接追加一组元素,这一组元素就是一个list了。啊,所以整体来讲列表状态还是非常容易理解的啊,那对应的我们在获取一个列表状态的时候,定义一个列表状态的时候。同样需要去传入一个状态的描述器,那它的描述器就叫做list state script,跟前面我们提到的这个value state script是完全一致的,所以接下来呢,我们也可以在代码里边对列表状态来进行一个测试啊,那前面我们就直接说的是定义状态吧。接下来直接追加一个列表状态。List。好,我们把它放在这里,List state同样要引入对应的类。同样我们还是把当前的数据元素啊,作为一个列表保存起来啊,那接下来同样在这个open生命周期里边也应该有对应的补货的方法,那我们还是调用gettime contacts获取当前的运行上下文。注意现在要调用的方法,既然是要获取一个列表状态,那自然就是get list state啊,所以我们看对于不同的状态类型。
03:21
Link给我们提供的接口本身就是不一样啊,那这里边就不会搞错了,里边传入的自然就是一个list state script,那里边同样一个名称my list。注意这个名称一定跟上面是不同的啊,另外还得传一个类型,这个类型的话也是我们这里的泛型类型啊,就是对应的这个Lia state的泛型类型,填到这就可以。Event。这就是我们基本的一个定义啊,那当然了,现在接下来又可以对list做一个处理了,我们可以调它的get方法诶,去得到当前列表状态里边的所有的数据啊,那另外呢,更加常见的用法就是直接A啊,把当前的一个数据添加到列表状态里。
04:11
这个其实非常简单,我们也就不用去做更多的测试了。那除了列表状态之外,另外还有一种集合类型的状态,在状态编程里边非常的常见,那就是映射状态,一个map mapst state,哎,那顾名思义就是我们当前要保存的状态呢,是一个一个的键值对k value的形式,作为一个状态整体把它保存起来。所以对于map sit这个接口呢,它的泛型就得有两个了啊,一个UK,一个UV,因为你既然是线值对了,那键和值当然是可以有不同的类型了,哎,所以分别对应着当前要保存的键的类型和值的类型,然后呢,同样提供了一系列的调用的方法,我们看到可以直接调一个get,然后把K传进来,哎,那么get得到的当然就是对应的值了,另外呢,还可以put put就相当于是把一个键值对直接写入到当前的map里边来啊,那如果说这个K已经存在的话,当然这就是一个更新操作了,另外还可以put套,这就是把一组KPI全进行更新,另外还有remove,就对应的这个K直接删掉啊,当然了,还有常见的这些一些方法的调用啊,Contains,那判断某个key在不在当前的map当中啊,另外呢,还可以调用entriess keys values,获取当前所有的建制,对,或者获取所有的。
05:37
集合获取所有值的集合,另外还有一个判断是否为空啊,这些其实都是在一个map里边啊,或者说我们在Java里边哈希map常见的一些方法的调用,所以使用起来也是非常简单的啊,我们这里也可以简单的做一个测试吧,上面我们再去定义一个。Mapstate。我们需要指定两个泛型参数。
06:03
这里我们干脆就让它有一点实际意义吧,比如说我们既然是event嘛的数据都来了,那我们就看到其实直接使用一个这样的map转换,或者reach Fla map function也可以实现之前我们类似于word count进行聚合的功能,诶那怎么聚合呢?其实就很简单的,可以使用一个map,我们就可以直接对当前每个用户浏览访问的页面的次数进行一个统计啊,那也就是说现在就是每一个用户的访问数据来了之后,我们按照user做一个判断。当前的用户有一次点击事件,我们后边就加一,来一个就加一,来一个就加一,哎,所以这就相当于word count了啊,String用户名称作为当前的K,那保存对应的抗值我们用一个长整型来进行保存啊,那同样在上边先把它定义出来,下边需要在open生命周期里边。去获取运行上下文,然后get map state里边要传入的当然就是一个map state script啊,那这个script的泛型自然能够想到同样也是两个了啊,那这样的话我们在这里就应该写string。
07:18
然后里边的参数啊,我们看一下它的构造方法,它的构造方法同样最简单的方式呢,也是要传三个,因为你现在除了一个名称之外,还得传两个类型,一个是K的类型,另外一个是值的类型啊,那所以现在我们其实就是跟上面也类似,先写一个就是my map。然后接下来class of stream,另外还有一个class of长整型long,这样的话就搞定了。好接下来我们可以看一下这个map state到底应该去怎么做操作啊,那为了跟上面输出的这个值状态的结果做一点区分啊,我们写一个这个分割线吧。
08:00
单画线的一个分割线,然后接下来我们来输出对应的map里面的内容,那现在我们想要把每个用户访问页面的点击频次直接保存进去,诶,那这个时候我们要保存的这个key当然就是用户名了,诶,那所以这里边我们直接就可以map去做一个。Put操作,Put对应的K就是当前数据里边的user来了一条数据,那当然它对应的之前我们已经保存的那个count值,它的访问次数就要在加一嘛啊,那我们应该先把之前的那个访问次数先拿到,哎,所以这里边我们可以上面啊,先多写几句吧,不要写在一句一行里边了,我们上面先拿到当前的一个访问频次这个count值。那就是map state要去直接get硬点user对应的值,哎,那这里边还有另外一个问题,就是我们得考虑,那假如说当前这条数据是这个用户第一次访问点击的事件呢?诶,那如果是这样的话,我们当前get显然就没有对应的key码,那就要报错了,所以这里边我们还得处理一下这个初始情况,其实这个也简单,我们可以直接做一个一步判断。
09:12
那就是如果里边contains当前的这个K的话,那么count值我们就直接去get,直接把它拿到,那如果要是不包括呢?哎,那就else,直接返回零不就完了吗?这就是我们当前的count值,所以这里边我们想要put的其实就是直接count加一,这样就完了。啊,当然了,我们做完这个操作,也可以直接在控制台做一个打印输出,哎,那类似的,我们可以输出这样一条信息,我们现在看到的啊,其实就是某个用户。用户Dollar硬点user,他的访问频次为。Dollar。啊,其实我们可以不用count加一,我们也可以直接从map里边去做一个,再次做一个get啊,就是当前已经来了事件之后,这个GET1定能捕获到对应的值,那所以我们可以直接get in.user这样的话打印输出就可以看到它的结果了。
10:16
啊,这就是关于这个map的一个简单的用法啊,可以实现类似于work的功能啊。呃,那为了让最终的显示效果更加的清晰,我们再加一个分割线吧,就是每一条数据来了之后,我们都有一个双画线的这个分割线,把它分割开,这样的话我们在打印输出的时候就会看得更明显,好,现在可以重新运行一下,我们看一看这个map能够看到的结果又是什么样的。好,我们可以看到啊,前面首先这个直状态我们就不说了,来了一条数据之后,哎,那Mary的访问频次就是一。哎,然后第二条数据是爱丽丝的访问数据,所以我们看他的访问频次呢,还是一跟前面Mary没有关系。
11:00
如果再来一个carry,那它的频次也是一,哎,直到我们看又来了一个carry的数据,这个时候它的频次就变成二,那么又来了一个Bob的访问数据,当前的频次同样也是添加到二,再来一条的话就添加到三了,这就是所谓的这个map state,非常容易的就可以实现一个count统计。
我来说两句