00:00
继续讲一下河流的操作,有分就有合嘛,前面我们是用测输出流,把一条流拆成了多个,那假如说现在我们有多条流的话,能不能把它合成一条流呢?啊,当然是可以的,那首先我们讲一个最简单的合并合流操作,那就是所谓的联合UNI啊,Union的操作呢,就是非常简单粗暴啊,就是两条流,你不是有数据来吗?我们这条流来了一堆数据,这条流来了一个,一堆数据来一个,处理一个,那河流操作是啥意思呢?就是你这边来一个,我就处理一个,然后你这边也是一样,你们谁先来,我就处理谁,所有的数据都放在一条流里边,我就当成是一条流处理就完事了。啊,那它这里面有一个要求,有一个要求就是数据类型必须相同,那大家自然就知道了,既然数据类型都相同,当然他就是直接把它简单粗暴的合并放在一起不就完事了吗?所以这个算子它就是把两条流汇到了一起而已,没有做任何别的操作,那你要想做别的操作,那就基于它之后你在该map map,该开窗开窗,该process process就可以了,啊,所以这个union操作是非常非常简单的。
01:09
那这里面大家需要注意的就是特点呢,一个是数据类型必须相同,另外一个特点是它可以同时合并多条流,调用的时候呢,是基于一个流,这好像这个是主流一样是吧,但其实他们之间是完全平等的啊,因为类型都一样嘛,然后。点调点UN方法,然后传入其他的流,可以传入多个。可以把多条数据类型相同的data stream合并成一个,哎,这是这个幽宁的具体用法,这个非常非常的简单啊,但是大家这里需要注意一点,就是那合并之后数据类型一样是合在一起了,这个简单了。那如果要是那还有水位线啊,我们如果是事件时间语义的话,之前两条流它基于自己的数据去提取时间戳啊,那有可能它的那个延迟时间都不一样,对吧?啊,它的生成机制都不一样,那产生出来水位线也不一样啊,那传到这里幽宁之后,河流了之后,水位线又是什么样子的呢?
02:12
那接下来我们可以给大家测一测这个啊啊,就幽宁本身没什么好说的,但是这个水位线比较有趣。我们在第八章里边再去创建一个新的Java class,当前我们测的是河流union的河流操作。Union。啊,那前面的这个过程其实都差不多了啊,我们就直接把这个先先列在这儿啊。下边是env execute,执行起来括括号,那这里大家需要注意我们需要有两条流,哎,那其实就是应该有STREAM1 stream2对吧?诶,那这里还有一个问题,就是说如果我们都直接啊在这里去,呃,用这个click source去做这个发送数据的话,可能看的不太明显,诶,因为他们的数据都是都是按顺序来的嘛,我们可能更想测这个乱序数据,这里还有这个对应的延迟时间设置的时候,当前的water mark到底是怎么去增长,这个可能会,呃,就涉及到这个问题可能会更多一点啊,那所以这里边我们干脆就还是用socket吧。
03:20
呃,这里给一个HARD102。七七。然后接下来大家知道啊,到这个数据那就没那么简单了,要做一个map了啊,那那这里我们map的时候,可以你有一个map function也可以直接。每一个对应的表达式,那这里边我们可以当前的这个string先把它拆分开,这是fields,还记得这个写法吧,每一个data来了之后,先做一个split。当前,我们用这个逗号做一个分割。然后接下来,呃,那就是直接return又一个event。
04:02
边用fields。零,然后FS1。Him,最后是一个长整形any build2,哎,这就是。哎,我们最简单的这种处理方式啊,那然后接下来既然有了STREAM1了,那大家想我还得有一个理由。全类似的,来一个STEM2啊,那这里我们不要102了,103啊,或者是把这个NC都起到一个机器上也行啊,你来一个那个102的88889999都可以啊,我们这里边就用两个机器吧,都是777,然后接下来还是一样的数据,那这里边区别在于呢?哎,我们来一个。不同的延迟,比方说这里边我们第一个of second seconds延迟两秒,后面这个呢,延迟的多一点,我们延迟五秒。这样的话,后边这个水位线显然就。显然就不同了,对吧?啊,接下来我们做一个合并,合并两条流。
05:06
那这里我们用一个。STREAM1接union stream2。得到对,然后这里边大家会发现,如果我直接打印,那肯定看不到什么东西啊,我想看到里边的水位线到底到哪了,你要不就是用窗口之类的操作去测,对吧,看它到时间了没有,呃,或者定时器啊去去测,那我们这里面其实不用那么麻烦,我们直接去打印出来水位线不就完了吗?那要打印的话,不是应该调这个process方法呀,调一个process,在这个process function里边啊,就可以直接打印当前的水位线信息啊,所以里边我们用一个process function,那这其实里边也不需要这个真正输出什么东西啊,那那干脆我们就把这个水位线当成它的输出吧,不用再去控制台直接打印了,我们最后直接把这个流打印出来就完事了嘛。
06:01
那在这里边我们就需要去实现这个process element的方法,然后里边呢,关键的当然就是out点。一个输出,输出什么呢?啊,就是水位线。水位线是多少?好,这里我们写成了一个string了,对吧,那就改成string吧。既然要加上信息的话,就还是STEM方便一点。后面来一个,所以大家注意啊,Collect out的类型也要改。后边就要加上当前的。有边线,从timer service里边去取current water mark,哎,它做一个一个输出。这就是这个完整的过程啊,当然前面这里大家也可以就是看清楚我们当前到底输入了什么数据,对吧。一。然后下边来一个STEM2也来一个打印。
07:03
STREAM2。就是我们完整的一个测试啊,接下来我们开启哈杜102NC7777骑起来,然后另外哈杜103 103自己启动一下。之前我们提过水位线的传输机制,就是如果水位线在病情,就是当前有很多并行子任务,哎,这个上下游之间传递的时候。而下游传递并行子任务传递不要是广播,直接把水位线广播出去就可以了,而上游的不同,并行分区的水位线传递过来之后,当前任务会干个什么事呢?呃,取一个分区水位线对不对?然后以他们最小的那个为准,诶,那大家自然想到了,你看这个是并行子任务嘛,而我们现在呢?现在只不过是两条流而已啊。不过是两个不同的S任务,然后现在要合并到后边同一个access这个任务而已嘛,哎,那大家就会想到了,那当前他是不是也可以认为是从两个并行的上游任务接收水位线呢?
08:11
那它对应的这个操作应该是什么样的呢?自然就是跟我们之前讲到的,取他们两者之间最小的那个木桶原理啊,以最小的那个为准,这样是比较合理的一种方式啊,所以接下来我们可以看一下这个具体测试的结果是不是符合我们预期,好接下来爬度103,接下来NC-LK。7777,我们定义的这个端口都是7777好,然后接下来我们既然都已经起了,把这边的代码也提起来,做一个简单的测试。好,启动了,我们先来一条数据,大家看到STREAM1只有一条数据,水位线是长整形的最小值,负的一个很大的数,那我们再推进一次呢,1001没变化,大家看到完全没变化,对不对,1002没变化啊,我们再来一个Bob。
09:04
Copy一下。2000没变化啊,其实大家知道这个跟呃,就是当前的user和URL没关系啊,我们这里边也没有K代码,所以没有任何关系,你直接改最后的这一个时间戳就可以了,大家看你更新到多少,它这都水位线都是这个负的,呃都是这个长整形最小值,为什么呢?啊,就我们说的啊,SWIM122条流要以最小的为准。那。Stream这里没有任何数据过来,那是不是它必须要以我们当前的那个最小值为准啊,啊,所以是这样的啊,那这个时候如果我这儿来一个BOB1000的话。大家看。当前的水位线还是负的对吧?哎,那这个是我们之前说的那个原因,就是当前数据来了之后,它其实水位线还只是在之前的那个状态,对不对啊,其实我们知道这条数据来了之后,现在真实的这个系统场景里边水位线已经变了,但是我们当时打印的时候,水位线还没变,200毫秒触发一次嘛,呃,它还没更新呢,好,为了确认这个啊,我们还给一个1000大家看一下。
10:12
还给一个1000。大家看现在他这里边给的是一个负的4001。我们这里边输入了一个一秒钟的数据,为什么它是负四秒,而且零一毫秒呢?啊,这个主要我们当时还设置了延迟对不对,这里还有五秒钟的延迟呢,呃,第二个流里边的数据有五秒钟的延迟,所以我们当前的水位线来了一个一秒钟的数据的时候,要减掉延迟负四,然后还要减一,大家还记得还有个负一嘛,所以这里边它是负4001。啊,因为之前我们第一条流的这个水位线显然比它要高嘛,啊这里它是2000啊,那大家可能想到它的水位线是多少呢?啊,这个我们其实要看它的延迟是多少,延迟是两秒,所以现在减了之后变成零了,零再减一,那应该是负一。
11:06
所以这里边我们可以根据这个啊,直接看这个哈,杜普三这里的。我们不停的来推进它的时间,比方说我们把它推进到2000。大家说现在的水位线啊,那现在水位水位线不变,还是之前的那个效果嘛,对吧,前一个数据的效果,所以我们继续来继续贴一个2000。大家看现在是不是变成了3001啊,负的3001就是相当于在1000的补上加了,好,我们继续啊,再大一点啊,我们直接到5000。当然第一条数据这个没效果对吧?啊,这这个第一条数据是没用的啊。我们还得我们再多推进一点吧,6000。大家看现在是刚才5000的效果,5000的话,现在的。当前的这一个时间就已经减五变成了零,再减一变成了负一,所以就推进到了负一,而且现在来了6000,那我们现在再再复制一下啊。
12:06
我们再来一个7000。大家看到现在的水位线,诶,怎么还是负一呢。我即使是盐不是滞后一个,也应该是以之前的这个6000为准啊,他如果减就减五秒减5000是1000,再减一,那也应该999啊,那为什么还是负一呢。最小的为准,是不是现在要看这边了呀,呃,这里的它的水位线减了两秒之后啊,再减一,它是负一,所以那当然不行了,那所以我们这里边比方说。Pass一下,如果这里给6000。啊,当然这个还用的是这个2000对吧,大家看还是2000,这这个数据的话,这个没什么影响啊,所以我们接下来再配一下。大家看现在。水位线直接涨到了1999,为什么呢?因为它的这个六零零零两秒之后是四秒,那应该是3999,而这边呢,啊,这边还停留在。
13:01
就是7000,然后减去五秒之后再减一的那个1999,诶不是你应该看它的上一个数据吗?注意啊,看上一个数据是因为在打印这个数据的时候,当前的数据还没有生效,所以200毫秒的那个水位线没有体现出来,那现在200毫秒过了没有,肯定过了对吧?那之前这个7000的这个数据体现是不是就肯定体现出来了呀,他自己当前是体现不出来,那你在另外一个流输入数据的时候,当然可以体现这边这个7000的效果呀,所以现在两个流结合起来,它的水位线就是1999。本身优很简单,但是根据这个大家可以看一下两条流合并之后,我们说的以最小的那个水位线为准,到底是什么样的含义。
我来说两句