00:00
挨个实现啊,首先我们来看一下这个。自定义。预聚合函数。所以我们这里边实现一下这个count hg,它要去实现一个什么接口呢?聚聚合函数对吧?I agreegate function,诶大家看到刚才看到它里边的那个类型了吗。点的有点快啊,在这看一下in ACC out,这分别代表什么东西呢啊,In简单输入对吧?输入的数据类型在我们这里就是在我们这里是不是相当于就是user behavior啊,对吧?传入传进来的那个数据类型,那out也简单输出的数据类型,那中间这个ACC呢?给大家看type of the accumulator是不是就是那个累加器计数器啊,这里大家注意它其实代表的含义是什么?
01:05
代表的含义就是大家看中间的聚合状态,对,所以大家会想到在这个过程当中,我们做累加,那累加器是不是就是聚合中间的一个状态啊,对吧?来一个计数加一,来一个计数加一,我们要保存的其实就是这个状态,所以是这样的一个处理流程啊,好,所以这里边我们要给的那个印类型是user behavior。那累加器的类型是什么呢?状态的类型对吧?我们要保存的状态的类型,对,那是不是就是一个浪类型啊,大家想一想啊,大家想一想是不是这样,因为我中间的那个状态是不是就是count值啊,对,所以这里边是一个浪类型,最后那它的输出呢?哎,有有同学可能说你这里边他的输出应该是那个我们定义好的那个it count啊。但是大家注意,这里边它能这里是做预聚合对不对,它能直接拿到我们那个就是window相关啊,当然就是说你如果要是说想拿到window的这个,呃,相关的信息的话,那可以在其他地方拿对不对,那这里边是不是还拿不到window的相关信息啊,它只是来一个数据聚合一次做一次处理而已,我们这里就是直接累加而已,那大家想这里它能够输出的一个结果是什么啊,是不是就是最后它聚合好的那个计数的那个结果啊,然后他把这个结果要给谁,是不是就是给后边的这个window function啊,对,所以大家注意啊,它们俩之间的关系是什么呢?就是它这里边定义的out。
02:47
就是它这里边的E啊,就是它的结果会给到他去做最后的输出啊,所以这里边我们就直接把它也定义成了完事对吧。好,接下来我们实现一下。
03:03
哦,大家看到了啊,这里边实现是不是要实现这四个方法啊,啊首先大家看到有一个是ADDADD是什么意思呢?这里面它传进来的是不是就是当前的值,然后还有这个累加器accumulator,对吧?这是不是就是我们的状态,所以大家看这个聚合,我们说这个聚合算子就是一个有状态的算子,对不对,这个过程本身就是带状态的,它的状态其实就是这个累加器,那这里边来了一个值之后,我们怎么做操作呢?对,是不是就是直接。是不是直接累加器加一就完事啊,来一个计数器加一,来一个计数器加一对吧,数据相关的东西全不要无所谓,我就看你有几个就完事了啊,所以就这么做,就这么简单啊,那下面这个是create accumulator,这是不是相当于。初始状态给的初值啊,对吧,一开始的这个累加器,一开始是什么值,大家想是什么值呢?对,当然是零,然后接下来get。
04:08
Get result是不是相当于就是这里边输出的那个结果,我要把它拿到了,对吧,那大家说现在输出的结果应该是什么呢?是不是也就是这个这个累加器的值啊,所以大家看这个就。不要太简单啊,就就直接把这些东西一拿出来,把状态输出就可以了,那最后还有一个墨指墨GE指的是什么呢?就是对,就是如果遇到这个重分区或者其他的状态的时候,累加器,两个累加器怎么处理对吧,那大家想怎么处理啊,对呀,计数器嘛,那当然就是加起来就完事,A加B,所以大家看这个具体的实现就这么简单,这样就把这个自定义的语句和函数搞定了。这里边可能要给大家就是多讲一点啊,就是说大家会想到我们前面说在一些更加复杂的情况下,可能我们聚合的状态可能会更多,对不对。
05:05
那简单的聚合能不能用这个语句和函数实现呢?当然可以,你有很多个状态,怎么实现呢?大家想一想。那我这里边就一个累加器,那怎么那怎么处理啊。哎,这个累加器的数据结构不是我们自己定义的吗?那假如说现在我们有一个需求,让我们算一个平均数,上节课给大家提到很多指标要算平均数,对不对?平均数怎么算呢?给大家举个例子啊,自定义域聚合函数计算平均数,大家会想到你在过程当中是不是得保存两个状态,一个是累加起来的和另外一个是对,另外一个是当前的这个数量啊,那大家会想到我这里边写一个啊对,有同学已经想到了,是不是我可以去定义一个原组的这样的一个,呃,作为我的这个状态的类型啊,对吧,然后里边就可以有两个值了嘛,啊所以这里边比方说我定义一个叫呃,Average。
06:18
Agg对吧,平均数的一个聚合,那同样它还是要继承这个aggg方式,这个接口,这里边输入的数据类型啊,这就随便了,对不对啊,甚至我有可以就直接在这个例子上,我直接用这个user behavior也可以,对不对,我直接用user behavior后边的这个,对,这里边的状态是不是就应该是一个long,这是这是相当于我们要做计算求和的那个值对不对,比方说我们求什么和呢?就求,哎呀,这这里边好像没什么数字。User ID不太合适,我们直接把时间戳做一个求和得了,对吧?算一下时间戳的这个这段时间他们时间戳的平均数,呃,也算是有一点实际意义啊,所以这里边就是可以是一个浪类型做求和,然后对应的那个个数的话,那如果说我认为它的个数比较少,我是不是直接用一个int就可以啦,然后最后输出应该输出一个什么,大家注意对,最后是不是要做除法啊,这个是不是有可能出现小数啊,当然你如果定义浪也可以,那最后就相当于精度缺失嘛,对吧?所以这里边我们定义一个double,诶,这是我们定义好的这样的一个状态。
07:35
那具体的里边怎么去实现呢?大家看现在的这个accumulator,它就有两个值了,对吧,两个状态了,那现在来了一个数据的时候做什么操作呢?大家注意这里边我是不是得返回一个,大家看它的返回类型,就是我们的这个accumulator对吧?就是要返回状态的改变,所以来了一个数据,我要返回一个什么呢?
08:02
是不是还是一个元组啊,元组首先第一个是对,就是前面我们要加求和的那个,那那怎么求和呢?是不是当前的状态值,它的一要加上value的time stamp对吧?啊,当然这里边我们是加的是time stamp啊,那另外第二个这是个数对不对?个数的话是不是就是accumulator的二加一是不是就这样做啊,对,然后接下来初始值,初始是不是也是一个状态值啊,那这个状态是什么呢?前面是LONG0L,初始的和当然是零,对不对,那后边是不是也是零啊,个数也是零,对,所以就是这个样子,然后接下来大家再看这个拿结果,拿到的结果是什么呢?结果是不是就是我们最后输出的那个平均数啊。平均数怎么算啊,这个太简单了,那是不是就是accumulator的一,这是和是不是直接除以accumulator的二,是不是就这就是它的平均数啊啊,当然最后还有这个对两个这个累加器怎么样去聚合在怎怎么样合并在一起,那是不是合在一起就是A的一加B的一。
09:28
是吗?不是除对吧,最后是不是这里还是一个状态的改变啊,没有要算最后结果,所以还是状态啊,逗号是一个元组A的二对加B的二,大家看是不是非常简单。对吧?啊,所以说就是我们把这个例子给大家讲完之后,预计大家就是说之后做的基本的那些统计类的指标,基本上就是拿来只要有数据对吧,拿来直接直接一按,按这个标准一算就完事了,像这个为什么后面还麻烦一点,因为后边还要做那个做排序,所以就看起来好像还复杂一点,如果你要不用排序的话,直接统计的话,是不是就我们把这个做完就就已经搞定了呀,对吧?所以这个其实是非常简单的啊。
10:16
好,然后这个语句和函数就实现了,我们接下来要实现一个什么呢?还得实现一个window function,是不是包装成我们最后想要的一个结?呃,那个样例类的类型叫item will count,对吧?把它的那个数、count值和window and都包装在一起输出,那这个玩意儿要怎么实现呢?这里啊,自定义。窗口函数计算,呃,输出item will count。呃,当然这里边我们还是先把这个拿出来,叫window result,它需要去,它需要去实现一个什么接口呢?Window function对吧。
11:14
好,这里大家看到了它的类型,这个稍微有点多啊,它有它有四个类型,In outt,诶这个我们知道输入输出对不对,它还有一个key,对这里面涉及到我们当前那个key的类型,最后还有一个,呃,是不是还有一个这个呃,Window的类型啊,当然这里边这个小于冒号的,这个是表示对下界对吧,上下界的这个意思啊,这个应该是表示这个window类型是它的上界对吧。对,所以大家会看到这里边我们可以传当前的window应该是什么,应该是个time window,对不对,所以可以传一个time window,好,所以这里边它的印是什么类型,大家还记得吗?就是前面聚合预聚合函数输出的结果是它的输入对吧?所以这里边它的输入是了,输出呢,Item welcome样一类对吧,然后key key是什么类型呢?诶,这里边大家看到key是item item ID对吧?所以大家会想到这里边给一个long,然后最后是一个time window。
12:29
Window,好。好,我们上来看一看,但是大家看诶不对呀,正常来讲,我这里边如果要已经实现这样一个接口的话,这里是不是应该就不报错了,他这里报类型不匹配。Match,对吧,这个原因在哪里呢?大家仔细看他期待的这个window方式的类型是什么,是不是中间的那个K,他期待的是一个元组类型啊,他类型对不对?哎,为什么他期待的是一个ta类型啊,哎,我们这里边把这个先改过来。
13:14
对吧?哎,大家看一改,果然这个就没事了,是不是?哎,这里当时跟大家说过,做KBY的时候,如果我们直接传一个字段类型,字段传进去的时候,返回的结果类型,这个k stream的结果是不是它的类型就是这个,这是它的本身的那个数据类型,对不对?K的类型就是Java temple。那所以这里边是不是它就也是一个tabo啊,那大家想一想啊,当然我们看看它影响不影响吧,不影响的话我们就忍了,对吧?啊还好。我们看一下它要实现什么,他要实现一个APA方法,在这个里边我们最后是不是就是想要去大家看我们用什么可以去输出最后的结果,Out对,它是一个collector,我们是不是直接out.collect对里边是不是就包装成一个item will count啊,诶大家想这里边我们要的。
14:12
三个参数,Item ID window and和count。那count应该是什么?Count是不是就这里边input里边的值啊,当然它还是个inter类型,我们把它拿出来就可以了,那window and是不是根据window也能够也也能够拿到,哎,这两个都都不难,那当前的ID是什么呢?ID是不是得从K里边去拿,哎呀,这个就有点儿别扭了,是吧,大家想一想这个如果我要去拿的话,是不是你得从一个这个类型里边去拿,大家看一下这个需要怎么去做啊。我可以定义一下啊,Item ID它是一个浪类型对不对,怎么去拿呢?都知道它就是K里边去拿的,那K它本身又是一个呃,一个Java tempbo的类型,那怎么拿它对应里边那个值呢?而且这个是一个一个一个接口类对吧?这里边我还得去把它明确,它的这一个实例是什么,As instance of,那大家想它是一个什么的instance呢?当时我们说看那个那个呃,Java temple它下面有有什么样的实现来着,是不是TEMPLE0T1T2对吧,它应该是什么呢?它是一个TEMP1对吧?哎,所以这是我们。
15:33
具体的这个类型,所以它是as instance of temple1,这里大家还要注意,你这里面默认引入的可能会有问题,就是默认引入的是这个。呃,是这个skyla这里边实现的这个TEMPLE1对吧,所以我可能还得这样啊,把这个引入对不对。他一。
16:00
好,然后大家看跳转,诶,现在是跳到这儿了,对吧,那里边的值是什么呢?大家可以看到它里边是不是就一个值,它的类型是T0啊,这个值是什么?是不是就是它这里边叫做F0的这个玩意儿啊,对吧?但是这个玩意儿是不是TOP1还得指定它的类型啊。它的类型是类型是love,对,哎,所以大家看就就这么别扭,对吧,把这个都拿出来之后,我现在就可以拿到它下面的这个F0了,对不对,对吧,可以把这个拿到了,所以哎,这这个过程就这么麻烦对吧,这么别扭,有有没有方便点好使点的方法,大家自然就想到那是不是你当时的那个K不要写的这么别扭就行了,对吧。这怎么写呢?对,大家想到当时我们这个有一种方法,可以让它返回的这个k stream的K类型,不是temple,什么方法呢,是不是直接定义那个一个key select这样一个function就可以把它最后返回,就是它对应的那个数据类型啊,啊,所以这里边我们可以定义。
17:14
下划线点item ID对吧?呃,现在当然你直接这么改了之后,这个类型又不匹配了,对不对,所以这里面我们要改成了对吧,改了之后看一下没事了对不对?哎,现在这样才好嘛,哎,这个干干脆把它全删了,重新,因为那个写那个类型有点麻烦对吧?大家看现在这里边我们的这个类型是不是就变成浪了,所以现在是不是就简单很多,我直接就可以。out.collect一个item will。Item,呃,本身的这个item ID是不是本身就是key啊,对吧?然后接下来window的and window and是不是从window里边去拿,大家看它是不是有一个方法叫get and,哎,所以直接拿我们确定一下它是一个long类型,Long没问题,对不对,好,所以然后还有一个,还有一个count count是是不是要从input里边去拿,它是一个table类型,所以是不是要去。
18:18
点next把它拿出来啊,所以大家看,就这样把它包装成一个item count输出就完事了。这样看起来就就舒服多了,对吧?啊,省得我们整那么麻烦啊好,接下来。大家看这一步我们就已经搞定了,其实现在已经可以输出,可以看到这个,呃,当前处理完之后聚合窗口聚合的结果了,对不对啊,这这其实已经可以说是告一段落了。
我来说两句