前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >6 spark入门键值对操作sortByKey、groupByKey、groupBy、cogroup

6 spark入门键值对操作sortByKey、groupByKey、groupBy、cogroup

作者头像
天涯泪小武
发布2019-01-17 12:00:33
2.3K0
发布2019-01-17 12:00:33
举报
文章被收录于专栏:SpringCloud专栏SpringCloud专栏

SortByKey

从名字就能看到,是将Key排序用的。如一个PariRDD-["A":1, "C":4, "B":3, "B":5],按Key排序的话就是A、B、C。注意,这个方法只是对Key进行排序,value不排序。

上代码

代码语言:javascript
复制
/**
 * 用于对pairRDD按照key进行排序
 * @author wuweifeng wrote on 2018/4/18.
 */
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Tuple2<String, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>("A", 10));
        data.add(new Tuple2<>("B", 1));
        data.add(new Tuple2<>("A", 6));
        data.add(new Tuple2<>("C", 5));
        data.add(new Tuple2<>("B", 3));

        JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);
        //true为升序,false为倒序
        System.out.println(originRDD.sortByKey(true).collect());
        System.out.println(originRDD.sortByKey(false).collect());
    }
}

结果是

[(A,10), (A,6), (B,1), (B,3), (C,5)]

[(C,5), (B,1), (B,3), (A,10), (A,6)]

GroupByKey

类似于mysql中的groupBy,是按key进行分组,形成结果为RDD[key,Iterable[value]],即value变成了集合。

代码语言:javascript
复制
/**
 * 
 * @author wuweifeng wrote on 2018/4/18.
 */
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Tuple2<String, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>("A", 10));
        data.add(new Tuple2<>("B", 1));
        data.add(new Tuple2<>("A", 6));
        data.add(new Tuple2<>("C", 5));
        data.add(new Tuple2<>("B", 3));

        JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);
        
        System.out.println(originRDD.groupByKey().collect());
    }
}

结果是[(B,[1, 3]), (A,[10, 6]), (C,[5])]

GroupBy

和GroupByKey类似,只不过groupByKey是指明了按照Key进行分组,所以作用对象必须是PairRDD型的。而GroupBy明显是不知道该按什么进行分组,即分组规则需要我们自己设定。所以groupBy的参数是接收一个函数,该函数的返回值将作为Key。

代码语言:javascript
复制
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Integer> data = new ArrayList<>();
        data.add(10);
        data.add(1);
        data.add(6);
        data.add(5);
        data.add(3);

        JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
        Map map = originRDD.groupBy(x -> {
            if (x % 2 == 0) {
                return "even";
            } else {
                return "odd";
            }
        }).collectAsMap();

        System.out.println(map);
    }
}

结果是{odd=[1, 5, 3], even=[10, 6]}

参数里的算法就是判断奇数偶数。

cogroup

这个是groupByKey的升级版,groupByKey是对一个RDD里key相同的value进行组合成一个集合。

cogroup则是对多个RDD里key相同的,合并成集合的集合,例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN]) 

看代码

代码语言:javascript
复制
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

        JavaRDD<Tuple2<String, Integer>> rdd1 = javaSparkContext.parallelize(Arrays.asList(
                new Tuple2<>("A", 10),
                new Tuple2<>("B", 20),
                new Tuple2<>("A", 30),
                new Tuple2<>("B", 40)));
        JavaRDD<Tuple2<String, Integer>> rdd2 = javaSparkContext.parallelize(Arrays.asList(
                new Tuple2<>("A", 100),
                new Tuple2<>("B", 200),
                new Tuple2<>("A", 300),
                new Tuple2<>("B", 400)));
        JavaRDD<Tuple2<String, Integer>> rdd3 = javaSparkContext.parallelize(Arrays.asList(
                new Tuple2<>("A", 1000),
                new Tuple2<>("B", 2000),
                new Tuple2<>("A", 3000),
                new Tuple2<>("B", 4000)));

        JavaPairRDD<String, Integer> pairRDD1 = JavaPairRDD.fromJavaRDD(rdd1);
        JavaPairRDD<String, Integer> pairRDD2 = JavaPairRDD.fromJavaRDD(rdd2);
        JavaPairRDD<String, Integer> pairRDD3 = JavaPairRDD.fromJavaRDD(rdd3);
        JavaPairRDD<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> pairRDD = pairRDD1.cogroup(pairRDD2, pairRDD3);
        System.out.println(pairRDD.collect());
    }
}

结果是:

[(B,([20, 40],[200, 400],[2000, 4000])), (A,([10, 30],[100, 300],[1000, 3000]))]

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年04月19日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SortByKey
  • GroupByKey
  • GroupBy
  • cogroup
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档