00:00
了解了简单聚合的用法。那其实我们发现。简单聚合,整体来讲就是两步,首先先要基于一个data stream做一个K,得到k stream,那stream本质上当然也还是data stream了,只不过它是增加了一个key的信息,根据当前指定的键去进行分组操作,那。这里所用的分组操作是什么呢?当然就是分组聚合了,所以基于我们可以去直接调用flink帮我们内嵌进去的一系列简单句合运算,那这里面最常见的就是some以及max me。这样的几种操作啊,那max可以认为是特殊化的max。这些操作比较简单,但是我们会发现它的应用场景其实非常局限的,就只有求和、求最大最小值这样几种需求,那对于真正的应用场景而言,需求可能是五花八门的,这个时候我们怎么样去自定义更加灵活的聚合操作呢?那当然这就需要有一个更加一般化、更加底层的接口定义了。
01:13
这就是所谓的里边的方法。其实这样一个操作是可以说是大名鼎鼎啊,我们知道从map开始,我们对它就非常熟悉了啊,那简单的一些转换,我们都可以把它归结成map操作,而对于规约聚合,类似这样一个统计计算的过程呢,我们都可把它归结成reduce操作啊,那所以这里边flink里边定义的reduce呢,跟我们所理解的reduce也没有太大的区别。那整体来讲,这就是一个规约聚合的过程。源码里面很明显的可以看到它要传入的就是一个规约函数,一个reduce方式啊,那这个操作很显然跟之前map filter map就又一致了,就调一个reduce方法里边传入一个实现了reduce接口的一个类的对象啊。
02:11
这样的话,我们就可以定义到底怎么样去做对应的句号。我们可以看到这样一个reduce接口里边,同样它有唯一的抽象方法,也就叫做reduce,而且我们看到它的泛型呢,也只有一个T,说明经过聚合转换之后,当前的数据类型是不变的,这和之前我们看到的呃,简单聚合运算是一样的。那我们看一看当前这个reduce方法到底是个什么呢?看起来非常简单,它只有两个T类型的参数,Y61 Y62,返回值也是一个T类型。那这到底是要干什么事儿呢?简单来说的话,我们可以认为这个方法接收到的是两个T类型的事件。
03:01
那这里边的两个事件分别是什么意思呢?呃,那从规约的本意上来看的话,我们可以认为就是把所有收集到的集合里边的每一个数据拿出来,按照一定的规则,不停的规约,不停的规约,不停的规约。那我们知道规约得到的结果又是一个相同类型的数据,所以我们就会发现了两两规约又得到一个相同类型的数据,接下来又跟下一个数据,两两规约又得到一个新的值,再去两两规约,那最终按照同一个规约方法。不停的迭代调用下去,最终就会得到一个唯一的规约聚合的结果,这就是所谓reduce的基本的含义。那当然了,在处理的过程当中,我们应该怎么看待这里的Y61和Y62呢?我们可以认为,VALUE1就是我们进行规约操作这两两规约的第一个数。
04:02
来二,那当然就是第二个数了。那我们会想到,如果当前真的就是所有数据里边的第一个数和第二个数的话,这个没问题,那假如说已经处理到了后边的数据的话,新来了一个数据的话,那当前的VALUE1 value2又表示什么呢?很显然。VALUE1表示的是当前已经规约好的结果,这在规约的时候它就变成第一个数了,那同样接下来如果我们是跟第四个数据进行规约的话,那VALUE1就是当前前三个数去调用reduce方法,然后进行规约之后得到的结果。那VALUE2呢,就是当前新输入的第四个数,所以我们会看到,本质上来讲,Reduce其实是针对一个列表去进行两两规约的啊,那它的内部呢,很明显我们应该对之前已经规约好的结果要进行一个保存。
05:03
接下来我们输入一个新的数据的时候,要把当前数据和之前的数据两两之前数据已经规约好的结果,他们俩做一个规约啊,那所以当前的VALUE1其实是之前所有数据规约出来得到的一个结果值。在flink流处理的底层实现过程当中,实际上呢,我们这里就要把之前所有规约聚合的结果保存下来,这个东西就叫做。状态。这就是我们所说的flink是有状态的流处理,那在做规约聚合的过程当中就使用到了状态。其实如果如果我们去较真的话,会发现前面提到的简单聚合操作,不管是some还是max,很显然,在当前数据某一个数据新的数据传进来的时候,也应该要结合之前所有数据保存下来的一个状态去进行对比,不管是做叠加some,还是做最大最小的一个对比判断之前,我们都应该保存某一个数据,这个数据也是当前聚合运算的状态。
06:20
就是所说的state。关于状态这一部分呢,我们还是会在后续章节里边做详细的介绍,现在我们至少就可以知道在flink当中确实是有状态的操作啊,接下来呢,我们就可以来用一个具体的案例来看一看。Reduce进行规约聚合,到底怎么做?所以接下来我们可以在当前下边去重新创建一个测试的va。我们当前是。Reduce test。同样也是做聚合的测试,它比前面我们讲的简单聚合就要更加一般化,更加底层,更加灵活一些了。啊,那整个的测试的框架其实跟之前还是一样的,我们把异常抛出,然后前面的内容呢,我们可以直接创建流式执行环境和数据的读取,直接copy简单聚合的这一部分。
07:22
接下来我们要做的是一个具体的需求了。呃,我们可以做一个比较复杂的设想,我们现在想要统计的是当前访问量最大,也就是点击次数最多的那个用户,最活跃的用户到底是谁?诶,那这个就稍微复杂一点了,什么叫做最活跃,我们已经说明了,那就是当前它的访问量最大,那访问量最大的话,我们当前数据输入进来之后,并没有访问量这样一个参数啊,所以我们首先要做一个访问量的统计。这个统计其实跟前面的work count就非常类似了,很简单,那就是来一条Mary的数据,我们就记一个MARY1好,那接下来如果再来一个Mary数据的话,那就应该是把它叠加起来,Count加一变成MARY2啊,那我们类似之前work count的做法,就可以每来一条数据,把它map成一个二元组,就一个user一个一,User一个一,后边直接做一个简单聚合some起来就可以了。
08:25
那当然了,我们也可以用reduce去实现一个类似sum的功能。在接下来呢,接下来就涉及到我们还需要对当前所有的用户,不同的用户做一个最大访问值的筛选,所以还应该实现一个类似于max的功能。我们应该把对应的。访问量最大的那个用户,把他的数据提取出来。所以这里我们的第一步。应该分两步走。第一步是要。
09:01
每个用户的访问频次。这是我们的第一步,就是把每一个用户的count数先统计出来啊,那这个的话其实比较简单,基于stream,我们可以先做一个map,把它转换成二元组啊,那这里边我们可以直接使用拉姆达表达式转换成一个啊,TEMP2也可以,因为如果是拉姆达表达式的话,我们知道后边涉及到泛型擦除还需要有一个returns啊,那这里我们也可以直接去new一个方式,匿名类的方式写在这里也是一样的啊,那当然了,这里我们转换之后的类型。就应该变成了TEMP2。对应的两个元素,一个是string类型的user,另外一个是长整型的count值,这是我们能够想到的转换里边必须要实现一个map方法,诶,那就直接return。直接一个TEMP2了,那我们知道调用的是temp2.2方法里边value.user以及一个1EL,哎,这就是我们简单的一个转换过程。
10:11
后,接下来我们应该要基于当前的二元组数据统计每一个user对应它出现的频次,那所以呃,如果直接调简单聚合方法的话,那就直接K之后some就可以了。而现在呢,我们希望实现的是一个reduce聚合,我们可以看看reduce怎么样去实现那些简单聚合方法啊。所以这里边reduce首先也要K,我们说flink里边必须先分按键分组,然后才能做聚合操作,Reduce也不例外,所以先,那这里面有一个问题什么呢?我们当然可以,因为现在是二元组嘛,我们可以KY0,按照当前的这个字段位置的索引来进行一个定义,但是我们发现这种方式其实要被弃用了,那更好的方式是什么呢?很明显,我们应该传入当前的字段的名称或者。
11:10
传入一个key select,这里其实最推荐的方法就是select,我们其实直接写一个拉表达式就可以了,那我们要提取的是什么,其实很明显就是。就是当前。二元组里边的它的两个元素分别叫做F0和F1嘛,所以我们要选取的就是F0。分组之后定义了K,那接下来就可以直接reduce去做聚合了。reduce里边要传入一个reduce function,所以我们可以看到当前的reduce function就是传入的这个泛型参数,就是当前的数据类型。因为我们已经做过了map转换,当前的数据类型当然就是二元组类型了,那这里边我们要实现的方法,那就是基于之前规约聚合得到的结果,然后再结合当前新传入的数据Y62,怎么样得到一个新的聚合结果呢?啊,其实我们知道最后得到的新的规约结结果既要更新成下一次的VALUE1啊,既要更新我们内部的状态,也要作为返回数据直接输出出去。
12:27
交给下一步去进行进一步的操作,那所以这里面我们想要去返回的也应该是一个二元组点二。那里面的字段。User当然不变了,Y61.F0,其实我们知道Y62也是对应的,也是同一个user,因为我们K的话。接下来关键就是后边的第二个字段怎么去定义count值怎么办?诶,那其实我们知道count值的话,不能直接去做加一啊,当然如果要加一的话,基于VALUE1的count值去加一是可以的,但是一定不能基于VALUE2的count值去加一。
13:04
因为当前的2Y2是新输入的当前数据,它的第二个字段一定是一。而VALUE1呢,是之前的规约结果,它的字段其实是之前的一个count累积的状态。所以接下来我们最为稳妥的方式其实就是一点F1,也不要直接加一,而是加上VALUE2点F1。这是最为稳妥的方式,把两个要规约的值,它的第二个字段叠加起来。这样的话,我们就实现了每个用户访问频次的统计,那这样得到的结果呢?经过聚合之后,当然就又得到了single output stream operator啊,这本质上就就是一个新的data stream,注意它和stream是不同的,他们都继承自data stream。但是。当前的这个算子啊,Single output stream operator,它并不是K,当前是没有K的定义,它就是普普通通的get啊,所以当前我们可以把它叫成。
14:12
比方说叫LIS by user,每一个用户的点击访问事件统计的个数。然后接下来的第二步,那就是要根据当前的个数选取出最活跃的用户了。所以我们是。选取。当前最活跃。活跃的用户。那这个过程如果我们直接调用简单聚合方法的话,那就是直接把所有数据里边对应的后边这一个count值最大的提取出来就可以了。而呢,现在我们是想要用reduce的方式来做一个实现,但这里面又涉及到另外一个问题,因为我们知道基于这个by user这样一个数据。
15:03
想要做max统计,或者说我们用一个reduce做一个规约聚合的话。首先我们得。得按键做分组分区啊。那现在我们基于哪一个可以做做分分区呢。很显然,我们现在不能基于user再做分区了。因为当前我们要统计的是所有用户里边访问次数最大,那当前既然所有用户都得包含在里边,User不能作为K字段,那我们当前这个KY是不是就没有办法做了呢?其实也不是。因为当前我们在API调用的过程当中,是要求必须基于KSTEM才能调用聚合方法,但是没有限制我们k stream里边它的K到底指定成什么样,哎,那我们当前呢,完全可以不从data里边去提取字段,我直接就定义一个string类型的字段,就叫做K。
16:00
那这代表的含义就是。所有的数据都分配到了相同的一个K,都分配到相,所有的数据都会有相同的一个K,就叫做K,那么他们就都会被分配到同一个分组,同一个分区去。当然了,在实际应用过程当中,我们要慎用这种方式,因为它会造成我们所有的数据没有办法并行,全部扔到一个slot上去进行聚合计算。但是在有些场景下,如果我们前面已经对于分区的数据进行统计了,最后得到的是少量的数据,那最后我们要全局统计出一个最活跃的话,那最后这一步也是不可缺少的了,必须把它整合在一起啊,那这个时候就相当于分到同一组去做计算。后边我们同样还是来一个啊,那这里我们还是直接reduce,你有一个reduce方式,这里边我们要做的计算呢,是提取第二个字段最大的那个元素。
17:04
那这个时候又怎么办呢?啊,那其实我们会发现最终返回的值,前边我们已经规约聚合得到的这个结果,它就是之前某个最活跃的用户相关的那个数据。所以我们最后返回的呢,要不就是Y61,要么就是Y62。而这里我们其实就是要做一个判断了,如果value一点,它的抗值F一看谁大,如果要大于。Y62.1的话。那么我们就直接返回。VALUE1的。直接返回Y61整个这个事件,这个二元组,因为我们的数据类型不能变啊,那当然了,谁大就返回谁嘛,否则的话,那就直接返回VALUE2。这就是我们定义的这个过程。
18:00
那我们可以把这个叫叫做最后的result。我们可以把做一个打印输出,下边不要忘记env execute执行起来。接下来我们就可以看到当前最活跃的用户了。当然这里面涉及到一个问题,就是如果两者相等怎么办呢?按照我们当前的逻辑,显然如果两者相等的话,那就是直接输出Y62,用最新的这个数据来替代我们之前已经规约聚合好的数据。那就是只要。跟之前一致了的话,我们就用最新的这个用户啊,最最活跃的用户里边选一个最新还在活跃的用户,把它做一个输出,那具体到底要不要相等。这个逻辑是由业务需求来指定。为了更加明确的看到一个测试结果,我们这里还是调整一下测试的数据,因为我们看到这里边很显然啊,一直访问次数最大的就是Bob啊,那前面是Mary,后面是Bob,来了之后就更新成Bob啊,在后面是爱丽丝,然后又是Bob,那我们知道如果当前Alice丝的数据。
19:08
其实我们也不用统计更多。只要把它放在。提前一条放在这个Bob的第二条后边的话,那很明显爱丽丝的第二条数据来了之后是会更新当前最大的,最最活跃,而且最新的用户是Alice,所以接下来我们可以运行一下,看一看聚合出来的结果是否符合预期。我们可以看到Mary第一条数据来了之后,哎,当然当前。访问次数最多的只有Mary一个嘛,是MARY1,鲍B来了之后,鲍B就把它更新了,是BOB1,因为我们是最活跃和最新最近访问的这样一个原则啊,那另外ice丝来了之后,爱ice丝一,那鲍B第二条数据来了之后,诶,那就更新成鲍B最活跃了吗?鲍B是二,然后同样爱丽丝来了第二条数据之后,同样会替代掉Bob是爱ice丝二,接下来就全都是Bob是当前最活跃的用户了。
20:06
BB3 BB4b5。这就是我们能够看到的reduce规约聚合的、一般化的、更加灵活的应用。在实际应用场景里边,其实使用去进行规约聚合,就可以实现大部分的需求了。
我来说两句