专栏首页SmartSiSpark 多文件输出

Spark 多文件输出

1. 自定义MultipleOutputFormat

Hadoop 多文件输出MultipleOutputFormat中介绍了如何在Hadoop中根据Key或者Value的值将属于不同的类型的记录写到不同的文件中。在这里面用到了MultipleOutputFormat这个类。

因为Spark内部写文件方式其实调用的是Hadoop相关API,所以我们也可以通过Spark实现多文件输出。不过遗憾的是,Spark内部没有多文件输出的函数供我们直接使用。我们可以通过调用saveAsHadoopFile函数并自定义MultipleOutputFormat类来实现多文件输出,如下所示:

public class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString();
    }
}

RDDMultipleTextOutputFormat类中的 generateFileNameForKeyValue 函数有三个参数,key和value是RDD对应的Key和Value,而name参数是每个Reduce的编号。上面例子中没有使用该参数,而是直接将同一个Key的数据输出到同一个文件中。我们来看看如何使用这个自定义的类:

String appName = "MultipleTextOutputExample";
SparkConf conf = new SparkConf().setAppName(appName);
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> source = sc.textFile(inputPath);
// 以platform为key
JavaPairRDD<String, String> result = source.mapToPair(new PairFunction<String, String, String>() {
    @Override
    public Tuple2<String, String> call(String str) throws Exception {
        String[] params = str.split("\t");
        String platform = "other";
        if(params.length > 1 && StringUtils.isNotBlank(params[1])){
            platform = params[1];
        }
        return new Tuple2<>(platform, str);
    }
});
// 保存
result.saveAsHadoopFile(outputPath, String.class, String.class, RDDMultipleTextOutputFormat.class);

上面示例中通过调用 saveAsHadoopFile 函数并自定义 MultipleOutputFormat 类来实现多文件输出,如下所示输出:

[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price
Found 3 items
-rw-r--r--   3 xiaosi xiaosi          0 2018-07-12 16:24 tmp/data_group/example/output/price/_SUCCESS
-rw-r--r--   3 xiaosi xiaosi     723754 2018-07-12 16:23 tmp/data_group/example/output/price/adr
-rw-r--r--   3 xiaosi xiaosi     799216 2018-07-12 16:23 tmp/data_group/example/output/price/ios

我们可以看到输出已经根据RDD的key将属于不同类型的记录写到不同的文件中,每个key对应一个文件,如果想每个key对应多个文件输出,需要修改一下我们自定义的RDDMultipleTextOutputFormat,如下代码所示:

public class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString() + "/" + name;
    }
}

输出如下所示:

[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price/
Found 3 items
-rw-r--r--   3 xiaosi xiaosi          0 2018-07-16 10:00 tmp/data_group/example/output/price/_SUCCESS
drwxr-xr-x   - xiaosi xiaosi          0 2018-07-16 10:00 tmp/data_group/example/output/price/adr
drwxr-xr-x   - xiaosi xiaosi          0 2018-07-16 10:00 tmp/data_group/example/output/price/ios
[xiaosi@ying ~]$
[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price/adr/
Found 2 items
-rw-r--r--   3 xiaosi xiaosi 23835 2018-07-16 10:00 tmp/data_group/example/output/price/adr/part-00000
-rw-r--r--   3 xiaosi xiaosi      22972 2018-07-16 10:00 tmp/data_group/example/output/price/adr/part-00001

2. DataFrame 方式

如果你使用的是Spark 1.4+,借助DataFrame API会变得更加容易。(DataFrames是在Spark 1.3中引入的,但我们需要的partitionBy()是在1.4中引入的。)

如果你使用的是RDD,首先需要将其转换为DataFrame。拥有DataFrame后,基于特定 key 输出到多个文件中就很简单了。

SparkSession sparkSession = SparkSession
  .builder()
  .appName("MultipleTextOutputExample")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

JavaRDD<Price> priceRDD = sparkSession.read().textFile(inputPath).javaRDD().map(new Function<String, Price>() {
  @Override
  public Price call(String str) throws Exception {
    String[] params = str.split("\t");
    Price price = new Price();
    price.setDate(params[0]);
    price.setPlatform(params[1]);
    price.setAdType(params[2]);
    price.setChannelId(params[3]);
    price.setUid(params[4]);
    price.setPrice(params[5]);
    return price;
  }
});
Dataset<Row> priceDataFrame = sparkSession.createDataFrame(priceRDD, Price.class);
priceDataFrame.write().partitionBy("platform").json(outputPath);

在这个示例中,Spark将为我们在DataFrame上分区的每个 key 创建一个子目录:

[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price/
Found 3 items
-rw-r--r--   3 xiaosi xiaosi  0 2018-07-16 15:41 tmp/data_group/example/output/price/_SUCCESS
drwxr-xr-x   - xiaosi xiaosi  0 2018-07-16 15:41 tmp/data_group/example/output/price/platform=adr
drwxr-xr-x   - xiaosi xiaosi  0 2018-07-16 15:41 tmp/data_group/example/output/price/platform=ios

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hexo 为Next主题添加评论功能

    之前使用的来必力,最近评论功能一直出不来,用户体验比较差,所以重新评估选择了Valine。

    smartsi
  • Hadoop中的Secondary Sort

    我们首先提出了一个查询问题,为了解决这个问题,需要在数据集的多个字段上进行排序。然后,我们将研究 MapReduce Shuff 阶段的工作原理,然后再实现我们...

    smartsi
  • Spark 第一个Spark程序WordCount

    使用上述命令打包后,会在项目根目录下的target目录生成jar包。打完jar包后,我们可以使用spark-submit提交任务:

    smartsi
  • 首发!DevOps@BOC — 器用之道,如琢如磨

    我来自中国银行软件中心的一个开发部门,中国银行软件中心从 2013年开始试点敏捷软件开发以及相关CI、CD等实践,而我们内部真正的提 DevOps 比这个要更晚...

    DevOps时代
  • Python把类当做字典来访问

    如果我们想直接通过字典的方式访问类,obj=A(),dict(obj)直接拿到类的所有实例变量,可以通过如下方式:

    py3study
  • Python把类当做字典来访问

    定义一个类将它实例化,我们可以通过obj.属性来访问类的属性,如果想获取类的所有实例变量,我们可以使用obj.__dict__来访问,如下:

    于小勇
  • shell脚本打印99乘法表

    用户1733462
  • 什么才算是真正的编程能力?

    三哥
  • 一张思维导图看懂《周易》基础要点——参考《易经杂说》

    本图对《易经》的基础部分做了归纳总结。主要参考南怀瑾先生的《易经杂说》及不同版本《周易》。

    一石匠人
  • 缓存穿透、缓存击穿、缓存雪崩概念及解决方案缓存穿透缓存雪崩缓存击穿

    缓存穿透 概念 访问一个不存在的key,缓存不起作用,请求会穿透到DB,流量大时DB会挂掉。 解决方案 采用布隆过滤器,使用一个足够大的bitmap,用于存储可...

    Clive

扫码关注云+社区

领取腾讯云代金券