我们首先用idea来搭建Spark项目,具体可以参考提交第一个Spark统计文件单词数程序,配合hadoop hdfs ,只不过我们现在用java语言来编写,而不是Scala.
二次排序问题解决方案
先用第一种方案来处理
public class SecondSort {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SecondSort");
JavaSparkContext scc = new JavaSparkContext(conf);
JavaRDD<String> rdd = scc.textFile(args[0]);
JavaPairRDD<String, Tuple2<Integer,Integer>> pairs = rdd.mapToPair(s -> {
String[] tokens = s.split(",");
System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]);
Integer time = Integer.parseInt(tokens[1]);
Integer value = Integer.parseInt(tokens[2]);
Tuple2<Integer, Integer> timeValue = new Tuple2<>(time, value);
return new Tuple2<String, Tuple2 < Integer, Integer >> (tokens[0], timeValue);
});
List<Tuple2<String, Tuple2<Integer, Integer>>> outPut = pairs.collect();
outPut.stream().forEach(t -> {
Tuple2<Integer, Integer> timeValue = t._2;
System.out.println(t._1 + "," + timeValue._1 + "," + timeValue._1);
});
JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groups = pairs.groupByKey();
List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> outPut2 = groups.collect();
outPut2.stream().forEach(t -> {
Iterable<Tuple2<Integer, Integer>> list = t._2;
System.out.println(t._1);
while (list.iterator().hasNext()) {
Tuple2<Integer, Integer> t2 = list.iterator().next();
System.out.println(t2._1 + "," + t2._2);
}
});
JavaPairRDD<String, List<Tuple2<Integer, Integer>>> sorted = groups.mapValues(s -> {
List<Tuple2<Integer, Integer>> newList = new ArrayList<>();
while (s.iterator().hasNext()) {
newList.add(s.iterator().next());
}
Collections.sort(newList, SparkTupleComparator.INSTANCE);
return newList;
});
List<Tuple2<String, List<Tuple2<Integer, Integer>>>> outPut3 = sorted.collect();
outPut3.stream().forEach(t -> {
List<Tuple2<Integer, Integer>> list = t._2;
System.out.println(t._1);
list.stream().forEach(t2 -> System.out.println(t2._1 + "," + t2._2));
});
}
}
public class SparkTupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable {
public static final SparkTupleComparator INSTANCE = new SparkTupleComparator();
private SparkTupleComparator() {
}
@Override
public int compare(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2){
return t1._1.compareTo(t2._1);
}
}