00:00
那接下来呢,我们就讲第二部分的转换算子,那就是我们所谓的键控流转换啊,为什么叫键控流转换,就是先要做一个K啊,其实就是分组之后要做聚合,你先得分组,然后再做聚合,这里边就首先涉及到了一个所谓的这个K败的,呃,当前的这个算子啊,它其实比较特殊,我们前面也已经看到了,你如果把它那个画成画出直行图来的话。其实这个KY并不是一部具体的执行操作,对吧?它并不是一个真正意义上的operator,那它属于什么呢?它就是定义了一下我们两个任务之间数据传输的模式啊,我们说当前的这个K是基于你定义的K,然后做分组,那怎么样做数据传输呢?那就是基于每个K的哈希code进行一个重分区,对吧?呃,类似于就是这样的一个分组操作。所以比如说这里边我们做了一个KY,比方说就按照这个方块的颜色来做,做作为K,对吧?那分组的结果是什么呢?大家这个一定要注意啊,就是分组的结果是如果我当前是同一个K的话,比方说都是黑色的或者都是白色的,那么当前所有的数据一定都会放到同一个分区里边去做处理,对吧?啊,就当前这个分区里边一定有当这个K的所有的数据。
01:23
但是呢,同一个分区里边未必所有的数据都是一一个K,都是一种颜色,不是的啊,因为我们这个你的颜色可能很多对吧,K的这个取值可能很多,但我们的分区数有限啊,你的那个并行度,它不可能有那么多无限个呃,这样的并行子任务,所以说最后我们肯定是有一个取模,然后做做分配这样一个计算的过程的,所以最终就有可能出现一个分区里边有不同K的元素啊,但是你你会发现只要它是白色,那肯定都在这儿,对吧,但这个分区里边可能除了白色还有绿色啊,那下面这个只要是黑色,它都在下面这个分区,这个大家就搞清楚了。
02:04
这里面大家需要注意一下,就是说呃,这个KBY的这个转换呢,它涉及到了一个数据结构的转换,就是点KBY做了这个操作之后,由data stream就转换成了一个k stream。啊,那有同学可能就就会疑惑了啊,你说的不是data stream这个API吗?对吧,不是要做这个就是数据流的转换操作API调用吗?怎么又变了呢?怎么又变成了这个KSTEM了呢?哎,这个大家其实不用着急,我们这里边简单的给大家看一眼啊,我们找那个之前的workout,大家看这个KBY之后得到了一个stream对吧?那么这个k stream大家看它其实还是继承了data stream。所以本质上还是data string对吧,只不过就是加了一个K一个键定义的data string啊,所以说呃,我们说它是data string API还是没没问题的,还是没错的啊,这是关于这个k buy的一个操作,然后后边的话,K之后K本身并不是目的,我们并不是只是想把它分组就完了,肯定是分组之后要做真正的这个分组操作计算。
03:17
主要做什么计算呢?那一般常见的就是一些统计计算,这里边的统计方方法,我们常见的啊,列举在这里就有一些滚动聚合算子,对吧?啊,这个为什么叫滚动聚合算子呢?因为在整个我们在调动的过程当中,数据是源源不断的来,对吧,就不停的来,不停的来,所以这里边我们聚合出来的结果呢,它是在之前的基础上不停的去改变,不停的去啊,就是相当于更新啊,就有点像这个滚雪球一样的一个状态,所以有时候我们管它叫做rolling aggregation,就是滚动聚合算子,这里边常见的算子,那就是some mean max mean by max by,一看名字大家就知道什么意思,对吧?啊,就是求和,当前这个这个组里边的所有元素求和,就像我们前面做work count的时候还有什么呢?呃,命max就是做这个最大,求最大最小值,另外还有命by max by。
04:13
那这里边这个mean by和命max和max到底又有什么区别呢?呃,其实整体来讲差不多啊,就是一些微小的区别,大家从字面上去理解,什么叫败呢?命败,什么叫败哈,呃,其实他说的就是你里边相当于是要指定一个。相当于你要去做最小值选取的那个字段,因为我们当前的数据,大家看到你有可能你像我们这个sensor reading啊,这就不是一个简单的数据了,对吧,这已经是一个比较复杂的数据,里边有很多具体的这个,呃,具体的这个属性了,那这里边呢,我可能是以某一个属性作为比较的基准,然后去提取它的最小值,那你这个明白当然就是BY某一个属性,对吧?啊,那这里边这个明白指的是什么呢?是按照这个属性来选最小值。
05:07
那最后我输出的是什么呢?大家要注意,输出的其实还是最后的,就是最小值对应的那一条,整个的sensor reading数据输出的是这个东西。啊,那与之对应就是命的话,命是什么呢?命就相当于是只是选取我们当前的这个最小值,对吧,只是把这个最小值拿出来,但是大家注意啊,呃,早些时候的时候,它确实是只输出一个最小值出来就完了,但是呢,现在这个输出的一,假如说我们这个例子啊,它就也还还是3READING类型了。那那别的字段它输出什么呢?它输出的是第一条数据的对应的那个字段。啊,这个就是大家先大概的了解一下,后面主要还是在代码里边去做一个测试,做一个实现就好了啊,所以接下来我们还是在代码里边给大家写一写这一部分啊。
06:01
还是在API test下边,我们新建一个object。当前这个我们就叫做transform。Transform test。然后接下来,呃,这个整体流程呢,其实都大同小异,差不多对吧?首先我们先定义流式的处理环境,Stream execution environment,然后get,后面我们做转换操作的时候,需要用到影视定义,把这个要呃影视转换对吧?把这个定义,所有的这个定义要引入,包要引入,然后接下来呢,诶,我们去读取。读取对应的这个数据源,这个数据源我就简单说吧,比方说我叫input STEM,我们干脆就简单一点,直接从文件去读取就算了,对吧?啊,那这里边的这个对应的路径呢,我就直接copy这里好了。哎,直接用这个啊,Copy过来完事。
07:02
从文件里边把它定义出来,这里边我还是改一下,这个叫做input stream。读取出这个sensor reading这个数据来之后,接下来我们要做一些包装转换,诶大家看到你这里边本来读出来的应该是就是逗号分割的一堆字段,对吧?哎,那我们怎么样去,呃,我们首先要后续要处理的话,你这个字符串肯定不好处理嘛,所以基本的一个转换操作,你你想要去比方说做聚合,做取这个最小值什么的,那肯定还是需要,呃,基于这个某种特殊的数据结构,然后提取它的字段啊,所以先把它转换成,先转换成。样例类类型对吧,先做一个这样的转换啊,前面我们这个是读取数据。接下来转换成样一类啊,那比方说这个我叫做data stream,对吧,这是接下来我们真正要操作的这个data stream它的类型,那就基于input stream做转换啊,那当然是一个map了啊,做一个简单的转换啊,对于我们当前的每一条data塔这条数据,那接下来是不是要按照逗号做一个分割,把每一个字段提取出来,然后包成一个样例类类型对吧?啊所以这里边呢,我可以定义一个array,这个arra就直接用data里边调一个位的方法,用逗号做分割来得到了这样的一个字符串的数组,然后对于当前每一个字符串数组里边的元素,我都可以把它提取出来,按照字段位置提取出来,包装成sensor reading样例类型返回啊,那这里边我可以定义这个ARRAY0。
08:54
就是当前的ID对吧?啊,它本身就是这个string类型,所以说我都不用管它了啊,然后ARRAY1,呃,那大家知道它本身是一个string类型,然后我们要的这个A1呢,这里边啊,大家看到是一个长整型的时间戳time stamp,所以接下来我还要做一个to long的一个场景性转换。
09:18
最后还有A2这个温度值,它本身是一个字符串转成一个double对吧?哎,这这就是一个非常简单的一个转换的过程,包装成一个样例类,大家看这就有点像ETL一样是吧,有点像做做这种转换一样啊,然后接下来呢,就可以基于这个样例类类型的数据,我们去做就是分组聚合了,对吧?分组聚合我们主要就是求。每个啊,就是不用求了啊,就是像解数学题一样,我们输出每个传感器当前最小值温度,温度最小值对吧?啊,那这里边我们定义一个,呃,最终的这个结果啊,我们叫a j stream聚合的结果,那么基于data stream去做转换,首先要做K对吧。
10:18
啊,然后这里边大家会发现KY这里边呢,其实它可以传不同的参数,之前我们比较熟悉的这个方式呢,是传一个int,表示它的那个下标位置,那这里边我们如果说要以第一个元素,也就是ID去作为这个分组的K的话,那我传一个零是不是也是可以的,就像我们前面做word count的那个二元组一样,对吧?啊,这个是完全没有问题的,这个样例类也是一样的,那另外还有一种是什么呢?我们看到它下边可以直接传一个F的。名字传一个string进来,而且你可以传多个,那这种方式,这这这又是怎么样一个一个情景呢?这其实就是如果说我们在这个样例类里边,你这不是每一个属性都有它的字段名称吗?诶,所以你可以把这个字段的name,当前属性的name直接传进来,K by ID,大家看这个可读性就比我们KBY0好多了,对吧?你一看就知道我当前是以ID作为K进行分组,哎,所以接下来因为每个传感器嘛,当然是按照ID来分组了啊,所以根据ID进行分组,然后接下来求最小值嘛,那当然就是命了啊,那这里边大家可能会有疑惑,到底是命还是命呢?我们挨个去看一下就完了,然后这个命这里边大家也看到了啊,跟sum一样,一种方式是可以去给一个position,就是当前的那个下标索引值,给一个int类型,那如果要是按照当前的这个温度值的话,那应该。
11:49
还是给个命二对吧,第二个字段温度值嘛,那或者还有一种方式,跟刚才一样,传一个字段名称。穿一个string进来啊,所以接下来我们这里边就给一个temperature哎,这样的话看的就会更加的语义更加的明确一点,程序的可读性就会呃更加的舒服了,对吧?啊,那接下来我们看一眼这个当前的。
12:18
Print啊,把这个打印输出,当前的这个AJSTEM,最后的结果做一个打印输出,大家不要忘记,最后还有一个env execute,对吧,把它执行起来,当前我们这个叫做。Test对吧?好,这是当前的这个代码,好,我们运行一下看看结果吧。啊,当然这里边你如果要直接这么执行这个最小值的话,大家会发现没有意义对吧?因这里边我们本身这个数据每一个sens啊,大家知道你现在如果要是分组去做统计的话,那应该怎么样,是不是每个组针对每个K都应该有一个统计结果,而且是只只统计自己这个组内部的,对吧?呃,不同的组之间它是互不影响的,互相不会干扰,那你这里边只有一个数据,那最后最小值肯定就是只有他自己啊,那这就没有意义了,所以接下来呢,我们把这个数据稍微给的多一点,比方说我就简单说了啊,就是341,我多给几个,这个时间戳我给到后面啊206,然后比方说这里给个32对吧。
13:24
呃,然后接下来我们这个208,然后随便给一个36.2。同样我们后边再给一个210,给一个,呃,29.7啊,这个都是可以去做这个转换操作的,对吧,我多给几个吧,212。也也不要总是隔两秒啊,213这里给一个30.7 30.9随便啊,然后接下来我们测一下,看看现在的这个最小值的效果是什么样的,大家看一看这个结果啊。
14:03
呃,现在我们已经得到了运行结果,大家看看输出的效果是什么样的呢?啊大家看到啊首先啊341诶大家看到这个341这里边这感觉有点不对呀,我们这里边输出的时候,你看这这里边这个这个过程啊,它为什么直接就是29.7呢,对吧,难道不应该是按照顺序应该是35.8吗。这个大家要注意一下啊,有同学可能觉得这个这个理解不了对吧?啊觉得很奇怪,你难道不不应该是一开始35.8,后边然后再去更新,变成一个小一点的数,后面才是29.7嘛。难道说这里边是先把29.7读进来了吗?诶,还真是的,这里边为什么会出现这种场景呢?因为我们现在并行计算,大家看默认并行度是四对吧?啊,我这里面没有做过做过更改,按照CPU的核心数,那当前我就是所有的这个任务都有四个,也就是说前边我读取数据做map的时候。
15:01
就有这样的四个。也就是说大家会想到啊,我前边29.7啊,比方说有好几个啊,比方说我前边这个,呃,32.5,我我我随便说啊,然后32.1。然后29.7大家想,前边我没有做KY之前做map的时候,是不是这些数据是分别进来的呀,对吧,它是可以并行的呀,可以在这个四个并行的子任务上同时处理的,然后接下来怎么做呢?然后是做KBY,然后去有一个这个命的一个。这样的一个操作,对吧,做最小值的一个提取,那所以说接下来呢,呃,这个命这里边当然本来也是四个啊,本来也是四个,但是我们因为做了K,那是不是现在341的数据就全部会集中到一个里边来,对吧,全部集中过来,那大家看一下,这就会涉及到一个问题。之前他们几乎都是同时进来的这些数据,那接下来到命的时候,到底谁在前,谁在后呢?这就没准了。
16:07
对吧,这当然就就说不定了,你有可能前面我这个29.7,前面这处理的快,然后这个网络传输的时候,它也快一点点,然后之后他就第一个来了。哎,所以就容易出现这样的情况,甚至有可能你这里边啊,就大家记得刚才我们那个第一个是29.7对吧?啊,就是先进来的第一个数是29.7,你如果再重新运行一下的话,甚至有可能我们得到的结果是不一样的啊,就是你有可能每一次运行这个并行的结果都不一样,因为并行处理的时候,他的这个大家看现在就变成什么了。第一个是35.8了,对吧,然后29.7看起来就好像排到后边了,跟我们真实的那个数据这个顺序好像差不多了啊,所以说这个其实是没准的,所以如果这个时候你想要让它的这个顺序啊,当然了,就是你到最后统计出来的时候,最后一个输出它当然是就是当前最小的值对吧,这个正确性是能够保证的,但如果说你想对它的这个处理的顺序还要保证是一致的话,哎,那那这个怎么办呢?你这里边好像没别的方法,只有是全局的去给一个并行度是一了。
17:18
啊,这样的话,你如果给了全局并行度是一,我们再去执行一下,就可以看到跟我们的预期就会完全一样了,我看一下现在的这个并行度是一的时候的执行效果,诶大家看这个就是顺序完全保持了,对吧?哎,现在的话你看第一个数据341 35.8进来了,然后后边是六七十,它只有一条数据,这个就不用说了,最小值肯定是这个,然后接下来呢,第二条数据341 32进来的时候,最小值变成了32,第三条数据36.2进来的时候呢,最小值还是32,对吧?然后29.7进来的时候,最小值就变成了29.7,然后这里大家需要注意一个点就是什么,现在我们用的是命,用命的话,它去求最小值是什么呢?就只是找到这个当前temperature的最小值。
18:07
那别的这个字段呢,他不考虑,所以大家看到前面这个时间戳全部都是199对吧?啊,全部都是这样的一个第一个输入数据的这个对应的这个字段啊,所以说它这个是保持不变的,只是当前的这个温度值变成了当前最小的这个温度值啊,那有些时候这种场景我们就觉得很奇怪,对吧,我想要的是什么呢?你就把当前我们输入的最小的那条数据,它是什么时间点发生的,你都给我完整的提取出来,完整的那条数据给我拿到啊,那这种情况怎么办,那就是用明白对吧。呃,用明的话,你再执行一下,得到的结果就是啊,大家想要的那种情况了,大家看这个明的执行结果,这是不是就是199对吧,后面就是206,诶当前32那个206,这是32那条数据当前最小,然后29.7来了之后呢,就变成210了,我输出的就是当前这个,呃,本身最小的温度值对应的那条完整的数据。
19:08
这就是关于这个简单的一些滚动聚合的这个操作啊,分组之后再聚合。
我来说两句