00:00
呃,讲完了S,接下来我们继续看这个传送,下一步有了数据源读进来数据之后,接下来我们就可以给它做各种各样的转换,做计算,做处理了,对吧?诶,好,那大家看一下我们常用的转换算子有哪些呢?诶,这里边大家大家可以简单的分几类吧,呃,有时候大家可以看到一些这个文档或者说文章里边会对呃,Get stream API里边的这个转换算子做一些分类,比方说大家看前面的这个map flat map filter这些东西,它很简单对吧?呃,而且大家都很熟悉了,这个在Spark里边也都有啊,往往会把它叫做基本转换算子,呃,它确实很基本对吧?啊,这个就是不用讲,大家也应该知道,然后下边的这个像KBY开始之后。那大家会想这个就开始是不是要把它做分组了呀,要开始做转换了,对不对,然后从KBY开始,呃这个这呃往往会把这个东西叫做一个监控流转换算子啊,当然也有也有文章会把它叫做分组转换算子,那这个就看定义了啊,这个所谓的键控流其实就说的就是就是它它要key by嘛,就是大家也已经看到它转换之后,是不是是把一个data stream转换成了一个叫做kid stream呀。
01:25
啊,有时候会翻译这个K的stream,把它翻译成键控流,就是用一个键一个K控制的一个流,对吧?啊是这两个字啊,建控流,所以有时候就把这个KBY叫做键控流转换算子,然后下边的这个后面有一些聚合操作,对不对?大家会想到有sum啊,有这个max啊,有命啊,这些操作往往就叫做滚动聚合算子,或者跟这个reduce这些算子联合起来,我们就统一把它们都叫做聚合算子,对吧?啊,这是又是一大类,就是KBY之后做聚合,分组之后做聚合。
02:02
然后后边呢,还有一大类就是大家看有什么呢?Split select connect,还有UN,从名字上大家就可以看得出来,这是干什么呀,可能要做连接对吧,有可能还要做做分割对不对?哎,这可能就是对多条流做做转换了,所以下面这些往往又会叫做多流转换算啊,就是有有不同的定义啊啊,当然了就是这里我们介绍的只是data streamam上的一些基本的操作,还没有牵扯到跟时间和窗口相关的操作啊,所以后面的话大家就会看到还有上面是不是可以开窗口啊啊,所以就这个API里边还有这个窗口算子啊,开窗啊,还有跟时间相关的一些算子,所以这个我们后续再讲,这里讲的是跟时间还没有牵扯上关系。好,接下来我们就一个一个说吧。呃,前面的这个比较简单啊,大家会看到这个,呃,我们还是先另外先新建一个,新建一个类吧,这就不是south test了,对吧,我们同样在这个下面新建了一个object,我们这个叫做呃,对,Transport。
03:18
Transform。然后在这里边我们把这个main定义出来,前面是不是先要去创建执行环境啊啊,这个是没办法没没办法少的啊,没办法偷懒get,然后我们把这个引入进来,后面不要忘记要有exeq的,对吧,这个是transform。Test啊,这是我们的这个,呃,整个程序的框架啊,那这个里边的数据呢,我们正常来讲就还用之前的那个,呃,Sensor sensor这个数据好了,对吧?啊,然后大家想这个样类我是不是在一个包下面,我也不用重新定义样利类啊,啊,我就还用这个,呃,SS下面的那个样例类就可以了,所以后面我们用到什么算子再再来去做介绍啊。
04:16
呃,首先大家看一下这个map map这里做做的操作非常简单了,就是转换,大家一看这个图是不是来的时候每个数据是方的,最后可能输出的数据就不一样了,对吧?变成圆的了对不对啊,所以非常简单啊,也非常直观,这个有点像我们的一个生产线流水线对不对啊,进来的时候是原料,出去之后是产品对吧,非常简单啊,都很熟悉,不讲了啊,然后flat map这个大家也很熟悉了啊,这是跟map的区别,就是相当于我们可以是传进去的,本身就是一个list对吧,本身就是一个数组这样的一个类型,然后最后结果是不是相当于就。全部打散了呀,啊,全部打散了拆开,然后最后合并成一个大的粒子,所以大家看我们这个结果是什么呢?把每个元素都要都要复制一份,然后变成一个list对不对啊,它是要我们说是要把它打散了,变成一个list,是不是不打散也可以啊,只要最后是给的,给成一个例子就可以,对不对?哎,所以这里边就是把它复制了一份,然后最后变成了一个例子,最后的结果就是什么呢?是不是把这个list最后再合并在一起啊,大家看就变成了112233对不对。
05:35
啊,所以这是这个Fla map的一种用法啊,另外我们更常见的其实是本身传进来的就是就是一个list,然后之后是不是就相当于我们把它split之后,哎,就相当于打散了啊,每一组本身是一个list,把它split之后,最后再合并在一起啊大家看这就相当于我们work的时候先做了分词。最后就是abcd分开啊,这是常用的这个Fla map操作也很熟悉啊,不详细讲了啊,然后下面一个基本转换算子filter啊,这个大家就更熟悉了,它是不是里边可以传进来一个filter函数,对吧?呃,根据这样的一个这个这个函数是什么呢?是我们的这个元素,然后它的对应的这个应该是一个什么?是一个布尔的。
06:28
这个表达式对不对,布尔类型的表达式,所以说大家看这个判断是不是就是假如是一的话,把这样的元素都给filter出来啊,啊这个大家也很熟悉,我们就不详细讲了,所以它的效果是什么呢?其实就是一组元素进来。本身这个元素的形式和这个模样都不变,最后是按照它的某个特征,某个特性,最后筛了一部分出来,对不对?比方说大家看按照颜色是不是只把白色的选出来了啊,这个就比较直观好,这就是这个秀的一部分啊。
07:04
呃,基本转化算是我们就是结合后边我们做做这个事例代码的时候,给大家一起再去实现就好了,接下来我们给大家看这个KBY。K,大家要注意的是,它理论上来讲应该是做了一个什么操作呢。大家看它其实是做了一个分组或者说分区的操作,对吧?啊在前面我们讲到这个运行架构的时候,其实也说了,其实遇到KBY算子的时候,数据是要做重分区的,它是基于什么去做重分区呢?哈西扣对吧?啊,所以大家看它最后的效果其实是什么啊,有点像是把一个流就拆成两个了,对不对啊,但是大家看注意这里要注意啊,我们的流其实还是一个。它只是分组,只是分区,没有分流,它是变成了一个什么样的流呢?啊,变成了一个分组之后的流,或者说变成了一个监控流,对吧,就是我们所谓的这个k stream,所以在这个K的操作过程当中,它其实是把data stream。
08:16
转换成了一个k string,这里就涉及到这个数据类型的转换了,大家要注意一下啊,所以它是从逻辑上将一个流拆成不同的分区了啊,并没有实际把这个流拆开,那么大家看一下它的这个效果啊,它相当于是什么呢?按照P,那大家看,比方说这里边我们如果认为它这个颜色就是表示一个P的话。大家看是不是就相当于按照不同的颜色要给它分区啊,诶那大家看到这里黑色的诶全放到下面这一个区了,那为什么上面这里边不全是白色的,还有绿色的呢。对,大家要想到它是按照哈西扣来分区的,那是不是有可能有一些不同的K会分到同一区区啊,哎,所以这个大家是很好理解的啊,所以这就是KBY的这个操作啊好呃,然后接下来再给大家看一下这个滚动聚合,滚动聚合哎,其实这个都不用说了啊,要强调一点的是link里边的滚动聚合算子。
09:21
必须得针对k stream来做操作,也就是说你必须得KBY之后再去撒。你必须得分组之后再sum,你直接在一个data stream上做sum不行,Data stream没有这个方法,没有这个API啊,这个给大家强调一下,所以大家看比较简单的方式就是说,哎,Sum,那就是针对我们KY之后每一个直流,是不是你可以指定某一个字段去做一个求和啊,啊聚合,这是一个求和操作,或者我们可以怎么样,是不是可以命可以max,可以mean by max by最大最小值是不是也是一个聚合操作啊呃,非常简单的这个滚动聚合算子啊呃,这里边讲到这儿的话,我们还是结合这个代码给大家做一个做一个实现吧,就把前面提到的这些简单的这些操作都做一个实现吧。呃,比方说这里边我们。
10:18
要做转换,首先先得从先得读取数据对不对?哎,我们这里边的数据从这个,呃,Stream from file吧。从那个文件里边sensor这个TXT文件里面去读取吧,这个简单一些啊,En nv,呃,我们是RARA的text file对吧。路径copy过来。读学读进来之后,诶,大家会想到这个时候它的数据类型是什么呢。大家看read text file返回,对,它就是一个string类型的data stream对不对啊,固定是一个date string类型的data stream,然后接下来我们是不是就应该把这个stream类型的data streamam要做转换啊,对,得转换成我们想要的sensor reading对不对?所以这里边我们就是我们的data stream了啊做转换,那它是不是可以基于stream from fire哎,这个来做转换对吧?诶,这里边首先可以做什么转换?
11:25
像对像我们之前的那个word count一样,是不是直接就可以做map操作了,Map这里边我们怎么怎么做呢?是不是每一个data是不是都要。先分词对不对,分词把每一个字段提出来,然后包装成sensor reading对吧?哎,所以大家看一下这个,呃,常规的这个操作流程啊,是不是我们定义一个data RA,从data里边是不是要做一个分词操作啊,我们这里边根据什么去分词呢?
12:02
呃,逗号这诶不是word的框啊,Sensor没有打开,用逗号去做分词对不对,所以这里边我们直接给一个逗号,然后接下来诶,大家看这个map完了之后,它的数据类型是什么。数据类型就是一个还是一个data stream,但是它的里边的这个泛型。发生了变化对不对,那这个类型你就可以自己去定义了,你是不是转换成什么都可以啊,啊大家还记得我们当时那个map的那个方式,是不是你可以本来方的进来,你可以把它诶揉一揉,捏成一个圆的呀,哎,所以这个map算子对这个输出的类型是可以做改变的啊,那当然了,本身这个data streamam这个大类型不能改,对不对,我们说的类型改变是说它里边每一条数据的那个类型,对吧?或者说这个data streamam它的那个泛型是不是改变了,哎,好,所以这里边大家注意啊,是这样的一个过程,那看这个map里边是不是就是诶某一个这个T类型是不是转换成了这个R啊,所以这是我们返回的这个数据类型啊好,那接下来我们把这个data RA把它拆开,还没完,还得怎么样?最后我们想要返回的应该是一个sensor reading,对不对?包成一个ssor reading。
13:22
好,那首先我们是不是应该第一个是那个idd,就是第一个字段,那应该是data ra0,对啊,这个大家要注意一下啊,本身Sen ID就是那个string,大家想是不是就不用转换了啊,但是这里边大家可能注意一下,我们的这个本身这里边还带了空格的。所以一般有一个好习惯,就是对拿出来之后是不是先trim一下呀啊去掉空格对吧?好,然后后边是那个时间戳一对吧,也是先trim一下,那个时间戳是一个long类型,我是不是要涂涂long啊这样对不对,然后最后一个字段贝瑞二。
14:09
还是tri一下,它是温度值,我们是一个double类型,这里是不是要图double啊,诶这样一个转换就完了,所以这里大家要注意,经过这一个转换之后,这里面我们没有写啊,因为SKY拉有这个类型推断对不对,我们这里面没有写,如果这里面我们写出来它的数据类型的话,应该是什么?Data swim对不对,应该是一个data stream,诶,那大家会想到我这里边大家看直接data stream这里边报错对吧?你还得写这个泛型得指定好,呃,大家看这里边你如果要直接写stream的话,大家知道前面那个不是一个bit stream stream吗?这里写stream肯定就报错了,对不对?大家看这个报的错是什么是不是?Type Miss match啊对吧,本来你这里边是string,我们这里边转换的是一个sensor reading对不对啊,当然就type Miss match,所以这里边其实我们要的是一个sensor啊,这里就不报错了,对吧?啊啊,然后接下来大家会想到我有了这个data streamam之后啊,当然当然就是说我可以就是直接在这里边做操作啊,也可以在下边我们单独再去再去做一些操作,比方说我是不是接下来可以去KBY了。
15:30
呃,我现在假如说我想做一个聚合,按照每一个呃这个ID,然后把它所有的温度,历史的温度做一个做一个sum,做一个加吧,累加吧,尽管这个好像不是一个实际的应用的需求啊啊,我们就直接做一个聚合,做一个累加,那我是不是可以TBYT拜什么呀。KYID对吧,ID是不是零之前我们就是这么做的,对吧,然后然后再做什么。
16:02
是不是可以直接上对吧。Sum哪个?哎,我们呃之前我们是KBY0SUM1,那个是我count对吧,这里你要SUM1是不是就把时间戳给sum起来了呀,我们这里呃,不把时间戳加起来,我们把那个对SUM2对吧?哎,我们把那个温度值sum起来,所以这样就完成了这样的一个转换,那这里大家会发现经过KBY之后得到的就是一个kid stream对吧?啊,那这里边就提到了,它里边的泛型是什么呢?对,大家发现这里边其实这个T类型是什么呢?这是不是就是我们本身的这个data stream,它本身的那个泛型啊,所以当前我我们的这个T应该是什么?这T应该是什么?对,所以然后接下来我如果这里边这个不是K了吗。
17:08
KY后边还有一个Java temple对不对,这个是表示什么呢?哎,这是kid stream本身要求的,它里边是有泛型,是有两个类型,一个是我们本身数据的类型,另外一个是K那个字段的类型,对吧?哎,所以大家看是T和K2个类型,那这里边我们那个K那个字段的类型是什么呢?是一个Java Java temple对吧?Java temple,所以这里面大家大家会发现我如果说用这个一个一个int啊,这里边传一个int,指定一个字段去去指定的话,那大家会发现flik它可能没那么智能,对不对。他在编译的时候,是不是发现你这里边只传了一个int,指定了一个位置,它是不是并没有直接解析出来,你这里边这个位置到底是什么东西啊。
18:07
所以他诶,我这里边并不知道你这里边具体是什么东西的时候,他就把它包装成了一个。元组类型,而且是Java temple啊,Java的元组类型,哎,所以这是它的统一的一个处理,然后接下来大家会发现,那sum之后得到的又是什么呢。每一个分区的kid是吧。哎,大家点进去看一眼啊,它得到的是一个又回归到了一个data stream啊,所以大家要注意啊,简单聚合算子,呃,就是像我们这些所谓的滚动聚合算子,它其实最后返回的结果呢,还是就是在之前基础上,相当于就是一个呃一个一个累计对不对对吧,一个一个做一个聚合,聚合完了之后最终得到的结果跟之前还是一样。
19:04
还是一个data stream,然后呢,还是最初的那个数据类型,所以这里返回的是什么,是不是还是sensor reading啊,啊,所以大家会想到,如果我们要直接用前面的这一个data stream,然后sensor reading这样来写的话,我是不是直接这样把它删掉,这样接上是不是也不会报错?最后返回的还是一个sensor对吧,所以是这样的一个过程,好,那呃,这里我们做了这些操作之后,最后可以把这个打印输出,大家看一下这个效果啊print。我直接先看一眼这个结果吧。让一下啊,大家看一下。好,诶大家现在看到我们已经正常输出了这个结果对不对,诶这里边这个输出的结果,呃,我们没有做sum,为什么呢。
20:02
因为我们这里边是不是每一个ID对里面只有一个数啊,哎,所以为了想让他做一个sum,我们多给几个吧,这个就很简单啊,比方说我这里边这个,呃,这里边也也稍微变一变206,比方说这里边就35.1,对不对,或者说我再给一个二九九三十一点零。是不是可以啊,好,我重新再跑一下看一下。诶,大家看到这就是我们这个流处理的一个效果,流处理的效果就是它不是一下子给我们输出一个最后聚合的sum一个结果101点几对吧,而是什么呢?它是一个一个的输出的,对不对,是来一个输出一个,来第二个的时候,它是不是就相当于两个做了一个叠加啊,哎,我们想要的这个流式处理的这个聚合是不是就是这样来一个处理一次对不对?哎,然后最后是三个叠加的一个结果,但是大家发现这个好像有点奇怪啊。
21:05
有有什么问题,这里边他对大家发现,诶它为什么是35.1,这个是,诶大家看到是这里边的这个数据先来了,对吧,他他为什么会这么奇怪的,这个这个先来呢。接下来还有一个什么状态呢?下一个来的是什么?下一个来的是31.0对不对,我们本身第一个那个35.8,它是最后才来的,对吧?诶有些同学已经想到了,这是不是因为我们现在是多线程的一个操作啊,对吧,大家看到它其实是。呃,就是我们现在是有那个,呃,并行度不同的那个操作,不同的那个处理的,大家其实会发现就是在这里边我们可以以怎么样,我是不是可以在在这里直接去把这个并行度设成一啊对吧?输出的时候设这个并行度是一,然后大家看一眼。
22:08
诶,大家看到现在是现在是一个线程了,对吧,现现在就是一个slot对不对,哎输出,然后大家会看到,诶还不是啊第一个,而且跟刚才还不一样了。第一次读出来的是31,然后后边加的是这个35,最后加的是第一条的那个数据,呃,那个35.8多少多少对吧,这是为什么呢。其实大家仔细考虑的话,前面如果大家仔细去看的话,当时我们的那个多线程不同的slot那个并发对这个有影响吗。我们当时的所有经过kba之后,所有的341,它的数据是是在一个slot上保,呃,那个运行的呢,还是说分发到不同的lo上去,它是一个区还是多个区啊。
23:03
KBY之后是不是按照它的这个3ID不同,是不是放到一个区里面执的呀,所以按道理来讲,它是不是就应该是都在一个呃县城里面执行的呀,那为什么还有这个问题呢?哦,对,其实已经有同学想到了,这里边我们设置的这个并行度,是给谁设置的并行度啊。只针对print对不对,前面我们所有的那些重新分区啊K对吧,Sum,还有我们前面的这一个读取数据源,呃,这些我们给他设置并行度了吗?没有设的话,它的并行度应该是几默认的,我们现在开发环境我这边默认是四对不对,哎,对核数,所以大家会发现,如果说我现在前面的这些步骤还是以并行度四去运行的话,那会有一个什么效果呢?
24:01
那是不是大家会会想到我前面做分区做做各种这个k by sum的过程当中,是不是还是乱的呀,对吧,有可能并行,那就有可能顺序就乱了,哎,所以大家发现你尽管最后输出的时候它是哎要求你就按照一个这个最后一个lo去输出,但前面顺序就乱了,那你这里边平行度一其实没什么效果,对吧。那那怎么去改呢?全局,哎,对大家自然就想到,其实我可以把它改成全局的并行度,对吧?啊,所以这里边我们有一个方法,就是直接把这个放到前面来,Dv.set对PARALLELISM1,然后全局给一个并行路,我们跑一下看看这个是不是符合我们的预期。好,大家看现在这个结果符合我们的预期吗?是不是非常符合是吧,这跟跟我们那个这里边这个顺序是不是严格一致啊,啊就是一六七十一一,而且这个第一条读进来就是35.8几对吧,第二个累加的是不是就在它基础上加了那个35.1啊,大家看一下这个数,35.8加35.1 70.9对吧?啊,然后最后又加了31啊101.9,所以这就是我们这个,呃,并行度对这个输出结果的一个影响,大家其实看到确实是有影响的,对不对?呃,所以我们后边在做测试的时候,为了方便啊,方便给大家演示,所以往往就直接用前面定行度都设统一设成一就完事了,这个不影响我们结果正确性对不对,大家看最后的这个累加101.9,是不是每次算出来都一样啊,是不是我们最终的这个状态一致性是能够保持的啊。所以大家这。
25:50
看到其实是没有问题的啊,只不过在中间的一些这个某一个节点上,因为它来的这个顺序数据相当于出现乱序了,对不对啊,数据如果有乱序的话,中间可能会有所不同,但是呃,在这个最终的这个结果上是一致的,这是我们呃这个。
26:11
前面给大家讲到了这个并行度和我们最终这个输出结果的一个关系。
我来说两句