00:00
接下来我们再来介绍一下列表状态的具体使用,前面我们在讲到top n具体的实现的过程当中,其实已经用到了一个列表状态啊,那简单来说就是我们可能要保存很多个状态值,那这些很多个状态值呢,他们都是相同的类型,这样就可以把它放在一个列表里面,这就是所谓的list state,那除了之前我们top n的这个需求里边可以这么用之外,还有什么样的场景具体使用的时候需要用到列表状态呢?那我们可以简单的思考一下关于CQ里边两张表的作用,两张表的连接这样一个过程啊。那后面我们会讲到,在flink里边,它也有上层的API flinkq也可以使用Joy去进行两条流的河流操作,我们知道对于draw而言。比如说我们看一下这样的一条语法,Select from a,然后inner join b,那就是做一个A和B的内连接,Where a.ID等于b.ID那就是以ID作为它们俩之间连接的K相等的,所有的数据都要两两匹配在一起。
01:19
我们知道在CQ里边这样的语句是要慎用的,因为做这样的一个连接啊,其实那两张表里边的所有数据都要做一个扫描,而且做一个两两匹配,其实一个笛卡尔级的计算,那在flink里边如果是内连的话,其实比较简单,前面我们知道如果直接使用window,如果直接使用窗口连接的话,它本身就是一个内连接,是可以获取到所有ID相等的匹配起来的,两两的匹配对的。但是如果说我们想要做全外连接又怎么办呢?全外连接是需要把每条流里的所有数据都保存下来啊,那那其实我们在这个fli CQ里边这样一条语句本身也是要慎用的,因为它的底层逻辑也是会把A流和B流里边所有的数据都保存下来,然后进行join。
02:16
那怎么样能够优化这样一个过程呢?哎,这里可以提供一个思路,就是我们可以使用列表状态,把两条流里边已经到达的所有数据做一个列表保存,保存下来之后呢,只跟之前的所有数据去做一个一一的配对比较就可以了。接下来我们可以在代码里边利用列表状态对两条流实现一个全外连接的双流状。好,接下来我们就可以去新建一个Java类,我们要实现的是两条流的状啊,所以是to stream。Join example。
03:00
呃,那首先我们还是把整个的框架先写好exception,然后前面同样的也应该有当前环境的定义,执行环境的定义,以及数据源的输入啊,那当然现在我们的数据源就不要用这个click s了啊,那这里我们干脆就直接定义一个,就像文档里面定义的这样一个A流和B流,然后定义他们的第一个字段去做一个,做一个连接,做一个匹配就可以了啊,那这样的话,这是一个三元组,是一个string string long类型的三元组,第二个字段呢,表示的是当前流的。标号流的名称STEM1STEM2啊,然后最后一个长整型,很明显就是一个时间戳了标准的一个日志数据进行提取,进行过滤筛选之后啊EL之后得到的一个结果,所以接下来我们可以直接把这个三元组先。类型先copy过来。
04:01
然后后边那当然就是了。我们可以直接from。Element。Elements。里边啊,就直接定义这个三元组就可以了,现在我们想要的就是一个A,然后一个。STREAM1。1000第一秒钟。然后同样的类似来一个B1。2000啊,这样的话我们就有了初始的两条数据,构建了这样的一条,然后接下来同样还应该assign time stamp and waters。这里我们直接用。Strategy,然后我们可以直接调用forulous time steps啊,直接序的啊,单调递增的生成对应的呃,Water,然后接下来里边需要去传入with time,传入你一个。
05:09
Z sample,那当然了,这里我们给定的这样的一个类型啊,也应该是三元组。具体我们要实现一个CT这样一个方法啊,那当然了,前面这个应该把三元组要放在。这个方法前面这样的话就类型不会报错,那这里面我们返回的。提取的时间自然就是ELEMENT2。第三个元素就是当前的时间戳啊,这是我们基本的一个定义,然后后边的二啊,这相当于是STREAM1 stream2也可以有类似的定义。可以直接做一个copy。
06:01
我们把上面啊给一个STREAM2,然后接下来这两呃对应的两个。数据我们就直接把它改成二就可以了啊,这里可以给。三和四。后边整个的定义,如果说我们这里第二条流也是这样的三元组的话,那后边所有的定义都可以不变。接下来我们关键就是要看看怎么样去做这样的两条流的做一个啊,相当于我们要做的是一个full join啊,不要做inner join,如果inner join的话,前面我们知道直接可以使用。前面我们定义的value state去做一个保存,然后去做一个匹配就可以了,那如果想要做一个全外连接的话,哎,那我们知道是需要把所有的数据都保存下来的,这个时候就必须要用到list。所以接下来我们。自定义。列表状态进行。全外。
07:01
连接。所以接下来我们要做的是STEM1啊,那既然是两条流要进行连接,那肯定现在我们是一个connect操作了啊,那首先我们把这两条流都做一个K第一条。当然是以它的F0作为K,然后接下来connect第二条流STREAM2。我们也把它以当前的。F0第一个字段作为K。分组之后两条流做一个连接啊,那接下来要做的操作,因为用到了状态,当然就需要有process function出现了process,然后里边我们需要的是一个process。我们在这里就直接以另类的形式写在这里就可以了啊,那当然了,还涉及到一个输出的数据类型,输出的数据类型为了简单起见,我们也还是直接转换成词缀输出就可以了。
08:04
我们知道,在Co里必须要实现的是。两个方法,一个process element1element2,这样的两个方法,一国两制嘛,啊,针对两条流分别去进行处理,如果我们这两这里的两条流类型也都一样,然后处理的方式也都是完全对称的,所以其实我们只要实现一个后面的都copy过来就好了。接下来关键就在于怎么样去做对应的处理呢?因为我们是全外连接,所以很明显我们需要有两个list,保存每一条流里的所有数据,这样的话,比方说我们在STREAM1第一条流里的数据来了之后,我就可以跟STREAM2里的所有数据,之前到的所有数据去做一个连接匹配,然后输出了。啊,那同样STREAM2也是他可以跟stream之前已经到的所有数据去做一个匹配输出。那我们可能想到全外连接,因为我们现在是流处理啊,那这两条流。
09:04
当前数据来了之后,显然他只能跟另外一条流里之前已经到的所有数据,在这个列表里的所有数据去做一个匹配输出,那之后再来的数据怎么办呢?诶,没关系,之后再来的数据不就可以反过来跟STEM1里面的已经到的数据去做匹配了吗?诶,那所以这样的话,我们就能够把所有两条流里的数据都做一个笛卡G11做一个匹配,这就是全外连接的过程。那接下来我们就。具体来做一个实现关键在于。在定义两个list state。所以我们定义列表状态。用于保存。两条流中。已经到达的所有数据。
10:01
那么这两条,这两个list state的定义呢?我们可以把这个直接定义成private啊,这个属性嘛,这个是没关系的。它的作用范围肯定只是在当前这个类里边。那对应的我们也需要。保存完整的三元组类型。State先把定义出来啊,我们先把它叫stream list state。那同样类似的还应该有STREAM2。List state先把它生命出来,然后接下来他们的具体定义同样应该在open生命周期当中去做一个生命啊,那这个就需要stream list state需要用get contact方法,然后去get list,然后接下来要一个list set的描述器,Script。里边同样传入的还是一个名称。
11:02
这个叫。另外还应该有一个类型,类型的话,这里是一个三元组,元组类型啊,我们干脆就。Types,这里边我们直接types.temple用flink给我们已经实现的这个types类去对它的类型做一个定义啊,那这里面我们就可以把对应的。元组里的每一个字段,每一个属性元素的类型填在里边,Type。首先是一个,然后types。需要大写。然后还是。最后是一个太子。长征型浪。这就是我们整个定义的过程。哎,那尽管比较长,但是其实我们知道这个过程跟前面的定义都是完全一样的,那同样还应该有一个STEM2,也是要做一个获取,同样也是三元组,这个我们把它叫做STREAM2LIST。
12:04
把这个三元组定义出来,获取到。呃,当然了,上面我们在定义这个类型的时候,也可以让这一个Lisa state的类型简化一些,它不一定非要把我们完整的这个三元组都保存进来,因为这里边我们知道三元组里边第二个字段明明就只是表示了当前我们是哪一条流啊,就只是这个stream stream2的一个名称,所以我既然当前已经是STREAM1。有一个list state来表示STEM2也放在一个列表状态里边表示了,那当然中间这个就不需要保存了啊,那所以这里干脆我们可以把它换成一个二元组也是成立的。换成二元组的话,那里边中间就可以省掉一个字段。后边的定义当然也要更改。变成二元组。
13:00
同样后边在这里的声明就可以少少掉中间的一个type点缀。这样的话,呃,我们定义的这个状态会更小一点,更简洁一点,因为我们所要的数据只有这些。接下来关键就在于后边的process element1process element2怎么处理,其实我们知道这个就很简单了,那就是每来一条新的数据,我就另外一个流,里边已经保存的所有数据一一匹配,去做一个输出就完了。哎,那所以关键就是我要。获取。另一条。刘忠。所有数据。然后配对输出。那我们这里的输出是一个string,那就是直接把两个数据我们拼在一起打印就可以了啊,那这里其实就是一个for循环了,要便利另外一条琉璃已经到的所有数据嘛,那这个数据是在哪儿呢?呃,很简单,所有的数据。
14:03
应该都保存在了这个list state,那那这里边我们就把这个叫做一个right吧。他应该从STREAM2。List state里边去做一个获取,哎,那我们知道直接做一个get把它获取出来就可以了啊,那当然现在我们是把这个list的类型已经改成了二元组,所以我们把这个换成二元。得到这样一个类型。然后接下来直接out.C去做一个输出就可以了,那我们当前比方说为了区分,我们就把这个啊左右连接,就相当于第一条流是左表。然后第二条流是右边,这样的话,我们这里就直接把left。然后拼接上。加一个箭头。然后拼接上就可以。
15:02
那那同样,呃,在下边。STREAM2的元素如果到来之后,我们要做的事情其实跟他也是完全一样的,只不过这里边我们获取的应该是STREAM1。List里边的所有元素,把它叫做left,然后同样做这样的一个拼接。那当然了,这里不是完全完全对称,因为本身left是一个三元组,Right是一个二元组啊,这边stream里边这里的处理是反过来的,如果我们想把它这两个做成同样的,呃,这样的一个匹配的话,我们也可以直接把它们对应的那个数据提取出来,再做一个完整的包装,这个都是没有问题的。这里需要注意的是,不光是把之前的所有另外一条流里的所有数据都拿出来做一个匹配。还应该。还得把当前的数据添加到。自己对应的这条流到list state里面去,要不然的话,后边我们就没有办法把它,呃,就是后边到的数据就没有办法一一再做匹配了嘛,那所以这里面还应该做一个事情,就是一。
16:13
这里我们还应该把当前的元素要添加到第一条流的列表状态啊,那这里面ADD的话,我们需要ADD是是一个。一个二元组,所以稍微的还需要麻烦一点,我们需要把left的F0和left的F2这两个值拿出来。啊,那当然了,前面如果说我们这个打印只想看到当前它的呃,F0F2这两个参数的话,我们也可以做一个拼接就是。0F2。中间。再加一个空格。这样的话把它拼接在一起也就可以了啊,那下面我们也可以把这个right的F0。
17:02
加上。Right的。F2,这样的话就都把中间的那个字段省略掉了,这就是我们想要做的这个操作啊,那当然了,下边。第二条同样也应该把对应的STREAM2LIST做一个更新。我们这里需要把。点F0和点F2写进来,这就是我们整个处理的这个过程。啊,所以呃,简单来讲的话,我们要做的其实就是每来一个数据都把它添加到当前的list列表里,另外呢,把它跟另外一条流里之前的所有数据配对输出,这就是这个处理流程。好,那到这里为止。我们就已经做完了connect之后进行河流的完整的操作啊,那最后我们直接把这一个,因为它本身是stream嘛,直接做一个打印输出就可以了,最后不要忘记DNA execute执行起。
18:06
接下来我们看一下输出的结果是什么样。我们就看到了当前可以把每一条流里的所有对应的数据都做一个匹配输出,我们现在是因为每一条流里A和B都各只有一条数据,那如果说我们再多加一些数据的话。这里来一个三。这里来一个六。那我们知道。这样直接去运行的话,A就应该有一个完整的笛卡尔机,应该有对应的四条输出,我们看到诶,这是前面这个A1和第一秒的数据,和这个第三秒的数据,这个都是第一条流里的,而后边。
19:01
第三秒的数据和第六秒的数据,这个都是第二条流里的啊,所以这个我们可以看的非常的明显,这里的括号只是在于我们这里拼接的时候,一个是直接把元组直接打印输出的,而另外一个是我们自己做了一个拼接,只是这样一个区别而已。所以这就是我们可以看到所做的使用列表状态保存数据进行全外连接的一个典型的应用。
我来说两句