Spark 会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是形成一个 RDD。也就是说,集合中的部分数据会到一个节点上,另一部分的数据会到其他节点上,这样就可以用并行的方式来操作分布式数据集合。
① parallelize()
和 makeRDD()
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize()
和 makeRDD()
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(List(1,2,3,4))
val rdd2 = sparkContext.makeRDD(List(1,2,3,4))
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法。
② parallelize()
的 partition 数量
1、Spark 默认会根据集群的情况来设置 partition 的数量,也可以在调用 parallelize 方法时,传入第二个参数,来设置 RDD 的 partition 数量,比如 parallelize(arr,10)
。
2、Spark 会为每一个 partition 运行一个 task 来进行处理,通过 WebUI 可以查看。
Spark 支持使用任何 Hadoop 所支持的存储系统上的文件创建 RDD,例如 HDFS、HBase 等文件。通过 调用 SparkContext 的 textFile()
方法,可以针对本地文件或 HDFS 文件创建 RDD。通过读取文件来创建 RDD,文件中的每一行就是 RDD 中的一个元素。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
将会在 RDD 的转换中讲解。