前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark算法整理(Java版) 顶

Spark算法整理(Java版) 顶

作者头像
算法之名
发布2019-08-26 16:36:53
4870
发布2019-08-26 16:36:53
举报
文章被收录于专栏:算法之名算法之名

我们首先用idea来搭建Spark项目,具体可以参考提交第一个Spark统计文件单词数程序,配合hadoop hdfs ,只不过我们现在用java语言来编写,而不是Scala.

  • 问题描述:二次排序

二次排序问题解决方案

  1. 让归约器读取和缓存给定键的所有值(例如使用一个集合),然后对这些值完成一个归约器中排序,这种方法不具有可伸缩性,因为归约器要接收一个给定键的所有值,这种方法可能导致归约器耗尽内存。另一方面,如果值数量很少,不会导致内存溢出错误,那么这种方法就是适用的。
  2. 使用Spark框架对规约器值排序(这种做法不需要对传入归约器的值完成归约器中排序)。这种方法“会为自然键增加部分或整个值来创建一个组合键以实现排序目标”。这种方法是可伸缩的(不会受商用服务器内存的限制)。

先用第一种方案来处理

代码语言:javascript
复制
public class SecondSort {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SecondSort");
        JavaSparkContext scc = new JavaSparkContext(conf);
        JavaRDD<String> rdd = scc.textFile(args[0]);
        JavaPairRDD<String, Tuple2<Integer,Integer>> pairs = rdd.mapToPair(s -> {
            String[] tokens = s.split(",");
            System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]);
            Integer time = Integer.parseInt(tokens[1]);
            Integer value = Integer.parseInt(tokens[2]);
            Tuple2<Integer, Integer> timeValue = new Tuple2<>(time, value);
            return new Tuple2<String, Tuple2 < Integer, Integer >> (tokens[0], timeValue);
        });
        List<Tuple2<String, Tuple2<Integer, Integer>>> outPut = pairs.collect();
        outPut.stream().forEach(t -> {
            Tuple2<Integer, Integer> timeValue = t._2;
            System.out.println(t._1 + "," + timeValue._1 + "," + timeValue._1);
        });
        JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groups = pairs.groupByKey();
        List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> outPut2 = groups.collect();
        outPut2.stream().forEach(t -> {
            Iterable<Tuple2<Integer, Integer>> list = t._2;
            System.out.println(t._1);
            while (list.iterator().hasNext()) {
                Tuple2<Integer, Integer> t2 = list.iterator().next();
                System.out.println(t2._1 + "," + t2._2);
            }
        });
        JavaPairRDD<String, List<Tuple2<Integer, Integer>>> sorted = groups.mapValues(s -> {
            List<Tuple2<Integer, Integer>> newList = new ArrayList<>();
            while (s.iterator().hasNext()) {
                newList.add(s.iterator().next());
            }
            Collections.sort(newList, SparkTupleComparator.INSTANCE);
            return newList;
        });
        List<Tuple2<String, List<Tuple2<Integer, Integer>>>> outPut3 = sorted.collect();
        outPut3.stream().forEach(t -> {
            List<Tuple2<Integer, Integer>> list = t._2;
            System.out.println(t._1);
            list.stream().forEach(t2 -> System.out.println(t2._1 + "," + t2._2));
        });
    }
}
代码语言:javascript
复制
public class SparkTupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable {

    public static final SparkTupleComparator INSTANCE = new SparkTupleComparator();

    private SparkTupleComparator() {
    }

    @Override
    public int compare(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2){
        return t1._1.compareTo(t2._1);
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档