00:00
好,同学们,我们接下来呢,给大家讲一下,这个叫广播变量啊,这是我们三大数据结构当中的第三个啊,咱们叫分布式的共享只读变量啊,这个是什么意思呢?给大家演示个小例子啊,来把这个关掉,呃,关掉以后呢,我们这个包咱们就不变了啊,咱们这里拷贝一个新的来,我们写上一个叫做零五,但是呢,我们这里就不叫ACC了,咱们叫BC啊,Broadcast广播的意思啊,咱们把这个放到这里面,然后呢,把这个呢我们去掉,去掉以后SC点啊,咱们这个地方SC咱们点,嗯,咱们叫做我想想啊,嗯,咱们叫做。Make r DD,然后呢,我们这里做这样的一个功能吧,好不好,同学们,我们写上一个叫list,嗯,放过来,咱们叫括号啊,咱们写个A,然后写个逗号一,然后再来一个,我们写上一个B,然后写上一个二啊,然后再来一个,写上一个我们的这个C啊,写上一个我们的三,嗯,可以了,好,这是我们的第一个RD,诶,好,然后我们再来给大家写我们的第二个RD,强拉,放过来,然后写上一个我们的嗯,四给个五,给个六。
01:15
那我们现在想做一个什么功能呢?我想把两个我们的RDD的什么操作呢?给它联合在一块儿啊,相同的T把一和四连在一块儿,相同的B把二和五,相同的C把三和六连接在一起,所以呢,我们有一个RDD,一点我们的join啊,咱们叫RDD2,这个咱们之前是给大家讲过的,对不对啊,咱们叫做join啊RDD,那么我们的join r DD的话,那么这个地方咱们来拷贝啊,拷贝之后点一下咱们叫做collect,嗯,然后呢,我们for it,然后呢,有一个叫print,好了啊,那写完之后我们现在来运行。还记不记得我们这里其实不推荐使用的,为什么不推荐使用,因为咱们的这个里面,它涉及到一个笛car乘级,而且还可能会有沙uffle,所以啊,在整体上来讲,它的数据会几何性的增长,而且还会有沙,当数据量变多,你的落盘就会变慢啊,你的磁盘IO的次数就会越多,性能就会这么变差,对不对?
02:25
所以啊,对于我们来讲,我们觉得呀,这个地方是我们需要给大家去解决的问题啊,所以呢,我们这里说一下啊来。我们的这个join呢,它会导致啊,咱们的数据量的几何啊增长啊几何增长,并且呢,我们说了啊,它会什么呢?会影响嗯,会影响我们叫suffle的性能啊,数据量越多啊,咱们叫性能诶就是这样的,所以其实不推荐使用啊,咱们叫做不推荐使用,那你不推荐使用,那我这儿有没有什么办法可以诶不用它还能实现团的功能呢。
03:04
其实是可以的,为什么呢?因为大家想想你最后的结果变成什么呢?来大家想想我们的最后结果是不是变成了我们的A,然后呢,我们的这个地方A写上一个我们的括号一和四啊,对不对,然后呢,再来一个哎,写上我们叫做B,然后括号我们叫做二和五啊,还有一个我们写上一个C,我们叫做三和六。那我就问你,你原来数据是什么样子的RDD,你原来的数据是不是就是A,我们写个一,然后呢,再来一个,我们写上一个B,写上一个R,然后再来一个,我们写上一个C,是不是一个三呢,同学们。也就意味着从这个角度来讲的话,大家想想,哎,咱把这个稍微的给它变一下啊,大家想想有没有什么大家好明白的,你看一想,是不是就等于把一变成了他。对不对,把这个B大家看一下来,把这个二变成二和五吧,把这个C当中的三变成了三和六,其实这个算不算一种结构的转换呀,同学们。
04:08
所以也就意味着你不使用join的话,你使用什么map values是不是也可以啊,所以啊,大家看一下我现在的这个地方来,我们我就别用这个join了,既然你的这个join它有问题对不对,那我就不用它了呗,所以把这个注掉,注掉以后干嘛呢?同学们看我就这么来。诶,把这个咱们也给他去了啊,来拷贝这个放上面去,我现在说过了,那既然有问题,那咱不用了呗,我就把这个RDD我们拿过来,咱们点,点了以后给他一个叫map values什么意思啊,我要把你的number干嘛呢,要变成。变成什么东西啊?同学们想想我是不是要变成我们的它呀?可是问题来了,你要想变成它的话,这个四呀五呀六啊,他从哪来呀?你是不是还得想办法把那个456拿到,而且还不光是要把456拿到,你还要知道他们俩是不是同一个P呀。
05:09
所以说你光靠value是不行的,你还得干嘛呀,需要知道它的key,所以大家看一下咱们不能用map values了,应该什么呢?用咱们的这个这个case咱们拿过来啊,咱们拿过来这个case呢,就是我们当前的这个啊,咱们叫做W,咱们叫word写成一个count,哎,就是这样的。好,那你这么写了以后,我得想办法把这个数据拿到,诶,所以我写上,写上什么东西呢?我们写上咱们叫做嗯,List,或者咱们叫做map,是不是会更好一些呢?咱们叫做map啊来,我们现在这么写,这么写了以后给它一个啊,咱们叫做muable.map然后给它加个括号,嗯。好了,那我现在这么写,那我现在大家想想你的这个W是不是就是我们的word呀,我就看看这个map里面有没有不就行了吗?所以拷贝拷贝以后来我们点一下,点点了以后大家看一下,我们写上叫get or else对不对,那这个时候我们把我们的word放进去,放进去以后干嘛呀,如果取不到怎么办?
06:16
对不对,如果取不到的话,我们给他一个零啊,给他一个零按理说是可以取到的啊,咱们这只是演示啊,同学们,咱们不考虑实际的情况,所以呢,我们这里点咱们叫VAR回车,回车以后记住这个值就是我们里面的四,就是它啊,那好,你最终想到什么,这是C是不是你的一,那么这个是我们的四,那你最后想要的结果是不是就是W,再加个括号,咱们叫做C和那个L啊。大家想想这是你最后想要的吗?所以啊,你这么写完了以后,咱们点叫collect,然后点我们叫做什么呢?For it,给它来个print好了,那我现在我们运行一下看结果吧,看一看我们用这种方式实现的效果,诶,嗯,我看看啊,它这个地方说了,我们的这个是返回的类型不一样啊。
07:12
嗯,我们确定一下哦,咱们的这个map我确认一下,它这里面应该是不太一样的,那这样把这个去掉吧,那咱们不用了啊,咱们给他一个int,嗯,咱们这样来啊,好,我们再来重新运行一下。好了,同学们会发现我们最终的结果是不是已经得到了,而且没有任何的问题吧,我问问同学们中间有沙吗?没有沙对不对,诶,就是这样,所以这个功能呢,就是非常不错了,为什么呢?采用了一种特殊的处理方式代替了咱们的卷,而且实现了同样的功能,并且没有杀手,性能非常的好。对不对,但是。但是啊,同学们,我们一提就会说提但是了,嗯。
08:00
如果咱们这个数据量很多怎么办?他很多他很多怎么办?诶老师多就多呗,这要比笛卡尔成绩不要少很多嘛,对不对。那他太多了怎么办?这个大家可能也体会不出来,这个太多有什么区别是吧,或者有什么问题,这个呢,咱们通过画图的方式给大家说一下啊来同学们看我现在呢,比方说我拿过来,咱们现在呢,有一个driver啊,这个driver好嘞,这个driver当中啊,有这么一个东西,咱们拿过来,这个就是那个数据集合呀,那个map啊,所以我就写上一个map,你写完map以后,诶往哪去了,咱们叫做map啊嗯。好,把这个呢map呢我放过来,然后呢,我们的executor我们放过来啊,咱们放过来放到我们的这边啊好了,那好我问同学们,如果假设呀,咱们假设我们这个我们的啊,我写错了,这个应该是这个地方应该是我们的driver啊,如果我们的driver当中有十个分区的话。
09:06
那么是不是应该是十个任务啊,那就意味着十个计算任务,那么十个计算任务的话,是不是每个任务它都应该去执行这个map的操作,对不对?那好,如果每个我们的任务都执行map操作的话,它里面是不是就用到了谁咱们的这个map吧,所以说每个task里面是不是都应该有什么我们的这个map,所以大家看一下啊,来把这个东西我们换一下放到这儿,那就意味着我们会什么呢?每个task里面它都需要干嘛呢?用到这个map,所以这个map呢,是一个共享的数据,所以拿过来给他,为什么呢?因为是一个B包嘛,他要把这个给它包进来,哎,就是这样啊,所以这是我们的一个task,好。我刚才说过了,我有十个分区,但是我问同学们,我如果我假设有十个分区的话,那么可是我的资源只有一个excute怎么办?我没有那么多的ECU,我现在就一个ECU,你现在有十个分区,最终会出现十个task。
10:13
那我就问问同学们了,那是不是意味着这十个task应该都在咱们的一个equ里面来执行啊,对不对,哎,就这样,但是呢,你的一个equor呢,可能只有一个CPU的和,所以他们其实就不能叫并行计算了,应该叫并发计算,对不对,这个倒不重要,重要的是什么,大家会不会发现。我们的这个B包的数据在每个task里面都会传一份,如果它里面有100万条数据,那么每个都一份的话,那这里面是不是放了400万的数据啊?但是你放那么多数据,你会发现什么,其实这些都是冗余的,在同一块内存当中放了什么400份儿数据,但其中300份是什么冗余的,你觉得合适吗?所以啊,咱们这里呢,给大家稍微的咱们描述一下这个现象啊,就是来我们的B包数据啊,咱们的B包数据它都是什么呢?都是以我们的task克为单位。
11:16
为单位啊来为单位发送的,说的简单点就是每个任务啊,每个task中包含咱们叫包含我们的B包数据,诶就是这个意思,那如果是这样的话,就存在了一个非常严重的问题啊来嗯。啊,这样啊,可能会导致,可能会导致我们的一个exec中啊,含有咱们叫大量重复的数据啊,并且啊,并且占用大量的内存,占用大量的内存,为什么呢?如果你的那个数据量非常大的话,那么每一个啊,它会非常大非常大非常大,那你的性能就会下降,所以它占了大量的内存,就会影响咱们的性能,那该怎么办呢?
12:07
既然我们已经明确知道了,如果你现在这么写的话,它能够实现功能,但是当你数据量大的情况下,你的每个task里面都会包含这样的东西,那这种情况下肯定性能不高。可是我明白了它的问题,那我该如何解决呢?诶,这时候就涉及到另外一个概念了,就是我们的executor啊,其实executor啊,它其实啊,其实就是一个咱们叫GVM,所以呢,他在我们启动时啊,启动时会自动的,什么叫分配内存。那既然是自动的分配内存的情况下,那就说明eor它有自己的内存对吗?他如果有自己的内存的话,那能不能咱们这样。咱们在我们的ex里面的这个内存当中啊,咱们把这个map呢,给它放进去,就是放到这里面去,不再是以我们的每个task为单位了,而是以每个以Q为单位来保存这个数据,所以啊,它会自动分配内存啊,那么我们完全。
13:15
完全可以啊,可以将我们什么呢?我们的这个任务中啊,任务中的这个B包数据,咱们叫B包数据,给它放置在我们的executor的内存中,达到啊,咱们写上叫做达到共享的目的,达到共享的目的,诶就是这样的,所以啊,大家会发现这个就不需要了,这个也不需要了,它也不需要了,你只需要去关联它就可以了,比方说咱们的task用到了这个map来去找他。同样道理,谁用谁去找嘛,所以来放过来啊,放这边,嗯,好,然后再来啊,咱们再来,嗯,好,放这边。然后呢,再来啊,嗯,好,这边。
14:02
好了啊,那这样的话,大家都访问一块内存中的map的话,数据就不会出现大量的冗余,对吗?而且我们的这个共享数据不也挺好的吗?对不对,但是有前提条件,它不能改,因为我们都是访问它,如果有人把它改的话,别的是不是也都会出问题,这就类似于我们现成的安全问题,对不对?所以咱们所谓的这个map放在内存的这个操作,我们靠Spark的广播变量就可以做到啊,所来。翻过来我们的Spark中啊,Spark中的广播啊,咱们的广播变量,它就可以将我们的B包的数据给它保存到我们的execuor的内存中,这个是没有任何问题的,但是要千万注意,它是不能改的啊来。所以来广播变量它不能够更改。
15:01
它不能够啊进行更改操作,所以我们之前说了什么叫广播变量,大家还记得吗?它叫分布式的共享的只读变量啊,所以我们说一下来,咱们叫分布式,嗯,分布式咱们叫做共享的只读啊,咱们的变量首先它就是个变量,第二个它是在分布式当中去执行的,还有一个叫做共享,就是大家一块多个task去共享它,而且它只能读取,而没有提供任何的方法去改变它,所以这是我们的广播变量啊好,这个呢,我们加上一个大的字体呢,给它拿过来,嗯。啊,咱们叫广播变量啊,诶,那这个广播变量我们该如何的去使用它呢?其实非常的简单啊,把这个呢给它关掉,关掉以后呢,我们拷贝,拷贝以后写上一个零六,嗯。然后呢,把这个放过来,放过来以后同学们看啊,这个不要了。
16:04
然后呢,我写上一个SC,咱们叫做点,我们叫做broadcast,就叫广播变量,把这个map呢,给它放进来,然后点叫VAR回车,回车以后你这个就是BC啊,这就是那个BC,这个我们就是广播变量,那么广播变量在咱们的这个位置就可以想办法把它取到,叫做点,咱们叫Y,这样的话从广播变量里面把值就取出来了,取出来以后跟之前的用法没有任何的区别啊,所以啊,这是我们的广播变量啊,在这个地方咱们写上叫封装啊广播变量。广播变量,把你之前的数据集给它封装一下,然后呢,在这个地方呢,写上啊,咱们叫获取或者叫访问都行啊,咱们叫访问,我们叫广播变量,嗯,好了,哎,你会发现代码非常的简单啊,现在我们运行一下,看看结果有没有问题啊。
17:08
好了,同学们看结果是不是完全一样啊?诶诶,这就是我们的广播变量啊。
我来说两句