在RDD中,通常就代表和包含了Spark应用程序的输入源数据。 当我们,在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行transformation(转换)操作,来获取其他的RDD。 Spark Core为我们提供了三种创建RDD的方式,包括:
如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext中的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。即:集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以采用并行的方式来操作这个分布式数据集合。
// 并行化创建RDD部分代码
// 实现1到5的累加求和
val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
在调用parallelize()方法时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2-4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如,parallelize(arr, 10)
Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。
// 实现文件字数统计
// textFile()方法中,输入本地文件路径或是HDFS路径
// HDFS:hdfs://spark1:9000/data.txt
// local:/home/hadoop/data.txt
val rdd = sc.textFile(“/home/hadoop/data.txt”)
val wordCount = rdd.map(line => line.length).reduce(_ + _)
通过本地文件或HDFS创建RDD的几个注意点 1、如果是针对本地文件的话: * 如果是在Windows上进行本地测试,windows上有一份文件即可; * 如果是在Spark集群上针对Linux本地文件,那么需要将文件拷贝到所有worker节点上(就是在spark-submit上使用--master指定了master节点,使用standlone模式进行运行,而textFile()方法内仍然使用的是Linux本地文件,在这种情况下,是需要将文件拷贝到所有worker节点上的); 2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建 3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少
SparkContext的textFile()除了可以针对上述几种普通的文件创建RDD之外,还有一些特例的方法来创建RDD:
本文转自:https://blog.csdn.net/lemonZhaoTao/article/details/77923337