00:00
我们已经了解了在flink当中按键分区KBY的具体操作,当然了这一部分我们并没有把它运行起来去进行测试,那如果说想要测试KPI的效果,应该怎么样去测呢?呃,其实这个也非常简单,我们当前全局并行度设的是一,我可以把它做一个调整,我把它调大变成四,那我们知道当前就默认相当于有四个slot资源可以并行执行任务。假如说我当前后边什么都不做的话,直接在这里我们把stream做一个打印输出,因为execute执行起来,后面这一部分我们都全部都注掉,直接运行,我们可以看一下效果是什么样子。运行一下,我们可以看到当前的四条数据其实会平均的均匀的分布在四个分区上面,1234啊,每一个分区上面都有一条数据打印输出了,那假如说在上边我们做了分区操作又会怎么样呢?我们把上边这一句KBY加进来,然后就不要基于stream去进行print了,直接KBY之后做一个打印输出,因为我们知道KBY之后得到的k stream也是data stream,当然也可以直接打印了。接下来我们再看一下现在的状态是什么样。
01:19
运行。我们可以看得非常明显,跟刚才就有所不同了,我们得到的这四条数据里边,Bob、爱丽丝这两条数据还是分别处在第三和第四个分区上面,由不同的slot去执行刚才的任务,而Mary呢,Mary的两条数据就不会再分开了,而是全部都放到了第二个lot上做了打印输出。这就是我们说的KBY,之后所有的数据会按照当前K的情况。进行分组,分配到不同的分区,如果是相同P的话,是一定会被分配到同一个分区里面去。
02:02
而不同的K的话,那就有可能分配到不同分区,也有可能分配到相同分区啊,这个就看的非常的明显。当然了,对于P而言,我们的目的并不是要做这样的一个分区的测试啊,我们的关键还是要做后边的聚合操作,所以接下来呢,我们可以基于当前的分区去做更多的聚合操作,那首先接下来我们要介绍的就是所谓的简单聚合操作啊,什么叫简单聚合操作呢?其实就是link里边给我们已经提供了的最基本的,最简单的直接调用就可以的一些API。比如说之前我们在word count里边已经用过的some,针对某个字段直接进行求和,另外呢,还有mean max mean by max,这个顾名思义,这就是基于某个字段去求取一个最大值,最小值嘛,啊,那这里我们需要区分的就是这个命和mean by看起来都是求最小值,那么它们之间有什么区别呢?啊,简单来讲就是对于命而言,我们针对某一个字段啊,比方说啊,我们这里边有一个count。
03:10
这个字段是一个数量,我们可以求一个当前的最小值,针对这个字段求出最小值来之后,那假如说我们还有其他的字段,User URL,假如还有这些字段,那怎么办呢?对于命而言,它的聚合结果是,其他字段都以最初的第一条数据的值为准。哎,那我们就知道了,在有些场景下,我们可能不希望这样去做,我们希望的是把当前的这个count值求出一个最小值来,之后呢,是要把它对应的其他的那些字段也要完整的提取出来,就是那条完整的数据我们要拿出来,那怎么办呢?那就用明白明白所提取的是包含了字段最小值在内的完整整条数据啊,这个其实我们也是在代码里边做一个简单测试就可以看得很清楚。接下来我们还是啊,基于之前的这段代码可以做一个简单的测试,比如说我们前面已经针对当前的user作为K进行了一个分组啊,这里我们需要注意的是当前如果说啊,前面我们没有做分组,基于data stream这样一个类可以直接调用对应的sum方法吗?我们看到如果搜some的话,没有这样的方法。
04:23
或者我们搜max mean同样也是没有这样的方法的,那怎么样才可以真正的调用呢?诶,那就是我们说的必须要做KBYKBY之后得到的key stream这样一个类型里边,接下来我们去搜max me。这些方法就全部出现了啊,所以接下来我们真正要做的肯定就是先做KBY,然后调用对应的简单聚合方法啊,那KBY和聚合这两步操作,我们可以认为它就是成对出现的啊,要做聚合先做KBY,那我们现在根据user做了一个分组,那比方说我们就按照每一个用户最后一次点击统计每一个用户最近的一次点击事件吧,那我们应该调用什么样的方法呢?其实非常简单,就是那就是截取最大的时间戳,这个如果我们看的不是很明显的话,也可以在多加一些数据,比如说Mary的这个数据。
05:22
我们可以继续在后边增加,这里可以啊,点击一个ID为三的,ID为二的,我们可以给更多的时间戳,而且这个时间的也可以打乱,我们来看一看当前的最大最近一次的点击到底是以谁为标准的,所以接下来我们自然就可以给一个max。统计当前的最大时间戳,这里有一个问题,就是说,呃,到底是用max还是max by呢?两种我们都可以使用来进行一个测试,比如说我们先用max,后边我们看到它要传参,这里的传参,诶之前我们KBY给的是一个p select,那这里的max呢?它其实没有对应的这种接口的实现,它只有两种传参方式,我们看到一个是传一个int类型的position。
06:11
也就是字段的索引位置,另外一个是string类型的field,也就是某一个字段的名称。所以这其实就是我们所说的简单聚合所要传递字段的两种方式,一种是指定位置,另外一种是指定名称。啊,那像我们这里对于一个case class,一个样例类而言,显然里边我们可以直接指定每一个字段的名称,好,所以接下来我们这里边指定的就应该是max,给一个string类型,我们要的是比方说time step。这样传进去就可以了,好,然后接下来我们可以直接运行一下,看一看输出的结果是什么样子。当然了,现在我们当前的并行度设置的还是四啊,如果我们想要看的更加清楚一点的话,也可以把并行度再调回一啊。现在我们可以看到当前测试得到的结果,首先Bob,爱丽丝他们两个的话都只有一条数据,那当然输出的就是自身了,我们关键看Mary Mary的数据比较多。
07:16
Mary的第一条数据来了之后,很显然诶,得到当前的最大时间戳是1000,然后呢,第二条数据来了之后,当前最大是4000,后边第三条来了之后最大是6000,第四条来了之后最大还是6000不变啊,最后一条这个5000,第五秒钟到来的点击事件并没有更新我们当前的最近一次点击事件啊,那所以这就看的非常的明显,而且对于这个max呢,还有一个特点,我们也可以看到除了最后一个最大时间戳之外,前边的内容。它输出的都是我们第一条数据对应的内容啊,都是maryry后其他点击的URL都没有出现过啊,所以有些场景下,我们可能不喜欢这样的一个输出啊,我们可能希望就是假如说哎,我统计出来了,他最近一次点击,我还得知道他最近一次点击到底点击的是谁,诶,所以这样的话我们应该用谁呢?那这样的话我们就可以用一个MAX8啊,那用max by,同样它里边所要传的参数也是可以传一个string类型的当前的字段名,也可以传一个int类型的位置。
08:25
我们当前如果是样例类或者是坡类的话,直接传它的名称就可以了,那这样的话传进来我们可以运行一下,看一看当前的结果又是什么样子。我们可以看到现在就有所不同了,同样我们还是第一条数据来了之后,Mary后就是1000,然后第二条数据,第四秒的数据来了之后,我们这里就更新成了Mary pro的IDE4秒钟,这就是当前最近一次点击的完整数据啊,那当然了,六秒钟来了之后,同样要更新五秒的数据,最后来了之后呢,不会更新之前的最近一次的点击数据,它还保持着之前的模样。
09:06
这就是我们所说的简单聚合啊,那这里需要多说一句的是,我们刚才只讲了样例类里边直接按照字段名称去指定要针对哪个字段进行聚合,那假如说不是样例类,不是破柱类型,那又怎么办呢?我们说在实际应用的时候,另外一种常见的应用类型是元组类型,假如说我们这里的数据是元组的话,那就涉及到这样一个问题,后边每一个元组它对应的名称到底是什么呢?我们注意啊,在flink当中,它针对scla里边的元组,其实每一个字段名称就是以下划线后面加上当前的字段的位置,字段的索引位置来进行一个表示,比如说我们当前一个二元组的话。就是A1这样的一个二元组的话,那么我们如果要是表示第一个字段这个a string类型的A的话,那么我们就可以直接用下划线一来作为它的名称。
10:09
那如果说我们当前要上啊,要统计的话,显然是要统计一个数字啊,或者是max命啊,统计一个数字,那要统计后面这个一三这样一个数字的话,那么我们就可以用下划线二来表示它的名称。这里需要注意的就是说下划线二是它的名称,那如果说我们要用int类型的数字来表示呢?注意这个数字位置是对应的是一。而前面的第一个字段啊A,它用下划线一作为自己的名称,那么它对应的位置是什么呢?索引位置是零。哎,这个我们需要特别的注意啊,这就是为什么之前我们做word的时候是S1啊,就是当前如果是int类型的索引位置的话,是从零开始的,而如果是元组类型,我们用下划线的这种形式啊,表示它的这一个名称的时候,是以一开始的,下划线一,下划线二。
11:03
这种方式我们可以自己去测试一下啊,其实是非常简单的,只要自己from elements定义一个元组,就可以看到它对应的聚合情况是什么样的。所以我们也可以简单的总结一下,就是对于聚合计算而言,在flink当中它的规律是什么呢?其实就是分成两步走,先做一个KBY。得到一个kid street。然后基于这个kid stream再调用一个聚合方法。那调用完聚合方法之后,得到的又是什么呢?哎,那其实得到的就又变成了data stream就又回去了。所以我们说整个的这个聚合操作也是一个标准的data stream的转换操作,呃,那在源码里边我们也可以看到非常的明显啊,比方说这里我们调用了max之后,得到的就是一个data stream t,当前的泛型key就又不存在了,因为我们得到的当前其实就是一个简单的聚合结果啊,那之前的数据类型是什么,现在聚合的样子也还是什么,只要返回一个T类型的数据就可以了。
12:14
那么对于聚合操作的底层呢,其实我们可以看到啊。在源码里边是调用了aggregate这样一个private私有的方法去进行聚合的,它内部其实是创建了一个对应的aggregator,哦,这其实就是flink内部给我们提供的聚合算子啊,当然就是有可能啊,是一个some aggregator,也有可能是一个比较的comparable aator,这就是看我们调用的到底是some还是mean max max败了。最终其实都是一个聚合算子,那在这个算子里边呢,其实是为我们当前的每一个分组的K。保存了对应的一个中间聚合状态啊,所以在flink里边我们说它是有状态的流处理,什么意思呢?就是像当前我们针对每一个K进行分组聚合的时候,来一个数据就把当前的这个状态做一个叠加更新,保存来一个数据就做一个叠加更新,所以之后我们就可以不停的更新,实时的更新这个状态,也可以实时的获取到当前的聚合结果。
13:23
这里我们也可以看到啊,对于无限流处理而言,当前的数据源源不断,无休无止,那中间我们得到的这个聚合状态其实就永远不会被清除掉,它一直要保存在这里啊,那每一个K都要对应的保存这样一个状态的话,那显然我们这个状态其实是比较多的啊,啊,所以我们如果要使用聚合算子的话,要使用在只含有有限个K的数据流上,如果说我们的K无限多的话,那相当于最后我们的内存资源就会被耗尽啊,这个是我们在实际应用过程当中一定需要注意的一点,这就是关于简单聚合的操作。
我来说两句