00:00
讲完了直状态的具体应用案例,接下来我们再来看一看列表状态list state。其实关于list state啊,之前我们在实现top n的时候已经使用了这样一个列表状态啊,我们是每来一个数据就保存起来,放在一个列表里边不停的添加,然后最后拿出来做一个排序提取。那关于列表状态呢,其实还有另外一个非常经典的应用,那就是可以实现所谓的双流join啊,比如说我们知道啊,呃,在这个CQ里边,我们可以实现两张表的状语操作啊,就是做一个连接,比方说我们这样一句CQ select from a是一张表,然后呢,来做一个内连接,In the draw b,然后where a.ID等于b.ID啊,那这种方式,这就相当于是把他们ID相同的对应的字段,然后匹配起来做一个完整的输出。那其实关于这样一个draw的功能呢,之前我们也讲过了啊,Link给我们提供了window draw和internal draw啊,以及还有Co group这样的操作啊,其实都可以实现类似的功能,但是呢,他们的缺陷在于之前我们所讲到的两条流的状操作都是基于时间去合并的,不管是一个时间间隔还是一个时间窗口,那我们能不能直接就是啊,当前数据所有的跟时间没关系,两条流里边所有的数据都保存下来,然后做一个婴儿join呢?能不能这样去实现呢?啊,当然这个代价可能会比较高啊,一般情况我们要慎做这种操作,但是呢,如果我们真的想实现这个需求,那之前的window drawing internal draw以及Co group就都实现不了,那我们怎么做呢?那就只能做一个connect,做最通用的Co process function的一个实现了。那么在这个Co process function里边我们就可以去。
01:52
使用列表状态保存当前已经到来的数据啊,接下来我们可以在代码里边去做一个具体的实现,那同样我们还是new一个SC的object。
02:05
我们现在的这个具体需求,其实是两条流的一个状语操作,我们就叫to stream draw example。同样那方法还是写在这儿啊,那前面的流程基本上都是差不多的啊,我们先把这个先copy过来。下划线先要引入。当然了,这里又会有所不同,因为我们涉及到了两条流,哎,那我们还是单独来指定一下吧,这个就不要直接用click了,哎,我们就简单的指定两条数据就完了,比如说直接from elements,哎,那这个我们也不用events了啊,直接使用原组类型啊,简单粗暴,比方说我们当前指定一个K。就是一个字符串类型的A,这是一个K,然后呢,我们再指定一下当前这是第一条流语的STREAM1。里的一条数据,最后我们再补充一个时间戳字段。同样的数据,我们可以多创建几条,比方说,第二秒又来了一个B。
03:04
这个K对应的STEM1上面有一条数据啊,后边提取时间戳指定生成watermark的生成策略的话,那就需要使用下划线三这个字段了啊,这是STEM1,然后接下来我们可以把它直接copy一下。再定义一个STREAM2。别的我们也不用多改了,直接把表示流的这个信息改掉,测试输出的结果就会非常的明确,然后再把对应的时间戳改一下,三秒和四秒,同样还是A和B。这两个K对应的数据啊,所以接下来我们肯定是以第一个字段作为K去进行分组,然后进行连接操作的。所以接下来我们当然就是要连接两。调流。进行join操作。啊,这里直接STREAM1去先做一个KBY,当然我们也可以先connect再做KBY啊,我们这里就直接先K吧,用这种方式一个kid stream去connect,另外一个k stream,我们这里使用的是下划线一。
04:10
然后connect。里边STREAM2同样要做一个K。也是以下划线一第一个字段作为当前的K。接下来经过连接之后得到的connected streams,那我们后面既然是要做比较复杂的操作啊,我们就直接process了,里边要实现的就是一个Co process方式啊,我们可以把它叫做这个是一个双流的连接,To stream join。先把这个写在这里,最后哎,那肯定我们还是直接print打印输出。执行起来啊,那这里我们需要去实现自定义的。Co process方式。Class。To stream Joy啊,然后接下来extend一个process function。
05:03
后边的泛型啊,那我们也知道啊,主要就是第一条流的类型,第二条流的输入类型,最后输出的数据类型啊,所以现在我们的这个流里面的类型都是三元组啊string。Three。长整形long,直接把这个copy一下,两条流里的数据类型是一样的。最后的输出的话,那我们还是简单起点,就给一个string吧,打印一个信息输出就可以了,必须要实现的方法那是两个process element啊,Process element1和process element2,所以接下来呢。我们分别对这里的逻辑做一个实现,其实我们也知道两条流完全都是对称的啊,本质上来讲,我们实现一条流里的逻辑,那另外一条流照搬就可以了。所以接下来我们就想到了所有这条流里的数据到来的时候,我们要输出它跟另外一条流理数据对应匹配项的,因而join之后的结果啊,那很显然,现在啊,只要进入到这个方法里边来的数据,那就应该都是同一个K,而且现在我们又没有时间的限制,那所以就是所有进来的第一条流里的数据跟所有进来的第二条流里的数据,那其实就是要做一个笛卡耳机嘛。
06:21
任意两个都要配对起来,哎,那所以这里其实非常的简单,那我们就定义两个列表状态,分别保存两条流里已经到达的数据啊,所以。定义。列表状态。保存。流中已经到达的数据。啊,那我们还是用lazy的这种方式做一个定义吧,首先我们是STREAM1,它对应的list state。它的类型是list state。引入啊,那后面的泛型的话,我们里边保存的数据元素肯定就是三元组类型了,String string了。
07:08
然后我们获取状态句柄的方式,Get runtime contact,然后get list state里边去new一个list state的。Script。好,那同样我们还是要传入它的名称和类型,那名称的话我们就直接叫STREAM1LIST吧啊,然后后边还需要有一个数据类型,那当然就是三元组了,直接放在这里啊,那后边把这个花括号要删掉。我们当前定义了一个script传进去啊,那对应的STREAM2本质上完全一样。直接复制一份,只要改成STREAM2就可以了,那类型也是完全一样的。那这里注意需要把。当前状态的名称描述信息里边的名称也要改过来啊,这样的话我们就定好了这两个列表状态。接下来那就是每一个数据到来之后,我们判断它到底是第一条流的数据还是第二条流的数据,然后去进行对应的处理了,逻辑也非常简单,那就是数据到来之后。
08:11
直接添加到。列表状态中。所以我们这里如果是第一条留里的数据的话,当然添加的就是STREAM1的list state,直接做一个ADD操作,添加的是当前的value。啊,那除此之外呢,我们还应该把当前的数据跟另外一条流里已经到来的所有数据做一个一一配对啊,因为我们就是笛卡尔基嘛,之前已经配好对的,理论上应该都输出过了,那现在没输出的呢,其实就是这个元素跟。另外一条流里边所有元素的配对啊,那所以接下来我们就是。便利。另一条流中。已经到达的所有数据。
09:02
输出配对信息。这就是我们想要的inner join的结果啊,那所以这个便利的过程的话,自然就是一个for循环了。我们这里可以直接。就把它叫做VALUE62吧,啊,那或者说我们前面这个把它定义叫y value61,那接下来这个叫做Y62,这个就看的一目了然,到底是哪一条流里的数据,这里我们要便利的当然就是STREAM2LIST state。我们知道本身这个list state里边它能调用的方法是有限的,这样一个表达式其实是要调它的for each方法,它里边并没有for each方法啊,但是我们知道它有一个get方法,可以返回一个able类型啊,那这样的话我们就可以去调用对应的。For each去进行便利了。啊,那这里其实还有另外一个问题,我们看到现在这个代码其实还是报错的,哎,那就是这里我们get之后得到的是一个特able类型,这是Java的特able类型啊,所以这里边如果我们直接使用scale的语法的话,它是没有办法直接做这样的转换的,没有办法直接调用对应的for方法,所以我们还需要引入相对应的饮食转换啊,那这里我们引入的是import scale。
10:17
Collection下边的convert下边的implicit conversions哎,我们把这个全部引入,接下来这个就可以调用了,好引入了这个之后,那这里其实我们就是直接输出嘛,直接使用这里的collector叫做out。调它的collect的方法,那我们这里输出就是一个字符串,那就简单一点吧,我们直接写一个VALUE61,然后我们做了一个draw。那就加一个这个箭头的符号。后边加上Y2这样就可以了,哎,这就是我们整体的逻辑,那所以后边第二条流理事件到来的时候,我们的处理逻辑呢,跟上面可以说是完全一样。直接可以抄过来。
11:00
直接copy过来,哎,那只不过这里我们只需要把输入数据的这个参数啊,我们叫做VALUE2,然后接下来呢,哎,那这里ADD是ADD到STREAM2LIST state里边,把VALUE2放进来,接下来。我们这里电力的这个变量名称叫做VALUE1。访问的是STREAM1LIST state里边的所有的数据啊,所以接下来我们还是把这个VALUE1VALUE62做一个输出就可以了,这就是我们完整的处理流程。好,把这一部分都已经做完之后,我们可以直接运行来看一看输出的结果到底是什么样的。这里我们的逻辑是完全对称啊,所以可以看到现在我们能够按照K进行连接的话,那其实就是每条流里边有每个K的一条数据,那所以最后的输出呢。做了in the draw,那就是ATREAM1里的一条数据到ATREAM2里的一条数据,BTREAM1里的一条数据到BTRE2里的一条数据,那这个时候比如说我们可以把STREAM2里的数据。
12:07
再追加一条。我们加一条A的数据,这是第五秒钟的数据,那如果运行的话,很显然关于A的输出就会多一条啊,STEM1到STEM2里的两条数据都有对应的配对关系啊,所以我们看到就变成了三条数据,那假如说在STREAM1里边我们也多了这样一条数据呢?诶,那很显然,现在我们最后的输出。A,这个键对应的数据就会变成二乘二,有四条对应的输出,这就是一个笛卡尔基最后输出配对的效果。这就是使用list state去进行两条流的in the drawing具体的操作。
我来说两句