00:00
前面我们已经了解了flink当中的简单聚合,那在做简单聚合的过程当中,其实就是先来做一部KBY按键分组,然后接下来就是直接调用系统内置的一些方法,比如说我们提到的some mean max mean和max。在这里我们所调用的这些简单聚合方法呢,其实每一个都只是代表了一个特定的场景,特定的需求,比如说some,那我们就是针对某个字段直接求和相加,那M呢,就是求最小值,Max就是求最大值。我们既然讲到了,那有一些在实际生产应用当中可能特殊的需求,是没有办法用这些现成的方法直接实现的。那这个时候呢,我们就需要用到一个一般化的聚合操作。在flink当中,一般化的聚合操作,那就是大名鼎鼎的reduce,这就是所谓的规约聚合啊,其实我们知道啊,对于大数据的处理计算,Reduce是一个可以说是非常经典非常基本的计算啊,那从哈杜的map reduce开始就是这样去定义的啊,那reduce的概念是什么呢?它其实就是说我们这里可能来了一组数据,很多个数据。
01:16
所谓的reduce,那就是按照顺序两两的进行一个合并计算啊,比如说我们这两个按照某种计算规则得到了一个新的值,然后接下来呢,得到这个值就再跟下一个两两的再去做一个合并,得到一个新的值,那我们知道。每一个新的数都跟之前的合并结果进行两两的合并,那最后就会把所有的数据全部合并成一个唯一的值。我们看这个过程就相当于是一个规约简化的过程,所以我们把这个过程英文就叫做reduce。这就是规约聚合的基本含义,所以在这个过程当中我们会发现啊,两两去做合并,那这个合并是叠加吗?不一定,当然我们可以是做some定义的,这个操作就是叠加,两两都是相加的操作,我们其实也可以定义两两是相减啊,每次都是前面这个数减后面这个数,最后也可以得到合并之后的一个结果,所以我们发现reduce,它灵活就灵活在这里。
02:23
他并没有给我们规定死到底应该用哪种方法去进行合并计算,而是给了我们一个通用的接口,我们可以自己去定义他们两两合并的操作啊,这就是reduce的一个基本概念,当然了,从这个基本概念我们也可以看得出来啊,Reduce它里边最核心的就是要定义我们规约合并的这个操作,然后呢,归结出来的结果跟我们原始的数据类型也应该是完全一样。啊,所以接下来我们可以在代码当中去做一个对应的实现,我们还是在当前的包下边去新建一个object,现在我们要测试的是下边的reduce test。
03:07
同样还是把那方法先写出来啊,那前面的这一部分其实都非常的类似了,我们可以直接照抄,先get execution environment,获取执行环境,然后接下来设置全局的并行度,然后读取数据。我们直接引入啊,那上边我们还是把这个下划线引进来,方便后边做影视转换。方便测试的话,我们直接把全局并行度还是设成一吧,然后这里我们直接给定了这么几条数据,接下来如果要做规约的话。规约聚合。那么首先既然是聚合嘛,我们知道你如果直接想调reduce,那是调不了的,基于data stream是不行的,想要聚合先做KBY,那reduce也不例外,所以它其实就是跟我们之前的简单聚合相比的话,它就是一个更加一般化的聚合操作。
04:01
那这里我们还是按照我们之前的想法,我可以按照每一个用户去做一个分组啊,那就是直接传入下划线点user,然后接下来那就可以直接调用reduce了。我们可以看一下这个reduce接口需要传入什么样的参数。啊,那这里就又来了,里边默认需要传的是一个。Reduce function。我们点进去可以看到reduce function本身也是一个接口啊,那这个接口呢,里边有唯一的抽象方法,就叫做reduce reduce有两个参数,它的数据类型都是T。然后分别叫做Y1和Y2,那得到的返回结果呢,还是一个T,所以看的很明显,这里的reduce就是要我们去定义怎么样合并这一组数据的那个具体操作啊,那传进来的两个数到底怎么做操作,合并之后返回一个唯一的值呢?这就是我们里边要实现的内容。
05:03
那这里的参数Y1和Y2分别指的是什么呢?啊,其实我们可以把它理解成。在这一组数据进行两两规约聚合的过程当中,哎,那每一步操作啊,要执行的这个操作就是所定义的这个reduce,那这里边每一次执行都应该有两个操作数啊,就是前面一个数和后边一个数,那或者说下一步的话,得到的这个规约聚合的结果又要跟下一个数据进行聚合,那当然就是这是第一个数,这是第二个数,所以这其实就是我们做规约的两个操作数。那如果说具体到我们这个聚规约聚合的过程来看的话,我们会发现,那其实它还是一步一步从左到右一个数一个数据聚合的呀,那聚合的过程当中,其实前边的聚合结果我们就会发现了,这就相当于是一个状态吧。就相当于是我们之前已经保存好的一个聚合状态。所以。
06:02
聚合状态就可以认为是这里的第一个操作数Y1。那跟着的后边的Y2是什么呢?当然就是我们接下来要去进一步规约的那个新的数据,诶,所以我们可以这样去理解,Y1就是当前已经规约好的聚合状态,那V2呢,就是接下来要做进一步规约的下一个数据。对于流式计算的话,那就更加的明显了,因为我们当前的数据是一个一个来的。那当前来一个数据的时候,我们要进行一个聚合,在什么基础上聚合呢?哎,我们说当然就是要在之前已经保存的那个聚合状态上。进行叠加聚合,所以这个时候当前的状态就是Y1。那么新输入的这个数据,那就是万二。所以我们可以理解成定义了一个reduce function之后,接下来每来一条数据就会调用这里的reduce方法,这里的传参,那Y1就是我们已经之前保存好的状态,V2就是新的数据。
07:09
最后得到的结果呢?哎,那当然就是更新我们当前的聚合结果得到的返回值,进一步去更新当前的状态。这就是reduce的整个处理的过程。当然了,这里是实现reduce function接口的这种方式同样在link里边也提供了另外一种写法,那就是可以直接传入一个拉姆达表达式。其实我们知道这个拉姆达表达式呢,里边的逻辑我们看它同样是有两个输入参数和一个返回值,类型都是T,那显然它具体的表现形式就跟这里的reduce方法的实现是完全一样的。那reduce function同样只有一个泛型,就是当前的数据类型T,它的数据类型是保持不变的,经过reduce计算之后得到的结果又变成了date three t,这跟我们之前所说的简单聚合转换其实还是完全一样。
08:06
所以接下来我们还是举一个具体的例子来看一看这个reduce到底怎么用吧。呃,我们来举一个比较复杂一点的例子,实际应用的时候我们知道啊,对于当前很多用户的点击数据,往往我们可能要做这样的一个统计,首先我们想统计一下每个用户。他到底访问了多少次啊,就是当前啊,这个用户的活跃度到底有多高,那我们可以统计他在这段时间内点击页面的数量啊,那首先我们可以做一个类似于count统计了啊,这个逻辑呢,跟之前我们所说的word count是很像的,而且这里边我们不需要再去做分词了。只要按照用户用户名去做分组,然后呢啊,每来一个用户的点击,其实我们知道后面就相当于数一个一嘛,那简单来讲的话,我们就可以做一个map转换来一个event,我们就把它转换成一个当前的用户名user后边一个一这样的一个二元组。
09:07
然后接下来呢,诶,那不就是之前我们的word count的方式吗?按照当前的用户名进行分组,然后把后边的一做一个叠加,这就是他的访问次数。而且呢,我们还可以更进一步啊,就是基于当前每一个用户统计出来的访问次数,我们可以做一个比较,筛选当前所有用户里边访问次数最大的那个,也就是当前的最活跃的用户到底是谁啊,那显然我们就知道接下来我们又要做一个聚合,这一次的聚合呢,是要找出当前的最大count数对应的那个用户是谁。所以接下来我们当前的需求就是要提取。当前最活跃用户。这里我们需要分成两步走,第一步操作我们是需要先对于当前的用户先做一个转换,转换成二元组,然后统计每一个用户的活跃度啊,那在做K之前,我们应该先做一个转换,那就是做一个map,把它转换成二元组,哎,那我们当前的这个event啊,或者说我们的data就应该直接转换成data.user这里的URL和time Sam已经不重要了,我们需要的就是当前的用户名称,然后再加一个一啊,那我们如果是长整形的一个统计的话,EL也是可以的。
10:32
然后接下来呢,哎,那就不能直接KBY下划线点user了,因为我们现在已经变成了二元组,元组里边当前它的名称。第一个位置的名称其实是。我们知道是叫下划线一。我们可以用这种方式啊,但是我们会发现啊,KBY的时候,对于直接传int类型的位置和传string类型的名称的这种方式呢,都已经要被弃用了,所以我们现在最好的方式还是。
11:03
直接传一个k select,或者说我们直接传一个拉的表达式啊,那就是直接用下划线,点下划线一表示选取当前二元组里边的第一个位置的字段作为P,也就是user作为P。接下来聚合呢啊,当然我们这里可以直接去做一个丧。SOME1,那这样的话就可以把它按照第二个位置的元素啊,像work count一样把它统计叠加起来,这里呢,我们用一个reduce来做一个一般化的实现,那这个reduce应该怎么实现呢?其实也非常的简单,在这里呢,我们可以直接来new一个自己实现的一个reduce function啊,我们这里想要实现的其实就是一个叠加嘛,所以我们直接把它叫做my son吧,呃,那具体的实现plus。那是需要实现reduce function这样一个接口。对应的数据类型啊,当然我们当前就是二元组类型了,直接写括号里边是string。
12:02
Long长整型。里边必须要实现的方法就是一个reduce方法,然后这里边我们到底返回什么呢?其实我们知道这两个数据传进来啊,T是当前已经聚合好的,已经规约好的一个状态,T1呢是接下来新到的数据,那我们新到一个数据在之前的基础上干什么呢?其实我们知道就是在之前基础上加一就可以了嘛,或者说加我们新到数据的第二个字段,这个一加上就可以了。所以我们这里的做法可以非常的简单,那就是直接返回一个二元组。当前的用户用户名称当然是不变的,那就还是T点下划线一。这是当前的user,那当前的这个数量到底是多少呢?那就是T点下划线二,在原先的基础上我们可以直接加一,或者我们可以加上T1,当前输入的T1的下划线二,也就是我们新增的这个数据后边的一。
13:05
这个效果都是一样,这就实现了我们类似于word count的统计每一个用户访问量的这样一个过程。所以我们可以注释一下,这是。统计每个用户的活跃度。然后接下来还有另外一步操作,我们需要再把所有用户的活跃度放在一起,提取出一个最活跃的,活跃度最大的用户。哎,那这里就会涉及到另外一个问题,那我们想到了接下来就是需要做一个max嘛,但是我们知道啊,Reduce之后得到的已经变成一个普通的data stream data stream是没有直接去调用聚合的方法的,那这个时候怎么办呢?哎,那我们说max它还是一个聚合嘛,要聚合先KBY,但是现在KBYKBY什么呢?哎,我们会发现现在其实我们根本不需要做分组,因为前面我们统计完了之后,那就应该是一个user。
14:03
一个它的数量,它的活跃度,一个user,一个数量。那所有的这些数据我们应该是不分组,全部要放在一起,提取出后边这个活跃度最大的那个数。哎,那这个时候该怎么办?怎么KY呢?哎,其实也很简单,我们只要把所有数据的K指定成同一个就可以了,比如说我这里可以直接不论来的是什么样的数据,我直接返回一个处。哎,当然了,我们直接返回一个default分组,这样一个K也是可以的啊,就是把所有的数据来了之后啊,无论是什么,返回一个唯一的值,那么所有数据不就都分到同一个组里边去了吗?哎,这就是统计所有数据的一种方式。这是。将所有。数据按照同样的K。
15:01
分到。同一个组中,当然了也就会分到同一个分区里面去进行处理了,那接下来我们再用一个reduce去进行最大的处理,当然我们这里可以用max或者max啊,接下来我们要做的是。选取当前。最。活跃的用户。哎,那么这里类似于max的一个操作,我们同样也可以用一个reduce function来实现,那前面我们使用了这个reduce function的形式,接下来呢,我们还可以用另外一种形式呢,就是直接写一个拉姆达表达式。这里怎么去写呢?呃,很简单,我们这里需要有两个参数啊,那前一个参数我们可以把它叫做当前的state,另外一个参数呢,我们可以叫做当前新进来的一个data,那这两个参数来了之后,接下来我们要判断的其实就是。Data新来的这个数据里边的它的第二个字段,也就是活跃度是否比当前状态里边最活跃用户的那个活跃度还要大,如果大的话就更新。
16:08
如果比它小或者相等的话,哎,那我们可以自己定义啊,是不更新,还是说做别的一些处理,所以这里边其实就是一个简单的一幅判断,我们可以去判断当前新来的贝塔,它的下划线二,它的活跃度如果说大于。之前state里边的下划线二的话。那么这个时候我们就直接更新啊,那就应该把当前data返回,也就是说诶,当前最新的这个数据,这是当前最活跃的用户。那如果说不是呢,Else呢?哎,那我们就依然返回之前的state,但是在这种情况下我们会发现啊,假如说相等的话,活跃度相等的数据,诶,那是相当于我们不更新当前的数据,好,这一部分我们都定义好了之后,最后就可以直接print打印输出了,最后不要忘记。
17:01
执行起来,这就是我们完整的处理流程。接下来我们可以运行测试一下,当然为了看得更加清楚的话,我们可以这样,后边有这个Mary的数据啊,所以前面我们可以加一个Bob的数据,比如说我们可以让Bob的数据多来几条哦,然后接下来我们就会发现,一开始第一条是Mary啊,Mary是最活跃的,然后接下来呢,会更新成Bob啊,那再往后呢,哎,那又会更新成Mary啊,所以我们接下来可以运行一下看看效果。我们可以看到,按照当前的逻辑,那就是第一条数据,Mary来了之后,最活跃的当然是MARY1。第二条数据Bob来了之后呢,诶Bob也是一,所以我们的原则是不更新,诶只有新数据更大的时候才更新嘛,所以这个时候呢,还是MAR1,这是最活跃的用户啊,那接下来Bob来了第二条的时候,那要更新了BOB2,这是当前最大的,然后接下来爱丽丝数据来了之候,不更新Mary来第第二条,Mary也是二,Bob也是二,不更新宝抱二啊,那如果Mary来了第三条数据的话,这个时候更新成MARY3。
18:09
再来一条更新成MARY4,这就是我们的基本的处理逻辑啊,那当然了,如果说我们希望比方说啊,呃,我们一开始MARY1这是最活跃的用户,然后Bob来了之后呢,诶,我们认为BOB1BOB比Mary这条数据来的更新一点吧,所以我们要把它更新成Bob,那怎么办呢?那也很简单,我们这里加一个大于等于不就完了吗?啊,那就相等的这种情况也就直接做更新了,我们同样可以运行一下,看到对应的效果,就是每一次都会更新啊,那后面也是啊,BOB2,接下来Mary有了第二条数据的时候,就直接更新成MARY2,后边是MARY3MARY4。这就是我们对于reduce一般化聚合的一个具体的应用。
我来说两句