00:00
可能会发现一个问题,就是我们这里边提供的这些滚动聚和算子呢,相对来讲都比较简单啊,Sum max命对吧,也就是求和或者是求最大最小值,那假如说我现在的需求比较特殊,诶,我我不能,呃,比方说我们我们想输出一个什么样的场景呢?就是我当前呢,想要获取当前的最小值的温度值,然后呢,我输出的这个时间戳呢,不要输出第一个时间的,也不要输出最小温度值的那个时间的,我输出什么呢?我要知道到目前为止对吧?最近时间为止最小的温度值是什么?那我要输出什么,最新的最大的那个时间处。这种情况下,我们到底应该呃用什么方法来做这个处理呢?这就是有有这样的一个呃,复杂需求的一个聚合的场景了啊,那所以对于这种场景而言,我们就不能用简单的这些滚动性和算子搞定了,那就得用更一般的聚合方式,那就是所谓的reduce了啊,这reduce其实整体来讲也非常的简单,大家可能知道,呃,就是之前我们做这个map操作的时候,其实它就是一个呃,相当于把这个数数据做规约,做做这个,呃,就相当于归集,其实就是一个聚合的过程嘛,就把很多个数据归集成一个结果,比方说求和啊,比方说这个最大最小,这不就是一个规约的过程吗?所以接下来更一般化的reduce,我们看一看怎么样去实现,好,那这里边我去定义一个呃。
01:38
Result stream啊,大家看我可以直接就在基于之前这个data stream基础上直接去做转换,跟这个h stream就没关系了啊,这里面的转换同样还是先做一个KBY对吧?啊,那我当前还是你既然是要做这个,按照不同的这个传感器温度去做分组嘛,那还是根据ID去做一个KBY分组,这里边把这个需求给大家写出来啊,我们当前需要输出,呃。
02:09
就是。当前最小的温度值,以及最大的时间戳,也就是最近对吧,最近的时间戳要用一般的这个聚合方式要用reduce啊,所以接下来KBY之后,诶,大家看到可以调用reduce方法。所以接下来我们调一个reduce,那这个reduce这就比较麻烦一点了啊,不像前面的那个some或者说next me那么简单,里边大家看要传一个什么,要传一个reduce function,大家还记得前面我们自定义source吗?自定义source的时候是ADD source里边要传一个source function,而现在如果是reduce的话,那里边要传一个reduce function。啊,那当然了,这个reduce它有这个重载的方法,还有另外一种实现是什么呢?啊,那就是拉姆达表达式了,这个大家更熟悉一点,对吧,反正是要传一个函数,那这里边呢,我写一个拉姆达表达式,大家注意啊,是两个参数的一个拉姆达表达式,它表示什么含义呢?这跟我们之前用用到的那个reduce是一样的啊,就是前边这里边就相当于是我们规约起来的那个状态,所以大家看我们说这个聚合肯定是有状态的,那它状态其实就在这儿吧,前面这个参数,第一个参数表示已经规约起来的那个结果,上次的那个结果放在下一次的第一个参数。
03:36
然后第二个参数表示当前处理的最新的这个数据,新的这个数据元素,然后呢,经过转换之后得到一个相同数据类型的啊,新的这个聚合结果,对吧?啊,这就是这样的一个表达啊,那我们就先给大家实现一个这个拉姆达表达式的写法吧,这个大家可能比较熟悉一点啊,呃,那我们前面两个元素嘛,呃,前面我们这个就叫做current state对吧,就是当前的这个result啊,一个state管它叫做状态,然后接下来呢,还有一个new data对吧?这两个参数传进来之后,最后我要得到一个什么东西呢?这个稍微的会有点儿麻烦,但是其实我们知道它的数据类型不变嘛。
04:22
当前的这个数据类型,你的数据我们当前都是那个sensor reading吧,大家知道啊,那接下来是不是这里边你聚合的这个结果,规约的结果也得是一个sensor reading啊啊,那前面的ID当然不变,那中间的这个。时间戳我们要用最新的那个时间戳,而后边的这个,呃,温度值呢,我们用最小的那个温度值,所以接下来要做的这个操作,其实就是包装成还是包装成一个s reading,然后就把对应的字段,每一个想要的东西填进去就完事了啊,那这里边我们可以用当前的这个状态里边的那个ID肯定还是一样的,对吧?因为是s reading嘛,分组之后ID都一样,然后呢,我就用new data的time stamp,最新的这个时间戳放在这里。
05:10
啊,但是前提假定我是认为所有按照时间戳从前到后排序输入输入进来的,对吧,这时间不能乱啊,然后后边呢,诶,它俩的那个温度值取一个最小值,我就用current state的temperature,然后调一个main方法,里边传进来的是new data的temperature,哎,这样的话就可以实现我们这样的一个功能了。啊,那后边这里边我可以把这个result STEM去做一个打印输出,大家可以看到就是最后得到的结果啊,呃,就是符合我们预期的这样的一个效果,来我们看一看运行的结果,哎,大家看还是啊,最小值一开始第一个数35.8,因为现在我们是全局并行度是一啊,按照顺序一个一个来啊,那后边呢,来了一个32的时候,你看这个后边啊,来了一个32这个数据的时候,它是206,所以是最小值32,然后当前时间206对吧,这个没问题,然后来了一个36.2的时候,我们说现在的最小值还是32.0,但是当前时间变成了208对吧?当前最新的这个时间戳放在这儿了。
06:21
那同样后边29.7来了之后,哎,更新时间说210,那最后呢,三十三十点九来了,最小值还是29.7,那这里边时间戳更新成213。这就是这个我们做了一个自定义的这个reduce这样的操作,那呃,当然了,这里边还有另外一种方式,就是你自己去实现一个reduce function对吧?在这里边比方说我给大家把这个做一个实现吧,比方说我管这个叫做my reduce function,就假如说我这里边不写这么一个拉姆达表达式的话,那这里面得传什么东西呢?哈,那大家想是不是就是可以啊,当然这个就是我得先把这个注掉,对吧?因为你这个reduce之后,数据结构已经不一样了,我去调这个,我要new一个my reduce function。
07:07
然后这里还报错,为什么你得去实现一个reduce function,这个才算数,对吧,Reduce function。好,我把这个类型大家注意引入,然后里边要有泛型,你这个reduce function处理的是什么数据呢?当然是sensor reading啊,里边大家看一下必须要实现的方法,就是一个reduce方法,对吧?然后这个reduce方法呢,两个参数同样都是三次reading的类型,VALUE1 value2,最后得到一个聚合之后的结果sensor reading,那再看这个跟我们实现的这个拉姆达表达式不是一模一样嘛,对吧,几乎就是一模一样,所以你这里边如果要去实现的话,怎么实现?哎,那不就是返回一个sensor reading,然后按照我们的标准,value1.id放在这儿不变对吧?然后VALUE2,最新的这个time,这个时间戳temperature sta放在这儿,那后边呢,是两者的temperature,取一个最小值放在这儿就完事了,对吧,所以诶写错了啊temperature。
08:12
所以说这个其实大家看到这用这个函数类啊,你去实现一个类的这种实现方式,其实它里边呢,你还是要重写这样的一个方法,跟我们直接写这个核心的这个reduce方法,写一个蓝格表达式放在这里本质是一模一样的啊,所以大家可以下来之后把这两种不同的方式再做一个测试啊,自己再好好的去练一练,呃,这就是这个弗林克里边给大家。就是比较常见的一种编程风格,后面我们还会频繁用到啊,就要不是你传一个拉姆表达式,要不这里边就直接去实现一个对应的这个啊,当然这里边这是一个接口了啊,就是我们自定义一个类,去实现一个要求的这个接口类型就可以了,这里边的本质都是一样的。然后我们再给大家简单的说一说,这里边数据转换的一个过程啊,前面大家说过啊,做了这个KBY之后呢,这里边得到的是一个K的stream,然后大家会发现,诶,你在这个大家看这个data stream里边啊,如果我去看他当前这个方法,我想要去找some找不到对吧,想要去找mean max啊,这里边是mean resources,这个显然不是我们做聚合的那个,对吧?啊,或者说这里边max,你看到这是什么set最大的那个并行度,这显然也不是做聚合的那个max,所以大家看到在data streamam本身的它下下面的这个API里边没有做聚合的方法。
09:37
但是呢,转换成KBY之后,转换成了K的stream,在这个里边大家看到有萨对吧,有max有mean,有mean by,有max by这些转换算子就都有了,然后我们还可以大概的看一看,还有哪些呢?好,大家看有reduce对吧?然后另外之前大家可能也也讲过,就是有这个for的方法对吧?像这个skyla的集合类型里边本身就有这个fo的折叠嘛,啊,它底层就有这个for的方法,那这里边这个fo的方法呢?啊,大家看到它。
10:07
这个被弃用了对吧?Depreated,所以以后的版本可能会直接把它移除掉,那推荐大家用什么呢?啊,当然就是推荐大家直接用这个reduce,一些普通的这些方法不就完事了吗?对吧?它它所能实现的这个方式啊,我们用一些更一般化的方法其实都能实现,那这是关于这个food,然后另外还有哎,这里边大家看到还有一个。更一般化的叫做aggregate,那当然了,在这个k stream里边的这个aggregate,它是一个私有的方法,就我们前面讲的,大家看这个max by对吧,讲的这个呃,Mean by,呃,各种各样的这些聚合的操作,它其实底层最后调用的都是一个私有的aggregate方法,这是真正的一个,就是我们真正做聚合的这个过程,调用的是这里啊,那那在这个调用的过程当中,大家就会想到它里边肯定就是需要有一个聚合状态的,对吧?啊,这个聚合状态我们保持在这里,这就实现了来一个数不停的叠加,不停的叠加这样一个过程。
11:13
这就是结合这个源码里边的一些内容,给大家再做一个讲解啊,下来之后大家要好好的把它做一个测试。
我来说两句