专栏首页扎心了老铁大数据算法设计模式(2) - 左外链接(leftOuterJoin) spark实现

大数据算法设计模式(2) - 左外链接(leftOuterJoin) spark实现

左外链接(leftOuterJoin) spark实现

package com.kangaroo.studio.algorithms.join;


import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class LeftOuterJoinSpark {

    private JavaSparkContext jsc;
    private String usersInputFile;
    private String transactionsInputFile;

    public LeftOuterJoinSpark(String usersInputFile, String transactionsInputFile) {
        this.jsc = new JavaSparkContext();
        this.usersInputFile = usersInputFile;
        this.transactionsInputFile = transactionsInputFile;
    }

    public void run() {
        /*
        *   读入users文件, 文件有两列, userId和location, 以制表符\t分割, 形如:
        *   u1  UT
        *   u2  GA
        *   u3  GA
        * */
        JavaRDD<String> users = jsc.textFile(usersInputFile, 1);

        /*
        *   将字符串切分为kv对
        *   输入: line字符串
        *   输出: (userId, ("L", location)), 其中L标识这是一个location, 后面会有"P"标识这是一个product
        *   ("u1", ("L", "UT"))
        *   ("u2", ("L", "GA"))
        *   ("u3", ("L", "GA"))
        * */
        JavaPairRDD<String, Tuple2<String, String>> usersRDD = users.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
            public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
                String[] userRecord = s.split("\t");
                String userId = userRecord[0];
                Tuple2<String, String> location = new Tuple2<String, String>("L", userRecord[1]);
                return new Tuple2<String, Tuple2<String, String>>(userId, location);
            }
        });

        /*
        *   读入transattion文件, 文件有4列, transactionIdproductId/userId/price, 以制表符\t分割
        *   t1  p3  u1  300
        *   t2  p1  u2  400
        *   t3  p1  u3  200
        * */
        JavaRDD<String> transactions = jsc.textFile(transactionsInputFile, 1);

        /*
        *   将字符串切分为kv对
        *   输入: line字符串
        *   输出: (userId, ("P", productId)), "P"标识这是一个product
        *   ("u1", ("P", "p3"))
        *   ("u2", ("P", "p1"))
        *   ("u3", ("P", "p1"))
        * */
        JavaPairRDD<String, Tuple2<String, String>> transactionsRDD = transactions.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
            public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
                String[] transactionRecord = s.split("\t");
                String userId = transactionRecord[2];
                Tuple2<String, String> product = new Tuple2<String, String>("P", transactionRecord[1]);
                return new Tuple2<String, Tuple2<String, String>>(userId, product);
            }
        });

        /*
        *   创建users和transaction的一个并集
        *   输入:
        *       transaction ("u1", ("P", "p3"))
        *       users       ("u1", ("L", "UT"))
        *   输出:
        *       (userId, ("L", location))
        *       (userId, ("P", product))
        * */
        JavaPairRDD<String, Tuple2<String, String>> allRDD = transactionsRDD.union(usersRDD);

        /*
        *   按照userId进行分组
        *   输入:
        *       (userId, ("L", location))
        *       (userId, ("P", product))
        *    输出:
        *       (userId, List[
        *                 ("L", location),
        *                 ("P", p1),
        *                 ("P", p2),
        *                 ... ])
        * */
        JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupedRDD = allRDD.groupByKey();

        /*
        *   去掉userId, 行程location和product的配对
        *   输入:
        *       (userId, List[
        *                 ("L", location),
        *                 ("P", p1),
        *                 ("P", p2),
        *                 ... ])
        *   输出:
        *       (product1, location1)
        *       (product1, location2)
        *       (product2, location1)
        * */
        JavaPairRDD<String, String> productLocationRDD = groupedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, String>>>, String, String>() {
            public Iterable<Tuple2<String, String>> call(Tuple2<String, Iterable<Tuple2<String, String>>> s) throws Exception {
                String userId = s._1;
                Iterable<Tuple2<String, String>> pairs = s._2;
                String location = "UNKNOWN";
                List<String> products = new ArrayList<String>();
                for (Tuple2<String, String> t2 : pairs) {
                    if (t2._1.equals("L")) {
                        location = t2._2;
                    } else if (t2._1.equals("P")){
                        products.add(t2._2);
                    }
                }
                List<Tuple2<String, String>> kvList = new ArrayList<Tuple2<String, String>>();
                for (String product : products) {
                    kvList.add(new Tuple2<String, String>(product, location));
                }
                return kvList;
            }
        });

        /*
        *   以productId为key进行分组
        *   输入:
        *       (product1, location1)
        *       (product1, location2)
        *       (product2, location1)
        *   输出:
        *       (product1, List[
        *           location1,
        *           location2,
        *           ... ])
        * */
        JavaPairRDD<String, Iterable<String>> productByLocations = productLocationRDD.groupByKey();

        /*
        *   对location进行去重
        *   输出:
        *       (product1, List[
        *           location1,
        *           location2,
        *           location2,
        *           ... ])
        *   输出:
        *       (product1, List[
        *           location1,
        *           location2,
        *           ... ])
        * */
        JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByUniqueLocations = productByLocations.mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {
            public Tuple2<Set<String>, Integer> call(Iterable<String> strings) throws Exception {
                Set<String> uniqueLocations = new HashSet<String>();
                for (String location : strings) {
                    uniqueLocations.add(location);
                }
                return new Tuple2<Set<String>, Integer>(uniqueLocations, uniqueLocations.size());
            }
        });

        /*
        *   打印结果
        * */
        List<Tuple2<String, Tuple2<Set<String>, Integer>>> result  = productByUniqueLocations.collect();
        for (Tuple2<String, Tuple2<Set<String>, Integer>> t : result) {
            // productId
            System.out.println(t._1);
            // locationSet和size
            System.out.println(t._2);
        }
    }

    public static void main(String[] args) {
        String usersInputFile = args[0];
        String transactionsInputFile = args[1];
        LeftOuterJoinSpark leftOuterJoinSpark = new LeftOuterJoinSpark(usersInputFile, transactionsInputFile);
        leftOuterJoinSpark.run();
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • java使用spark/spark-sql处理schema数据

    1、spark是什么? Spark是基于内存计算的大数据并行计算框架。 1.1 Spark基于内存计算 相比于MapReduce基于IO计算,提高了在大数据环境...

    用户1225216
  • java spark-streaming接收TCP/Kafka数据

     本文将展示 1、如何使用spark-streaming接入TCP数据并进行过滤; 2、如何使用spark-streaming接入TCP数据并进行wordcou...

    用户1225216
  • 分布式锁的实现(redis)

    1、单机锁 考虑在并发场景并且存在竞态的状况下,我们就要实现同步机制了,最简单的同步机制就是加锁。 加锁可以帮我们锁住资源,如内存中的变量,或者锁住临界区(线程...

    用户1225216
  • 这样规范写代码,同事直呼“666”

    乔戈里
  • java中两个map比较

    ydymz
  • QQ小程序支付

    首先是配置类,设置为包内访问权限,其实应该放于properties文件,或者直接配置在xml中,偷了个懒直接写在了代码中

    WindrunnerMax
  • 关于SpringMVC中如何把查询数据全转成String类型

    上帝
  • Java做爬虫也很牛

    首先我们封装一个Http请求的工具类,用HttpURLConnection实现,当然你也可以用HttpClient, 或者直接用Jsoup来请求(下面会讲到Js...

    猿天地
  • 当我遵循了这 16 条规范写代码,同事只对我说了三个字: 666

    Many of the happiest people are those who own the least. But are we really so ha...

    良月柒
  • 这样规范写代码,同事直呼“666”

    java思维导图

扫码关注云+社区

领取腾讯云代金券