前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark之【RDD编程】详细讲解(No6)——《RDD缓存与CheckPoint》

Spark之【RDD编程】详细讲解(No6)——《RDD缓存与CheckPoint》

作者头像
大数据梦想家
发布2021-01-27 10:56:27
6860
发布2021-01-27 10:56:27
举报
文章被收录于专栏:大数据成长之路

本篇博客是Spark之【RDD编程】系列第六篇,为大家介绍的是RDD缓存与CheckPoint

该系列内容十分丰富,高能预警,先赞后看!

在这里插入图片描述
在这里插入图片描述

7.RDD缓存

RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。

但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用

在这里插入图片描述
在这里插入图片描述

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

在这里插入图片描述
在这里插入图片描述

在存储级别的末尾加上“_2”来把持久化数据存为两份。

在这里插入图片描述
在这里插入图片描述

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

1)创建一个RDD

代码语言:javascript
复制
scala> val rdd = sc.makeRDD(Array("atguigu"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at <console>:25

2)将RDD转换为携带当前时间戳不做缓存

代码语言:javascript
复制
scala> val nocache = rdd.map(_.toString+System.currentTimeMillis)
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27

3)多次打印结果

代码语言:javascript
复制
scala> nocache.collect
res0: Array[String] = Array(atguigu1538978275359)

scala> nocache.collect
res1: Array[String] = Array(atguigu1538978282416)

scala> nocache.collect
res2: Array[String] = Array(atguigu1538978283199)

4)将RDD转换为携带当前时间戳并做缓存

代码语言:javascript
复制
scala> val cache =  rdd.map(_.toString+System.currentTimeMillis).cache
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27

5)多次打印做了缓存的结果

代码语言:javascript
复制
scala> cache.collect
res3: Array[String] = Array(atguigu1538978435705)                                   

scala> cache.collect
res4: Array[String] = Array(atguigu1538978435705)

scala> cache.collect
res5: Array[String] = Array(atguigu1538978435705)

8.RDD CheckPoint

Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

案例实操:

1)设置检查点

代码语言:javascript
复制
scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")

2)创建一个RDD

代码语言:javascript
复制
scala> val rdd = sc.parallelize(Array("atguigu"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24

3)将RDD转换为携带当前时间戳并做checkpoint

代码语言:javascript
复制
scala> val ch = rdd.map(_+System.currentTimeMillis)
ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26

scala> ch.checkpoint

4)多次打印结果

代码语言:javascript
复制
scala> ch.collect
res55: Array[String] = Array(atguigu1538981860336)

scala> ch.collect
res56: Array[String] = Array(atguigu1538981860504)

scala> ch.collect
res57: Array[String] = Array(atguigu1538981860504)

scala> ch.collect
res58: Array[String] = Array(atguigu1538981860504)

本次的分享就到这里,受益的小伙伴或对大数据技术感兴趣的朋友可以点赞关注博主哟~至此,Spark的【RDD编程】系列先告一段落。后续还会根据实际情况对其进行更新补充,敬请期待!!!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/02/26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 7.RDD缓存
  • 8.RDD CheckPoint
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档