前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >4 spark入门键值对聚合操作combineByKey

4 spark入门键值对聚合操作combineByKey

作者头像
天涯泪小武
发布2019-01-17 12:00:45
1.7K0
发布2019-01-17 12:00:45
举报
文章被收录于专栏:SpringCloud专栏SpringCloud专栏

combineByKey是spark中一个核心的高级函数,其他多个键值对函数都是用它来实现的,如groupByKey,reduceByKey等等。

这是combineByKey的方法。可以看到主要有三个参数,后面还有分区等参数就不管了。主要来看前三个参数,分别是

createCombiner,mergeValue,mergeCombiners,参数类型是JFunction(接收一个参数,返回另一个类型的值),JFunction2(接收两个参数,返回另一个类型的值),JFunction2。

对一个PairRDD<K, V>做combineByKey操作的流程是这样:

  1. createCombiner[V, C] 将当前的值V作为参数,然后对其进行一些操作或者类型转换等,相当于进行一次map操作,并返回map后的结果C。
  2. mergeValue[C, V, C] 将createCombiner函数返回的结果C,再组合最初的PariRDD的V,将C和V作为输入参数,进行一些操作,并返回结果,类型也为C。
  3. mergeCombiners[C, C] 将mergeValue产生的结果C,进行组合。这里主要是针对不同的分区,各自分区执行完上面两步后得到的C进行组合,最终得到结果。如果只有一个分区,那这个函数执行的结果,其实就是第二步的结果。

看例子,假如有多个学生,每个学生有多门功课的成绩,我们要计算每个学生的成绩平均分。

["zhangsan":10,"zhangsan":15]

注意,Key我们不用管,全程都用不到Key。我们需要做的就是对value的一系列转换。

通过第一步createCombiner将V转为C,做法是将10转为Tuple2,即第一次碰到zhangsan这个key时,变成{zhangsan:(10, 1)},C就是Tuple2类型,目的是记录zhangsan这个key共出现了几次。

第二步mergeValue,输入是Tuple2和value,我们的做法就是将Tuple2的第一个参数加上value,然后将Tuple2的第二个参数加一。也就是又碰到zhangsan了,就用10+15,得到结果是{zhangsan:(25, 2)}.

第三步就是对第二步的结果进行合并,假设有另一个分区里,也有zhangsan的结果为{zhangsan:(30, 3)}.那么第三步就是将两个Tuple2分别相加。返回结果{zhangsan:(55, 5)}.

三步做完就可以collect了。

上代码

代码语言:javascript
复制
/**
 * @author wuweifeng wrote on 2018/4/18.
 */
public class ScoreDetail implements Serializable {
    private String studentName, subject;
    private int score;

    public ScoreDetail(String studentName, String subject, int score) {
        this.studentName = studentName;
        this.subject = subject;
        this.score = score;
    }

    public ScoreDetail(String studentName, int score) {
        this.studentName = studentName;
        this.score = score;
    }

    public ScoreDetail() {
    }

    public String getStudentName() {
        return studentName;
    }

    public void setStudentName(String studentName) {
        this.studentName = studentName;
    }

    public String getSubject() {
        return subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public int getScore() {
        return score;
    }

    public void setScore(int score) {
        this.score = score;
    }
}
代码语言:javascript
复制
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author wuweifeng wrote on 2018/4/18.
 */
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<ScoreDetail> data = new ArrayList<>();
        data.add(new ScoreDetail("xiaoming", "Math", 98));
        data.add(new ScoreDetail("xiaoming", "English", 88));
        data.add(new ScoreDetail("wangwu", "Math", 75));
        data.add(new ScoreDetail("wangwu", "English", 78));
        data.add(new ScoreDetail("lihua", "Math", 90));
        data.add(new ScoreDetail("lihua", "English", 80));
        data.add(new ScoreDetail("zhangsan", "Math", 91));
        data.add(new ScoreDetail("zhangsan", "English", 80));

        JavaRDD<ScoreDetail> originRDD = javaSparkContext.parallelize(data);
        //转为name->score的键值对,[{"xiaoming":98, "xiaoming":88}]
        JavaPairRDD<String, Integer> pairRDD = originRDD.mapToPair(scoreDetail -> new Tuple2<>(scoreDetail
                .getStudentName(), scoreDetail.getScore()));

        Map<String, Tuple2> map = pairRDD.combineByKey(
            //第一个是createCombiner,也就是将pairRDD的value作为参数,经过操作,转为另一个value。
            //此处就是将score,转为<score, 1>。那么此时的key-value就变成了"xiaoming":(98, 1)
            new Function<Integer, Tuple2>() {
            @Override
            public Tuple2<Integer, Integer> call(Integer score) throws Exception {
                //(98,1)代表1是计数器,代表已经累加了几个科目
                return new Tuple2<>(score, 1);
            }
        },
            //这3个参数第一个是上一个function的返回值,第二个是最早的pairRDD的value,第三个是该函数的返回值类型
            new Function2<Tuple2, Integer, Tuple2>() {
            //v1就是上一步操作返回的Tuple2,即(98,1)
            @Override
            public Tuple2 call(Tuple2 v1, Integer score) throws Exception {
                Tuple2<Integer, Integer> tuple2 = new Tuple2<>((int) v1._1 + score, (int) v1._2 + 1);
                System.out.println(tuple2);
                return tuple2;
            }
        },
            //前两个Tuple2是不同分区上的,通过上一个函数得到的返回值,即(score1+score2 : 2)
            new Function2<Tuple2, Tuple2, Tuple2>() {
            @Override
            public Tuple2 call(Tuple2 v1, Tuple2 v2) throws Exception {
                return new Tuple2((int)v1._1 + (int)v2._1, (int)v1._2 + (int)v2._2);
            }
        }).collectAsMap();

        System.out.println(map);
    }
}

运行结果是:

{zhangsan=(171,2), lihua=(170,2), xiaoming=(186,2), wangwu=(153,2)}

注释写的很清楚了。

参考https://blog.csdn.net/t1dmzks/article/details/70249743

https://blog.csdn.net/jiangpeng59/article/details/52538254

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年04月18日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档