00:00
那接下来呢,我们就来讲一点啊,除了基本转换算子之外,其他更加复杂一些的操作,那其实大家想到除了这个map Fla map啊这样简单的一些转换之外,那是不是还应该我可以对于这个数据做一些聚合统计啊,可以做聚合啊,你像我们之前这个非常经典的就是map reduce这样的一个操作了,那reduce在这个data stream里边可不可以直接做呢?哎,这个大家要注意一下啊,我们在代码里边可以稍微的看一眼,像当前我们这个data stream啊。大家看之前我们这里边是有map,有flat map对吧,这些都是都有的啊,然后有这个filter,这些都能看到,那有没有reduce呢。大家看非常的尴尬,没有reduce。那或者说之前我们不是有那个直接求和sum吗?有没有some呢?也没有萨。哎,那为什么这里边设计的这么奇怪,这个data sum,呃,Data stream没有办法直接做类似于这样的一些聚合操作呢?哎,这就是因为在flink设计理念里边,所有的聚合操作必须要先进行对分组之后,然后才能做聚合,大家想一下,这个其实跟大家之前呃学过这个CQ啊,其他的那些定义其实都是一样的,对吧?哎,你说就包括在这个呃,Spark里边,你是不是也要reduce by k啊?
01:27
那你如果reduce by key的话,是不是也相当于针对不同的K做了一个分组,类似于这样的一个操作啊,啊,所以这里边其实是,呃,就是说我们针对某一个K,针对某一个关,呃,所谓的这个键值啊,要去做对应的统计,这才是本身才是有意义的啊,所以在flink本身API设计的时候,Data stream没有办法去调用一些相关的一些聚合操作,而是必须得怎么怎么样呢?就是先做一个。
02:00
分组啊,那我们说这个在呃流里边没有group by,只有KBY,所以是不是就是先做KBY得到大家看啊,得到的是一个kid stream,得到这个kid stream之后。它里边就可以有对应的,大家看是不是有sum操作啊,啊,该sum sum,该reduce reduce,这是不是都有了啊,所以这其实就是这样的一个定义的方式啊,那所以接下来我们就给大家专门的讲一讲关于聚合这方面的操作,那所以接下来这个KBY,还有这个所谓的滚动聚合算子,还有reduce这样的算子,这三个合起来都可以,大家都可以理解成就是跟聚合操作相关的啊,这这些算子啊,那这个KBY呢,比较特殊一点,它严格意义上来讲并不算一步计算,之前我们也说过了。它其实应该算是数据传输对不对啊,它应该是属于这个数据传输类的啊,它的特点其实就是说要根据当前数据里边的某一个某一个字段,或者说我们自己定义的某一个K啊,相当前的这个流相当于拆分成不同的分区。
03:14
所以大家看这里边就呃,有点像分流操作,但是它不是分流对吧?大家注意啊,这不是把一条流分成了两条流,而是当前这个流,因为我们是一个分布式的处理架构嘛,我要把它分配到不同的分区里面去,比方说现在我就是按照当前数据的这个颜色,对吧?大家看这个方块里边的颜色做一个KPI,做一个分区,那我真实的这个呃,处理方式其实是什么呢?那就是要基于它的。当前K的哈希code,哈希code算一个哈希值,但大家会想到,那我这里面K可能有很多啊。那很多的K求了哈code之后,呃,那那最后只有对应的接下来的分区数可能只有只有个很少的几个,那怎么分配呢。
04:04
那是不是一定还要类似于有一个取模运算的过程啊啊,所以就是求了哈code之后再做一个取模操作,对应的分配到相应的分区里面去,那最终得到结果,这会带来一个什么结果,就是所有相同的K。对应的数据是不是一定会被分到同一个分区里面去?那另外有一个问题,就是同一个分区里边只有一个K对应的数据吗?也有可能有有多个对吧,因为大家想是哈希值求完了之后有可能不一样,但是呢,取模之后有可能就分配到一样的地方了,对吧?啊,所以大家看这里边就有可能出现这种情况,绿色和白色的数据都分配到了第一个分区,而黑色的数据呢,都分分分配到了第二个分区。啊,就是说我能保证的是黑色数据要在第二个分分区,就一定都在第二个分区,它不会出现在上面。
05:03
但是呢,我不能保证第二个分区里边只有它对吧?啊,就还有可能有别的颜色的数据还会来,这就是这个K的一个特点啊,那KBY之后大家就会发现啊,我们不光是对数据做了一个重分区啊,基于哈code的重分区的一个操作,另外。整个处理过程的这个数据结构也变了,由data stream变成了一个k stream,对吧?啊,那当然了,就是在本质上来讲,大家其实发现了k stream是不是也是一个data stream啊。啊,那所以整体来讲,我们这个还是叫做data stream API整体来讲没什么变化,但是呢,基于k stream和基于单独的data stream能调的API就不一样了,对吧?啊,所以大家会发现啊,就是基于这个k stream,接下来我们就可以做各种各样的聚合操作啊,所以说KBY就往往会跟这个聚合放在一起给大家讲啊,那我们看一看到底能做什么样的聚合操作呢?
06:04
最简单的聚合操作,这里边统一叫做滚动聚合算子rolling obligation啊,那这些滚动聚合算子大家看其实就是非常简单直白的这种操作,比方说so。那就是针对当前这个K对应的啊,所有数据是不是做一个求和啊,来一个就求和一次,来一个就求和一次,那另外还有就是m max,这是不是求当前所有数据里边的最小最大值对吧?那另外还有这个m by max,这个其实跟那个m max是一样的啊,它的特点是就是区别又在哪里呢?呃,命和max的话,顾名思义嘛,大家看他就是只选取最小最大的那个那个字段,对不对,就是他只关心的是最小最大那个字段,但是大家注意啊,就是它这里边返回的那个数据类型呢,跟我们之前整个数据流里的数据类型还是一样的。
07:00
啊,大家会想,那我这里边取的那个字段只有一个呀,那别的字段怎么办呢。他用的就都是第一个数据对应的那个字段。而这个m byi max by呢,哎,它就是啊,对,取到最小值那个字,比方说我们举那个温度值啊,光这么说大家可能有点迷惑,我们举那个例子的话,就是温度值啊,那是不是我这个明白取当前这个最温度值的话,我取出来的的那个数据就不仅仅是最小的温度值,是不是还有当前最小温度值的时间撮合SID啊对吧,所有的那个数据都是那同一条数据的对应的ID,那那那那些数据,而如果是命的话,呃,那就是当前的那个温度值是最小的温度值,那前面的时间戳呢?那它就还是前面第一条数据的时间戳对吧?啊,这个我们在代码里面给大家测一测,大家就知道了啊,所以接下来我们还是在transform下边新建一个class transform test2接下来我们这个是滚动聚合的测试,测试rolling aggreg。
08:14
好,我们把这个先创建出来,呃,然后前面的这个流程,那就跟之前我们做的那个基本转换没什么区别是吧,我就直接把这个先copy过来吧,没方法啊。哦,当然前面的这些我就把这个环境创建出来,然后后边从文件读取数据啊,先做到这就可以了。好,引入。呃,然后后边大家不要忘记有这个执行对吧,XQ的执行起来后边应该还有一个划括号,这就是我们当前整个这个程序的架构啊,呃,然后这里边有了这个input stream之后,大家想我后面要做那个分组,做做聚合嘛,那是不是首先我应该把对应的那些字段全拆出来啊。
09:05
然后包装成一个我们想要的那个po,呃,Sensor reading类型,对吧,然后接下来我就可以指定,哎,我按照ID去做一个分组,然后我统计的是啊,比方说最大的温度值,最小的温度值,这就可以做了啊,所以接下来我们首先啊,就结合前面的这个基本转换,我们做一个具体的需求,就是把它map成一个po类型,我们自己定义出来的那个po类型啊,转换成sensor reading类型。哎,那这里边自然就会得到一个data stream,当前的泛型就不是stream了,那应该是sensor reading,把它先列在列在这啊,这个我叫做data stream,基于前面的input stream去做一个什么什么操作,对,大家自然想到是不是直接做一个map操作啊啊然后这里边我是不是直接可以new一个map function啊里边这里边这个这个map里边怎么写,是不是直接去return new一个sensor reading里面包好就可以了,对吧?哎,那当然这里面这个操作我们不能这么简单粗暴啊,是不是前面应该先要做一个切分啊,对吧,我定义一个string类型的数组,把那个所有的fields要拿出来啊,就基于呃,这这里边就是value是不是要做一个这个split呀,逗号做一个分割,然后接下来这个3READING里边,是不是就是从fields里边一个一个去取就完事了,这个好像拼错了啊,Fields对吧?Fields,然后接下来我这里边取的。
10:38
其实就是FS0,这是ID对吧,就是string类型不用转,然后FS1是那个对长整型的那个时间戳,那这里边我是不是string得转成一个长整型啊啊,所以这里边我可以直接new一个long对吧,那这个也是可以的啊,或者是那个呃,Long long那个value对吧?呃,Value at啊这这个都是可以的,这里我把FIELDS1要传进去。
11:06
然后最后还有一个double类型的温度值,这是F2是不是就是这样这样做啊啊,这个其实整体来讲是非常简单的啊呃,那这里面我要给大家介绍另外一种写法。这种写法其实之前大家在这个学习的过程当中应该也接触过,那就是啊,我先把这个先copy过来啊,呃,那就是传说中的拉姆达表达式啊,因为其实大家会看到在这里我们这个map方啊,它本身是继承自function,而function这里边大家看到它就是一个接口,就是实现了这个,呃,继承了这个civil liable的一个接口,对吧?这里面没有任何的呃,需要去实现的方法,那它主要是用来干什么的呢?啊,大家看它,它其实就本身是一个empty啊,它主要就是为了作为啊,大家看Java吧,里边拉姆达表达式的这样实现的一个扩展,对吧,主要就是为了这样一个来定义的,所以你这里边呢,可以去实现这样的一个呃,Function类的一个接口,呃,你可以自定义实现这样的一个类,也可以直接在这写一个,哎,对JAVA8里边支持的这个拉姆达表达式去实现就就完全没问题啊,那所以这里边怎么写这个拉姆达表达式呢?
12:33
把这个就去了对吧?这里面可以直接写一个拉姆达表达式哦,那大家知道拉姆达表达式在SKY拉里边的写法是,就是我要定义一个当前的那个输入参数对吧?那比方说当前我的输入参数,这其实有一行一行读进来的嘛,啊那我定义一个line对吧?然后在SKY拉里边,大家记得这个拉姆表达式是不是这样写啊,Java里边没有这样的符号,Java里边JAVA8的拉姆达表达式是对,是一个横杠,一个一个大于号对吧?啊,类似于那个看起来有点像双箭头,我们现在是一个单箭头啊啊就是Java跟这个skyla的不同的写法,然后后面当然就是说接下来要做的这个操作,该怎么做怎么做就完事了,对吧?啊,就是这是参数,然后后面是这个函数体嘛。
13:18
那接下来我们函数体是什么呢?啊,其实大家发现这样做的操作是不是还是这些啊,对吧,就就直接就是把这个string这里边先拆开,然后接下来。对应的这个,呃,所有的这些操作都都把它这个。包装成我们想要的一个sensor reading,这不就完了吗?对吧,其实就是这样很简单的一个过程啊,那这里面需要注意,这就不是value了,而是传进来的line,对吧?然后接下来呢,是不是就是fields直接包成这个就完事了啊,所以大家看这个就看你喜欢啊,就是可能map function这个是它自动会给我们补全很多东西啊,类型也不用担心,直接就写出来就完事了,那lada表达式的话,可能会相对来讲呃,更加的容易,更加简单一些。
14:07
这里面需要给大家多多提一句的是,有同学可能想到了,你要这么说的话,我们之前那个word count是不是也可以这么写呢?哎,大家想那个work work这里边这个,呃,二元组的这个转换,我这边这个Fla map是不是也可以直接很简单的写一个拉表达式就把它搞定了呢。这大家要注意啊,你直接写那个拉姆达表达式的话还不行,为什么呢?哎,这主要是涉及到这里边我们返回的这个类型,这是不是一个带着呃,带着这个泛型的这样的一个包装类型啊,二元组对吧?而在Java g vm里面大家知道有这个泛型擦除的影响,对不对。呃,是有泛型擦除的,所以对于我们当前的编译器来讲,你如果直接在里边写这个拉姆达表达式,你写成这个样子的话,它是推断不出来你当前的这个类型的啊,啊,所以这就会有有对应的那个,呃,就是变编译的时候有这个类型的问题了,对吧?啊,那当然也有解决方案,就是说你可以写了那个拉姆达表达式之后,后边呢,再用一个所谓的那个returns那个写法啊,指定当前的一个啊,就是叫做type hint hint这样的一个东西,就相当于追加一个。
15:22
类型的标签啊,但是这种方法说实话呃,有时候可能大家会觉得不太好用是吧,就很麻烦,那你如果要觉得那个很麻烦的话,是不是你直接实现一个这个,呃,像这里实现一个Fla function是就完事了呀,对吧?你如果定义直接去实现这个接口的话,这就没有任何类型的定义的问题,因为这里边定义清清楚楚,对吧,输入什么类型,输出什么类型,你直接把它如果有泛型的话,你直接定义在这里边啊,就完全不会出现这个歧义啊,所以大家如果觉得这个拉姆达表达式不是很好掌握,或者说又不太好用的话,直接写,直接会用这个就是所谓的这个函数类啊,实现这个接口的方式就可以了。
16:02
我们这里边是给大家提供另外一种思路啊,好,做完这个转换之后,接下来是不是就可以。是不是就可以啊,做一个分组,然后去做聚合了,对吧?啊,所以接下来是分组。啊,我把这个拆开啊,先分组,然后做滚动聚合。滚动聚合,哎,我们这里边就取当前最大的温度值。好,那大家看一下这个分组操作的话,是不是就基于data stream直接去KBY就完事了呀?诶那这里面就有一个问题,KBY的话,大家还记得之前我们是怎么K的吗?诶对,之前大家是不是看到我们这个word count里边直接就KBY0啊,直接就取他当前那个位置就完了,那像现在我们怎么K。大家是不是觉得也是直接KY0就完事了,我们可以先试一下啊。
17:03
我们把这这个写出来,大家看这里边得到的这个类型就是一个kid string对吧?啊,然后接下来我这里边随便给一个名称啊,比方说这个我叫做呃,K the stream对吧,把这个先定义出来,呃,那比方说我我这里边就先不做聚合啊,我直接在这里边先先执行一下,大家看看这个效果怎么样。大家看直接报错了对吧。非常快报错了,这个报错说的是什么呢?哎,就是你这里边如果要是确定这个key的时候啊,要通过这个位置,大家看field的这个position位置去确定的时候,只能用在什么地方呢?只能用在原组的数据类型里边,对吧?那所以现在我们这是一个抛啊,类似于Java b,那就不能用这种方式了,嗯,那大家想,那我接下来应该怎么样KBY呢?哎,大家看一下这个KBY重载的方法还有哪些对吧?这是首先是一个可以传int类型的这个位置,然后另外还可以传,大家看是不是还可以传字符串啊,还可以传字符串对吧?那字符串这其实指的是什么?就指的是当前每一个字段的名称。
18:16
所以前面我们为什么要把它包装成一个那个特定类型的那个po种呢?就像一个抓va病一样呢,里边实现那么多东西对吧,就是这里我只要实现了那些get set对吧?啊,有这个无参的构造函数,这里边我就可以直接把它的那个属性名作为这里边KBY的字段写在这里提取出来就完事了啊,那所以这里边。可以有一个什么简单的写法,直接给一个ID来看,我直接诶直接这么写。这也是正确的,这样的话就是按照sensor的ID把我们当前的数据做了一个分组啊。啊,这就是这样的一个操作啊,那大家看到这个K外之后,得到这个数据类型是什么呢?K stream对吧?K stream是有泛型的呀,泛型它是两个,我们看一眼啊k swim的泛型是T和K。
19:14
T,大家知道是对,就是当前输入,其实大家知道分组的话是不是不改变数据类型啊,那只是做一个重分区嘛,所以这就是本身输入的数据是什么类型还是什么类型,那后面这个key是啥呢?哎,这就是当前我们指定的分组的那个键的类型对不对?那大家看一下现在我们建的类型是什么。对,大家注意,这里面是一个temple类型。为什么是个Java元组类型呢?我这里边明明定义的ID不是string吗?那为什么这里是个元组呢?我要把这个改成string行不行?这样行不行?直接报错对吧?哎,那所以大家要注意啊,这是跟我们这里边调用的这个方法有关的,你如果传所有的这个字段名称进来的话,大家看这是不是写死的呀,得到的这个k stream类型必须是T,这是原原始的数据类型,然后K的类型就是一个专外元素。
20:13
啊,那所以这里面的这个为什么它是一个抓法元组呢?大家仔细看的话,是不是我这里面有点点点是不是可以传多个呀,也就是可以有多个字段作为一个组合K来进行分组,那你说如果我要有多个字段的话,那你说我的这个K的类型到底是啥?那是不是啊,多个字段是吧,那我能传多个类型吗?那就只能是一包成一个元组,元组里边是不是具体的类型可以不一样啊,哎,所以大家看我就包成了这样一个Java元组类型,诶,这就是这样的一个使用方法啊呃,那另外还有一些这个key by的时候,大家看到这个调用的方式啊。我们看这个KY,其实还有后面还有什么呢?还有这个kids,大家看就是下面这个啊,Kids的话,这里边就是有一个抽,就是特别的这样一个抽象类,专门就叫case对吧?哎,你就指定这么的,呃,就是这里边指定好我们的这个case到底是什么,然后按照这个来来去分配就可以了,这个可能稍微用的会稍微少一点,这里面得到的这个结果也是一个temple类型,对吧?啊,也是一个元组类型啊,而且就是这这个不是用的稍微少,这个主要是因为它是一个private方法,这是底层调用的时候会掉到这儿,对吧?啊,我们平常这个调用的时候是不会掉这个方法的啊。
21:33
那另外大家看这个还有什么样的方法呢?还有一个叫这这也是public啊,这个KBY传的是一个key select。哦,那家看到这是一个K的选择器,那这个选择器又是个什么东西呢?大家看接口。它继承自function这样的一个啊,这个function这样的一个接口,对吧,那大家就知道这是不是又可以传那个拉姆达表达式啊,对吧,又可以实现一个这样的一个选择器的这个方法啊,那你如果要单独实现的话,它有一个方法就是get key。
22:10
就是输入当前的一个呃,元素的类型,大家注意返回的就是当前你提取出来K的那个类型对不对,所以这里大家看到我最终得到的这个类型是不是跟我提取出来的那个类型是一模一样的呀,所以大家看这种调用最后得到的这个还是temp吗?哎,这里是不是就可以得到一个是什么类型就是什么类型啊啊,所以在这儿我也可以有另外一种写法。大家看啊,我这个不写前面的啊,我直接data stream直接去KBY,我还可以怎么写呢?拉姆达表达式,那我就直接用,比方说我当前的data塔来不来表达式,对吧,我去是不是取当前的ID啊。大家看直接可以这么去写啊,那这个得到的对应的这个k stream1。
23:06
前面的K的类型是什么?对,大家看这是不是就变成string了啊,这就是我们这个前面KY啊,其他的一些不同的用法,大家下来之后都可以试一试啊,当然大家如果觉得这种方法最最舒服的话,其实你这么写就好了,对吧?呃,这个看起来是非常直直白啊,就按照这个呃字段的名称直接选取分组就可以了。呃,这里大家看到它还就是有一个黄色的这个报警,就是说我们可以用其他的写法把这个替代,用什么写法呢。哦对,有同学想之前我们那个scla里面是不是可以直接下划线点这样去调啊,呃,但是现在不行对吧,大家不要把这个当成scla啊,这是Java,那Java里面有没有直接去比方说呃,获取我们当前的某一个,呃,某某一个方法或者某一个字段传一个方法进来直接就搞定的呢?诶,可以的啊,这个Java吧里边是有这样的一个方法引用的,说呃一个一个概念的啊,我这里边可以直接传一个方法引用,然后就相当于告诉我,呃,我们这里边我提取K的那个方法就是get ID对吧?哎,那那这里面就涉及到get ID是谁的方法呢?哎,是sor reading的方法对吧?然后方法引用必须要对冒号,冒号get,然后这个get ID大家注意后面没有括号。
24:28
因为括号是方法调用对吧,哎,这里边不加括号,有点像skyda里边我们直接传那个,呃,函数体方法体的那个过程对吧?哎,直接这里边不加那个冒号啊,这里边这就是一个方法引用啊。大家感兴趣的话可以可以试一试啊,但是平常如果要是直观看的话,这种方法可能是代码可读性最强的啊,所以我们还是以这个为例啊。好,这是分组的基本的一个操作,然后接下来那就是要做这个滚动聚合了,滚动聚合的话,我们要找这个当前最大的温度值对吧?哎,那所以这里边我就直接可以基于k stream,诶大家看啊,那这sum我们现在不想要对吧,是不是直接max啊max诶大家看这个max。
25:13
它是不是对也是,呃,你看这个max是不是也是可以传一个当前的这个位置,Int类型的一个位置,对吧?Position,另外也可以传一个当前的字段名string啊啊,所以你现在既然是不是原组不能传位置,但是不是我传那个string就可以了啊,那所以当前我要的是那个temperature最大的温度值,按照这个做的一个选取。所以最后我得到的这个比方说我这个叫呃,Result stream,对吧,把这个获取出来,最后再做一个打印输出,这就完事了啊,但是大家如果觉得这个中间它这个这个single output swim operator看着不舒服的话,你把它换成data stream是不是也一样啊,啊对吧,这个是它都是继承字date stream嘛。
26:04
好,那我们接下来给大家运行一下,看看结果啊,这里面要运行的话,其实大家能知道这个最后的效果应该是什么样啊。大家看这个数据。我们当前统计的时候,是不是分组之后是各自统计各自的呀,那你现在这个每一个3ID对应只有一条数据,那这个最大值是不是一点意思都没有啊,对吧,那都是自己嘛,原原样输出就完事了,那所以接下来我再多加几条数据啊,比方说这里面341,我这给一个呃207的时间,来一个36.3对吧,然后比方说再来一个209的时间。呃,来一个32.8,再来一个212的时间,来一个37.1啊,这样的话,接下来我们就可以看一看,到底输出的是什么样子的啊,运行一下。
27:05
好,大家看到这里的运行结果,我们看一下,哎,这里运行结果为什么有这么多啊,有这么多最大值吗?大家注意,我们当前是什么滚动,为什么叫滚动聚合呢?是不是就相当于是一个啊,就是相当于一个这个这个滚滚轮一样,是不是不停的在滚动,不停的在更新啊,所以当前我其实是来一条数据就要更新一次,取到一个最大值对不对啊,所以大家看我们当前即即使是读取的这个文件数据,它其实也是一条一条更新的,这还是流,这个流失处理的特点嘛啊所以大家看啊,首先第一条数据三四十一进来之后,35.8没问题,对吧,三十六七十这它只有一条数据,是不是都是各自自己啊啊,这个没问题啊,分组每个组是不是各自统计各自的,互不影响,然后接下来三四十一又来了一条数据,第二条是36.3,大家看这里是不是我就输出了一个36.3啊,然后大家需要注意滚动聚合的过程当中,大家看一下这个max啊。
28:09
Max得到的结果是不是就是这样的一个,呃,Simple output stream operator,注意数据类型变不变不变对不对,所以是不是之前是34READING得到还是34READING啊,哎,那所以它只是把当前的这个34READING里边的那个温度值更新成了现在最大的那个温度值,对吧?哎,那这里面就有一个问题,那别的别的数据呢?大家看别的数据是不是不变啊,341199,这是不是跟之前都一样啊啊,所以它是这样的一个特点啊,当前的温度值变成了36.3,然后接下来下一个数据32.8来了,这是小了对吧?哎,所以最大值是不是还是36.3啊,再来一个37.1,诶这个更新了37.1。这是这个啊,Max做滚动聚合的一个过程啊,啊,那可能有同学就想,那我至少你这个不应该说这个别的数据都不变对吧,我至少想想知道当前这个最大温度值到底是哪个嘛,哎,所以这怎么办呢?哎,对,这里边直接把这个改一下,我不要用max,用一个max by再来执行,大家会看到结果跟刚才就会有所不同了。
29:19
啊,那接下来这个就是它会把当前最大的那个温度值提取出来,而且其他字段用谁呢?用什么呢。是不是都用的是当前最大温度值对应的这个时间戳啊啊大家看这个36.3,这是我们,呃,这个207这个时间来的这条数据,所以大家看输出最大数据的时候都是二零七三十六点三,二零七三十六点三更新之后二幺二三十七点一对吧?诶这是我们当前处理的一个操作啊,滚动聚合。
我来说两句