00:00
我们已经了解了分流操作,那就是将一条流拆分成多条完全独立的流啊,那当然了,有分就有合嘛。接下来我们要介绍的就是flink当中的河流操作,其实整体来说呢,在实际应用过程当中,河流操作会更加的频繁,更加的普遍,因为可能我们经常就会遇到来源收集的数据源完全不同的两条流,他们的数据呢又有相应的一些联系,我们需要把它们结合在一起来进行处理,这个时候就需要涉及到河流的操作了。那在flink当中呢,其实给我们提供了非常丰富的河流对应的API,那最为简单的就是这里的联合操作union啊union直观上来讲的话。就是最为简单粗暴的把两条流捏在一起,捏合在一起啊,那它有一个非常严苛的要求,就是流里边的数据类型必须相同啊,所以我们可以看到这张图的示意就是不需要做其他任何的操作,两条流本身的数据类型就是完全一样的,然后呢,经过幽操作之后,就汇合在一起,变成了一条新的流啊,那接下来就把它当成一条完整的流,然后进行接下来的转换操作就可以了。
01:19
在代码当中也非常简单,那就是基于一条data stream,调一个点union方法,然后他需要传一个参数,传入另外一个data stream,注意他俩的数据类型必须要是一样的,然后得到的就变成了一个新的data stream啊,那其实我们知道这个所谓的新的data stream本质上还是相同类型的流,哎,那其实这个就没多大区别嘛,就是把两条流里边的元素按照顺序又重新排到了一条流里边而已。所以在代码上呢,其实实现也非常的简单,那接下来呢,我们其实是想用这种方式啊,利用UNI的方式来测试一下,当我们出现多条流合并的时候,水位线到底是怎么样去推进的啊,因为我们之前说过啊,水位线在事件时间语义下,它就是时间进展的标志,那假如说我们现在有两条流要合并,那他们的数据来源有可能完全不同嘛。
02:18
自然水位线进展也是不同的,那假如说我们一条流快一条流慢,他们合并之后,接下来的这个水位线又会变成什么样呢?诶,那数据的话,我可以说是谁先来,哎,我接下来在下一个流里边,合并之后的流里边就先处理谁,但是水位线到底以谁的为准呢?哎,所以这其实就涉及到了水位线在不同任务之间传递的规则,之前我们就曾经说过,水位线传递的规则其实是有一个所谓的木桶原理的啊,就是假如说啊是上游有多个并行子任务的话,向下游去传递水位线,那么下游任务就会以他们当中时间最慢的那个为准啊,这就是我们说的啊,为每一个上游分区设置一个分区水位线,然后取他们当中最小的那个作为自己的事件之中。
03:13
这是上游不同分区子任务朝下游传递水平线的规则,那现在如果说我们变成了上游是两个不同的原算子呢?两条不同的流呢?河流的操作又是怎么样呢?啊,其实跟我们之前说的这个传递规则是完全一样的。接下来我们就可以在代码里边去做一个对应的测试,刚好也可以把UN做一个测试啊,那所以接下来我们在这儿新建一个SC的object,这个就叫union test。没方法先写出来啊,那首先当然还是stream execution environment,我们先把它get获取到。同样叫做env,上面改成下划线,那全局的并行度还是直接设置成一,方便我们进行打印输出,看到测试的结果。接下来我们要做的当然就是读取。
04:07
两条流。进行合并。我们就先定义一个STREAM1ENV,我们可以直接ADD source啊,那当然了,ADD source如果我们传入自己定义的测试数据源和一个S的话,它是隔一秒产生一个这个,而且是完全按照顺序的话,我们可能也看不清楚到底这个水位线是怎么变化的,我们干脆就直接用一个so文本流手动的去发送数据,这样的话我们就能够更加清楚的看到水线的变化了,对它有更好的控制啊。那所以我们现在调用的方法是socket type stream。传入哈杜普102。7777。当然了,如果说我们是直接读取文本流的话,得到的是一个字符串,我们还得对它进行一个map转换,包装成我们想要的类型啊,因为我们接下来要处理的时候,肯定还是event类型更加的方便一些啊。那我们首先做一个字段的提取,叫做fields,从data里边split。
05:16
按照逗号进行一个分割,然后要返回的当然就是一个event。我们要引入。CHAPTER05下边的event。包装对应的字段,这个我们非常熟悉了,F0本身就是user字符串类型,直接返回啊,那1URL也是字符串类型,Tri之后直接返回,最后一个是F2,这是一个长整型的时间戳,所以要做一个too long转换啊,这样的话就得到我们想要的数据了,那当然了,后面我们要做水位线的测试,所以还应该有一个分配时间戳,提取时间戳,生成水位线这样的一个方法调用啊,那我们这里边直接就是使用升序的时间戳提取就可以了,我们主要是测试水位线传递嘛,不需要去测试乱序,所以直接下划线点TIME3提取出来。哎,这就是我们对于第一条流STREAM1的定义,那其实我们想到第二条流应该也是完全一样的啊,完全类似。
06:17
直接copy过来就可以了,STEM,那后边我们读取的端口可以改一下,把它改成8888啊,那下边的map转换以及提取时间戳和生成水位线的策略都是完全一样的,有了这两条流之后,接下来就可以进行合并了,合并的操作非常的简单直接,UN stream2。然后接下来啊呢,可以去定义一步转换计算,然后在里面我们关键是想看一看当前的水位线到底是多少,哎,那如果想看当前水位线的话。当然就需要使用一个process function,因为在process function里边有timer service,我们可以直接捕获current water啊,那这里就调用一个点process方法里边我们要实现一个自定义的process方式。
07:09
哎,那这个process function里边的数据类型呢,当然就是event,最终我们输出的还是直接用一个string。来看一看里面的信息就可以了,好,那当前我们直接做一个process element,每一个数据来了之后调用的都是这个方法,我们这里希望做的其实就是要输出当前的一个水位线信息啊,那当然了,如果我们想输出的话是alt.C然后把水位线信息包装在一个字符串里面就可以了啊,那我们可以写一下当前水位线。到底是什么样的?那这里我们就可以直接调用ctx的timer service,然后去获取当前的current water mark做一个打印输出。啊,那当然了,这里我们只是作为process的处理结果,得到这样一个string类型的数据,那最后想要看到的话,还应该要加一个think任务,直接print到当前的控制台。
08:07
最后env执行起来,这就是我们完整的测试流程。接下来我们要做测试的话,那当然应该先到哈图102上去把NC要提起来啊,那我们NC-LK7777。然后我们可以新建一个terminal控制台。NC-LK8888。两个都提起来啊,那当然了,我们可以看的清楚一点啊,把它们并排的窗口放在这里,然后我们可以分别输入数据,就可以进行测试了。接下来我们把当前的flink代码运行起来。然后分别去输入对应的数据。好,接下来我们在7777这里先来做一个输入。输入一条数据之后,我们看一下当前的水位线是什么呢?诶,注意这里的水位线,如果我们还记得话,这就是最初没有数据到来的时候,默认水位线的那个最小值,哎,那所以我们看到啊,一条数据来了之后,现在输出的当前水位线是最小值。
09:13
好,那接下来我们可以继续输入。比如我们可以复制Bob的这条数据啊,接下来我们同样还是在777这边把第一条流里边再去输入一条数据,在之前我们进行测试的时候,其实会发现啊,第一条数据输入的时候,因为当前的水位线200毫秒周期性的生成一次第一条数据的时间戳,还没有引发当前水平线的变化,这个是正常的。但是第二条数据来的时候,哎,我们说那第一条数据它的这个最大时间戳应该已经影响到水位线了呀,那我们现在这个水位线如果减一毫秒的话,不应该是999吗?为什么第二条数据来了之后,当前的水位线还是之前的那个最小值呢?这就是我们所说的当前如果有多条流进行合并的话,那合并之后的水位线是按照之前最慢的那个时钟来计算,哎,那所以说我们当前是有分区水位线的吧,整个的这个过程的话,我们可以用一个图来做一个描述,哎,那可以看到我们现在有两条流要进行合并,最初的时候呢,水位线都是长整形的最小值,哎,那接下来我们当前算子啊,进行合并之后,UN之后进行process处理的时候,它的水位线当然也就是最小值了。
10:34
我们可以把它认为就是一个负无穷大,然后接下来呢,如果第一条流里边来了一条数据,它是第一秒的数据,这个时候分区水位线推进到了999,哎,那这个时候本身算子的水位线啊,我们当前合并之后,Process的水位线根本没有变化,因为它是以最小的这个为准的。假如说第一条流的数据继续到来,又来了一个2000毫秒的数据,第二秒的数据水位线推进到1999,诶,那其实还是没有变化,我们这里的瓶颈是第二条流这里的最小值啊,所以这个话就解释了我们这里能够看到的这样一个结果。
11:17
所以怎么样我们当前输出的水位线可以发生变化呢?哎,那其实非常简单。我们只要。继续推进这里的水位线。如果说我们给STREAM2这边来一个水平线的推进,比如说我们先推进它到一秒钟吧。如果这里到一秒钟的话,我们可以看到当前的水位线并没有发生变化,那这个也很好理解,因为现在STREAM1这里的水位线应该已经到了1999,但是STREAM2这里的水位线在我们输入这条数据的时候,它还没有发生变化,它还是之前的最小值,所以如果我们在这里要直接输出当前的水位线的话,那肯定还是以之前的最小值为准的啊。但是我们既然讲到了如果。
12:09
继续输入一条数据的话。再给一条两秒钟的数据,这个时候效果就会有所不同了,因为在输入这条数据的时候,第二条流STEM2的水位线本身已经到达了999,而第一条流STREAM1的水位线呢,已经是1999,哎,那所以我们这个时候如果要想输出当前的水位线的话,那肯定就已经是999了。其实我们输入了这条数据之后,接下来再去过200毫秒时间周期之后发出的水位线,那就应该已经推进到了1999才对,所以说如果我们在STREAM1这边再发出一条。三秒钟的数据的话,很显然这里当前的水平线就已经进展到了1999啊,那如果说我们只是在STREAM1第一条流这里不停的推进。
13:05
那它就不会有任何的进展,当前的水位线还是1999。只有在STEM2这边也有了进展之候,两边齐头并进的时候,当前的水位线才会发生变化,当然了,在我们刚刚输入这条数据的时候,STREAM2的水位线还处在1999,所以这个时候呢,我们如果看的话,还不会看到它的变化,那什么时候会变化呢?其实我们就知道了,只要是下一次它能够输出水位线信息的时候,我们当前就应该已经变化,因为第二条流已经推进到2999了啊啊,那所以只要有一个机会啊,让它输出明显,它就会变化,比如说我们这里直接输出一个没有推进水位线的信息的一条数据,甚至你来一个乱序数据都可以啊,只要触发了输出水位线的信息,来一个4000。那我们就可以看到现在的水位线已经变成了2999。
14:04
哎,所以这就是我们所说的多流转换,多条流进行合并的时候,水位线的传递规则还是符合木桶原理,它会以所有合并的数据流里边最慢的那个水位线为准啊,而且我们在测试的时候需要注意的就是每一个数据触发调用这里的process element,然后输出当前水位线的时候呢,当前数据时间戳对水位线的影响还没有办法体现出来啊,这个我们一定要考虑到。这就是关于union以及水位线传递规则的测试。
我来说两句