前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >创建RDD(集合,本地文件,HDFS文件)

创建RDD(集合,本地文件,HDFS文件)

作者头像
编程那点事
发布2023-02-25 15:32:26
8340
发布2023-02-25 15:32:26
举报
文章被收录于专栏:java编程那点事

进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。

Spark Core提供了三种创建RDD的方式,包括:使用程序中的集合创建RDD;使用本地文件创建RDD;使用HDFS文件创建RDD。

1、使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。 2、使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。 3、使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。

并行化集合创建RDD 如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。

// 案例:1到10累加求和

代码语言:javascript
复制
val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)

调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如parallelize(arr, 10) 案例: Java

代码语言:javascript
复制
/**
* 并行化创建RDD
* @author zhang
*
*/
public class ParallelizeCollection {

​public static void main(String[] args) {

//创建sparkConf
SparkConf conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local");

​​//创建javasparkcontext
JavaSparkContext sc = new JavaSparkContext(conf);

//要通过并行化集合的方式创建RDD,那么调用sparkContext以及子类的parallelize()方法
​​List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

//我们执行reduce算子操作,相当于先进行1+2 =3;3+3=6
​​int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {

​​​private static final long serialVersionUID = 1L;

​​​@Override
​​​public Integer call(Integer v1, Integer v2) throws Exception {
​​​​// TODO Auto-generated method stub
​​​​return v1 + v2;
​​​}
​​});

​​System.out.println("1-10的累计和=" + sum);

sc.close();
​}
}

Scala版本:

代码语言:javascript
复制
object parallelizeCollection {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("parallelizeCollection").setMaster("local")

val sc = new SparkContext(conf)  
val numbers = Array(1,2,3,4,5,6,7,8,9,10)  
val numberRDD = sc.parallelize(numbers, 5)
val sum = numberRDD.reduce(_ + _)
println("1到10的累计和="+sum)
}
}

使用本地文件和HDFS创建RDD Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。

有几个事项是需要注意的: 1、如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。 2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。 3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。

// 案例:文件字数统计

代码语言:javascript
复制
val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)

案例:统计单词个数 Java版本 创建一个文件LocalFile.java

代码语言:javascript
复制
/**
* 使用本地文件创建RDD
* 案例:统计文本字数
* @author zhang
*
*/
public class LocalFile {

​public static void main(String[] args) {

​​//1获取sparkConf
​​SparkConf conf = new SparkConf().setAppName("LocalFile").setMaster("local");

//2创建javaSparkContext
​​JavaSparkContext sc = new JavaSparkContext(conf);

​​//使用sparkContext以及其子类的textFile()方法,准对本地文件创建RDD
​​JavaRDD<String> lines = sc.textFile("C:\\Users\\zhang\\Desktop\\spark.txt");

JavaRDD<Integer> lineLength =​lines.map(new Function<String, Integer>() {

​​​private static final long serialVersionUID = 1L;

​​​@Override
​​​public Integer call(String v1) throws Exception {
​​​​// TODO Auto-generated method stub
​​​​return v1.length();
​​​}
​​});

int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {

@Override
​​public Integer call(Integer v1, Integer v2) throws Exception {
​​​// TODO Auto-generated method stub
​​​return v1 + v2;
​​}
​});

System.out.println(count);
​} 
}

Scala 版本:

代码语言:javascript
复制
object LocalFile {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("LocalFile").setMaster("local")

val sc = new SparkContext(conf)

val lines = sc.textFile("C:\\Users\\zhang\\Desktop\\spark.txt", 1)

val count = lines.map { line => line.length() }.reduce(_ + _)

 println(count)
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-02-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档