前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming写出文件自定义文件名

Spark Streaming写出文件自定义文件名

作者头像
俺也想起舞
发布2019-07-24 14:32:22
1.3K0
发布2019-07-24 14:32:22
举报

通过重写MultipleOutputFormat来自定义文件名

1.背景

​ 在工作中碰到了个需求,需要将Spark Streaming中的文件写入到Hive表中,但是Spark Streaming中的saveAsTextFiles会自己定义很多文件夹,不符合Hive读取文件的规范且saveAsTextFiles中的参数只能定义文件夹的名字,第二个是采用Spark Streaming中的foreachRDD,这个方法会将DStream转成再进行操作,但是Spark Streaming中的是多批次处理的结构,也就是很多RDD,每个RDD的saveAsTextFile都会将前面的数据覆盖,所以最终采用的方法是重写saveAsTextFile输出时的文件名

2.分析

2.1 分析代码

既然是重写saveAsTextFile输出逻辑,那先看看他是如何实现输出的

代码语言:javascript
复制
def saveAsTextFile(path: String): Unit = withScope {
    // https://issues.apache.org/jira/browse/SPARK-2075
    //
    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
    // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
    // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
    // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
    //
    // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
    // same bytecodes for `saveAsTextFile`.
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

可以看出saveAsTextFile是依赖saveAsHadoopFile进行输出,因为saveAsHadoopFile接受PairRDD,所以在saveAsTextFile中通过rddToPairRDDFunctions转成(NullWritable,Text)类型的RDD,再通过saveAsHadoopFile进行输出

可以看出输出的逻辑还是Hadoop的那一套,所以我们可以通过重写TextOutputFormat来解决输出文件名的相同的问题

2.2 代码编写
2.2.1 saveAsHadoopFile算子

首先先看下官方提供的saveAsHadoopFile算子说明

代码语言:javascript
复制
/**
   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
   * supporting the key and value types K and V in this RDD.
   */
  def saveAsHadoopFile[F <: OutputFormat[K, V]](
      path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
    saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
  }
/**
   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
   * supporting the key and value types K and V in this RDD. Compress the result with the
   * supplied codec.
   */
  def saveAsHadoopFile[F <: OutputFormat[K, V]](
      path: String,
      codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
    val runtimeClass = fm.runtimeClass
    saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
  }
 /**
   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
   * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
   */
def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      codec: Class[_ <: CompressionCodec]): Unit = self.withScope {
    saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
      new JobConf(self.context.hadoopConfiguration), Some(codec))
  }
/**
   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
   * supporting the key and value types K and V in this RDD.
   *
   * Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
   * not use output committer that writes data directly.
   * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
   * result of using direct output committer with speculation enabled.
   */
  def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {...}

这里我们使用的是def saveAsHadoopFile(path: String, keyClass: Class[_],valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit = self.withScope { }

依次传入 path:路径、keyClass:key类型、valueClass:value类型、outputFormatClass:outformat方式,剩下两个参数为默认值

2.2.2 MultipleTextOutputFormat分析
代码语言:javascript
复制
/**
 * This abstract class extends the FileOutputFormat, allowing to write the
 * output data to different output files. There are three basic use cases for
 * this class.
 * 
 * Case one: This class is used for a map reduce job with at least one reducer.
 * The reducer wants to write data to different files depending on the actual
 * keys. It is assumed that a key (or value) encodes the actual key (value)
 * and the desired location for the actual key (value).
 * 
 * Case two: This class is used for a map only job. The job wants to use an
 * output file name that is either a part of the input file name of the input
 * data, or some derivation of it.
 * 
 * Case three: This class is used for a map only job. The job wants to use an
 * output file name that depends on both the keys and the input file name,
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class MultipleOutputFormat<K, V>
extends FileOutputFormat<K, V> {

  /**
   * Create a composite record writer that can write key/value data to different
   * output files
   * 
   * @param fs
   *          the file system to use
   * @param job
   *          the job conf for the job
   * @param name
   *          the leaf file name for the output file (such as part-00000")
   * @param arg3
   *          a progressable for reporting progress.
   * @return a composite record writer
   * @throws IOException
   */
  public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
      String name, Progressable arg3) throws IOException {

    final FileSystem myFS = fs;
    final String myName = generateLeafFileName(name);
    final JobConf myJob = job;
    final Progressable myProgressable = arg3;

    return new RecordWriter<K, V>() {

      // a cache storing the record writers for different output files.
      TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();

      public void write(K key, V value) throws IOException {

        // get the file name based on the key
        String keyBasedPath = generateFileNameForKeyValue(key, value, myName);

        // get the file name based on the input file name
        String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);

        // get the actual key
        K actualKey = generateActualKey(key, value);
        V actualValue = generateActualValue(key, value);

        RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
        if (rw == null) {
          // if we don't have the record writer yet for the final path, create
          // one
          // and add it to the cache
          rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
          this.recordWriters.put(finalPath, rw);
        }
        rw.write(actualKey, actualValue);
      };

      public void close(Reporter reporter) throws IOException {
        Iterator<String> keys = this.recordWriters.keySet().iterator();
        while (keys.hasNext()) {
          RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
          rw.close(reporter);
        }
        this.recordWriters.clear();
      };
    };
  }

  /**
   * Generate the leaf name for the output file name. The default behavior does
   * not change the leaf file name (such as part-00000)
   * 
   * @param name
   *          the leaf file name for the output file
   * @return the given leaf file name
   */
  protected String generateLeafFileName(String name) {
    return name;
  }

  /**
   * Generate the file output file name based on the given key and the leaf file
   * name. The default behavior is that the file name does not depend on the
   * key.
   * 
   * @param key
   *          the key of the output data
   * @param name
   *          the leaf file name
   * @return generated file name
   */
  protected String generateFileNameForKeyValue(K key, V value, String name) {
    return name;
  }

  /**
   * Generate the actual key from the given key/value. The default behavior is that
   * the actual key is equal to the given key
   * 
   * @param key
   *          the key of the output data
   * @param value
   *          the value of the output data
   * @return the actual key derived from the given key/value
   */
  protected K generateActualKey(K key, V value) {
    return key;
  }
  
  /**
   * Generate the actual value from the given key and value. The default behavior is that
   * the actual value is equal to the given value
   * 
   * @param key
   *          the key of the output data
   * @param value
   *          the value of the output data
   * @return the actual value derived from the given key/value
   */
  protected V generateActualValue(K key, V value) {
    return value;
  }
  

  /**
   * Generate the outfile name based on a given anme and the input file name. If
   * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
   * the given name is returned unchanged. If the config value for
   * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
   * name is returned unchanged. Otherwise, return a file name consisting of the
   * N trailing legs of the input file name where N is the config value for
   * "num.of.trailing.legs.to.use".
   * 
   * @param job
   *          the job config
   * @param name
   *          the output file name
   * @return the outfile name based on a given anme and the input file name.
   */
  protected String getInputFileBasedOutputFileName(JobConf job, String name) {
    String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE);
    if (infilepath == null) {
      // if the {@link JobContext#MAP_INPUT_FILE} does not exists,
      // then return the given name
      return name;
    }
    int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
    if (numOfTrailingLegsToUse <= 0) {
      return name;
    }
    Path infile = new Path(infilepath);
    Path parent = infile.getParent();
    String midName = infile.getName();
    Path outPath = new Path(midName);
    for (int i = 1; i < numOfTrailingLegsToUse; i++) {
      if (parent == null) break;
      midName = parent.getName();
      if (midName.length() == 0) break;
      parent = parent.getParent();
      outPath = new Path(midName, outPath);
    }
    return outPath.toString();
  }

  /**
   * 
   * @param fs
   *          the file system to use
   * @param job
   *          a job conf object
   * @param name
   *          the name of the file over which a record writer object will be
   *          constructed
   * @param arg3
   *          a progressable object
   * @return A RecordWriter object over the given file
   * @throws IOException
   */
  abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
      JobConf job, String name, Progressable arg3) throws IOException;
}

可以看出,在写每条记录之前,MultipleOutputFormat将调用generateFileNameForKeyValue方法来确定文件名,所以在只需要重写generateFileNameForKeyValue方法即可

2.2.3 MultipleOutFormat重写
代码语言:javascript
复制
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {

  private val start_time = System.currentTimeMillis()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
    
    val service_date =  start_time + "-" + name //.split("-")(0)
    service_date
  }
}

Spark Streaming 代码修改

代码语言:javascript
复制
...//业务代码
.map(x=>(x,""))//由于saveAsHadoopFile接受PariRDD,所以需要转成这样 
.saveAsHadoopFile(finallpath,classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat])

到此,已经可以解决覆盖问题

参考

Spark(Streaming)写入数据到文件

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.背景
  • 2.分析
    • 2.1 分析代码
      • 2.2 代码编写
        • 2.2.1 saveAsHadoopFile算子
        • 2.2.2 MultipleTextOutputFormat分析
        • 2.2.3 MultipleOutFormat重写
      • 参考
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档