首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark系列--OutputFormat 详解

Spark系列--OutputFormat 详解

作者头像
solve
发布2019-10-30 13:15:55
9440
发布2019-10-30 13:15:55
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

本文主要内容

  1. 什么是OutputFormat及其运行机制?
  2. 如何自定义自己的OutputFormat?
  3. 实战自定义mysql OutputFormat。

一丶什么是OutputFormat?

定义了 spark 的输出规则的类。这也许会让你想到 Hadoop Mapreduce 的 OutputFormat,没错,其实他们是一个东西,嗯,完全一样。Spark 本身只是一个计算框架,其输入和输出都是依赖于 Hadoop 的 OutputFormat,但是因为 Spark 本身自带 Hadoop 相关 Jar 包,所以不需要我们额外考虑这些东西,下面我们以saveAsTextFile源码来验证我们的结论

 def saveAsTextFile(path: String): Unit = withScope {
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    //最后调用的 saveAsHadoopFile()  并且泛型是 org.apache.hadoop.mapred.TextOutputFormat,
    //是属于 hadoop 包下的一个outputformat,以此简单来验证我们的结论
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

二丶OutputFormat运行机制?

我们知道 Spark 是分布式计算框架,其计算是一个个 Executor 为单元进行的,当运行到 类似于 saveAsTextFile等输出型算子时,会根据其定义的 Outputformat 规则进行输出,在每个Executor 单元内的每个task有且只有一个 Outputformat 实例

三丶自定义 OutputFormat 解析

首先我们来看一下 OutputFormat 接口

public interface OutputFormat<K, V> {

  /** 
   * 根据给予的参数返回一个 RecordWriter 对象
   *
   * @param ignored 基本没什么用
   * @param job 可以用来获取各种配置,定制特别的 RecordWriter
   * @param name 一个唯一的名字,比如:part-0001
   * @param progress mechanism for reporting progress while writing to file.
   */
  RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                     String name, Progressable progress)
  throws IOException;

  /** 
   * 用来做输出前的各种检查
   */
  void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

//获取一个 OutputCommitter,用来保证输出的正确执行
  public abstract  OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;
}

checkOutputSpecs很好理解,用来做输出前的检查,比如 Spark 会对输出路径做检查,如果存在就抛出异常,那么接下来我们先理解下 RecordWriter 和 OutputCommitter

  • RecordWriter
public abstract class RecordWriter<K, V> {
  /** 
   * outputformat 是针对于 kv格式的RDD的,
   * Rdd数据的每条记录都会调用一次 write 方法 用来写入数据
   */      
  public abstract void write(K key, V value
                             ) throws IOException, InterruptedException;

  /** 
   * 在数据写完之后,会进行调用,一般执行一些 IO 的 close 操作
   */ 
  public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException;
}

这里我们可以发现,如果你不是 KV 格式的 Rdd,那么能调用的只有有限的几个输出型算子,比如saveAsTextFile,其实底层是给你加格式化成了 kv 格式 Rdd 的,其 key 为 NullWritable,这块一般是我们自定义的重点。

  • OutputCommitter
package com.inveno.data.analysis.user.statistical;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

public abstract class OutputCommitter {
    /**
     * 每个job执行之前都会调用一次或者多次,用来进行一些初始化操作
     */
    public abstract void setupJob(JobContext jobContext) throws IOException;

    /**
     * 每个job执行之后都会调用一次或者多次,用来进行一些初始化操作
     */
    @Deprecated
    public void cleanupJob(JobContext jobContext) throws IOException {
    }

    /**
     * 每个job执行完成都会调用一次
     */
    public void commitJob(JobContext jobContext) throws IOException {
        cleanupJob(jobContext);
    }


    /**
     * 每个job中断执行会调用一次或者多次
     */
    public void abortJob(JobContext jobContext, JobStatus.State state)
            throws IOException {
        cleanupJob(jobContext);
    }

    /**
     * 每个 task 执行之前都会调用一次或者多次,用来进行一些初始化操作
     */
    public abstract void setupTask(TaskAttemptContext taskContext)
            throws IOException;

    /**
     * 需要输出到 hdfs 上的 task 用来检测是否有输出需要提交
     */
    public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
            throws IOException;

    /**
     * 每个 needsTaskCommit 为 true 的 task 执行完成都会调用一次或者多次
     */
    public abstract void commitTask(TaskAttemptContext taskContext)
            throws IOException;

    /**
     * task 中断会被调用一次或多次
     */
    public abstract void abortTask(TaskAttemptContext taskContext)
            throws IOException;

    /**
     * 是否支持输出恢复
     */
    public boolean isRecoverySupported() {
        return false;
    }

    /**
     * 恢复task输出
     */
    public void recoverTask(TaskAttemptContext taskContext)
            throws IOException {
    }
}

其中代码注释说的调用多次,一般都是因为重试机制导致的,一般只会调用一次,这个我们一般使用系统自带的实现类,然后在各个生命周期添加一些自定义操作。

四丶实战---定义一个自己的 MysqlOutputFormat

  1. 每当你想自定义一个东西,第一步应该想的是:我有这个需求,别人有没有?我是不是在重复造轮子?别人的轮子适合我吗?我可以做的更好吗?
  2. 有了上面的思考,我们果断在源码包里面找到了一个叫做 DBOutputFormat的类,轮子果然是有的,那么好不好用呢?能不能优化一下呢?
  3. ok,废话不多说了,我们来看看今天我们自定义的 MysqlOutputFormat,因为要用在 Spark 上 所以我们使用的是 Scala 语言
abstract class MysqlOutputFormat[K, V]() extends OutputFormat[K, V] {

  val logger = LoggerFactory.getLogger(getClass)
  //直接返回一个 MysqlWriter 对象
  override def getRecordWriter(taskAttemptContext: TaskAttemptContext): RecordWriter[K, V] = {
    new MysqlWriter[K, V](getDBFlag(), getValueConvert(), taskAttemptContext)
  }

//空实现,这里可以根据你的需求实现,比如删除一些老旧数据
  override def checkOutputSpecs(jobContext: JobContext): Unit = {
  }
//因为我们数据读入的KV格式,这里定义了一个 SQLValueConvert trait,来让使用者自定义输入规则
  def getValueConvert(): SQLValueConvert[K, V]
//用于给 mysqlwriter 获取mysql相关参数的 flag
  def getDBFlag(): String

  //我们这里直接使用系统自带的就ok了,可以根据你的需求来做相关修改
  override def getOutputCommitter(taskAttemptContext: TaskAttemptContext): OutputCommitter = {
    new FileOutputCommitter(null, taskAttemptContext)
  }
}

实现比较简单,值得注意的是,在 Spark 中 OutputFormat 是通过反射生产的实例,所以需要提供一个无参的构造方法。那么接下来我们看看最重要的部分 MysqlWriter

class MysqlWrite[K, V](db_flag: String, converter: SQLValueConvert[K, V], context: TaskAttemptContext) extends RecordWriter[K, V] {
  val logger = LoggerFactory.getLogger(getClass)
  
//加载resource mysql配置文件
  val conf: Configuration = context.getConfiguration
  conf.addResource("mysql.xml")

//根据传入的 flag 读取resource mysql 相应的配置文件
  val table: String = conf.get(String.format(JDBCManager.JDBC_TABLE_NAME, db_flag))//table name
  private val batch_size = conf.get(String.format(JDBCManager.BATCH, db_flag)).toInt// batch size

  var count = 0

  var committerStatement: PreparedStatement = _
  var conn: Connection = _

//执行批量写入 mysql
  def commit(): Unit = {
    if (conn == null || committerStatement == null) {
      return
    }
    try {
      committerStatement.executeBatch()
      conn.commit()

      committerStatement.clearBatch()
      count = 0
    } catch {
      case e: Exception =>
        //出错回滚 并抛出异常
        conn.rollback()
        logger.error("在writer中写数据出现异常", e.printStackTrace())
        throw e
    }
  }

//相关资源释放
  override def close(taskAttemptContext: TaskAttemptContext): Unit = {
    try {
    //提交剩余的数据
      commit()
    } catch {
      case e: Throwable =>
        throw new SQLException()
    } finally {
      if (committerStatement != null) {
        committerStatement.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }


  override def write(key: K, value: V): Unit = {
    if (key == null || value == null) {
        return
     }
    try {
      //根据自定义规则 将KV转换成 array(),
      val values = converter.convert(key, value)
    
      //创建数据库链接
      if (conn == null) {
        conn = JDBCManager.getConnection(conf, db_flag)
        conn.setAutoCommit(false)
      }
      //创建Statement
      if (committerStatement == null) {
        committerStatement = conn.prepareStatement(
        //"INSERT INTO %s VALUES(%s)" 创建 sql 语句
          MysqlOperation.insertByParameter(table, values.length))
      }
    //添加参数
      for (i <- values.indices) {
        committerStatement.setObject(i + 1, values.apply(i))
      }
      committerStatement.addBatch()

      count = count + 1
      //大于batch_size进行提交
      if (count >= batch_size) {
        commit()
      }
    } catch {
      case e: Throwable =>
        println("在writer中写数据出现异常", e.printStackTrace())
        throw new Exception(e)
    }
  }

上面的代码都比较简单,这里读者可以思考一下,数据库的连接是否可以放到 setupTask?提交任务是否可以放到 commitTask ? 这边 mysql.xml 相关配置就不贴了,项目实际应用过程我们一般都需要将配置属性写到额外的文件,方便管理和维护。

五丶额外的思考

能否自定义一个outputformat来实现控制spark 文件的输出数量呢?这里主要考虑的多个task同时写入一个文件,必然涉及到文件的追加,而我们知道 hdfs虽然支持文件的追加,但是性能并不是很好,至于效率到底怎么样?笔者也没验证过。。。如果你有好的想法,欢迎留言。。。一起讨论!!!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.08.23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 一丶什么是OutputFormat?
      • 二丶OutputFormat运行机制?
        • 三丶自定义 OutputFormat 解析
          • 四丶实战---定义一个自己的 MysqlOutputFormat
            • 五丶额外的思考
            相关产品与服务
            云数据库 SQL Server
            腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档