前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark系列 —— 算子详解(二)

Spark系列 —— 算子详解(二)

作者头像
solve
发布2019-10-30 18:41:17
6250
发布2019-10-30 18:41:17
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

本文接上一篇 Spark系列 —— 各类算子详解(一) 这篇主要来讲讲 Action 算子 以及 Cache 算子。

Action 算子

Spark 的执行算子,一个 Action算子 会触发一次 job 的生成。 这里需要注意的是, Action 算子要么没有返回值, 如果有返回值,那么这个值是会被拉取到driver端的, 如果数据过大,你就得考虑下你的driver端是否装的下了...

  1. reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 将RDD的数据进行聚合,并返回聚合后的值。 执行逻辑类似于 reduceByKey
  2. collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. 返回RDD中所有的数据,即 将RDD 的所有数据原封不动的拉回到 Driver 端
  3. count() Return the number of elements in the dataset. 返回该 RDD 中的数据的条数。
  4. first() Return the first element of the dataset (similar to take(1)). 返回 RDD 中的第一条数据。
  5. take(n) Return an array with the first n elements of the dataset. 返回 RDD 中的前 N 条数据。 n: 需要拿取数据的条数
  6. takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. 随机 返回 NUM 条数据。 withReplacement:是否有放回抽样 num: 抽取数据的条数 seed:随机种子,相同的种子会有相同的随机数据
  7. takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator. 根据 ordering 排序 ,然后返回 前 n 条数据。 n:返回的数据 ordering:排序函数
  8. saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. 将RDD的 数据 以 txt 格式 保存到指定路径 path, path:保存路径,该路径可以是 local filesystem 或者 HDFS 或者 any other Hadoop-supported file system
  9. saveAsSequenceFile(path) (Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). 类似于 saveAsTextFile,不过格式是 Sequence Sequence 这是一种特殊的压缩格式。
  10. saveAsObjectFile(path) (Java and Scala) Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile(). 同上,不过一把用做保存 RDD 的里面的数据是 object 类型的 数据, 这样加载的时候可以直接转换成对应的对象
  11. countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. 这个就是个 Wordcount,不赘述
  12. foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. 遍历RDD的每一条数据。 func : 遍历数据的逻辑。
  13. treeReduce(f: (T, T) => T, depth: Int = 2) 这个算子是最近看优化的时候看到的,之前还真一直没怎么用过, 一般也是用来优化 Reduce 操作的, 不过因为是 Action 操作,会将数据拉回到Driver,所以用的地方一般不多, 不过目前网上好像也没有能把这个算子讲明白的帖子,所以这里多啰嗦一下。 f:这个是聚合逻辑,没什么好多说的。 depth:从字面意思看起来有点像树的深度的意思。默认值是 2 。 那么这个字段实际的作用是什么呢? 首先我们来说说 treeReduce的作用。 平时我们如果 reduceByKey 之后再 collectAsMap 将数据啦回Driver端, 会有一个问题,如果分区数很多,那么每个分区的数据都要啦回 Driver, 然后再进行聚合,Driver是单机的,所以很容易在数据拉回来之后直接把就OOM了, 就算不OOM,单机做聚合的效率也肯定是比较慢的。 所以这时候我们就应该用 treeReduce, 他会对我们的数据进行进行若干次 reduceByKey 操作,并慢慢减少分区数, 假设我们现在有 100 个分区,设置 depth = 3;那么他会 以 100/3 = 33(向下取整) 分区进行一次 reduceByKey, 再以 33/3 = 11(向下取整) 分区进行一次 reduceByKey, 再以 11/3 = 3(向下取整) 分区进行一次 reduceByKey, 这个时候才会将聚合好的数据拉回到Driver 端完成聚合。

Control 算子

控制类算子,也就是我们常说的 缓存类算子

  1. persist(StorageLevel) 缓存算子,懒执行,返回一个 缓存类型的 RDD。 当缓存 RDD 被 Action 算子执行后, 该缓存RDD 会被储存起来, 当再次需要该 RDD 执行其他job 的时候, 就可以通过缓存直接读取数据了。 StorageLevel:缓存级别
    • 关于缓存级别:我们可以来看下其构造函数, 可以看到,其分为: _useDisk:是否使用磁盘 _useMemory:是否使用内存 _useOffHeap:是否使用堆外内存 _deserialized:是否反序列化 _replication:副本数

    class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1)

    • 内置的缓存级别 val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) 可以根据构造函数来理解下这些内置的缓存级别。 补充: 一般我们常用的缓存级别是:DISK_ONLY,MEMORY_ONLY_SER。 副本数一般来说并没有很大的作用,当然如果你内存非常充足另说 使用磁盘储存的话,会对效率有比较大的影响, 当然如果你计算链确实很长,数据确实很多,那另说。
  2. cache 其实就是 persist(MEMORY_ONLY), 没什么好说的,一般缓存用这个就好了....
  3. checkpoint 这严格来说不算是一个算子, 不过这也是一种持久化的手段, 其特点就是不会因为程序的结束而终止, 需要人为的控制其生命周期, 一般都是用在流式处理当中, 这里稍微提一下,不做过多解释了。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.07.31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Action 算子
  • Control 算子
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档