00:00
了解了k set的基本概念和特点,那接下来呢,我们再来说一说它所支持的数据结构类型啊,那其实我们知道啊,K本身这是一个很宽泛的概念,我们只知道它是一个状态,然后呢,按照当前的K来进行分组的维护,那至于说里边具体保存什么样的数据结构,数据类型,这个是需要我们再去单独定义的,所以接下来我们所要介绍的呢,就是。Flink目前所支持的k state数据类型,那整体来讲,我们可以认为k state它支持两大类型的数据结构,那一种就是值类型,就是我们这个当前的state里边状态里边保存的就是一个值啊,那当然了,关于这个值具体的数据类型是什么样的,我们还可以再去进一步定义,另外呢,就是当前的状态里边我们可以直接保存一个集合类型啊,当然了,这个集合类型就有可能是列表list,也有可能是映射map。好,那接下来我们就分别来进行讲解。
01:06
首先就是值状态,这也是最简单的状态类型啊,啊,Value set,那其实我们就知道里边保存的就是一个值,那value seat本身呢,它是一个接口,然后它还有一个泛型啊,其实我们很容易理解啊,这就是我们所说的只知道状态里保存的是一个值还不够,我还得知道这个值到底是什么数据类型啊,你到底是长整型呢,还是一个字符串类型呢,还是一个其他的比方说样例类类型呢,所以这就需要我们专门使用泛型参数来进行一个定义了啊,我们其实也可以在源码里边去看一下啊,比如说我们可以直接搜索一下value state这个类啊,那其实我们就可以看到这是一个Java的接口,它本身呢,继承自state这样一个接口啊,那我们就知道了,它是一种状态,然后这里边呢,必须要实现两个抽象方法,一个方法叫做value,另外一个方法叫做update,啊,其实这也看的很明显,这就相当于一个get,一个set。
02:06
Value,那就是直接返回当前状态里边保存的T类型的值,相当于一个get方法,那后面的update呢?那就是更新当前状态里边保存的值,更新成什么呢?那就是传进来的这个参数了,相当于一个set方法。所以value可以说是最为简单也最为好用的一种状态。啊,那在代码当中具体去调用的时候,前面我们也已经用过了,比如说之前我们讲的Bill check,这里定义的其实就是一个value state,那它具体调用的过程当中我们会发现啊,必须还要先去创建一个直状态的描述器value state script,那这个描述器里边主要做了什么事呢?哎,那主要其实就是传入了当前状态的名称,哎,我们可以认为这个状态的名称就像我们代码当中的变量名一样。
03:02
在运行上下文里边,就靠这个名称能够把它找到,所以这个名称必须唯一,另外呢,为了让flink底层能够帮我们把这个状态解析出来,能够正确的做序列化,序列化,哎,那还必须指定它的类型啊,所以class指定当前状态的数据类型。然后接下来呢,是把这个描述器作为参数传给了运行是上下文的get state方法,诶,所以这个get state其实是在get什么呢?就是在运行上下文里边,按照这个名称和类型找到对应的那个状态啊,这样的话,接下来我们就可以使用一个本地变量把它进行一个描述,然后接下来就可以按变量去调用它的方法。哦,那我们还是可以专门的来做一个测试吧,我们重新创建一个object,那现在是新的一章,我们去new一个package。CHAPTER09在下边,当前我们要主主要是测试k state,所以我们就是。
04:03
Key state test。没方法写出来,哎,那整体的流程还是非常类似的,Execution environment,先引入get execution environment,把它叫做env,上面我们还是把下划线引进来。全局的并行度直接设成一,方便测试,然后接下来读取数据源,我们还是EV直接去a source。你有一个我们自定义的数据源click source。把它创建出来啊,接下来还是按照时间余下的这种处理标准,我们先去提取时间戳,做一个对应的watermark的指定啊,有了这两步之后,接下来就可以定义我们想要做的处理转换操作了。我们现在主要是测p state,那肯定是先要做一个K的,那KBY的话,我们就以当前的user作为K做一个分组。其实我们现在也没有专门需要去实现的需求,诶,那所以就随便的啊,定义一个map转换吧,然后最后做一个打印输出就完了,这就是我们简单的一个处理流程。
05:12
那按照之前的介绍呢,这里的map里边我们需要去实现一个自定义的map function式,比如说我们直接用一个my map,那这里面我们会想到啊,如果是一个简单的map function的话,那很显然里边是不能使用状态的啊,Map本身是一个无状态的算子啊,如果说现在我们想要使用状态怎么办呢?A,那就可以自定义一个。Rich map。在里面就可以定义状态了,哎,所以接下来我们会class。My map extends rich map function。当然了,这里我们需要传入对应的泛型参数input和OUTPUT2个类型,输入的类型是event。哎,我们需要把对应的样例类引入输出的话,简单起见,我们还是直接转换成string打印输出吧。
06:04
这样的话,接下来里边必须要实现的就是一个map方法啊,那当然了,在这里边我们也不需要专门做什么事情,主要是要测试k state,那我们看一下怎么样自定义一个直状态value state呢?我们现在目前只了解了这种类型啊value state,所以。定义一个值状态。啊,那我们就知道了,定义值状态的时候,我们想要获取这个值状态,其实是需要调用get runtime context啊,这个只有在rich方式里面才能调用,获取当前的运行上下文,然后如果是直状态的话,那就直接get list就可以了,然后里边呢,就要传入一个value state script这里的泛型,哎,那就是对应的那个value state,里边存什么样的类型数据,我们这里的泛型就是什么样的,哎,当前的话,我们干脆就只保存event数据类型吧,里边传两个参数,一个是当前的名称,哎,我们就叫my value吧。
07:05
一个state,另外对应的类型直接class of event。放进来。这样的话就可以得到我们想要的一个value state啊,比方说我们这个就叫做value state。但是这种写法是不对的啊,之前我也介绍过啊,主要就是因为当前的这一句代码在执行的时候,如果放在这儿的话,这相当于是我们这个类里边的一个静态代码块嘛,那在执行这个静态代码的时候呢,我们去调用get runtime context获取当前的运行上下文,其实是获取不到的,这个也很好理解,就是我们当前这个代码的执行,那应该就是在作业要提交的时候,我们想要去生成那个drop graph作业图的时候直接去执行的嘛,那这个时候的执行呢,显然还是没有分布式集群环境的,诶,那所以这跟我们最后去执行任务的环境是完全不同的,哎,所以这个时候呢,我们调用它是不对的,那得怎么样做呢?那就只能在。
08:09
Open生命周期里边去调用这样的一个方法啊,所以我们当前的这一句话啊。必须把它放到这儿。但是我们说这样做也有问题,那就是呃,首先这里就不能private去定义了啊,那我们可以定义这样的一个变量,就叫做value state,获取到当前运行上下环境里边的一个value state的控制句柄啊,这个类型也直接帮我们补全了,这样是没问题的,但是后边我们还想用这个state呀,还想捕获到这个状态怎么办呢?使用这个状态的时候,很明显我们是应该在map方法里面啊来一个数据,接下来要处理的时候,结合这个状态去进行计算,所以value state本身的定义还是应该放到外面来。哎,我们还是应该把它放到这儿,哎,那只不过一开始呢,就只能给一个空值了,而且要把它定义成一个Y类型,一个变量,接下来。
09:05
那就是直接在这里去做一个赋值操作了。重新做一个赋值,哎,这种方式是完全可以的。这就是我们在代码当中具体使用的过程啊。好,已经定义了这个状态之后,那接下来在麦克方法里边,我们就可以捕获状态的值,然后去更新状态里面的内容,这就是。对状态进行操作就可以做这些事儿了,哎,那这里边最简单的方式当然就是直接可以调它的value方法来,这样的话,我们拿到的就是这个纸里边具体的内容啊,我们可以直接做一个打印,看一看里边的内容到底是什么啊,比如我们这里还可以加一些别的信息。直状态为。后面加上当前value state.value然后接下来我们还可以对它进行赋值啊,那要赋值的话,当然就是要调value state的点update方法里边要传入一个event类型的数据,就会更新当前value state的值啊,那这里边我们可以直接把当前传入的这个in,这不就是一个event吗?当前新到的一个事件嘛,我们就直接把它侵入到better state里面。
10:19
这样是完全可以的,下面我们还可以再做一次输出,看一看当前的值到底是多少,那当然了,因为我们现在用的是一个map的转换啊,最后这个必须得返回一个string类型的值,这个稍微有点麻烦,如果我们不想这么麻烦的话,最后就是不想输出结果的话,也可以在这里直接定义一个flat map,因为我们知道如果是flat map操作的话啊。这定义成my flat map,那后边的就想输出就输出,不想输出的话,什么东西都不用返回,直接也是可以的,哎,那如果是这样的话,我们这里就应该是reach flat map方式,那泛型同样还是两个,这里的map就得改成。
11:02
Flat map。啊,当然了,直接这么改的话,因为后面还涉及到这个输入参数和这个返回值的类型啊,我们还是把这个直接去掉,把它删了。重新来自动生成一个就可以了,这样就省去很多麻烦,然后复制过来,那这样的话我们就不用再去collect.collect去输出具体的这个string了,这样的话我们流程就已经定义完了,可以运行一下,看一看能够输出什么东西。好运行起来。我们看一看。好,那我们可以看到啊,现在已经输出对应的结果了,我们看每来一条数据,对应的这里就会输出两句值状态,为什么?哎,那我们可以看一看,一开始的时候,哎,这个值状态是闹,最初的时候因为我们什么数据都没有嘛,当然里边就是空的,所以一开始是闹,然后接下来来了一条Bob的数据,那么对应的值状态就更新成了event Bob的一个点击事件。
12:03
那么我们看第二条数据,很明显这是一个Alice丝的点击时间来了,为什么当前的值状态又是nu呢?哎,这就是我们说的啊,当前是kid state kid state特点是按照当前的key去进行了一个划分,每一个状态实例都只针对当前的key有效,那所以你现在如果是爱丽丝的点击事件的话,诶,那他一开始就不能去直接访问Bob的状态吗?它也是自己的单独的一份状态,初始值也是闹。那么来了一个数据之后,就会把它更新成爱丽丝的一个点击事件,然后下一个又是爱丽丝的点击事件,来了的时候,我们看之前的状态,就是上一次输出的这个状态,然后就会更新成当前新的爱丽丝的点击事件啊,那同样如果又来了Bob的数据呢?第三个数据是Bob的,那么我们看原先访问到的状态,就是之前我们已经保存好的这一个状态更新之后,诶,那就变成一个新的Bob的访问数据了,那同样如果又来了一个其他用户的Mary的访问数据的话,那他之前没有过数据,自然状态就是,那就是空了,就会把当前数据更新到状态。
13:14
这就是所谓的直状态,最为简单的一种set。
我来说两句