Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Transformation 和 Action 常用算子

Transformation 和 Action 常用算子

作者头像
每天进步一点点
发布于 2022-07-27 02:05:04
发布于 2022-07-27 02:05:04
42300
代码可运行
举报
文章被收录于专栏:IfDataBigIfDataBig
运行总次数:0
代码可运行

一、Transformation

spark 常用的 Transformation 算子如下表:

Transformation 算子

Meaning(含义)

map(func)

对原 RDD 中每个元素运用 func 函数,并生成新的 RDD

filter(func)

对原 RDD 中每个元素使用func 函数进行过滤,并生成新的 RDD

flatMap(func)

与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq )。

mapPartitions(func)

与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator<T> => Iterator<U> ,其中 T 是 RDD 的类型,即 RDD[T]

mapPartitionsWithIndex(func)

与 mapPartitions 类似,但 func 类型为 (Int, Iterator<T>) => Iterator<U> ,其中第一个参数为分区索引

sample(withReplacement, fraction, seed)

数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed);

union(otherDataset)

合并两个 RDD

intersection(otherDataset)

求两个 RDD 的交集

distinct([numTasks]))

去重

groupByKey([numTasks])

按照 key 值进行分区,即在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, Iterable<V>) Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 性能会更好Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传入 numTasks 参数进行修改。

reduceByKey(func, [numTasks])

按照 key 值进行分组,并对分组后的数据执行归约操作。

aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks])

当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey 类似,reduce 任务的数量可通过第二个参数进行配置。

sortByKey([ascending], [numTasks])

按照 key 进行排序,其中的 key 需要实现 Ordered 特质,即可比较

join(otherDataset, [numTasks])

在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin, rightOuterJoin 和 fullOuterJoin 等算子。

cogroup(otherDataset, [numTasks])

在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset。

cartesian(otherDataset)

在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) 类型的 dataset(即笛卡尔积)。

coalesce(numPartitions)

将 RDD 中的分区数减少为 numPartitions。

repartition(numPartitions)

随机重新调整 RDD 中的数据以创建更多或更少的分区,并在它们之间进行平衡。

repartitionAndSortWithinPartitions(partitioner)

根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。

下面分别给出这些算子的基本使用示例:

1.1 map

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(1,2,3)
sc.parallelize(list).map(_ * 10).foreach(println)

// 输出结果:10 20 30 (这里为了节省篇幅去掉了换行,后文亦同)

1.2 filter

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(3, 6, 9, 10, 12, 21)
sc.parallelize(list).filter(_ >= 10).foreach(println)

// 输出:10 12 21

1.3 flatMap

flatMap(func)map 类似,但每一个输入的 item 会被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(List(1, 2), List(3), List(), List(4, 5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)

// 输出结果 :10 20 30 40 50

flatMap 这个算子在日志分析中使用概率非常高,这里进行一下演示:拆分输入的每行数据为单个单词,并赋值为 1,代表出现一次,之后按照单词分组并统计其出现总次数,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val lines = List("spark flume spark",
                 "hadoop flume hive")
sc.parallelize(lines).flatMap(line => line.split(" ")).
map(word=>(word,1)).reduceByKey(_+_).foreach(println)

// 输出:
(spark,2)
(hive,1)
(hadoop,1)
(flume,2)

1.4 mapPartitions

与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator<T> => Iterator<U> (其中 T 是 RDD 的类型),即输入和输出都必须是可迭代类型。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitions(iterator => {
  val buffer = new ListBuffer[Int]
  while (iterator.hasNext) {
    buffer.append(iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
//输出结果
100 200 300 400 500 600

1.5 mapPartitionsWithIndex

与 mapPartitions 类似,但 func 类型为 (Int, Iterator<T>) => Iterator<U> ,其中第一个参数为分区索引。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
  val buffer = new ListBuffer[String]
  while (iterator.hasNext) {
    buffer.append(index + "分区:" + iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
//输出
0 分区:100
0 分区:200
1 分区:300
1 分区:400
2 分区:500
2 分区:600

1.6 sample

数据采样。有三个可选参数:设置是否放回 (withReplacement)、采样的百分比 (fraction)、随机数生成器的种子 (seed) :

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println)

1.7 union

合并两个 RDD:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list1 = List(1, 2, 3)
val list2 = List(4, 5, 6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// 输出: 1 2 3 4 5 6

1.8 intersection

求两个 RDD 的交集:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list1 = List(1, 2, 3, 4, 5)
val list2 = List(4, 5, 6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// 输出:  4 5

1.9 distinct

去重:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(1, 2, 2, 4, 4)
sc.parallelize(list).distinct().foreach(println)
// 输出: 4 1 2

1.10 groupByKey

按照键进行分组:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)

//输出:
(spark,List(3, 5))
(hadoop,List(2, 2))
(storm,List(6))

1.11 reduceByKey

按照键进行归约操作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)

//输出
(spark,8)
(hadoop,4)
(storm,6)

1.12 sortBy & sortByKey

按照键进行排序:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
// 输出
(120,storm)
(90,spark)
(100,hadoop)

按照指定元素进行排序:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
// 输出
(storm,120)
(hadoop,100)
(spark,90)

1.13 join

在一个 (K, V) 和 (K, W) 类型的 Dataset 上调用时,返回一个 (K, (V, W)) 的 Dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin, rightOuterJoinfullOuterJoin 等算子。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)

// 输出
(1,(student01,teacher01))
(3,(student03,teacher03))
(2,(student02,teacher02))

1.14 cogroup

在一个 (K, V) 对的 Dataset 上调用时,返回多个类型为 (K, (Iterable<V>, Iterable<W>)) 的元组所组成的 Dataset。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
val list02 = List((1, "A"), (2, "B"), (3, "E"))
val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)

// 输出:同一个 RDD 中的元素先按照 key 进行分组,然后再对不同 RDD 中的元素按照 key 进行分组
(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))

1.15 cartesian

计算笛卡尔积:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list1 = List("A", "B", "C")
val list2 = List(1, 2, 3)
sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)

//输出笛卡尔积
(A,1)
(A,2)
(A,3)
(B,1)
(B,2)
(B,3)
(C,1)
(C,2)
(C,3)

1.16 aggregateByKey

当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey 类似,reduce 任务的数量可通过第二个参数 numPartitions 进行配置。示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 为了清晰,以下所有参数均使用具名传参
val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
      seqOp = math.max(_, _),
      combOp = _ + _
    ).collect.foreach(println)
//输出结果:
(hadoop,3)
(storm,8)
(spark,7)

这里使用了 numSlices = 2 指定 aggregateByKey 父操作 parallelize 的分区数量为 2,其执行流程如下:

基于同样的执行流程,如果 numSlices = 1,则意味着只有输入一个分区,则其最后一步 combOp 相当于是无效的,执行结果为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(hadoop,3)
(storm,8)
(spark,4)

同样的,如果每个单词对一个分区,即 numSlices = 6,此时相当于求和操作,执行结果为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(hadoop,5)
(storm,14)
(spark,7)

aggregateByKey(zeroValue = 0,numPartitions = 3) 的第二个参数 numPartitions 决定的是输出 RDD 的分区数量,想要验证这个问题,可以对上面代码进行改写,使用 getNumPartitions 方法获取分区数量:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
  seqOp = math.max(_, _),
  combOp = _ + _
).getNumPartitions

二、Action

Spark 常用的 Action 算子如下:

Action(动作)

Meaning(含义)

reduce(func)

使用函数func执行归约操作

collect()

以一个 array 数组的形式返回 dataset 的所有元素,适用于小结果集。

count()

返回 dataset 中元素的个数。

first()

返回 dataset 中的第一个元素,等价于 take(1)。

take(n)

将数据集中的前 n 个元素作为一个 array 数组返回。

takeSample(withReplacement, num, [seed])

对一个 dataset 进行随机抽样

takeOrdered(n, [ordering])

按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。只适用于小结果集,因为所有数据都会被加载到驱动程序的内存中进行排序。

saveAsTextFile(path)

将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。

saveAsSequenceFile(path)

将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作要求 RDD 中的元素需要实现 Hadoop 的 Writable 接口。对于 Scala 语言而言,它可以将 Spark 中的基本数据类型自动隐式转换为对应 Writable 类型。(目前仅支持 Java and Scala)

saveAsObjectFile(path)

使用 Java 序列化后存储,可以使用 SparkContext.objectFile() 进行加载。(目前仅支持 Java and Scala)

countByKey()

计算每个键出现的次数。

foreach(func)

遍历 RDD 中每个元素,并对其执行fun函数

2.1 reduce

使用函数func执行归约操作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 val list = List(1, 2, 3, 4, 5)
sc.parallelize(list).reduce((x, y) => x + y)
sc.parallelize(list).reduce(_ + _)

// 输出 15

2.2 takeOrdered

按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。需要注意的是 takeOrdered 使用隐式参数进行隐式转换,以下为其源码。所以在使用自定义排序时,需要继承 Ordering[T] 实现自定义比较器,然后将其作为隐式参数引入。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  .........
}

自定义规则排序:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 继承 Ordering[T],实现自定义比较器,按照 value 值的长度进行排序
class CustomOrdering extends Ordering[(Int, String)] {
    override def compare(x: (Int, String), y: (Int, String)): Int
    = if (x._2.length > y._2.length) 1 else -1
}

val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
//  引入隐式默认值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list).takeOrdered(5)

// 输出:Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)

2.3 countByKey

计算每个键出现的次数:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).countByKey()

// 输出:Map(hadoop -> 2, storm -> 2, azkaban -> 1)

2.4 saveAsTextFile

将 dataset 中的元素以文本文件的形式写入本地文件系统HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).saveAsTextFile("/usr/file/temp")

参考资料

RDD Programming Guide

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IfDataBig 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark-Core核心算子
接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。
ha_lydms
2023/09/04
2870
Spark-Core核心算子
Spark Core快速入门系列(3) | <Transformation>转换算子
  从一个已知的 RDD 中创建出来一个新的 RDD 例如: map就是一个transformation.
不温卜火
2020/10/28
1.9K0
Spark Core快速入门系列(3) | <Transformation>转换算子
Spark算子总结
由于计算过程是在内存进行,然后spill出来,每到达一个checkpoint就会将内存中的数据写入到磁盘,这个功能就是手动使其到达checkpoint
vincentbbli
2021/08/18
8940
Spark系列 —— 各类算子详解(一)
本文主要是一篇总结性文章, 将列举绝大部分的 Spark Transformation算子及其使用方法 和一些使用场景。
solve
2019/10/30
1K0
Spark常用的算子以及Scala函数总结
上海站 | 高性能计算之GPU CUDA培训 4月13-15日 三天密集式学习 快速带你晋级 阅读全文 > 正文共11264个字,7张图,预计阅读时间28分钟。 Spark与Scala 首先,介绍一
用户1332428
2018/04/17
1.9K0
Spark常用的算子以及Scala函数总结
Spark之【RDD编程】详细讲解(No2)——《Transformation转换算子》
本篇博客是Spark之【RDD编程】系列第二篇,为大家带来的是RDD的转换的内容。
大数据梦想家
2021/01/27
2K0
Spark之【RDD编程】详细讲解(No2)——《Transformation转换算子》
BigData--大数据分析引擎Spark
(1)zeroValue:给每一个分区中的每一个key一个初始值; (2)seqOp:函数用于在每一个分区中用初始值逐步迭代value; (3)combOp:函数用于合并每个分区中的结果。
MiChong
2020/09/24
9660
BigData--大数据分析引擎Spark
【Spark常用算子合集】一文搞定spark中的常用转换与行动算子
转换算子是spark中的一种操作,用于从一个RDD转换成另一个RDD,它可以被用来创建新的RDD,也可以被用来转换已有的RDD。它们提供了一种通用的方法来完成RDD的转换,如map、filter、groupByKey等。
大数据小禅
2023/01/10
2.4K0
【Spark常用算子合集】一文搞定spark中的常用转换与行动算子
2018-11-17 Spark算子练习常用Transformation(即转换,延迟加载)通过并行化scala集合创建RDD查看该rdd的分区数量union求并集,注意类型要一致intersecti
###################################################################################################
Albert陈凯
2018/12/14
6520
2021年大数据Spark(十五):Spark Core的RDD常用算子
RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。
Lansonli
2021/10/09
8430
Spark常用Transformations算子(一)
介绍以下Transformations算子: map flatMap mapPartitions mapPartitionsWithIndex filter sample union intersection sortBy sortByKey groupByKey reduceByKey distinct coalesce repartition
CoderJed
2018/09/13
4850
Spark常用Transformations算子(一)
Transformation转换算子之Key-Value类型
依样画葫芦娃 我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner
用户1483438
2022/04/27
7240
Action行动算子
在spark中,有两种算子,Transformation转换算子和 Action行动算子。Transformation转换算子在整个job任务中,都是一个懒加载,只有真正执行了 Action行动算子的时候,整个job任务才会得到正在的运行。 可以把Transformation转换算子理解成工厂中的流水线, Action行动算子相当于总闸,只有拉下总闸,整条流水线便开始了运行。
用户1483438
2022/05/09
9190
SparkCore快速入门系列(5)
铁铁们,博主前段时间在做一些项目加上找工作所以到现在才更新,(__) 嘻嘻…… 博主现在已经工作啦,后期会给你们更新一些关于数据库以及报表开发的文章哦! 接下来言归正传!!!!!!
刘浩的BigDataPath
2021/04/13
3710
SparkCore快速入门系列(5)
Spark Core 学习笔记
1:Spark Core:内核,也是Spark中最重要的部分,相当于Mapreduce                 SparkCore 和 Mapreduce都是进行离线数据分析                 SparkCore的核心:RDD(弹性分布式数据集),由分区组成 2:Spark Sql:相当于Hive                 支持Sql和DSL语句 -》Spark任务(RDD)-》运行
曼路
2018/10/18
2.2K0
大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor
  我们需要一个效率非常快,且能够支持迭代计算和有效数据共享的模型,Spark 应运而生。RDD 是基于工作集的工作模式,更多的是面向工作流。   但是无论是 MR 还是 RDD 都应该具有类似位置感知、容错和负载均衡等特性。
黑泽君
2019/05/10
2.5K0
大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor
Transformation转换算子之Value类型
思考一个问题?map算子并没有指定分区,但是却是还是4个分区? 首先 map的数据来源于rdd1;rdd1指定了分区。
用户1483438
2022/04/27
5920
Spark Core入门2【RDD的实质与RDD编程API】
所以RDD不过是对一个函数的封装,当一个函数对数据处理完成后,我们就得到一个RDD的数据集(是一个虚拟的,后续会解释)。
Java架构师必看
2021/05/14
1.1K0
Spark Core入门2【RDD的实质与RDD编程API】
Spark2.x学习笔记:7、Spark应用程序设计
本文介绍了Spark编程的一些基础概念和常用操作,包括RDD、DataFrame、DataSet、Transformations、Actions、Spark Streaming、GraphX和Machine Learning。同时,文章还探讨了Spark在不同领域的应用,包括互联网广告、推荐系统、数据挖掘和自然语言处理等。文章还介绍了Spark的生态系统,包括Spark SQL、MLlib、GraphX和Structured Streaming。
程裕强
2018/01/02
1.1K0
Spark2.x学习笔记:7、Spark应用程序设计
Spark的常用算子大总结
作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 2. 需求:创建一个1-10数组的RDD,将所有元素2形成新的RDD (1)创建 scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :24 (2)打印 scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)将所有元素2 scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at :26 (4)打印最终结果 scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
Maynor
2021/12/07
4620
推荐阅读
相关推荐
Spark-Core核心算子
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验