首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(十七):Spark Core的RDD持久化


RDD 持久化

引入

在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

API

缓存/持久化函数

可以将RDD数据直接缓存到内存中,函数声明如下:

但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别

缓存/持久化级别

在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:

持久化级别

说明

MEMORY_ONLY(默认)

将RDD以非序列化的Java对象存储在JVM中。 如果没有足够的内存存储RDD,则某些分区将不会被缓存,每次需要时都会重新计算。 这是默认级别。

MEMORY_AND_DISK (开发中可以使用这个)

将RDD以非序列化的Java对象存储在JVM中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取

MEMORY_ONLY_SER (Java and Scala)

将RDD以序列化的Java对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的CPU。

MEMORY_AND_DISK_SER (Java and Scala)

与MEMORY_ONLY_SER类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们。

DISK_ONLY

将RDD分区存储在磁盘上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2等

与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上。

OFF_HEAP(实验中)

与MEMORY_ONLY_SER类似,但将数据存储在堆外内存中。 (即不是直接存储在JVM内存中) 如:Tachyon-分布式内存存储系统、Alluxio - Open Source Memory Speed Virtual Distributed Storage

实际项目中缓存数据时,往往选择MEMORY_AND_DISK

缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。

释放缓存/持久化

当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:

此函数属于eager,立即执行。

代码演示

代码语言:javascript
复制
package cn.itcast.core

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
 * RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
 */
object SparkCacheTest {
    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[*]")
        val sc: SparkContext = new SparkContext(sparkConf)
        sc.setLogLevel("WARN")
        
        // 读取文本文件数据
        val inputRDD: RDD[String] = sc.textFile("data/input/words.txt", minPartitions = 2)

        // 缓存数据
        inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
        // 使用Action函数触发缓存
        println(s"Count = ${inputRDD.count()}")
        println(s"Count = ${inputRDD.count()}")

        // 释放缓存
        inputRDD.unpersist()
        // 应用程序运行结束,关闭资源
        sc.stop()
    }
}

或使用spark-shell演示

代码语言:javascript
复制
// 启动集群和spark-shell

/export/servers/spark/sbin/start-all.sh



// 将一个RDD持久化,后续操作该RDD就可以直接从缓存中拿

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)//WordCount

rdd2.cache //缓存/持久化

rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行缓存/持久化

rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

总结:何时使用缓存/持久化

在实际项目开发中,什么时候缓存RDD数据,最好呢???

 第一点:某个RDD被使用多次的时候,建议缓存此RDD数据

比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据

第二点:某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据

比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行关联Join等聚合操作,获取RDD:etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD数据

案例: etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)

下一篇
举报
领券