00:00
呃,累加器咱们前面给大家演示的一些基本操作,那么现在回过头来咱们再看一看啊,说累加器它是一个分布式的共享的止血变量啊,同学们看一下啊,来咱们这边稍微的描述一下这个东西写上啊,咱们叫做什么呢?叫分布式共享,咱们叫只写变量好了,那么这个分布式呢,其实我相信到了现在这个时候应该好理解了吧,同学们对不对,为什么呢?这个累加器的这个值呀。它是分布式来统计执行的啊,还有个叫共享,那么就意味着driver当中的这个累加器被多个我们这边啊所共享,对不对,诶好,还有一个叫止血是什么概念。它表述的是我们的这个累加器的值,他们互相之间是不能访问的,是读不到的,比方说咱们这个累加器,它是读不到这个累加器的,他是读不到他的啊,但是你自己肯定能读自己的嘛,对不对,那肯定没问题,但是你想互相访问,那不行,那谁能访问呢?咱们driver,因为你最终的结果都是要返回到我们的driver这一边呢,对不对,所以driver可以真正的把它们每一个都读到,然后呢,给它合并在一起,所以呀,这就叫止血的意思啊,那么首先它是我们一个变量啊,咱们生命的一个变量,然后在这边传递,所以通过这个呢,把这个概念稍微的理解一下,诶老师呀,那咱们这个累加器是不是只能做一些事件数据的一个累加呢?其实不是,在某些情况下,我们也可以用累加器实现一些特殊的功能,而且比方说我们可以把一些需要杀Le的东西可以用累加去实现,这样的话就没有沙Le了,比方说word count,那咱们word count呢?
01:44
其实就可以想办法用累加器来实现,那好我们这里呢,一块给他演示一下,那首先我们先来把这个先关掉啊来。好,那我们写上一个啊,咱们叫零四啊,然后拿过来我们写上一个叫做什么呢?叫word count,嗯,来word count好了,咱们拿过来点击OK啊,放在这边,那么大家想一想,我们以前的word count我们一般都怎么写啊,比方说啊,咱们来咱们写上,当然咱们叫做hello啊hello,然后呢,我们再来一个,咱们叫Spark,然后再来一个hello,所以啊,咱们的数据呢,可能是现在这个样子,那咱们为了给他大家呢,什么实现这个word count我们怎么办?咱们RDD,点咱们的map,然后呢,括号下划线,逗号一变成我们渐值类型对不对,然后呢,点我们叫reduce by key没问题吧,下划线加下划线,所以大家可以想象一下,这边是肯定有沙否操作的,对不对,这个肯定是这样的吧,嗯,数据他们hello,放一块来做聚合。
02:44
但是其实我们换一个思路,如果我不需要沙,能不能实现呢?其实也能实现,为什么呢?就一个一个便利,一个一个统计班,比方说嗨来了,诶我现在呢,就找一个什么呢,找一个map,找一个map啊,我看看这个hello有没有,如果有把它的数量加一。
03:01
如果没有呢,把它添加进去对不对?诶,那这样的话,Hello是一次,Spark是一次,Hello之前出现过是两次,这样的话呢,其实就没有沙uffle了,那如果在数据量多的情况下,你没有沙uffle,那么你的性能不也就得到提高了吗?所以啊,咱们一些简单的数据的累加,其实都是可以用累加器的啊,像咱们这种word count用累加器肯定是没有问题的啊,那好,那我们该怎么做呢?首先第一个咱们是RDD点,我们叫for it。干嘛呀,把每个单词诶,我们要进行数据的累加,所以在这里呢,我们说一下叫数据的累加,大家想想你这就是一个循环便离,怎么可能会有沙,不可能有对不对,那么既然不可能有的话,你要想办法准备你自己的那个变量了,所以在这里我们写上啊,咱们叫累加器,叫累加器,这个累加器它应该会有一个什么呢?我们的诶呃,名字,这个名字我们就叫WC吧,对吧,咱们叫word,就这么写啊,咱们叫ACC,好,那问题来了,你后面写什么呀?
04:07
诶,老师,那我们后面就写咱们的这个sc.long好像不行,因为你想求的是word count,你肯定不是光一个浪,那么你用double。好像也不行,对不对,那好,还有一个叫collection,这个集,集合是可以吧,但是也不行,因为咱们集合当中它保存的是我们的list,那也就意味它保存的是list,这个list按理说你要保存个什么键值,对啊,什么word count也行,可是你再去匹配数据的时候就会感觉不方便,所以啊,咱们这个地方啊,好像浪不行,Double不行,那个collection好像也不行,所以在这种情况下,如果我们能够自己来定一个累加器,这不是一个好的选择吗?所以啊,大家看,我想实现一个功能,就叫word count,那么这个时候我就要自己来定义累加器了,所以在这我就写上叫class,叫MY,咱们叫ACC IQ military,哎,怎么写啊,好了,嗯,我们这么写上啊,嗯。
05:07
咱们叫做自定义啊,数据累加器,它要实现的功能呢,就是word count,嗯,好了,那你随便写个类就能叫累加器了吗?那肯定不行,对不对?所以我们要看一看,我们模仿一下,天下文章一大抄,别人怎么写,咱们抄别人的不就完了吗?那么好,咱们来看一看我们的SC点它不有一个叫做long accumulator吗?我们就看看它是怎么做的啊,咱们点一下点点完以后大家看一下它是这样的,它首先new了一个我们的类,然后呢,有个register,有个注册,这个就是上下文对象的注册功能,所以他就把累加器注册进去,并且起了个名,然后返回累加器,那所以啊,对于咱们来讲,咱们也可以这么做呀,大家看一下来,咱们叫创建累加器对象,好,然后呢,我们叫做向Spark进行注册,向Spark进行注册,诶就是这样的,然后呢,你这个数据的累加,其实说白了不就是。
06:08
使用累加气吗?哎,使用累加气,然后呢,最后这个地方来写上获取累加的结果,获取累加器累加的结果。对不对,哎,分这么几个步骤,其实就可以了,那好了,那我们现在就发现哦,我要创建累加期对象,那来吧,拷贝,所以呢,我放到这边,我们写上一个什么呢?咱们叫做WC的,然后呢,等于咱们叫做new,诶我就放过来了,放过来以后干什么呀,咱们要给它什么注册,诶所以呢,我们写的sc.register注册,注册的时候大家会发现他有要求了,你的这个累加器啊,不能随便给你要给他。那我的对象要想传进去的话,是不是要跟他有关系啊,所以这个叫iqul VR。它到底怎么回事,咱们不知道,所以呢,我们来再去看一看,大家看一下我们写上一个我们的long啊,来我们这边写上一个啊,咱们来放到这边,我们写上一个,嗯,SC点咱们的long,我现在点它一下,点完之后大家看一下我们的这个自定义的累加器,跟这个就的感觉差不多,所以我点点完以后你看这儿,这个不就是我们的IQ me VR吗?所以呀,咱们再去点它是一个抽象类,有两个泛型,所以后退,后退以后拷贝过来,那么我们现在就明白了,哦,原来如果你想要能够诶定义累加器,你要遵循它的要求,它的第一个要求就是继承。
07:41
继承咱们的AQVR,所以来咱们写上继承叫做它。好了,写完以后,那这个时候你会发现它里面会有什么,我们的红色的波浪线,咱们点它一下,因为它是一个抽象类,所以它其中的抽象方法其实啊是什么,是应该需要重写的,所以我们要重写它的方法,但是你重写方法的话,会有一个要求什么呢?它有一个输入和输出的一个类型的约束,大家看一下这个aqm,它里面有一个中国号in和out,这是两个泛型,这两个泛型它是需要给它事先定义好的,所以咱们继承IQ minute VR干嘛呢?然后定义我们的泛型,呃,这个泛型呢,首先有一个in啊,有一个我们的out啊,In和out,那么这个in和out呢,我们看看表示什么含义,那顾名思义啊,就是往累加器器里面增加了什么样的内容,就是我们叫累加器啊,累加器输入的内容啊,或者叫输入的数据。
08:48
类型,嗯,好,而这个out呢,就表示的是什么呢?累加器,它需要返回的数据的类型,咱们叫累加器啊,咱们叫做什么呢?嗯,返回吧,叫返回的数据类型,嗯。
09:03
那我们就想一想吧,你这个是用来做word count的,那么你就会把每一个单词给它往里面放,对不对,所以啊,它的输入类型应该是什么,每一个单词呀,所以我给他个string没有问题,然后呢,你的累加器的返回结果不就是应该是word count吗?就是我们的count所形成的一个集合,那我们应该是一个map吧,对不对?诶点我们写上一个map,然后写上一个string,咱们叫做long就可以了啊。好了,那我现在的这个地方呢,就把它准备好了啊,所以来我们这里写上咱们第一个叫做string类型,第二个是一个map类型,为什么是map,就是因为它里面放的是我们的K键字,对word count不就是键对吗?嗯,好,那这个如果没问题的话,我重写它的方法,所以来。大家看一下我现在呢,重写它的方法了,那我先不重写了,我先把上面准备好,同学们看,我在这个地方把它准备好了,那这个地方呢,我们就去掉,我们来写上,叫做点,我们叫resist,然后把它放进去。
10:08
放进去以后给它起个名,这个名称我们就叫做word count ACC,哎,就行了,好,那你注册完毕之后,那我不就可以累加了吗?所谓的累加就是把这个累加器咱们拿过来。点我们叫A,那这个A呢,就把每个单词放进去就行了,放进去以后,当所有的数据循环遍历结束之后,结果就应该出来了,所以说我们的来,我们的累加器,点它有个叫Y6,这个Y6就是累加器的结果,那么好,我们直接点咱们叫做打印就可以了啊,所以基本的操作咱们已经算是做完了啊,但是我们累加器的细节咱们还没有做,没关系,咱们慢慢来啊。
我来说两句