00:00
好,同学们,我们刚才呢给大家介绍了一下u daf的实现原理,接下来我们准备呢,给大家演示具体的例子,那咱们以什么为例呢?就以刚才的这个年龄的平均值为例,其实我们的circle当中自带了求平均值的功能,对不对?但是我们呀,就是模仿一下这种udf的执行过程,给大家演示一下啊,因为平均值比较好理解嘛,对吧?所以我们就拿平均值来做这个事情,那首先我们第一步我们要先创建一个类来实现这个功能,我们写上零三,然后中间加上一个A,这个A叫aggregate,它就是聚合的意思,所以点击OK,然后把它拿过来,那么点击OK以后,首先我们会发现一个现象,什么现象呢?我们这个叫udf,这个呢,你是没有办法去把udf像我们这种写法直接写上,这个做不到,我们得需要自己创建的是么?我们的类,所以啊,咱们有自定义的类,那么这里写上啊来。咱们写上咱们叫自定义啊,我们的类,或者说自定义我们的聚合函数类,它的主要目的呢,就是计算年龄的平均值,哎,就可以了,那这里我就写上叫class,咱们叫做MY,嗯,Avg,平均值u daf就可以了啊好。
01:18
那你说自定义聚合函数类,你随便写个类,我们的Spark就能知道你是用来做聚合的吗?肯定不行,对不对啊,所以说你要明白啊,就是我们如果想让Spark能动态识别,你需要满足或者说遵循某个规则,对吗?那么这个规则就有了,咱们写上叫继承,它有一个叫U啊,咱们叫做有一个叫做defend啊,来咱们叫aggregate function在这里,它这里面标识了一条横线,表述的是已经不再推荐使用了,为什么呢?因为咱们现在这种写法,大家可以看到,我是准备在S文里面来使用一个聚合函数。可是你要知道S口纹它是没有类型的,它只有行和列,它表现的是我们的什么数据的结构,所以它是个弱类型的操作,那么弱类型的话,在我们当前最新的版本当中已经不怎么支持了,所以它这里呢,标识了我们的横线,但是没关系啊,我们先演示完,咱们再去做一些更新好不好,同学们。
02:19
那么首先第一个啊,咱们这里继承了它,所以我们写上一。嗯,咱们写成一啊来,咱们要继承我们的这个类啊,这个咱们可以看一看,点一下,点完以后它是一个抽象类啊,所以继承我们的它拷贝拷贝以后,但是你要记住它这里标红了,标红的话你要重写它的方法,快捷键ctrl I,然后呢,点击OK,这个时候呢,我们写上啊来写上咱们叫做什么呢?二咱们叫重写。咱们叫重写方法,那么你重写方法的话,你会发现有很多对吧,但是其实并不难理解,咱们一点一点看好不好,同学们来看第一个。呃,这里的第一个叫input scheme,那么scheme我们一般称之为叫结构,它表述的就是输入结构,那什么叫输入结构呢?就是说你对于我的聚合函数来讲的话,什么是输入的值,你输入的值的结构是什么样子的?比方说咱们说了年龄的平均值,那肯定是把年龄传给你啊,对不对?所以年龄就是我们的输入啊,那什么叫输出呢?那不就是我的计算结果嘛,对不对?所以啊,对于我们来讲,这个叫input scheme,就是输入的那个结构,那我们说了,咱们只有一个年龄。
03:32
那好,那只有一个年龄的话,咱们这该怎么用呢?首先第一个来,嗯,看他这个地方,他说了要求咱们返回一个叫stru type,就是一个结构的类型啊,这个叫结构对吧,这叫类型,那么这个我们怎么返回呢?我们点它一下,点点完以后大家有没有发现它是一个样例类,那么它如果是个样例类的话,在咱们选skyla的时候,咱们的它就可以直接构建对象,都不需要去new,直接就可以加个括号,对不对,为什么?因为样例类会自动生成它的伴生对象,以及它的apply方法对不对?那所以呢,这里就直接加括号,但你加括号的时候它要传参数对吗?它要传参数的话,它是个R瑞,那么大家就知道咱们的R瑞这个地方呀,也是可以直接写,对不对,所以我们写上一个R瑞,诶这么写是完全可以的啊,但是你R瑞里面都有东西啊,这个叫strut field,咱们叫做结构的属性,说的简单点啊,你的结构当中都有需要哪些值呀,那对于咱们来讲,咱们知道就需个年龄啊,所以点一下点。
04:32
点完以后它有一个name,有一个叫data type,有一个叫做是否为空啊,还有什么什么东西,这两个有默认值,那其实只要传这两个就够了,所以拷贝拷贝以后拿过来。大家看一下,呃,这个就叫ru fieldeld,诶把这个咱们导一下,导一下导一下传个名字,然后再传一个它的,嗯,这边呢,是一个叫做data type,咱们叫做类型对不对,咱们如果叫类型的话,那对于咱们大家来讲的话,我要看看我要传什么值,可是这个东西好像不是我们平常接触的到的,对不对。
05:05
所以啊,我们想看一个,我们点点它一下,点完以后它是个抽象类,所以我按CTRLH键导到它的一个关系,那么这个里面会有很多呀,那我们说了我们年龄嘛,它应该是一个数值类型的,并且应该都是一个整数吧,啊,当然了,也有可能是个浪类型,对不对,也有可能咱们这就不说了,所以我找一下这边有个原子性的操作点一下,点完有个数字类型,点里面有一个integer,再点,嗯,里面会有一个这个integer和这个long,其实啊,我们引teer就是我们整数了呗,但是你年龄相加以后是不可能很大,诶我们就说用long类型吧,所以拷贝。拷贝以后,那这个地方我们就知道了哦,给它一个long类型,其实诶不对,这个咱们叫做long啊,咱们的tap,嗯,有这么个东西啊,好了,那你这么写完以后,咱们这儿就OK了,同学们就现在啊,咱们输入的这个数据的结构就有了,叫输入的数据的结构啊,输入数据的结构好把这个去掉,那么去掉以后,那么我们接着往下啊,记住这是我的第一个方法的作用,第二个那其实你第一个能明白,第二个是不是就明白了,第二个叫bufferper,就意味着我们缓冲区做计算的那个结构是什么样子的,同学们看。
06:21
你回到这个图形当中,大家会发现这就是八分里面是不是要求得年龄的总和,以及它的总的数量啊,所以对于我们来讲,它的结构应该不是单一的了,所以拷贝拷贝以后放过来,放到这里,那我这里就写上了,叫做什么呢?叫缓冲区,这个缓冲区啊,就做临时计算的啊,所以我们说一下咱们叫缓冲啊,缓冲区数据的结构,那么这个结构啊,我们说了应该有多个,为什么呢?因为有一个叫做什么呢?叫total。就是总共的那个年龄啊,啊来总共的年龄,还有一个我们应该叫count啊,这个count呢,应该表述的含义是什么呢?表述的是数量,诶你到底有多少个我们的用户啊,诶你的总共的年龄是多少啊,所以把这个东西呢,给它写上就可以了啊,所以这样的话,缓冲区的结构也就有了,输入数据的结构也就有了,嗯,然后下面呢是data type,这个是什么意思呢?给大家解释一下这个字面含义可能不好理解啊,这咱们说一下,它应该叫做我们的函数。
07:23
啊,它的计算结果的数据类型,其实说白了就是输出啊,它就是那个out,这个呢,就是我们的in啊,同学们记住啊,输入啊,这个就是我们的out,那这个就是我中间的buffer,所以啊,它就是我们中间的缓冲区,对吧,用来做临时计算用的,这是我们的输出啊,那咱们知道吧,你的年龄是个浪,你的那个数量是个浪,你相处还是个浪啊,所以我们直接放过来。好了,翻过来以后接着往下,下面呢,叫做函数的稳定性啊,函数的稳定性,这个所谓的稳定性一般就指的是你传入相同的参数,你的结果是否相同,那咱们这个肯定是一样的,因为咱们这就是一个普通计算呢,里面有没有什么随机数之类的,对不对,所以啊,咱们有个稳定性的概念,还有下面呢,叫初始化,对吧,这个我们写上咱们叫做。
08:11
大家看啊,这边叫做八分,这个八分啊,其实就是我们中间缓冲区的那个操作,所以我们称之为叫缓冲区初始化是可以的。啊,缓冲区数字化,就是你到底怎么来对缓冲区做操作,它的初始状态是什么样子的,那咱们知道啊,咱们这个其实没什么可说的,就是什么呢?就是我们的全值零呗,因为缓冲区就是总和和数量,那默认情况下肯定都是零嘛,所以呢,我们这里来啊,咱们写上一个括号啊,写上一个零,哎,写上一个零,然后呢,它就写上一个0L就可以了,然后这边再写上一个一啊,然后再写上一个零。我们为什么这么写呢?大家看一下,我点点完以后,它这里面其实是有个叫update的方法的,这个update方法呢,叫做什么in,它是它的索引,这是它的值,所以啊,我们其实是可以通过什么呢?这种方式来拷贝,拷贝以后呢,我们点点了以后干嘛呢?我们就update给它一个零,给它一个我们叫0L,然后给他一个一啊,这个零表示的是位置啊,绊示的是你第一个结构当中的属性,这个一表示的是我们的第二个值,所以啊,它这个是靠位置来算呢,那么这两种方法其实是完全相同的,为什么呢?因为在盖语法当中,我们一旦集合中有update功能的话,它跟这种写法是没有任何区别的,所以啊,这个如果你看不懂的话,没关系,你看不懂,那咱们就用下面的啊,所以来这是盖LA语法当中的功能,然后呢,我们接着往下,下面是什么呢?Update,这个update大家可以看到,这叫input,这个叫buffer,顾名思义啊,当你的数据诶,你通。
09:49
这过来的时候,你如何更新我的缓冲区啊,来一条更新一个,来一条更新一个,对不对,那这就叫update,所以在这里呢,我们写上啊来根据我们叫做输入的值来更新我们缓冲区啊,缓冲区咱们叫缓冲啊缓冲区数据,哎,就是这意思了,好了,那我们拿过来。
10:13
拿过来以后,那这个八分咱就不说别的了,就直接点了,对吧,咱们就update了后给个零,这个零表述的是年龄的总和,然后这个呢,表述的是什么呢?表述的是我们的一,它表述的是那个数量,那么你要更新数量的话,那你之前的数量,你这回来了一个我们的,嗯,年龄的话,那我肯定加一呀,所以呢,应该是我们的buffer点,点了以后,它里面有一个叫get long啊,给他一个一表述的是它那个旧的值,然后再加什么,再加上一,这是我缓冲区的,同样道理,我的这个也是一样的啊来,咱们写上,咱们叫buffer,咱们叫get along。反过来给他一个零,为什么给他一个零呢?是因为我们最开始那个。年龄总和不是第一个位置嘛,对不对,所以它是零啊,然后再加上加什么,你输入的年龄,你输入的年龄是个肉,那就是按我们的顺序呗,所以来我们叫input,我们叫点叫get,咱们给他一个get along啊,给他一个零。
11:16
好了,你这么写完以后就可以了,再有的同学有疑问说老师你看,那你这也是零,它也是零,这啥意思啊?首先这个是我输入的数据,那我输入的数据我们就要求了,它只有一个结构对不对?所以我取零,取的就是你给我的年龄,而我们的这个buffer呢,它有两个值,一个叫total,一个叫count,那么这个total是零,所以那个count就是一,那么count为一,所以它更新加一,而这个呢,取的是我缓冲区之前的值,再加上我输入的值,那不就新的值吗?所以这样的话,零和一就被更新了,所以大家会发现这种我们的没有类型没有属性的这种操作,光靠顺序啊,它容易记混啊,但是没关系,咱们先这么做啊。好,接着往下同学们来。
12:03
呃,往下的话,那么下面是什么呢?叫墨子,咱们叫合并对吧?诶,咱们叫合并就是缓冲区啊,缓冲区数据合并,为什么呢?因为你分布式计算你的每一个缓冲区,其实啊是有多个的,对不对,那多个最终是要合在一块儿再做我们的处理的,所以啊,你会发现这就是两个什么呢?我们的八分一和八分二。那么buff份一和buff分二就是两个缓冲区,缓冲区啊,它里面的处理逻辑啊很简单,就是更新呢啊,把他们的数值给它相加,然后做更新就可以了,那好这里叫unit,那么到底是把哪个值更新呢?这个就要涉及到咱们该了一般的这种算法逻辑了,比方说同学们看举例子啊,来咱们比方说一个集合当中有四个元素,我们一般两两计算,大家会发现我们的前两个值,这个值是X吧,对不对,XY对不对,X和Y,它聚合之后得到一个结果。那么然后这个结果是不是就当X了,然后这个当成Y了,对不对,然后呢,X和Y又做计算,然后结果呢,又X,然后再跟我们这个Y再做计算,对吗?所以啊,我们的这个X其实就是第一个,那么它就在不断的去更新,不断的去更新,最后得到结果就还是它了,所以啊,我们这个八分的一点,咱们就update给它一个零啊,OK,然后再来,嗯,我们写上一个一就行了,那这个时候呢,我们应该是八分一,然后点一下,嗯。
13:30
表示一下,咱们叫get along啊get along给他一个零,然后呢,这边再加上我们的八分二。然后点叫get long啊,给他一个我们的一好了,你这么写是可以的啊,我这音也是零对不对啊,我刚才说错了,咱们这边再来拷贝,拷贝以后放过来,放过来以后这边应该都是一吧,嗯,应该都是一样,因为它都是缓冲区嘛,位置应该是相同的,好了,那你这么写完了以后,八分一就发生了改变,它做下去运算不又改变了对不对?诶,所以这样的话,缓冲区数据就合并了,那好,接下来我们来往下,往下以后同学们看下面是什么东西,下面就叫计算,说白了就是计算平均值。
14:14
啊,咱们前面说了什么缓冲啊,什么合并更新呢,包括初始化啊,其实都是为了做中间计算,那么你最后是要取得我们的什么?诶那个平均值的那个平均值buffer不就在这儿吗?你直接点我们叫get long,然后给个零,然后除以buffer.get long给他一个一,诶这样的话我们不就OK了吗?对不对,好了,那我现在呢,把这个我们的u daf函数就算是做完了,它里面的方法一二,然后三四。56788个啊,所以咱们这有八个方法也需要去重写一下,嗯,可能感觉挺多的啊,但是方法的含义还是比较好理解的,什么更新合并,初始化计算呢,什么,呃,我们的输入结构啊,输出结构包括什么稳定性这些东西对吧?诶你多写几遍其实就能够记得住,那好了,那我们现在自定义聚合函数类了,我该怎么去用它呢,对不对,那么所以啊,这个时候来。
15:12
把这个呢,我们就去掉了,因为咱们自己有自己的类了,所以我就拗一下,嗯,拗一下以后把这个拷贝啊,把它拷贝,拷贝以后呢,我这里啊,把它改个名字,咱们就叫年龄A,咱们叫啊,咱们叫HAVG,咱们叫做平均值,那如果你把这个我们的类当成了一个函数,那么这个函数就可以放在这儿来了,来放在这儿啊,放在年龄的这个位置,这个呢,咱们就不要了。好,那我现在呢,就把这个代码算是写完了,我接下来验证一下,看看跟我预想的结果是否是一样的,我回过头来打开优点Jason 20 30 40 20加30 50 50加四是90,那么三个用户90的年龄平均值应该是30,对不对?所以我们现在运行一下看结果。
16:05
好,同学们,结果出来了,看看跟我预想结果是完全相同的,对不对,哎,就是30了啊,行,那我们就先说到这里好吧。
我来说两句