Structured Streaming如何实现Parquet存储目录按时间分区

缘由

StreamingPro现在支持以SQL脚本的形式写Structured Streaming流式程序了: mlsql-stream。不过期间遇到个问题,我希望按天进行分区,但是这个分区比较特殊,就是是按接收时间来落地进行分区,而不是记录产生的时间。

当然,我可以新增一个时间字段,然后使用partitionBy动态分区的方式解决这个问题,但是使用动态分区有一个麻烦的地方是,删除数据并不方便。流式程序会不断地写入数据,我们需要将七天前的数据清理掉,因为采用partitionBy后,parquet的meta信息是会在同一个目录里,然后里面的文件记录了当前批次数据分布在那些文件里。这样导致删除数据不方便了。

所以最好的方式是类似这样的:

set today="select current_date..." options type=sql;
load kafka9....;

save append table21  
as parquet.`/tmp/abc2/hp_date=${today}` 
options mode="Append"
and duration="10"
and checkpointLocation="/tmp/cpl2";

这种方式的好处就是,删除分区直接删除就可以,坏处是,通过上面的方式,由于Structured Streaming的目录地址是不允许变化的,也就是他拿到一次值之后,后续就固定了,所以数据都会写入到服务启动的那天。

解决方案

解决办法是自己实现一个parquet sink,改造的地方并不多。新添加一个类:

class NewFileStreamSink(
                         sparkSession: SparkSession,
                         _path: String,
                         fileFormat: FileFormat,
                         partitionColumnNames: Seq[String],
                         options: Map[String, String]) extends Sink with Logging {
 // 使用velocity模板引擎,方便实现复杂的模板渲染
  def evaluate(value: String, context: Map[String, AnyRef]) = {
    RenderEngine.render(value, context)
  }

// 将路径获取改成一个方法调用,这样每次写入时,都会通过方法调用
//从而获得一个新值
  def path = {
    evaluate(_path, Map("date" -> new DateTime()))
  }
-- 这些路径获取都需要变成方法
  private def basePath = new Path(path)

  private def logPath = new Path(basePath, FileStreamSink.metadataDir)

  private def fileLog =
    new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)

  private val hadoopConf = sparkSession.sessionState.newHadoopConf()

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
      logInfo(s"Skipping already committed batch $batchId")
    } else {
      val committer = FileCommitProtocol.instantiate(
        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
        jobId = batchId.toString,
        outputPath = path,
        isAppend = false)

      committer match {
        case manifestCommitter: ManifestFileCommitProtocol =>
          manifestCommitter.setupManifestOptions(fileLog, batchId)
        case _ => // Do nothing
      }

      FileFormatWriter.write(
        sparkSession = sparkSession,
        queryExecution = data.queryExecution,
        fileFormat = fileFormat,
        committer = committer,
        outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
        hadoopConf = hadoopConf,
        partitionColumnNames = partitionColumnNames,
        bucketSpec = None,
        refreshFunction = _ => (),
        options = options)
    }
  }

  override def toString: String = s"FileSink[$path]"
}

实现sink之后,我们还需要一个DataSource 以便我们能让这个新的Sink集成进Spark里并被外部使用:

package org.apache.spark.sql.execution.streaming.newfile

import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming. Sink
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode

class DefaultSource extends StreamSinkProvider {
  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    val path = parameters.getOrElse("path", {
      throw new IllegalArgumentException("'path' is not specified")
    })
    if (outputMode != OutputMode.Append) {
      throw new AnalysisException(
        s"Data source ${getClass.getCanonicalName} does not support $outputMode output mode")
    }
    new NewFileStreamSink(sqlContext.sparkSession, parameters("path"), new ParquetFileFormat(), partitionColumns, parameters)
  }
}

这个是标准的datasource API。 现在使用时可以这样:

save append table21  
-- 使用jodatime的语法
as parquet.`/tmp/jack/hp_date=${date.toString("yyyy-MM-dd")}` 
options mode="Append"
and duration="10"
-- 指定实现类
and implClass="org.apache.spark.sql.execution.streaming.newfile"
and checkpointLocation="/tmp/cpl2";

是不是很方便?

额外的问题

在spark 2.2.0 之后,对meta文件合并,Spark做了些调整,如果合并过程中,发现之前的某个checkpoint点 文件会抛出异常。在spark 2.2.0则不存在这个问题。其实spark团队应该把这个作为可选项比较好,允许抛出或者保持安静。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Jed的技术阶梯

SparkStreaming 写数据到 HBase,由于共用连接造成的数据丢失问题

有如下程序,SparkStreaming 读取 Kafka 中的数据,经过处理后,把数据写入到 Hbase 中

8182
来自专栏文渊之博

pyspark 内容介绍(一)

pyspark 包介绍 子包 pyspark.sql module pyspark.streaming module pyspark.ml package py...

6256
来自专栏安恒网络空间安全讲武堂

nox&amp;CSAW部分pwn题解

暑假的时候遇到了一群一起学习安全的小伙伴,在他们的诱劝下,开始接触国外的CTF比赛,作为最菜的pwn选手就试着先打两场比赛试试水,结果发现国外比赛真有意思哎嘿。

1873
来自专栏为数不多的Android技巧

Android 插件化原理解析——插件加载机制

上文 Activity生命周期管理 中我们地完成了『启动没有在AndroidManifest.xml中显式声明的Activity』的任务;通过Hook AMS和...

2091
来自专栏Java3y

JDBC【数据库连接池、DbUtils框架、分页】

1.数据库连接池 什么是数据库连接池 简单来说:数据库连接池就是提供连接的。。。 为什么我们要使用数据库连接池 数据库的连接的建立和关闭是非常消耗资源的 频繁地...

3754
来自专栏郭霖

Android Volley完全解析(三),定制自己的Request

经过前面两篇文章的学习,我们已经掌握了Volley各种Request的使用方法,包括StringRequest、JsonRequest、ImageRequest...

2286
来自专栏与神兽党一起成长

jFinal路由解析源码分析

jFinal的路由解析是在JFinalFilter中做的,这个Filter也需要在web.xml中配置。JFinalFilter实现了javax.servlet...

1562
来自专栏我是攻城师

Apache Pig和Solr问题笔记(一)

3746
来自专栏数据科学与人工智能

【Spark研究】Spark编程指南(Python版)

Spark编程指南 译者说在前面:最近在学习Spark相关的知识,在网上没有找到比较详细的中文教程,只找到了官网的教程。出于自己学习同时也造福其他初学者的目的,...

1.5K5
来自专栏lzj_learn_note

阿里ARouter拦截器使用及源码解析(二)

关于ARouter基本跳转的用法以及源码解析在上篇文章阿里阿里ARouter使用及源码解析(一)已经有过分析,有不清楚的同学可以去看看。本篇文章主要是关于ARo...

2363

扫码关注云+社区

领取腾讯云代金券