00:00
好,然后上面这一部分是算是一个基本呃,转换算子。算子和简单聚合算子,好,然后接下来我们可能就要讲一些比较复杂的东西了啊,那比方说大家从文档上看啊,接下来我们要给大家讲多条流之间的关系了,所以这些东西我们给大家讲的是叫做多流转换算子,对吧。多流转换算子,首先第一个是split,那split大家从字面上理解应该是什么呢?对,Split不就是那个分割切分、拆分嘛,所以split它的作用其实是把一条流要。拆分成两个对吧?啊,它是这样的一个作用,那那大家可以想到,呃,它的这个状态是什么呢?大家从这个图上可以看到,它是本来原本的一个data,一个data stream啊,一条stream,通过split操作之后他得到的,其实大家看它其实并没有直接拆开。
01:19
它得到的是什么呢?是一个split stream,对吧,是只不过相当于就是说在这个SP SP stream里边给不同的按照我们的那个定义的标准拆开,是不是分了不同的组啊啊有有一个这样的一个一个感觉对吧,所以它的这种呃,拆分有点像是什么呢?有点像是这个就是就是我们前面给大家讲的那个KY的那种感觉有点像,对不对啊,但是不一样。因为大家看前面得到了PI stream之后,是不是我们直接做聚合就可以了,这里不一样,这里得到split之后。你没有办法直接去做其他的各种操作,而是要要怎么样呢?而是要从这个speed stream里边通过select的方法。
02:11
把你想要的单独的那个流再剪出来,所以大家看这个剪出来之后,是不是就真的变成了一个单独的data stream呀?哎,所以这两个结合起来用,就真的是把一个stream真正意义上分成了两条流,是这样的一个转换过程,只不过在数据类型上呢,其实是分成了两步,对吧?是先通过split,呃,Data stream转换成了一个split stream,然后呢,再通过sla的方法又回去分成了两个不同的data stream,对不对啊,是这样的一个操作啊,所以大家看这个,呃,接下来我们就具体来实现一下吧,这里有一个具体的需求了,比方说我们这个传感器数据,那是不是可以按照温度高低来做一个区分啊,把拆分成两个流,大家又会想到这两个流可能分别有不同的用途了,比方说诶,有可能你这个高温的那个流有可能就要做高温。
03:11
紧了对不对,然后低温那个瘤有可能就正常处理对不对啊,所以这就是一个比较,呃,比较那个常见的一个需求啊,在在他们的这个过程当中,大家其实可以发现就是这个呃,Select的时候,是不是他可根据某个标志去把这个我们当时那个分开的那个流把它选取出来啊,呃,所以大家会看到到时候实现的时候,呃,其实是会对对应的有一个标志的,相当于前面是有一个标签的,对吧。呃,大家就会看到,其实这个split这个操作,它其实并不是真正的切开了,对吧?并不是真正把这个切开了,而是干什么呢?而是相当于只是在同一个流里边给不同的,呃,按照我们这个分组的标准,给他贴了不同的标签对不对?盖了不同的戳,相当于给它分组分开了对不对?呃,这个这个过程其实有点像,呃大家想一下那个flu拦截器里边是不是也有类似的那个,呃拦截器和选择器那个过程对不对?大家想它是不是也是先盖戳,然后根据盖上那个戳再分拣出来啊,所以大家可以类似的去比对啊,捎带的大家把这个之前的内容也可以复习一下啊,在实际应用的过程当中,其实这种分流还挺常见的,为什么呢?因为大家会想到就是有可能是什么呢?我们一开始进来的那个数据,卡夫卡里边要消费的数据有可能是笼统的很复杂的一堆。
04:43
数据对吧,就是各个地方的那个数据全在一起,都在那个一个日志里边,然后提取出来,在这个卡夫卡里边了,那有可能我们这个部门要去处理的可能是里边的一部分数据,对吧?然后另外一个部分呢,有可能是其他其他部门要去要去用的其他的一部分数据,那这个时候我们提前去做处理的时候,就可以给他先做一个切分对吧?然后切分完了之后,诶,一部分那个流你还正常,再放到那个卡夫卡里边去让别人去消费,那我这边要处理的这一部分呢,自己去做,去做运算对吧?然后做对应的这个输出啊,这是实际工作当中还是经常用到的啊。好,然后这里边我们就来给大家实现一下这个spli的操作吧,这里边定义一个split stream啊。
05:36
然后大家会想到就基于这个data stream去做操作了,呃,当然这里边我就还是把这个reduce也先注掉吧,对吧?啊,这个我就直接基于我们前边的这个,呃。哦,这个其实我不应该基于这个KBY之后啊,大家想这个speed的stream应该是在在什么上面去做操作呢?Sweet操作直接在data stream上做操作,对吧,因为大家看到这个API。
06:09
它其实直接就是在这个data stream里边的,对吧?呃,而且大家会看到这个其实现在已经弃用了。Depreated,对吧?那现在他推荐的这个方式是什么呢?推荐的是side output instead对吧?用这种方式来取代,这又是一个什么玩意儿呢?啊,这是一个所谓的测输出流的方式,而这种测试出流的输出的这种形式啊,就要用到我们后边给大家讲的可能比较底层的API process function啊,这这些里边可以去输出测试数流啊,所以大家看这个现在flink的这种发展过程,它其实是想要把这个使用的方式更加统一起来,甚至是往更底层去统一,对吧?啊,所以我们更上层的那些反而就更多的不去介绍了,但其实现在至少这个speed还能用,所以还是给大家做一个介绍吧,这个直观上比较好理解一些啊。呃,那前面我们还是做一个划分吧,比方说我们这里边的这个就叫呃呃,我们叫聚合对吧,AJSTEM。
07:19
然后他就等于data stream去做K对吧,然后后边再去做做这个reduce,这个就没问题了,对不对,然后这里边呢,我们就还是基于这个data stream去做一个转换,它就可以直接sweet,大家看画了横线的对吧?啊,已经弃用了啊,这里边大家注意这个里边要传什么东西呢。大家看一眼,这里面要传什么东西呢?它里边要传的是一个output select,诶这个大家可能一看这个东西是不是我又要去自定义了呀,对吧,又要去实现这样的一个类了,对不对,这一看这就是一个类嘛,啊那有没有简单的方式呢?可以有比较。
08:05
诶,去哪里了啊?可以有更加简单的一个方式是大家看我可以直接去传一个什么呢?传一个函数对吧?哎,就是直接传这样的一个函数,然后这个输入的是我们当前的这个数据格式,返回的是一个传播one的这样一个数据类型,对吧。那这样一个数据类型,大家可能知道这个像我们的那个sick是不是就是一个传播的一个数据类型啊,对吧,所以我们直接用那个sick,然后对应的传一个标签,盖一个戳进去是不是就可以了,所以它其实是这样的一个标准啊,具体来看,大家其实就是实现一下,大家知道了啊,这里边split我们要传一个函数进去,比方说这里边是那个三四的data,对吧。
09:00
这里边怎么做操操作呢?如果贝塔的按照那个温度的高低大于30度来划分对不对,那所以这里边temperature如果要大于30的话,我就返回,返回一个传播ones的一个数据类型对不对,那这里边我就直接给一个S好了。然后里边是不是还要给一个string啊。这个string是不是就代表了我当前盖的那个戳,所以这个戳比方说大于30度叫氦高温对吧?然后当然这里面还报错啊,是不是还得else啊else,那我就返回一个派对应就是漏对不对。诶,那这里面不应该是方括号是吧。好,呃,我们这个把这个返回就可以了,这个大家知道这个C数据类型对吧,SKY里边的这个collection.s它其实是,呃,大家看它其实是不是就是也是一种传播类型啊,对吧?啊,所以在这个里边就是大家知道大概的这种这种应用就可以了,我们这里呢,你就可以直观的传一个,大家只要知道这个S里边你包上它对应的那个戳就可以了,这就相当于我们要到时候分拣的时候,是不是就按照这个戳去去分拣它到底属于哪条流啊,好,这是这个sweet,好,这里已经做完了之后,接下来那我们就要分拣啦。
10:40
有一条流叫hi对吧,那就是Li stream里边是不是要select呀,Select怎么去select呢?大家看它要传一个output names是一个string,那是不是就是我们传的那个戳啊,诶把hi拿出来,同样我是不是可以把。
11:00
Long剪出来啊好哎,这这样直接把它拿出来就可以了,大家会看到在这里边啊,这个sled,它这个string是不是后面还带一个星号,带一个星号什么意思。面是不是可以有多个啊,对吧?这个参数可以有多个,所以大大家会想到这个hi和low后边是不是可以跟跟多个这样的string,那代表什么?是不是我可以把两个都剪出来啊,不做成的戳可以同时把它们都剪出来,所以如果要是氦和漏都有的话,那是不是相当于就是R了,所以这没有R这个戳对吧?我要减的是氦和漏,大家看是不是这样去减的。好,哎,所以接下来我们在这个print的时候,我把这个data stream就先输掉了啊,我print的时候可以怎么样呢?high.print诶大家还记得这里可以给一个那个标记吗?对吧,这个是high,然后漏点print,这个是漏,对吧,然后我们r.print这个是R,好接下来我们来看一下这个输出的结果是什么样的。
12:15
看一下这个输出结果,诶大家看前面,幸好我们有前面这个东西对不对,要不然就看不清楚了啊,大家看这是第一条数据来了之后,他同时输出了两条流,有输出对不对。第一条数据来了之后,第一条数数据它属于哪两条流呢?它的温度在30度以上,它是不是同时属于氦和R啊,哎,所以大家看氦一条,R1条,然后六呃346的这条数据15度,它30度以下,它是不是同时属于漏和R啊,所以大家看啊,这个数据就是这样来的,二里边每一条数据都有30度以上的数据,是不是都在氦里边啊,然后30度以下的数据都在漏里边,哎,这就是我们这个分流的这个效果。
13:02
大家可以下去之后再好好的试验一下啊好,讲完了这个分流。Li啊,那当然接下来就会有这个合并对吧。呃,那大家看一下这个接下来的这个合并操作,我们可能会给大家讲两种,一种是connect,另外一种是UN,诶那说明这两种不一样对不对?好我们分别来看啊,先看connect,大家看一下这张图。Connect做这个操作是干了什么事情呢?诶,大家看这其实是不是就是把两条流,两条不同的流包了一层,然后放在形式上放在了一起,放在了一条流里边,对不对,哎,所以这个状态其实有点这个,这个应该叫什么呢?他们确实是合到一条流里了,是同流了,但是同流不合污,对吧?呃,就是就是有点像这个意思,所以就是他们其实还是各自是各自的一个状态,只不过外面包了一层,这个叫做connected。
14:16
Data stream对吧,不是connected stream啊,就叫connected stream啊,所以这是这样的一个数据类型,所以大家看,呃,这个直观上理解的话,就有点像我们说这个,呃,我们说这个一国两制之类的这种状态,对不对,就是我们整体来讲,哎是是在一起是一个流,但是呢,事实上里边我们还是各管各,那具体他们怎么各管各呢?这就要讲到一个算子叫Co map。对,其实这个Co map并不是单独的一个算子啊,它其实是什么呢?其实就是在connected stream的基础上直接去应用map和flat map这样的操作,那对应的效果是什么呢?其实就是实现了一个Co map,或者叫Co flat map。
15:08
然后他的操作是干什么事情呢?就是同时针对这一个流里边的两部分,同时分别对他们做操作,然后得到的结果大家看,最后又合到一起了,又变成一个data stream。哎,所以是这样的一个一国两制的方法啊,所以大家看这就相当于又是绕了一圈,是不是又转回到data stream啊,对吧,最后又把他们合在一起了,好,那接下来我们来看一看这个东西怎么样去处理啊,这个转换的过程当中,大家就看就会看到就是呃,Connect的这个操作是把两个不同的这个data stream合到了一起,对吧,变成了一个connected stream而。Co map这个操作,或者Co flat map,相当于把collected stream又转换成了一个data stream对吧?这里大家要注意的是,他们这里面的这些数据类型可以一样吗?呃,可以不一样吗?
16:08
可以不一样对吧,一国两制嘛,既然两制了,是不是你这里边数据类型也可以不一样,后边的那个操作也可以不一样啊,哎,所以这里边就是呃,这个特点啊,Connect和com操作的两条流是可以数据类型不一样的,然后最后合并到一条流里边去,好接下来我们来看一看吧,看一看这个合并两条流啊。呃,首先是这个,呃,我们这个的操作这里边比方说既然是数据类型可以不一样,那我们给大家定义一个不同的类型吧,比方说定一条叫warning的这样的一个一个流啊,我在hi的基础上,因为之前我们这个分拣的high和low是不是数据类型还一样啊,对吧,我在这个基础上,比方说这个高温我觉得要报警了,所以我在它基础上再做一个map操作,那数据类型应该就变了,对不对啊,那比方说我把它这个data转成一个什么类型呢?呃,比方说我我就要。
17:11
做一个简单转换啊,只要它里边的ID和温度时间说不重要,报警就对了,对吧,就是说这个温这个温度传感器现在有问题了,报警了,那我这里面要的是不是就是data塔点ID对吧?和data.temperature包成一个圆组是不是就可以了,现在warning的数据类型是不是就跟漏不一样了?好,接下来我们做一个操作。Connected stream对吧?Connected不要streams了,就叫stream吧。呃,那这里边我们做的操作其实就是直接warning可以去connect对吧,大家看有connect这个方法,然后直接点漏对吧。
18:00
哎,直接这样去做操作就可以了,然后接下来得到的这个connect,是不是就得到的是一个connected streams啊,大家看它有两个泛型了,对吧,T和T2,那是这是不是就代表一个是我们之前的那个data stream的类型,一个是就connect的连接上的这个这个类型对不对?哎,就是这两个类型都在里边了,那接下来我们怎么样再把它们合在一起呢?要扣map了,对吧?啊,Co map data string。他就应该用connected data stream要做一个大家看map或者Fla map操作对吧?啊,这个操作就相当于是Co map collected的STEM基础上去做map,就是Co map好,那么这里大家会发现它需要传什么东西呢?我们点进去看吧,啊,大家看它需要一个Co map方式,当然这又是一个自定义类了,有没有简便的实现方式呢?我们看一看啊。
19:06
诶很好,有一种很简单的实现,我是不是可以传两个函数啊,那这个是不是非常直白对吧,前一个函数是不是就是针对我第一个啊那个流去做处理对不对,然后后一个函数针对第二个流去做处理,所以这里边我们也是这样啊,大家直接传两个函数,比方说第一个我指定一个warning data。呃,它转换成一个什么呢。Warning点,呃。诶,这里边不能,因为已经是远组了,没有ID了,对吧?所以只能是它点一,然后warning data点下划线二,这是我们的第一个函数,对不对?诶,啊,当然这里边你直接这样的话,相当于数据类型没变,我们变一个数据类型吧,有一个报警信息,比方说warning,对吧,数据类型就变了,做了转换,然后下面load data的话,那相当于是不是也要做一个转换啊,对吧?比方说我用low data的ID,然后比方说我输出一个它没事,对吧?Healthy。
20:19
诶健康正常,哎,输出这样一个信息,大家看做了这个转换之后,就可以得到一个得到一个什么。是不是就可以得到一个data stream了啊,而且大家看这个data stream r类型,这是一个type information,所以它其实里边的类型是不是并不一定非得一样啊,什么样的类型都可以,对不对?所以大家看我这里边处理最后得到类型还是不一样,没关系,Map嘛,Map本身做转换是不是就可以转换成不同的数据类型啊,哎,所以这里边它就还是这样是没问题的,这里边我们把这个助调给大家看一眼。
21:00
Co map,它的这一个print出来是一个什么效果?大家看到这个输出的结果啊,哎,是不是就把我们的几条数据就相当于分门别类的做了处理,然后输出了大于30 30度的那些是不是都在那个氦那个流里边,然后转换到了warning里边,对不对,最后的输出结果是不是三三项,然后后面加了一个warning啊,而如果是在漏那个流里面的数据,它的转换最后是不是就是一个二元组啊,后面跟了一个healthy,六和七是healthy,而这个一它的一的三项数据,还有十的数据都是都是warning,对吧?每一条数据来了都要报警,所以大家看,这就是我们最终的这个输出结果。呃,这样我们就讲完了这个connect,另外除了connect之外呢,大家看到还有另外一种方法叫union union是什么呢?UN从字面理解大家知道它是一个联合操作,对不对啊,就是直接就就合并合并在一起了,那大家就会想到UN和connect看起来都是合并多条流,那它他们又有什么区别呢?
22:20
诶,这里边可能就得提到,首先connect这里边我们的两条流是不是可以数据类型不同啊,对吧,这个数据类型可以不同,然后我们这个一国一国两制,这个其实操作的灵活度是非常非常高的啊,那大家就会想到,如果要是说数据结构相同的话。那是不是你用这个也可以啊,对吧,这个其实是没有什么问题的,那大家想要这么说的话,Connect就非常好啊,它有什么缺陷吗?哎,这里大家要注意,我们说他他们所有的这些,现在讲的这些算子都是多流转换算子,我们的刚才讲的是两条流合并。
23:07
他能三条流合并啊。怎么安装,哎,对,大家可能就会想到这里边,那那我connect它里边可以传第三个参数吗?不能对吧?诶那大家又会想到这个东西,如果要不能就是不能在后面跟上更多的这个流作为它的参数的话,那是不是每一次点connect只能是两条流合在一起啊。那如果要在和第三条流怎么办好,有些同学说那我后边再去connect可以吗?因为这里边点connect之后,是不是得到的已经是一个connecting stream了,对,所以这个时候如果要去再调connect的方法,你是不是必须得把它对先转换成一个data stream之后是不是才能再调connect方法合并第三条流啊,那这个过程其实我们就觉得有点有点不爽了,对不对?那这个过程就好像不是我们一下子把药的这个流全合并到一块儿,然后去做操作了。
24:12
哎,所以大家注意connected connect和。com这个操作呢,它有局限,就是一方面它非常的灵活,数据结构可以不一样,不同的流数据结构可以不一样,另外一方面呢,它一次操作只能操作合并两条流,对吧?啊,那呃,大家自然就会想到,那我如果想合并多条流怎么办呢?想合并多条流的话也可以来,大家会看到我可以用另外一个方法,一个算子,就是这里提到的UNI,它可以直接合并多条流啊,那但是这个UN呢,合并多条流也不能说好处都让你一个人占了呀,它有限制。它的限制是什么呢?对,它的数据结构必须要一样,那数据结构一样这个就简单了,对不对,那你直接所有的流数据结构都一样嘛,直接合到一起是不是最后还是同样的一个data stream啊,啊,直接就放到一起就可以了,所以大家看这个union就非常简单,它其实就是还是data stream转换到data stream,没有对应的另外一个操作了,对吧?啊,所以这个其实是很简单的,我们可以在代码里边简单的实现一下啊。
25:23
呃,那这里边代表大家会发现,假如我们定义一个union stream,大家看假如说我用这个warning STEM,是不是直接可以调这个union方法啊,我如果想直接用这个long temple stream的话,大家看它是不是会报错啊。为什么报错呢?类型不匹配type missmatch对吧?哎,所以我们说它必须要是同样的数据类型两条流才能够拥在一起,哎,那这里边如果我不用warning stream,如果用什么跟它用就可以啊对,如果我用那个high time three来看,是不是这个就完全没问题啊,而且大家这个UNI这个点进去,大家看这里边它的这个参数是不是有一个星号表示,对我是不是可以一次传很多个data stream进来啊,只要你类型相同,你随便传对吧?而且另外大家也知道,你既然得到的这个也是一个data stream。
26:29
那就代表什么,是不是我后面可以继续去约定啊啊对吧,所以这个就就是符合我们的预期了,就是多条流都可以合并到一块儿啊,当然了,这个就是如果这两个合并到一起,我们这里边要去再打印一下的话,那这个就不用打印了吧,大家知道他们应该输出什么,对吧。那我还是来看一眼吧。啊,大家看这个合并在一起之后,这个状态就就非常简单,其实就相当于我们的那个所有的数据又重新输输出了一遍,对不对啊,但是大家看这个数据是不是就跟我们之前,尽管这里我们指定了那个并行度是一,但是这个数据数据输出的这个顺序跟我们数据读进来的顺序是不是就不一样了啊,因为大家看到我们既然是已经把它拆成了两个流,两条流对不对?哎,后面你不同的这个算子任务它是可以并行的,那哪一条数据先进入到后边的这一个print去打印,这个其实是没准的,对不对,每一条流先先结合到这个最后的这个union stream里边,这个是没准的,所以大家看后面的这个顺序有可能是会有一些变化。
27:40
这是我们这个union这一部分,那么当然大家也可以看一下文档里边的总结,就是connect和union的区别啊,这个其实也是非常明显了,它俩其实就是相当于是各有优劣,对吧?那么connect它的优势在于啊,一国两制可以是不同的数据类型都能够处理,都能够联合在一起,对吧,但是他就只能够操作两条流。
28:06
而union的话,是不是就可以操作多条流啊,啊对,那但是union就有局限,是不是它的数据类型必须是一样的呀,而且大家就知道UN是不是就没有对应的那个另外的一个匹配的那个操作符了,因为它数据结构都数据类型都一样,一直都是data stream,而connect就不行,Connect必须得对应一个哦,对Co map或者Co Fla map这样的一个操作,把connected stream再转换成返回到我们熟悉的这个data stream,这就是connect UN联合的这部分内容。
我来说两句