前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark篇】---Spark中Action算子

【Spark篇】---Spark中Action算子

作者头像
LhWorld哥陪你聊算法
发布2018-09-13 14:09:18
9480
发布2018-09-13 14:09:18
举报

一、前述

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。

二、具体

 原始数据集:

  1、count

返回数据集中的元素数会在结果计算完成后回收到Driver端。返回行数

代码语言:javascript
复制
package com.spark.spark.actions;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * count
 * 返回结果集中的元素数,会将结果回收到Driver端。
 *
 */
public class Operator_count {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("collect");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> lines = jsc.textFile("./words.txt");
        long count = lines.count();
        System.out.println(count);
        jsc.stop();
    }
}

 结果:返回行数即元素数

2、take(n)

       first=take(1) 返回数据集中的第一个元素

      返回一个包含数据集前n个元素的集合。是一个(array)有几个partiotion 会有几个job触发

代码语言:javascript
复制
package com.spark.spark.actions;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
 * take
 * 
 * @author root
 *
 */
public class Operator_takeAndFirst {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("take");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
 
        JavaRDD<String> parallelize = jsc.parallelize(Arrays.asList("a","b","c","d"));
        List<String> take = parallelize.take(2);
        String first = parallelize.first();
        for(String s:take){
            System.out.println(s);
        }
        jsc.stop();
    }
}

结果:

3、foreach

      循环遍历数据集中的每个元素,运行相应的逻辑。

4、collect

     将计算结果回收到Driver端。当数据量很大时就不要回收了,会造成oom.

     一般在使用过滤算子或者一些能返回少量数据集的算子后

代码语言:javascript
复制
package com.spark.spark.actions;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

/**
 * collect 
 * 将计算的结果作为集合拉回到driver端,一般在使用过滤算子或者一些能返回少量数据集的算子后,将结果回收到Driver端打印显示。
 *
 */
public class Operator_collect {
    public static void main(String[] args) {
        /**
         * SparkConf对象中主要设置Spark运行的环境参数。
         * 1.运行模式
         * 2.设置Application name
         * 3.运行的资源需求
         */
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("collect");
        /**
         * JavaSparkContext对象是spark运行的上下文,是通往集群的唯一通道。
         */
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> lines = jsc.textFile("./words.txt");
        JavaRDD<String> resultRDD = lines.filter(new Function<String, Boolean>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Boolean call(String line) throws Exception {
                return !line.contains("hadoop");
            }
            
        });
        List<String> collect = resultRDD.collect();
        for(String s :collect){
            System.out.println(s);
        }
        
        jsc.stop();
    }
}

结果:

  • countByKey

作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。(也就是个数)

java代码:

代码语言:javascript
复制
package com.spark.spark.actions;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;
/**
 * countByKey
 * 
 * 作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。返回一个Map<K,Object>
 * @author root
 *
 */
public class Operator_countByKey {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("countByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                new Tuple2<Integer,String>(1,"a"),
                new Tuple2<Integer,String>(2,"b"),
                new Tuple2<Integer,String>(3,"c"),
                new Tuple2<Integer,String>(4,"d"),
                new Tuple2<Integer,String>(4,"e")
        ));
        
        Map<Integer, Object> countByKey = parallelizePairs.countByKey();
        for(Entry<Integer,Object>  entry : countByKey.entrySet()){
            System.out.println("key:"+entry.getKey()+"value:"+entry.getValue());
        }
        
        
    }
}

结果:

  • countByValue

根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

java代码:

代码语言:javascript
复制
package com.spark.spark.actions;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;
/**
 * countByValue
 * 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
 * 
 * @author root
 *
 */
public class Operator_countByValue {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("countByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                new Tuple2<Integer,String>(1,"a"),
                new Tuple2<Integer,String>(2,"b"),
                new Tuple2<Integer,String>(2,"c"),
                new Tuple2<Integer,String>(3,"c"),
                new Tuple2<Integer,String>(4,"d"),
                new Tuple2<Integer,String>(4,"d")
        ));
        
        Map<Tuple2<Integer, String>, Long> countByValue = parallelizePairs.countByValue();
        
        for(Entry<Tuple2<Integer, String>, Long> entry : countByValue.entrySet()){
            System.out.println("key:"+entry.getKey()+",value:"+entry.getValue());
        }
    }
}

 scala代码:

代码语言:javascript
复制
package com.bjsxt.spark.actions

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * countByValue
 * 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
 */
object Operator_countByValue {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("countByValue")
    val sc = new SparkContext(conf)
     val rdd1 = sc.makeRDD(List("a","a","b"))
    val rdd2 = rdd1.countByValue()
    rdd2.foreach(println)
    sc.stop()
  }
}

 代码结果:

java:

scala:

  • reduce

       根据聚合逻辑聚合数据集中的每个元素。(reduce里面需要具体的逻辑,根据里面的逻辑对相同分区的数据进行计算)

java代码:

代码语言:javascript
复制
package com.spark.spark.actions;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
/**
 * reduce
 * 
 * 根据聚合逻辑聚合数据集中的每个元素。
 * @author root
 *
 */
public class Operator_reduce {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("reduce");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3,4,5));
        
        Integer reduceResult = parallelize.reduce(new Function2<Integer, Integer, Integer>() {
            
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        System.out.println(reduceResult);
        sc.stop();
    }
}

scala代码:

代码语言:javascript
复制
package com.bjsxt.spark.actions

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * reduce
 * 
 * 根据聚合逻辑聚合数据集中的每个元素。
 */
object Operator_reduce {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("reduce")
    
    val sc = new SparkContext(conf)
    val rdd1 = sc.makeRDD(Array(1,2))
    
    val result = rdd1.reduce(_+_)
    
    println(result)
    sc.stop()
  }
}

 结果:

java:

scala:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-02-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档