前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark你一定学得会(二)No.8

Spark你一定学得会(二)No.8

作者头像
大蕉
发布2018-02-05 17:25:21
6310
发布2018-02-05 17:25:21
举报

第一次分享的妥妥就是入门的干货,小伙伴们最好可以自己敲一敲代码,不然只看我的分享一点用都木有。但还是有很多小伙伴表示看不懂,没关系,慢慢来自己操作一遍有什么问题后台问我就可以了。

啥也不说了,就是干货,首先祭上今天的关键代码。今天主要跟大家讲讲Spark里面RDD的持久化机制。首先持久化机制有什么用呢?一个作用是保存到硬盘给其他小伙伴查看,另外一个作用是重用,我们都知道RDD是不可变的,所以当RDD有重用的时候,如果没有持久化,RDD都会乖乖地重新算。。

代码语言:js
复制
object RDDPersist {   
def main(args: Array[String]): Unit = {     
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("helloWorld")     
val sc: SparkContext = new SparkContext(conf);     
sc.setCheckpointDir("~/checkpoint");     
var someList = List[String]("hello","world")      
//我们先假设这里有一百万的数据     
val defaultPartition = 5;     
val wordsRDD: RDD[String] = sc.parallelize(someList,defaultPartition);     
val someThingSenceless = wordsRDD.map(x => x+",").map(x => ((Math.floor(Math.random()*100)) ,x)).filter(x => x._1 <50)     
someThingSenceless.count();     
someThingSenceless.groupByKey().count();     
wordsRDD.repartition(defaultPartition*2);     
wordsRDD.coalesce(defaultPartition , false) 
         
wordsRDD.cache()
wordsRDD.persist(StorageLevel.MEMORY_ONLY);     

/*     
StorageLevel.DISK_ONLY     
StorageLevel.DISK_ONLY_2     
StorageLevel.MEMORY_ONLY     
StorageLevel.MEMORY_ONLY_2     
StorageLevel.MEMORY_ONLY_SER     
StorageLevel.MEMORY_ONLY_SER_2     
StorageLevel.MEMORY_AND_DISK     
StorageLevel.MEMORY_AND_DISK_2     
StorageLevel.MEMORY_AND_DISK_SER     
StorageLevel.MEMORY_AND_DISK_SER_2     
*/     
wordsRDD.saveAsTextFile("~/")     
wordsRDD.unpersist();     
wordsRDD.checkpoint();   
}

从前面的分享我们可以知道,Spark快的根本原因就是Spark的中间过程都存储在内存里面,而这一切的基础就是RDD的持久化,RDD默认是根据partition按分区持久化在内存里边的。啥意思呢,一份数据就好像一个钥匙串,而这钥匙串会分成好多根钥匙保存到集群机器上。

代码语言:js
复制
 var someList = List[String]("hello","world")     
 //我们先假设这里有一百万的数据  
 val defaultPartition = 5;  
 val wordsRDD: RDD[String] = sc.parallelize(someList,defaultPartition);

就是初始化了一个有一百万数据的RDD,分布在5个分区里面。

五个分区好像有点少,想分配到更多的分区咋办?

代码语言:js
复制
wordsRDD.repartition(defaultPartition*2); wordsRDD.coalesce(defaultPartition , false)

coalesce是一个重新分区操作,第一个参数是目标的分区数,第二个参数是是否进行suffle,也就是混淆重新洗牌。如果不重新洗牌,那么就会按照分区数一对一分配到更多的分区上,比如一个分区拆分成两个分区这样,而不是把两个分区进行混淆重新洗牌然后平均分配到三个分区里边。怎么理解这个概念呢,看下面这个图,文字可以忽略,左边就是没有suffle的,右边是有的。

repartition是coalesce的简单实现,就是把第二个参数默认为true,就是repartition了。

代码语言:js
复制
val someThingSenceless = wordsRDD.map(x => x+",").map(x => ((Math.floor(Math.random()*100)) ,x)).filter(x => x._1 <50)     
someThingSenceless.count();     
someThingSenceless.groupByKey().count();

这里的操作都没什么意义,就是做一些复杂的操作而已。如果我们不对wordsRDD 进行持久化,那么一切的操作都会从源头开始,一步一步往下算,不会复用原始的数据。这明显不是我们想要的,如果我们不想这样,那要怎么办呢?

代码语言:js
复制
wordsRDD.cache()     
wordsRDD.persist(StorageLevel.MEMORY_ONLY);     
/*     
StorageLevel.DISK_ONLY     
StorageLevel.DISK_ONLY_2     
StorageLevel.MEMORY_ONLY     
StorageLevel.MEMORY_ONLY_2     
StorageLevel.MEMORY_ONLY_SER     
StorageLevel.MEMORY_ONLY_SER_2     
StorageLevel.MEMORY_AND_DISK     
StorageLevel.MEMORY_AND_DISK_2     
StorageLevel.MEMORY_AND_DISK_SER     
StorageLevel.MEMORY_AND_DISK_SER_2     
*/

虽然说Spark的东西都尽量放到内存里边,但是对于非常非常巨大的数据集,Spark能拿它怎么办呢?总不能全丢进去然后丢弃一大批吧?所以就有了以下的策略选择了。

cache()方法很好理解,就是全都放在内存里面,也就是MEMORY_ONLY。persist方法的默认参数也是仅内存。MEMORY_AND_DISK的意思就是,内存能放得下就放内存,放不下就放硬盘。如何理解这几个参数的搭配呢?从三个要素来搭配就行了,存储位置、存储份数,是否序列化。三个的所有搭配都在上面的。

通常建议是使用StorageLevel.MEMORY_AND_DISK_SER。为什么使用这个呢?比较明显的原因,就是可以通过序列化压缩的方式,保证内存利用最大化。

代码语言:js
复制
wordsRDD.saveAsTextFile("~/")

细心的观众可能会发现,这又是什么玩意。这就是把目前的数据集直接保存到Driver硬盘上的一个API而已。

我们都知道RDD是有parent的,也就是RDD是一个有向无环图,那么如果调用链太长太长了,那么最后的一个操作引起机器失联,会导致整个计算从头开始,这样的开销非常非常大,特别是在深度非常深的迭代操作中,那怎么解决这个问题呢?

代码语言:js
复制
    sc.setCheckpointDir("~/checkpoint");
    wordsRDD.checkpoint();

答案就是使用checkpoint,这个是啥意思呢,就是把RDD保存到磁盘或者HDFS上,直接把当前RDD当成顶级RDD,也就没有所谓RDD的依赖链了,可以缓解这个多次迭代导致的问题。但是世界上哪有这么好的事,checkpoint的代价就是RDD保存在硬盘上,效率总是要降低的。

数据没用了咋办?删掉呗。无论是什么方式,都可以使用unpersist这个API进行删除所有的持久化。

代码语言:javascript
复制
 wordsRDD.unpersist();

好了今天的分享就到这里,RDD的管理只是Spark的内存管理的一部分,优先理解这部分内容,对开发Spark应用优化有非常重要的作用,就酱。

如果分享的话,我会很开心的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-04-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一名叫大蕉的程序员 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档