00:00
我们的主要任务当然是基于一个KSTEM要去做聚合操作了啊,那接下来首先我们先来讲一讲简单聚合,所谓的简单聚合其实就是基于KBY之后的k flink给我们内置的一些非常基本最为简单的聚合操作啊,直接调用一个方法就可以实现啊,那这就是之前我们已经用过的啊,那或者刚才看到的max me,或者这里还有max by等等不同的方法调用,接下来我们就来做一个简单的代码测试。在当前包下边创建一个新的Java类,我们把这个叫做单聚合的测试啊,那我们同样当前还是。Transform这个大类下边,接下来我们做的是一个aggregate,我们叫简写a test。整个的流程还是类似。首先throws exception。
01:02
然后接下来创建当前的流市执行环境。Get。Execution environment把它叫做env,不是一般性全局的并行度。一。接下来我们还是需要去。读取数据源,获取当前我们的数据啊,那这个这一部分数据呢。我们还是跟之前类似,直接copy过来。但是能够想到当前我们既然是要做聚合,而且我们是按键进行了分区之后的聚合,或者说就类似于在Spark里边的group by k啊,做这个按键分组之后的聚合,那很显然我们这里边如果用user来做一个分组的话,只有一条数据,这个聚合看起来好像就没有什么意义了,所以我们这里可以去增加一些数据,比如说。当前的这条数据。再复制一份啊,那当然了,我们可以比方说我们把这个Bob的数据做一个扩展吧,啊,增加一个Bob的对应的数据啊,当前它的product ID,比方说访问的是ID1的这个商品在3000。
02:15
300秒。做了一个访问。同样接下来啊。呃,后面的话,我们当然还可以继续去扩展Bob的访问数据,它可以又访问了一下home页面。那在在接下来我们就继续让他多访问几次。商品页面。访问I2I商品对应的时间,我们也做一个更,比如说在3500秒的时候。这是3500毫秒,3800毫秒。4200毫秒,这样的话我们就扩展出了更多的数据,接下来我们就可以针对它做一个聚合计算了。那在聚合计算。之前首先先要做key按键分组。
03:05
案件分组或者说分区之后进行聚合。那我们得想一个具体的应用场景,我们到底要计算什么呢?呃,我们就非常简单的来做一个统计吧。因为当前做。简单聚合的测试的话,当前flink提供的简单合接口就只有some max max by,那接下来我们干脆就sum比较熟悉了,就是做了一个求和啊,我们之前在world代码里边已经用过了sum进行。累加求和,把所有的一累加起来,这样一个计数的操作,那现在呢,我们干脆就来看一看max和命怎么样去用,比如说现在我们可以直接选取当前每个用户。访问所有访问数据里边最大时间戳的那条数据,那其实就是当前的最后一次访问了,当前用户的最后一次访问了。所以我们就。
04:04
提取。当前用户最近一次。访问数据。那就是直接针对当前的stream去调用一个,先调用一个K里面我们要想要去实现一个k select。这是使用匿名类的方式。自己实现了一个key select接口啊,那么当前的数据类型是event,我们当前K要选择user作为K,那当前的类型就是string了。里边必须要实现的是一个get key方法,我们掉直接返回,就是选取数据里面的key,把它返回就可以了,所以当然就是value.user。定义了K之后,接下来就可以直接调用简单的基本的这些聚合方法,那我们这里边要要的是max,想要选取当前的最大时间戳,这里边我们发现有max,有max,那max和max分别有什么区别呢?我们接下来可以单独的去测试一下,看一看到底是什么样啊,那首先我们来看一下max。
05:18
Max,这里我们发现有两种不同的传参方式,一个是传入一个string,哎,那当然就是按照当前的属性字段去表示,针对哪个字段取一个最大值了,而下边呢,还有一种就是传入一个int类型的position position当然就是当前的字段的位置索引号了。其实我们也不陌生,之前我们在work代码里边就是直接传入了一个引,比如说我们传入一个一,就是对当前二元的第二个元素,针对它来做一个萨那如果说我们是max的话,当然就是针对第二个元素。提取它的一个最大值了,所以接下来我们在当前的处理过程当中应该用哪种方法呢?啊,这里需要注意,当前我们是一个抛类型。
06:07
对于po类型而言,很显然,直接去传入一个string,指定当前的字段名称,代码的可读性会更好,更加明确。而且对于类型而言。如果我们直接传入一个整数表示当前的字段的索引位置的话,那运行其实是会报异常的,所以这里面我们只能传入当前的字段名称。我们要截取的是最大的时间戳。接下来我们已经得到结果之后,干脆直接就把它做一个输出就可以了,这是我们的第一种方式。当然,我们这是使用的是max,我们可以使用max作为前缀做一个打印输出。因为execute执行起来。然后除了这种方式之外呢,我们还可以定义。我们还可以来测试一下max by是什么样的效果,来这里我们就一定把它先写出来了,STEMK,这个时候K我们可以用另外一种方式,前面我们是使用了匿名类的方式,当然我们也可以单独的把这个类在外边定义出来,声明出来。
07:15
那更加常用也更加简洁的一种方式。应该是传入拉姆达表达式,因为我们发现这个单一抽象方法的接口,它里面的get key这种方法,它的返回值类型应该是确定的,我们当前就是一个简单类型string,所以直接使用拉姆达表达式把对应的user返回就可以了,所以这里边可以直接写一个be提取它的user,这个就非常简洁而且一目了然。我们以当前数据里边的user字段作为K进行分组聚合。啊,那接下来当然也可以max了,我们现在来测试一下max啊,那同样里边也是有传string和两种方式,我们现在传入字段名称。
08:03
接下来做一个打印。这个是MAX8。接下来我们就可以运行做一个测试了。当然为了更加清晰的看到测试的结果,我们不仅把Bob的数据增多一点,我们也可以把比方说Alice的数据也增加一些。在两条Bob的数据访问数据之后,再来一条Alice的访问数据,比方说他访问的是ID为200的这个商品啊,那这里呢,我们会想到不同用户的点击数据,在我们真实的处理场景下,它其实来的顺序到来处理的顺序未必和它发生的时间戳的顺序是一致的,那所以这个也有可能,比方说在Bob3.5秒访问数据来了之后,才来了一个爱丽丝。3200毫秒3.2秒的访问数据,这也是完全有可能的啊,所以接下来我们可以直接运行一下,看一看得到的结果是什么样的。
09:03
我们可以看到直接已经输出对应的结果了,针对当前的每一条输入数据,很显然max和max这两个流的处理都应该有对应的一条输出啊,那因为我们当前是流式处处理吗?来一条数据对应的就应该有它的结果输出。我们首先看第一条数据,就是Mary在第一秒的一个点击访问事件。首先我们可以看到这个显然只有一条数据嘛,最大的最近一次访问肯定就是自己了,就原封不动的做了一个输出,Max,和max没有任何的区别。然后同样Bob的第一条数据来了之后,我们会看到也是原封不动的输出,没有任何区别。然后接下来Alice第一条数据到来之后啊,这个这个都没有任何的区别,因为我们当前是以user作为K进行了分组聚合,诶,所以其实每一个用户他的最近一次点击的数据其实各自互不干扰,那所以当然是每一个用户就统计自己就可以了,只来一条数据的话,那就是自身嘛。
10:07
接下来我们看Bob的第二条,数据来了之后。往下看。当前Bob的对应数据就更新了。我们看到输出的是Bob,他的最后一次访问时间变成了3.3秒。而这里边max就有了区别,我们看到max输出的URL还是cart,这个cart是什么呢?其实是他第一条数据对应的URL访问的那个页面,当前max呢。输出的是商品ID为一的商品对应的URL,这就是当前它真正的访问的数据,所以这里边max和max的区别也就看得非常明显了,Max只针对当前我们定义的这个字段。想要去。
11:00
提取的这个字段去截一个最大值,那其他字段怎么办呢?因为我们当前看到当前做聚合的这个结果啊,它的数据类型,我们最后得到的数据类型。我们可以看到它是single output stream operator得到的,还是一个类型的。对应的data,所以经过转换之后,我们的输入输出的数据类型是不变的,它是只是做了一个聚合操作,把所有的数据聚合成了一个值进行输出而已。背后输出的依然还是事件,那除了当前这个字段变成最大之后,那前面的字段怎么办呢?Maps的做法是直接采用第一条数据对应的字段。而max做的,他的做法是。相当于直接把当前最大时间戳的一条完整的数据保存下来,做了输出。
12:03
这就是他俩的区别,当然在实际应用的过程当中,可能各有各的用途,这就看我们到底想要什么。继续看的话,后边我们看到如果再来Bob的对应数据的话,那其实也就直接把。Bob的最近一次访问时间改成3.5秒啊,那我们看同样还是max URL是不变的,还是第一条的,而max呢,更新成当前最新的一次访问数据。然后再接下来,我们看到爱丽丝又来了一条数据之后,很显然爱丽丝的第二条访问数据是3.2秒,他显然没有之前Bob已经访问到的这个时间更近,但是我们发现更新这条数据来了之后,更新之后并没有受到Bob3.5秒最近一次访问数据的影响,它直接更新的只是爱丽丝的最近一次访问。还是3.2啊。所以后面再来报的数据,我们会看到他就更新最近一次访问3.8秒,4.2秒,按照每来一条数据进行一次聚合,更新一次当前的最近一次访问的聚合结果。
13:12
这就是简单聚合算子的使用。
我来说两句