【Spark篇】---Spark中控制算子

一、前述

Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partitioncache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

二、具体算子

1、 cache

默认将RDD的数据持久化到内存中cache是懒执行

chche () = persist()=persist(StorageLevel.Memory_Only)

 SparkConf conf = new SparkConf();
 conf.setMaster("local").setAppName("CacheTest");
 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");

 lines = lines.cache();
 long startTime = System.currentTimeMillis();
 long count = lines.count();//count是action算子,到这里才能触发cache执行,所以这一次coun加载是从磁盘读数据,然后拉回到drive端。
 long endTime = System.currentTimeMillis();
 System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ 
          (endTime-startTime));
        
 long countStartTime = System.currentTimeMillis();
 long countrResult = lines.count();//这一次是从内存种中读数据
 long countEndTime = System.currentTimeMillis();
 System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
           countStartTime));
        
 jsc.stop();

2、persist(可以指定持久化的级别)

解释:

<!-- li { list-style: none; margin: 0; } p { margin: 0; } span.l { color: red; font-weight: bold; } a.mapnode:link {text-decoration: none; color: black; } a.mapnode:visited {text-decoration: none; color: black; } a.mapnode:active {text-decoration: none; color: black; } a.mapnode:hover {text-decoration: none; color: black; background: #eeeee0; } -->

1、MEMORY_AND_DISK 意思是先往内存中放数据,内存不够再放磁盘

2、最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。

3、选择的原则是:首先考虑内存,然后考虑序列化之后再放入内存,最后考虑内存加磁盘。

4、尽量避免使用“_2”和DISK_ONLY级别。

5、deserialized是不序列化的意思。

注意事项:

  1. 1、cache和persist都是懒执行,必须有一个action类算子触发执行。
  2. 2、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
  3. 3、cache和persist算子后不能立即紧跟action算子。

错误:

rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了

3、 Checkpoint(对Lineage非常长时使用)

<!-- li { list-style: none; margin: 0; } p { margin: 0; } span.l { color: red; font-weight: bold; } a.mapnode:link {text-decoration: none; color: black; } a.mapnode:visited {text-decoration: none; color: black; } a.mapnode:active {text-decoration: none; color: black; } a.mapnode:hover {text-decoration: none; color: black; background: #eeeee0; } -->

     1、概念和特征:

           不仅可以将数据持久化到磁盘,还可以切断RDD之间的依赖关系,checkpoint也是懒执行。

<!-- li { list-style: none; margin: 0; } p { margin: 0; } span.l { color: red; font-weight: bold; } a.mapnode:link {text-decoration: none; color: black; } a.mapnode:visited {text-decoration: none; color: black; } a.mapnode:active {text-decoration: none; color: black; } a.mapnode:hover {text-decoration: none; color: black; background: #eeeee0; } -->

Checkpoin不仅存储结果,还会存储逻辑,还可以存储元数据。

            Persisit切断不了RDD的依赖关系。

  2、checkpoint 的执行原理:

<!-- li { list-style: none; margin: 0; } p { margin: 0; } span.l { color: red; font-weight: bold; } a.mapnode:link {text-decoration: none; color: black; } a.mapnode:visited {text-decoration: none; color: black; } a.mapnode:active {text-decoration: none; color: black; } a.mapnode:hover {text-decoration: none; color: black; background: #eeeee0; } -->

           2.1.Spark job执行完之后,spark会从finalRDD从后往前回溯。

2.2.当回溯到对某个RDD进行了checkpoint,会对这个RDD标记。

           2.3.回溯完成之后,Spark会重新计算标记RDD的结果,然后将结果保存到Checkpint目录中。

   3、优化checekpoint

  • 因为最后是要触发当前application的action算子,所以在触发之前加一层cache操作,一样会往前执行cache操作,实现对数据的cache ,所以考虑将cache优化到checkpoin的优化流程里。
  • 对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job(回溯完成之后重新开的job)只需要将内存中的数据(cache缓存好的checkpoint那个点的数据)拷贝到HDFS上就可以。
  • 省去了重新计算这一步,不需要重头开始来走到checkpoint这个点了。

<!-- li { list-style: none; margin: 0; } p { margin: 0; } span.l { color: red; font-weight: bold; } a.mapnode:link {text-decoration: none; color: black; } a.mapnode:visited {text-decoration: none; color: black; } a.mapnode:active {text-decoration: none; color: black; } a.mapnode:hover {text-decoration: none; color: black; background: #eeeee0; } - --><!-- li { list-style: none; margin: 0; } p { margin: 0; } span.l { color: red; font-weight: bold; } a.mapnode:link {text-decoration: none; color: black; } a.mapnode:visited {text-decoration: none; color: black; } a.mapnode:active {text-decoration: none; color: black; } a.mapnode:hover {text-decoration: none; color: black; background: #eeeee0; } -- -->

总结:

<!-- li { list-style: none; margin: 0; } p { margin: 0; } span.l { color: red; font-weight: bold; } a.mapnode:link {text-decoration: none; color: black; } a.mapnode:visited {text-decoration: none; color: black; } a.mapnode:active {text-decoration: none; color: black; } a.mapnode:hover {text-decoration: none; color: black; background: #eeeee0; } -->

持久化的最小单位是partition!!!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

SparkStreaming(源码阅读十二)

  要完整去学习spark源码是一件非常不容易的事情,但是咱可以积少成多嘛~那么,Spark Streaming是怎么搞的呢?

1242
来自专栏Hadoop实操

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《Spark2Streaming读Kerberos环境的Kafka并写数据到HBas...

2741
来自专栏简单聊聊Spark

Spark内核分析之DAGScheduler划分算法实现原理讲解(重要)

        接着上一篇,我们接着来分析下一个非常重要的组建DAGScheduler的运行原理是怎么实现的;通过之前对Spark的分析讲解,我们的Spark作...

1492
来自专栏Jed的技术阶梯

Spark-RDD持久化

使用不同参数的组合构造的实例被预先定义为一些值,比如MEMORY_ONLY代表着不存入磁盘,存入内存,不使用堆外内存,不进行序列化,副本数为1,使用persis...

2113
来自专栏Java 源码分析

SparkStreaming 入门

2418
来自专栏大数据-Hadoop、Spark

Spark Streaming + Kafka整合

2615
来自专栏行者悟空

Spark RDD的Action

1516
来自专栏Jed的技术阶梯

Spark性能调优02-代码调优

代码调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以...

1762
来自专栏行者悟空

SparkContext初始化过程

2124
来自专栏个人分享

Spark作业调度

    Spark在任务提交时,主要存在于Driver和Executor的两个节点.

1891

扫码关注云+社区

领取腾讯云代金券