专栏首页大数据技术栈Spark系列 —— 算子详解(二)

Spark系列 —— 算子详解(二)

前言

本文接上一篇 Spark系列 —— 各类算子详解(一) 这篇主要来讲讲 Action 算子 以及 Cache 算子。

Action 算子

Spark 的执行算子,一个 Action算子 会触发一次 job 的生成。 这里需要注意的是, Action 算子要么没有返回值, 如果有返回值,那么这个值是会被拉取到driver端的, 如果数据过大,你就得考虑下你的driver端是否装的下了...

  1. reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 将RDD的数据进行聚合,并返回聚合后的值。 执行逻辑类似于 reduceByKey
  2. collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. 返回RDD中所有的数据,即 将RDD 的所有数据原封不动的拉回到 Driver 端
  3. count() Return the number of elements in the dataset. 返回该 RDD 中的数据的条数。
  4. first() Return the first element of the dataset (similar to take(1)). 返回 RDD 中的第一条数据。
  5. take(n) Return an array with the first n elements of the dataset. 返回 RDD 中的前 N 条数据。 n: 需要拿取数据的条数
  6. takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. 随机 返回 NUM 条数据。 withReplacement:是否有放回抽样 num: 抽取数据的条数 seed:随机种子,相同的种子会有相同的随机数据
  7. takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator. 根据 ordering 排序 ,然后返回 前 n 条数据。 n:返回的数据 ordering:排序函数
  8. saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. 将RDD的 数据 以 txt 格式 保存到指定路径 path, path:保存路径,该路径可以是 local filesystem 或者 HDFS 或者 any other Hadoop-supported file system
  9. saveAsSequenceFile(path) (Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). 类似于 saveAsTextFile,不过格式是 Sequence Sequence 这是一种特殊的压缩格式。
  10. saveAsObjectFile(path) (Java and Scala) Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile(). 同上,不过一把用做保存 RDD 的里面的数据是 object 类型的 数据, 这样加载的时候可以直接转换成对应的对象
  11. countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. 这个就是个 Wordcount,不赘述
  12. foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. 遍历RDD的每一条数据。 func : 遍历数据的逻辑。
  13. treeReduce(f: (T, T) => T, depth: Int = 2) 这个算子是最近看优化的时候看到的,之前还真一直没怎么用过, 一般也是用来优化 Reduce 操作的, 不过因为是 Action 操作,会将数据拉回到Driver,所以用的地方一般不多, 不过目前网上好像也没有能把这个算子讲明白的帖子,所以这里多啰嗦一下。 f:这个是聚合逻辑,没什么好多说的。 depth:从字面意思看起来有点像树的深度的意思。默认值是 2 。 那么这个字段实际的作用是什么呢? 首先我们来说说 treeReduce的作用。 平时我们如果 reduceByKey 之后再 collectAsMap 将数据啦回Driver端, 会有一个问题,如果分区数很多,那么每个分区的数据都要啦回 Driver, 然后再进行聚合,Driver是单机的,所以很容易在数据拉回来之后直接把就OOM了, 就算不OOM,单机做聚合的效率也肯定是比较慢的。 所以这时候我们就应该用 treeReduce, 他会对我们的数据进行进行若干次 reduceByKey 操作,并慢慢减少分区数, 假设我们现在有 100 个分区,设置 depth = 3;那么他会 以 100/3 = 33(向下取整) 分区进行一次 reduceByKey, 再以 33/3 = 11(向下取整) 分区进行一次 reduceByKey, 再以 11/3 = 3(向下取整) 分区进行一次 reduceByKey, 这个时候才会将聚合好的数据拉回到Driver 端完成聚合。

Control 算子

控制类算子,也就是我们常说的 缓存类算子

  1. persist(StorageLevel) 缓存算子,懒执行,返回一个 缓存类型的 RDD。 当缓存 RDD 被 Action 算子执行后, 该缓存RDD 会被储存起来, 当再次需要该 RDD 执行其他job 的时候, 就可以通过缓存直接读取数据了。 StorageLevel:缓存级别
    • 关于缓存级别:我们可以来看下其构造函数, 可以看到,其分为: _useDisk:是否使用磁盘 _useMemory:是否使用内存 _useOffHeap:是否使用堆外内存 _deserialized:是否反序列化 _replication:副本数

    class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1)

    • 内置的缓存级别 val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) 可以根据构造函数来理解下这些内置的缓存级别。 补充: 一般我们常用的缓存级别是:DISK_ONLY,MEMORY_ONLY_SER。 副本数一般来说并没有很大的作用,当然如果你内存非常充足另说 使用磁盘储存的话,会对效率有比较大的影响, 当然如果你计算链确实很长,数据确实很多,那另说。
  2. cache 其实就是 persist(MEMORY_ONLY), 没什么好说的,一般缓存用这个就好了....
  3. checkpoint 这严格来说不算是一个算子, 不过这也是一种持久化的手段, 其特点就是不会因为程序的结束而终止, 需要人为的控制其生命周期, 一般都是用在流式处理当中, 这里稍微提一下,不做过多解释了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flume 高级 —— source 自定义

    前面我们已经说过了flume的简单入门,这篇文章继续深入,来熟悉下source,并通过自定义 source 来了解其工作原理,接下来的一系列文章都会以flume...

    solve
  • Spark系列 —— 各类算子详解(一)

    本文主要是一篇总结性文章, 将列举绝大部分的 Spark Transformation算子及其使用方法 和一些使用场景。

    solve
  • Java算法--堆排序

    solve
  • HDU 3032 Nim or not Nim?(Multi-Nim)

    Problem Description Nim is a two-player mathematic game of strategy in which ...

    attack
  • Codeforces 626E Simple Skewness(暴力枚举+二分)

    E. Simple Skewness time limit per test:3 seconds memory limit per test:256 megab...

    Angel_Kitty
  • hdu 2473 Junk-Mail Filter (并查集之点的删除)

    Junk-Mail Filter Time Limit: 15000/8000 MS (Java/Others)    Memory Limit: 32768/...

    Gxjun
  • 【Codeforces】1213B - Bad Prices

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    喜欢ctrl的cxk
  • POJ-1276-Cash Machine(多重背包)

    Cash Machine Time Limit: 1000MS Memory Limit: 10000K Total Submissions:...

    ShenduCC
  • 多目标化对解决单目标问题效益的实证研究(CS NE)

    在处理连续的单目标问题时,多模式是全局优化的最大困难之一。 局部最优往往会阻止算法取得进展,从而构成严重威胁。 在本文中,通过考虑其他目标来分析单目标优化如何从...

    用户7454091
  • CodeForces 666B World Tour(spfa+枚举)

    B. World Tour time limit per test 5 seconds memory limit per test 512 mega...

    ShenduCC

扫码关注云+社区

领取腾讯云代金券