前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Action操作开发实战

Action操作开发实战

作者头像
编程那点事
发布2023-02-25 15:43:54
2350
发布2023-02-25 15:43:54
举报
文章被收录于专栏:java编程那点事

1.Reduce 2.collect 3.count 4.take 5.saveAsTextTile 6.countByKey 7.foreach

Reduce案例:

代码语言:javascript
复制
private static void reduce() {
// 创建SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

​​// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
​​List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

​​// 使用reduce操作对集合中的数字进行累加
​​// reduce操作的原理:
​​​// 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3
​​​// 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6
// 以此类推
​​// 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素
​​int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
​​​public Integer call(Integer v1, Integer v2) throws Exception {

  ​​​​return v1 + v2;
​​​}
​​});

System.out.println(sum);  

​​// 关闭JavaSparkContext
​​sc.close();
​}

Scala版本

代码语言:javascript
复制
def reduce(){
   val conf = new SparkConf().setAppName("reduce").setMaster("local")

val sc = new SparkContext(conf)

val numberArray = Array(1,2,3,4,5,6,7,8,9,10)
val numbers = sc.parallelize(numberArray, 1)
val sum = numbers.reduce(_ + _)
println(sum)
}

Collect java版本

代码语言:javascript
复制
​private static void collect(){

// 创建SparkConf和JavaSparkContext
​​SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
​​JavaSparkContext sc = new JavaSparkContext(conf);

​​// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
​​List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
​​JavaRDD<Integer> numbers =   sc.parallelize(numberList);

//s使用map操作将集合中所有数字乘以2
​​JavaRDD<Integer> doubleNumbers = numbers.map(

new Function<Integer, Integer>() {

​​​​​private static final long serialVersionUID = 1L;

@Override
​​​​​public Integer call(Integer v1) throws Exception {

​​​​​​return v1 * 2;
​​​​​}
​​​​});

​​// 不用foreach action操作,在远程集群上遍历rdd中的元素
​​​​// 而使用collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地
// 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条
​​​​​// 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地
​​​​​// 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出
// 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理
​​​​List<Integer> doubleNumberList = doubleNumbers.collect();
​​​​for(Integer num : doubleNumberList) {
​​​​​System.out.println(num);  
​​​​}

// 关闭JavaSparkContext
​​​​sc.close();
}

Scala版本

代码语言:javascript
复制
def collect(){

val conf = new SparkConf().setAppName("collect").setMaster("local")
val sc = new SparkContext(conf)

val numberArray = Array(1,2,3,4,5,6,7,8,9,10)
val numbers = sc.parallelize(numberArray, 1)
val doubleNumbers = numbers.map { num => num * 2 }
val doubleNumberArray = doubleNumbers.collect()

for(num <- doubleNumberArray){
  println(num)
}
}

Count java版本

代码语言:javascript
复制
private static void count() {
​​// 创建SparkConf和JavaSparkContext
SparkConf conf = new SparkConf()​​​​.setAppName("count")​​​​.setMaster("local");  
​​JavaSparkContext sc = new JavaSparkContext(conf);

​​// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
​​List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 对rdd使用count操作,统计它有多少个元素
long count = numbers.count();
System.out.println(count);  
​​// 关闭JavaSparkContext
sc.close();
}

Scala版本

代码语言:javascript
复制
 def count(){

val conf = new SparkConf().setAppName("collect").setMaster("local")
val sc = new SparkContext(conf)

val numberArray = Array(1,2,3,4,5,6,7,8,9,10)
val numbers = sc.parallelize(numberArray, 1)

val count = numbers.count()

println(count)
}

Take操作 java版本

代码语言:javascript
复制
​private static void take() {
​​// 创建SparkConf和JavaSparkContext
SparkConf conf = new SparkConf()​​​​.setAppName("take").setMaster("local");  
JavaSparkContext sc = new JavaSparkContext(conf);

​​// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
​​List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
​​JavaRDD<Integer> numbers = sc.parallelize(numberList);

​​// 对rdd使用count操作,统计它有多少个元素
​​// take操作,与collect类似,也是从远程集群上,获取rdd的数据
​​// 但是collect是获取rdd的所有数据,take只是获取前n个数据
​​List<Integer> top3Numbers = numbers.take(3);
for(Integer num : top3Numbers) {
 ​​​System.out.println(num);  
​​}

// 关闭JavaSparkContext
 ​​sc.close();
}

Scala版本:

代码语言:javascript
复制
def take(){

val conf = new SparkConf().setAppName("take").setMaster("local")
val sc = new SparkContext(conf)
val numberArray = Array(1,2,3,4,5,6,7,8,9,10)
val numbers = sc.parallelize(numberArray, 1)
val top3Numbers = numbers.take(3)

for(num <- top3Numbers){
 println(num)
}
}

saveAsTextFile

java版本

代码语言:javascript
复制
​private static void saveAsTextFile() {
// 创建SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("saveAsTextFile");  
​​JavaSparkContext sc = new JavaSparkContext(conf);

​​// 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
​​JavaRDD<Integer> numbers = sc.parallelize(numberList);

​​// 使用map操作将集合中所有数字乘以2
​​JavaRDD<Integer> doubleNumbers = numbers.map(

​​​​new Function<Integer, Integer>() {

private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public Integer call(Integer v1) throws Exception {
​​​​​​return v1 * 2;
​​​​​}
​​​​});

​​// 直接将rdd中的数据,保存在HFDS文件中
​​// 但是要注意,我们这里只能指定文件夹,也就是目录
​​// 那么实际上,会保存为目录中的/double_number.txt/part-00000文件

doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");  

​​// 关闭JavaSparkContext
​​sc.close();
​}

查看Hadoop fs -ls /double_number.txt 列表 hadoop fs -text /double_number.txt/part-0000

Scala版本

代码语言:javascript
复制
def saveAsTextFile(){
val conf = new SparkConf().setAppName("collect").setMaster("local")
val sc = new SparkContext(conf)

val numberArray = Array(1,2,3,4,5,6,7,8,9,10)
val numbers = sc.parallelize(numberArray, 1)
val doubleNumbers = numbers.map { num => num * 2 }
   doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt")
}

countByKey

java版本

代码语言:javascript
复制
​private static void countByKey() {
​​// 创建SparkConf
SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
​​// 创建JavaSparkContext
​​JavaSparkContext sc = new JavaSparkContext(conf);

​​// 模拟集合
​​List<Tuple2<String, String>> scoreList = Arrays.asList(
​​​​new Tuple2<String, String>("class1", "leo"),
​​​​new Tuple2<String, String>("class2", "jack"),
​​​​new Tuple2<String, String>("class1", "marry"),
​​​​new Tuple2<String, String>("class2", "tom"),
new Tuple2<String, String>("class2", "david"));  

​​// 并行化集合,创建JavaPairRDD
JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);

// 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数
​​// 这就是countByKey的作用
​​// countByKey返回的类型,直接就是Map<String, Object>
Map<String, Object> studentCounts = students.countByKey();
​​for(Map.Entry<String, Object> studentCount : studentCounts.entrySet()) {
​​​System.out.println(studentCount.getKey() + ": " + studentCount.getValue());  
​​}

​​// 关闭JavaSparkContext
sc.close();
​}

Scala版本

代码语言:javascript
复制
def countByKey(){

val conf = new SparkConf().setAppName("countByKey").setMaster("local")
val sc = new SparkContext(conf)

val studentList = Array(Tuple2("class1","leao"),Tuple2("class2", "jack"),
   Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry"))
val student = sc.parallelize(studentList, 1)
val studentCounts = student.countByKey()
studentCounts.foreach(sudent => println(sudent._1 + " 有 " + sudent._2))
println(studentCounts)
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-02-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档