00:00
那前面我们是实现了一个基本的滚动聚合,求取当前的最大温度值啊,那大家其实会发现这种方法是很简单啊,啊,就是比方说我这个滚动聚和要求最大最小值很方便啊,另外还有一个方法是sum对吧,要求取某一个值的求和,这个也非也非常的方便,但是有一些场景里边,好像我们就不是简简单单的只要求一个最大最小就完事了,那比方说像现在这个场景啊,我们前面一开始如果用max的话,输出的呃,最大值,然后别的那个字段都是都是一开始那个数据不变对吧?啊,这个显然是我们觉得不好的一个结果,我们于是把它改成了ma max by max SP的话,得到这个结果呢,是都是当前最大温度值对应的那个时间戳,对吧?那大家想实际我们在做这个处理的时候,其实这个需求其实也不应该输出,这个我应该输出什么呢。应该是到目前为止,这个传感器收集到的最大温度值,对不对,那所以我是不是应该用的是啊,就是当前最大的温度放在这儿,然后这个时间戳字段,是不是应该用当前最新的那个时间戳啊。
01:19
哎,那这就来了问题了,那你要这么说的话,我们对两个字段都有要求,这要直接做这个滚动聚合,我怎么指定呢。啊,难道说啊,当然有同学可能说你你这个要这么说的话,它也是当前最新的那个时间戳,应该也是最大时间戳嘛,我这里边直接max by2个字段行不行。大家看这里边有点点点吗。有那个可变参数的选项吗?没有对吧,你你是不是只能指定一个字段去取它最大值啊,对吧,没有那种两个字段的那种情况啊,因为大家知道取这个最大,你底层是要有一个比较气的对不对,最大最小,你如果要两个字段的话,那比较假如说我的温度比它大,但是那个时间戳比它小,你这个到底算谁大嘛,对吧?这这个显然就没没办法去提取了啊,这个跟那个K做选择是不一样的,K的话当然就是可以有这个组合K。
02:12
那所以接下来这个需求我们应该用什么来实现?哎,对,当然就是需要有一些自定义的功能了,那当前的这个接口就是更加一般化的reduce对吧?啊,Reduce这个方法其实大家非常熟悉,它从字面上理解的话是是规约对吧?Reduce是有一个简化规约的意思啊,那其实我们在做操作的时候,什么叫规约呢?那是不是就是之前来的数据我全给它规约到一起,那是不是就是一个聚合的操作啊,啊,对吧,整体来讲其实就是聚合啊,它是一个一般化的聚合,所以接下来我们在代码里边再实现一个reduce的这样的一个代,呃,一个测试的代码transform。
03:01
三当前我们测的是reduce。呃,那么前面的这个流程我就不详细去写了,那整体应该都一样对吧?啊,首先我这个main方法里边创建新环境读取数据啊,甚至后边我这个转换,转换当前的这个sensor reading的port类型也是一样的,对吧?啊,我先把这个都copy过来啊。然后呃,这里我就把这个先删掉了啊。代码稍微的看的简洁一点。呃,最后大家不要忘记还有那个env对吧,Env execute把它执行起来,这步操作不要忘了,然后中间做的这个转换呢,大家想前面我做那个分组要不要啊。Reduce的话,分组是不是也要啊,对吧,你还是根据这个当前同样的那个3ID来来做的这个聚合,对吧?啊,那肯定不能是把所有的都放在一起嘛,所以接下来我还是把这个分组先做过来,然后接下来是不是就是做对应的那个reduce聚合了,对吧?Reduce聚合我们取的是最大的温度值以及。
04:17
当前对最新的时间戳。好,那所以接下来我们是要真基于之前的这个k stream reduce这个操作,前面大家看到了,是不是也只有k stream才能调啊,啊data stream是没办法直接调的啊,必须先分组才能调啊,所以接下来我们直接就是reduce对吧?那reduce里面要传的是个什么东西呢?啊,这个大家也看到了,对,传的是一个reduce function。啊,那这个reduce function当然也是一个interface了,也是一个接口对吧?啊,当然它也可以用一个就是拉姆达表达式来来写啊,啊这个就是我们可以后面再给大家具体来说,那里面要实现的一个方法就是一个reduce方法,然后大家看这个写法,这就非常的呃,看起来非常的整齐啊。
05:10
它的参数是。T类型的VALUE1和T类型的VALUE2。最后对两个最后要得到一个T类型的输出,所以整体来讲reduceuse function是不是泛型只有一个T数据类型不变啊,啊,这跟我们前面那个滚动聚合是一样的,对吧?啊,不能变啊,然后另外呢,这里面的这个Y1Y2分别表示什么意思呢?第一个对,它其实就是表示我们当前如果要做规约的时候,前后两个数到底是怎么样去做计算对吧?哎,那其实呃,就之前大家在那个scla里边对吧,或者这个Spark里边对reduce的理解应该是很深刻了,那其实我们在平常在使用每一次调用这个reduce的时候啊,啊,就在SC里边的话,大家可以认为是我直接传了一个就是一个方法啊,直接传了一个函数要做什么操作,传进去就完事了,那在我们这里边流式处理的时候,那其实应该是每来一个数据,是不是就要调一次这个reduce方法啊。
06:12
那我调这个reduce方法的时候,这俩参数到底谁是当前的那个最新的数据呢?诶对,大家注意后边这个VALUE62是当前最新的数据。哎,那前面这个Y61又是什么呢?对,是之前聚合出来的那个结果,或者叫状态对吧?所以大家看我们不是说这个弗link是有状态的流失计算吗?就在这儿对吧,来一个处理一个,那当前最新的数据在这儿,当前的状态在这对吧,最后得到一个返回,那大家想这个返回这是个什么东西呢?这其实就是最新状态对不对,而且我们也要把它做一个输出对不对啊,就是得到一个,呃,当前这个对应结果的一个输出啊,啊,所以这就是类似于滚动聚合的那个状态一样啊,也是来一个就更新一次的啊,所以我们这里边直接去new一个reduce function啊,大家看这里面类型都已经给我们列出来了,是不是数据类型都是sensor reading啊啊数据类型不能变,那这里面我们要new一个sensor sensor reading。
07:18
里边怎么去包装这个字段呢?那首先第一个字段是那个34ID 34id的话,那大家想你现在既然分组就是按ID分组嘛,对,那是不是Y61Y62都一样啊,这没什么区别啊,我就直接y value61get ID放在这儿,然后第二个字段是最新的时间戳,那是不是应该用Y62.get time s把它获取到对吧?啊第三个参数稍微麻烦一点,我要用当前最大的温度值,温度值的话。哎,对,大家想到温度值是不是应该要做一个计算啊,我要用VALUE1的那个呃,那个temperature和VALUE2的temperature做一个比较,取出最大呀,所以我用一个呃,Ma里边的max方法做一个比较,对吧?用value一点get temperature,然后VALUE2点get temperature。
08:16
大家看是不是就是这样的一个实现啊,哎,这就是我们这个reduce方法的一个具体的实现啊,那当然了,这里我们也可以k stream reduce的时候直接写一个拉姆达表达式,大家知道这个拉姆达表达式怎么写吗?对,现在就不是一个参数了,是不是俩参数啊,啊俩参数我可以定一下啊,比方说我我叫当前的那个状态对吧,Current state,然后另外还有哎,当前是不是最新的这个数据啊,New data对吧?诶然后箭头过来做一个处理,这里边怎么处理,诶那是不是,其实这个也也不用这么麻烦,我是不是直接你有一个s reading包好就完事了对吧?哎,这里边得。另外这里边就是拉姆达表达式的话,你也可以,呃,就是对省省略对应的那个东西啊,我们这里边如果要是在Java代码里边,呃,就是大家一般习惯是想要加那个return的,对吧。
09:14
就是在这对吧,你有一个sensor reading,然后啊,大家看这里边提示是不是直接可以省略对吧?啊,这个就是无所谓啊,然后我们在这个sensor reading里面包装的话,那是不是跟前面还是一样啊,Current state去get ID,然后new data get time stamp,另外max.max取两者温度的最大值,对吧,New data.get temperature是不是就是这样去写啊,对吧?这这所以这个过程其实都是一样的啊,大家想把这个删了也是可以的,呃呃,当然这个如果删掉的话,对。这个里边哦,大家看这个这个还是有问题的,对吧,就这里边你如果要是说直接我们的这个括号是对的,对吧?啊,这里边你如果要是删掉那个return的话,还是需要,就尽管这里边我们知道啊,每因为因为在那个skyla代码里面大家知道。
10:20
是有这个默认的规定,就是说整个代码块的最后一行就是返回值的,对吧?呃,就是就是我们这里边其实是会有这样的一个一个状态,就是你既然是要得到一个返回值类形式sensor reading嘛,啊所以说这里边还是需要把这个呃,Return写出来的啊啊,那我把这个可以比方说啊,定义出一个这个叫做result stream,然后看一下这个最终最终执行的结果是什么。好,Result stream啊打印出来运行一下。大家知道这两个运行应该最终结果是一样的是吧?啊,就是得到的那个我们聚合出来的效果应该都是完全一致的啊。
11:10
稍微的等待一下运行结果,诶大家看一下当前的这个数据跟之前我们做滚动聚合的结果是不是就有所不同了,当前我这个聚合的结果是什么?诶大家看这个就看三四十一就行了,对吧,别的都是只有一条数据,36.3来了之后,这里边啊207这个没问题,因为是当前的这个最新的数据对吧,就是最大的数据,然后36.3没变的时候,这种情况它输出的是什么?不再是207,而是现在。最新最大的那个时间戳对吧?啊,是当前到209这个时候,当前最大的温度还是36.3,那31.7 37.1来了之候,当前是到212,最大的温度是37.1,对吧?啊,这就符合我们当前的这个需求要求输出的结果了。啊,这就是reduce的一个用法,更加一般化的聚合操作。
我来说两句