专栏首页LhWorld哥陪你聊算法【Spark篇】---Spark中Action算子

【Spark篇】---Spark中Action算子

一、前述

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是我们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。

二、具体

 原始数据集:

  1、count

返回数据集中的元素数会在结果计算完成后回收到Driver端。返回行数

package com.spark.spark.actions;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * count
 * 返回结果集中的元素数,会将结果回收到Driver端。
 *
 */
public class Operator_count {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("collect");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> lines = jsc.textFile("./words.txt");
        long count = lines.count();
        System.out.println(count);
        jsc.stop();
    }
}

 结果:返回行数即元素数

2、take(n)

       first=take(1) 返回数据集中的第一个元素

      返回一个包含数据集前n个元素的集合。是一个(array)有几个partiotion 会有几个job触发

package com.spark.spark.actions;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
 * take
 * 
 * @author root
 *
 */
public class Operator_takeAndFirst {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("take");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
 
        JavaRDD<String> parallelize = jsc.parallelize(Arrays.asList("a","b","c","d"));
        List<String> take = parallelize.take(2);
        String first = parallelize.first();
        for(String s:take){
            System.out.println(s);
        }
        jsc.stop();
    }
}

结果:

3、foreach

      循环遍历数据集中的每个元素,运行相应的逻辑。

4、collect

     将计算结果回收到Driver端。当数据量很大时就不要回收了,会造成oom.

     一般在使用过滤算子或者一些能返回少量数据集的算子后

package com.spark.spark.actions;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

/**
 * collect 
 * 将计算的结果作为集合拉回到driver端,一般在使用过滤算子或者一些能返回少量数据集的算子后,将结果回收到Driver端打印显示。
 *
 */
public class Operator_collect {
    public static void main(String[] args) {
        /**
         * SparkConf对象中主要设置Spark运行的环境参数。
         * 1.运行模式
         * 2.设置Application name
         * 3.运行的资源需求
         */
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("collect");
        /**
         * JavaSparkContext对象是spark运行的上下文,是通往集群的唯一通道。
         */
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> lines = jsc.textFile("./words.txt");
        JavaRDD<String> resultRDD = lines.filter(new Function<String, Boolean>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Boolean call(String line) throws Exception {
                return !line.contains("hadoop");
            }
            
        });
        List<String> collect = resultRDD.collect();
        for(String s :collect){
            System.out.println(s);
        }
        
        jsc.stop();
    }
}

结果:

  • countByKey

作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。(也就是个数)

java代码:

package com.spark.spark.actions;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;
/**
 * countByKey
 * 
 * 作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。返回一个Map<K,Object>
 * @author root
 *
 */
public class Operator_countByKey {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("countByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                new Tuple2<Integer,String>(1,"a"),
                new Tuple2<Integer,String>(2,"b"),
                new Tuple2<Integer,String>(3,"c"),
                new Tuple2<Integer,String>(4,"d"),
                new Tuple2<Integer,String>(4,"e")
        ));
        
        Map<Integer, Object> countByKey = parallelizePairs.countByKey();
        for(Entry<Integer,Object>  entry : countByKey.entrySet()){
            System.out.println("key:"+entry.getKey()+"value:"+entry.getValue());
        }
        
        
    }
}

结果:

  • countByValue

根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

java代码:

package com.spark.spark.actions;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;
/**
 * countByValue
 * 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
 * 
 * @author root
 *
 */
public class Operator_countByValue {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("countByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                new Tuple2<Integer,String>(1,"a"),
                new Tuple2<Integer,String>(2,"b"),
                new Tuple2<Integer,String>(2,"c"),
                new Tuple2<Integer,String>(3,"c"),
                new Tuple2<Integer,String>(4,"d"),
                new Tuple2<Integer,String>(4,"d")
        ));
        
        Map<Tuple2<Integer, String>, Long> countByValue = parallelizePairs.countByValue();
        
        for(Entry<Tuple2<Integer, String>, Long> entry : countByValue.entrySet()){
            System.out.println("key:"+entry.getKey()+",value:"+entry.getValue());
        }
    }
}

 scala代码:

package com.bjsxt.spark.actions

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * countByValue
 * 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
 */
object Operator_countByValue {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("countByValue")
    val sc = new SparkContext(conf)
     val rdd1 = sc.makeRDD(List("a","a","b"))
    val rdd2 = rdd1.countByValue()
    rdd2.foreach(println)
    sc.stop()
  }
}

 代码结果:

java:

scala:

  • reduce

       根据聚合逻辑聚合数据集中的每个元素。(reduce里面需要具体的逻辑,根据里面的逻辑对相同分区的数据进行计算)

java代码:

package com.spark.spark.actions;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
/**
 * reduce
 * 
 * 根据聚合逻辑聚合数据集中的每个元素。
 * @author root
 *
 */
public class Operator_reduce {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("reduce");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3,4,5));
        
        Integer reduceResult = parallelize.reduce(new Function2<Integer, Integer, Integer>() {
            
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        System.out.println(reduceResult);
        sc.stop();
    }
}

scala代码:

package com.bjsxt.spark.actions

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * reduce
 * 
 * 根据聚合逻辑聚合数据集中的每个元素。
 */
object Operator_reduce {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("reduce")
    
    val sc = new SparkContext(conf)
    val rdd1 = sc.makeRDD(Array(1,2))
    
    val result = rdd1.reduce(_+_)
    
    println(result)
    sc.stop()
  }
}

 结果:

java:

scala:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【Spark篇】---SparkStreaming算子操作transform和updateStateByKey

    今天分享一篇SparkStreaming常用的算子transform和updateStateByKey。

    LhWorld哥陪你聊算法
  • 【Spark篇】---SparkStream初始与应用

    SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flum...

    LhWorld哥陪你聊算法
  • 【Spark篇】---Spark中Transformations转换算子

    Spark中默认有两大类算子,Transformation(转换算子),懒执行。action算子,立即执行,有一个action算子 ,就有一个job。

    LhWorld哥陪你聊算法
  • Java爬虫

    jsoup-1.7.3.jar 个人认为爬虫的实现机制: 获取Docume对象—>获取节点—>输出或者持久化

    用户1518699
  • Spark 第一个Spark程序WordCount

    使用上述命令打包后,会在项目根目录下的target目录生成jar包。打完jar包后,我们可以使用spark-submit提交任务:

    smartsi
  • Spark入门第一步:WordCount之java版、Scala版

    趣学程序-shaofeer
  • springboot+mybatis+dubbo+aop日志第三篇

    AOP称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,比如日志,事务,权限等等。

    写代码的猿
  • JSP上传文件与导出Excel表

    我们可以通过Apache的fileupload组件来实现jsp上传文件,这样就不需要自己去写具体的实现了,首先需要配置如下依赖:

    端碗吹水
  • Java---基于TCP协议的相互即时通讯小程序

    这是几年前,新浪的一个面试题~要求是3天之内实现~ 通过TCP 协议,建立一个服务器端。

    谙忆
  • 重启或关闭 Linux 系统的 6 个终端命令

    Linux 系统在重启或关闭之前,会通知所有已登录的用户和进程。如果在命令中加入了时间参数,系统还将拒绝新的用户登入请求。

    苏易北

扫码关注云+社区

领取腾讯云代金券