前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 多文件输出

Spark 多文件输出

作者头像
smartsi
发布2019-08-07 14:09:44
2.1K0
发布2019-08-07 14:09:44
举报
文章被收录于专栏:SmartSiSmartSi

1. 自定义MultipleOutputFormat

Hadoop 多文件输出MultipleOutputFormat中介绍了如何在Hadoop中根据Key或者Value的值将属于不同的类型的记录写到不同的文件中。在这里面用到了MultipleOutputFormat这个类。

因为Spark内部写文件方式其实调用的是Hadoop相关API,所以我们也可以通过Spark实现多文件输出。不过遗憾的是,Spark内部没有多文件输出的函数供我们直接使用。我们可以通过调用saveAsHadoopFile函数并自定义MultipleOutputFormat类来实现多文件输出,如下所示:

代码语言:javascript
复制
public class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString();
    }
}

RDDMultipleTextOutputFormat类中的 generateFileNameForKeyValue 函数有三个参数,key和value是RDD对应的Key和Value,而name参数是每个Reduce的编号。上面例子中没有使用该参数,而是直接将同一个Key的数据输出到同一个文件中。我们来看看如何使用这个自定义的类:

代码语言:javascript
复制
String appName = "MultipleTextOutputExample";
SparkConf conf = new SparkConf().setAppName(appName);
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> source = sc.textFile(inputPath);
// 以platform为key
JavaPairRDD<String, String> result = source.mapToPair(new PairFunction<String, String, String>() {
    @Override
    public Tuple2<String, String> call(String str) throws Exception {
        String[] params = str.split("\t");
        String platform = "other";
        if(params.length > 1 && StringUtils.isNotBlank(params[1])){
            platform = params[1];
        }
        return new Tuple2<>(platform, str);
    }
});
// 保存
result.saveAsHadoopFile(outputPath, String.class, String.class, RDDMultipleTextOutputFormat.class);

上面示例中通过调用 saveAsHadoopFile 函数并自定义 MultipleOutputFormat 类来实现多文件输出,如下所示输出:

代码语言:javascript
复制
[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price
Found 3 items
-rw-r--r--   3 xiaosi xiaosi          0 2018-07-12 16:24 tmp/data_group/example/output/price/_SUCCESS
-rw-r--r--   3 xiaosi xiaosi     723754 2018-07-12 16:23 tmp/data_group/example/output/price/adr
-rw-r--r--   3 xiaosi xiaosi     799216 2018-07-12 16:23 tmp/data_group/example/output/price/ios

我们可以看到输出已经根据RDD的key将属于不同类型的记录写到不同的文件中,每个key对应一个文件,如果想每个key对应多个文件输出,需要修改一下我们自定义的RDDMultipleTextOutputFormat,如下代码所示:

代码语言:javascript
复制
public class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString() + "/" + name;
    }
}

输出如下所示:

代码语言:javascript
复制
[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price/
Found 3 items
-rw-r--r--   3 xiaosi xiaosi          0 2018-07-16 10:00 tmp/data_group/example/output/price/_SUCCESS
drwxr-xr-x   - xiaosi xiaosi          0 2018-07-16 10:00 tmp/data_group/example/output/price/adr
drwxr-xr-x   - xiaosi xiaosi          0 2018-07-16 10:00 tmp/data_group/example/output/price/ios
[xiaosi@ying ~]$
[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price/adr/
Found 2 items
-rw-r--r--   3 xiaosi xiaosi 23835 2018-07-16 10:00 tmp/data_group/example/output/price/adr/part-00000
-rw-r--r--   3 xiaosi xiaosi      22972 2018-07-16 10:00 tmp/data_group/example/output/price/adr/part-00001

2. DataFrame 方式

如果你使用的是Spark 1.4+,借助DataFrame API会变得更加容易。(DataFrames是在Spark 1.3中引入的,但我们需要的partitionBy()是在1.4中引入的。)

如果你使用的是RDD,首先需要将其转换为DataFrame。拥有DataFrame后,基于特定 key 输出到多个文件中就很简单了。

代码语言:javascript
复制
SparkSession sparkSession = SparkSession
  .builder()
  .appName("MultipleTextOutputExample")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

JavaRDD<Price> priceRDD = sparkSession.read().textFile(inputPath).javaRDD().map(new Function<String, Price>() {
  @Override
  public Price call(String str) throws Exception {
    String[] params = str.split("\t");
    Price price = new Price();
    price.setDate(params[0]);
    price.setPlatform(params[1]);
    price.setAdType(params[2]);
    price.setChannelId(params[3]);
    price.setUid(params[4]);
    price.setPrice(params[5]);
    return price;
  }
});
Dataset<Row> priceDataFrame = sparkSession.createDataFrame(priceRDD, Price.class);
priceDataFrame.write().partitionBy("platform").json(outputPath);

在这个示例中,Spark将为我们在DataFrame上分区的每个 key 创建一个子目录:

代码语言:javascript
复制
[xiaosi@ying ~]$  sudo -uxiaosi hadoop fs -ls tmp/data_group/example/output/price/
Found 3 items
-rw-r--r--   3 xiaosi xiaosi  0 2018-07-16 15:41 tmp/data_group/example/output/price/_SUCCESS
drwxr-xr-x   - xiaosi xiaosi  0 2018-07-16 15:41 tmp/data_group/example/output/price/platform=adr
drwxr-xr-x   - xiaosi xiaosi  0 2018-07-16 15:41 tmp/data_group/example/output/price/platform=ios
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 自定义MultipleOutputFormat
  • 2. DataFrame 方式
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档