前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >1 Spark入门各种map的操作,java语言

1 Spark入门各种map的操作,java语言

作者头像
天涯泪小武
发布2019-01-17 12:01:04
7110
发布2019-01-17 12:01:04
举报
文章被收录于专栏:SpringCloud专栏

Spark基本操作主要就是各种map、reduce,这一篇从各种map开始。由于scala不熟悉,而且语法太精简,虽然代码量少了,但是可读性差了不少,就还是用Java来操作。

直接开始上代码了,注意,如果只是本地测试spark的各种api的使用,是不需要下载安装任何spark、Hadoop的。直接引入maven依赖就可以了。

新建一个java的maven项目,pom中引入spark的依赖。

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tianyalei</groupId>
    <artifactId>spark_learning</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <scala.version>2.11.8</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.3.0</spark.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>
</project>

这里有spark-sql、spark机器学习、spark-hive等的依赖,目前是用不上的。

1 简单map

map(function) 

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

注意,map是一对一的。

代码语言:javascript
复制
package map;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
 * @author wuweifeng wrote on 2018/4/10.
 */
public class SimpleMap {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
        //计算所有元素的和
        System.out.println(originRDD.reduce((a, b) -> a + b));

        //******************map的使用***************//
        //将原始元素每个都乘以2
        JavaRDD<Integer> doubleRDD = originRDD.map(s -> s * 2);
        //将RDD收集起来,组成list
        List<Integer> doubleData = doubleRDD.collect();
        System.out.println(doubleData);
        int total = doubleRDD.reduce((a, b) -> a + b);
        System.out.println(total);

        //使用map将key变成key-value,添加value
        List<String> list = Arrays.asList("af", "bbbb", "c", "d", "e");
        JavaRDD<String> stringRDD = javaSparkContext.parallelize(list);
        //转为key-value形式
        JavaPairRDD pairRDD = stringRDD.mapToPair(k -> new Tuple2<>(k, 1));
        List list1 = pairRDD.collect();
        //[(af,1), (bbbb,1), (c,1), (d,1), (e,1)]
        System.out.println(list1);

        //转为key-value,value不变,修改key
        JavaPairRDD valueRDD = stringRDD.mapToPair(k -> new Tuple2<>(k.length(), k));
        List list2 = valueRDD.collect();
        //[(2, af),(4, bbbb),(1, c),(1, d),(1, e)]
        System.out.println(list2);
        //mapValues
        List list3 = valueRDD.mapValues(s -> s + "_tail").collect();
        //[(2,af_tail), (4,bbbb_tail), (1,c_tail), (1,d_tail), (1,e_tail)]
        System.out.println(list3);


    }
}

这里面注释比较完整了,有简单的将每个元素乘以2的,有将每个元素转成key-value的,有修改key-value的key或者value的。

2 MapPartition分区map

代码语言:javascript
复制
package map;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.SparkSession;

import java.util.*;

/**
 * mapPartition
 * @author wuweifeng wrote on 2018/4/10.
 */
public class SimpleMapPartition {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

        //使用map将key变成key-value,添加value
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        //分为2个分区
        JavaRDD<Integer> stringRDD = javaSparkContext.parallelize(list, 2);
        //与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。
        // 如果在map过程中需要频繁创建额外的对象,(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),
        // 则mapPartitions效率比map高的多。
        JavaRDD rdd = stringRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {

            @Override
            public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
                int sum = 0;
                while (integerIterator.hasNext()) {{
                    sum += integerIterator.next();
                }}
                List<Integer> list1 = new LinkedList<>();
                list1.add(sum);
                return list1.iterator();
            }
        });

        List list1 = rdd.collect();
        //[3, 12]
        System.out.println(list1);


    }
}

将List分区然后map。这里将1,2,3,4,5分为2个区,然后对每个分区进行累加。

结果是1+2,3+4+5.如果是分3个区,则是1,2+3,4+5.

3 FlatMap

代码语言:javascript
复制
package map;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

/**
 * @author wuweifeng wrote on 2018/4/10.
 */
public class SimpleFlatMap {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<String> data = Arrays.asList("hello world", "java spark", "hello spark");
        JavaRDD<String> originRDD = javaSparkContext.parallelize(data);

        //flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
        //RDD经过map后元素数量不变,经过flatmap后,一个元素可以变成多个元素
        JavaRDD<String> flatMap = originRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());

        System.out.println(flatMap.collect());
    }
}    

FlatMap则是将每个元素变成多个元素,像上面例子,最终结果["hello", "world", "java" ^^^^^^]

因为将每个元素按空格split了,最终再集合起来。

4 mapDouble

代码语言:javascript
复制
package map;

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

/**
 * @author wuweifeng wrote on 2018/4/12.
 */
public class SimpleMapDouble {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

        List<Integer> random = Collections.unmodifiableList(
                new Random()
                        .ints(-100, 101).limit(100000000)
                        .boxed()
                        .collect(Collectors.toList())
        );

        //Add more than 100'000'000 of random integers using the Java 8 library for Stream and split them in 10 slides.
        JavaRDD<Integer> rdd = javaSparkContext.parallelize(random, 10);

        //The next step will compute for all the members the power 2 and then generate the JavaDoubleRDD with the statistics.
        JavaDoubleRDD result = rdd.mapToDouble(x -> (double) x * x);

        //Print the statistics for the given DoubleRDD: count, mean stdev, max and min
        System.out.println(result.stats().toString());

    }
}

这个则是将各元素计算平方,并转为double,最终打印结果的中位数、最大值、最小值、平均值等。可见JavaDoubleRDD用来做统计计算比较方便。

这就是map的几个最常用用法

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018年04月12日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 简单map
  • 2 MapPartition分区map
  • 3 FlatMap
  • 4 mapDouble
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档