本文接上一篇 Spark系列 —— 各类算子详解(一) 这篇主要来讲讲 Action 算子 以及 Cache 算子。
Spark 的执行算子,一个 Action算子 会触发一次 job 的生成。 这里需要注意的是, Action 算子要么没有返回值, 如果有返回值,那么这个值是会被拉取到driver端的, 如果数据过大,你就得考虑下你的driver端是否装的下了...
reduceByKey
local filesystem
或者 HDFS
或者 any other Hadoop-supported file system
saveAsTextFile
,不过格式是 Sequence
Sequence 这是一种特殊的压缩格式。SparkContext.objectFile()
.
同上,不过一把用做保存 RDD 的里面的数据是 object 类型的 数据,
这样加载的时候可以直接转换成对应的对象foreach()
may result in undefined behavior. See Understanding closures for more details.
遍历RDD的每一条数据。
func : 遍历数据的逻辑。treeReduce
的作用。
平时我们如果 reduceByKey
之后再 collectAsMap
将数据啦回Driver端,
会有一个问题,如果分区数很多,那么每个分区的数据都要啦回 Driver,
然后再进行聚合,Driver是单机的,所以很容易在数据拉回来之后直接把就OOM了,
就算不OOM,单机做聚合的效率也肯定是比较慢的。
所以这时候我们就应该用 treeReduce
,
他会对我们的数据进行进行若干次 reduceByKey
操作,并慢慢减少分区数,
假设我们现在有 100 个分区,设置 depth = 3;那么他会
以 100/3 = 33(向下取整) 分区进行一次 reduceByKey
,
再以 33/3 = 11(向下取整) 分区进行一次 reduceByKey
,
再以 11/3 = 3(向下取整) 分区进行一次 reduceByKey
,
这个时候才会将聚合好的数据拉回到Driver 端完成聚合。控制类算子,也就是我们常说的 缓存类算子
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1)
persist(MEMORY_ONLY)
,
没什么好说的,一般缓存用这个就好了....