前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Structured Streaming 源码剖析(一)- Source

Structured Streaming 源码剖析(一)- Source

作者头像
codingforfun
发布2019-03-04 10:45:12
1K0
发布2019-03-04 10:45:12
举报

一、trait Source

Source 必须不断地到达数据以进行流式查询。 Source 必须具有单调递增的进度概念,用 offset 表示。 Spark 将定期查询每个 Source 以查看是否有更多数据可用

代码语言:javascript
复制
// 返回此 Source 的数据的 schema
def schema: StructType

// 返回此 Source 的最大可用 offset
// 如果此 Source 从未接收过任何数据,则返回 None
def getOffset: Option[Offset]


// 返回 (start,end] 偏移量之间的数据。当 start 为 None 时,批处理应以第一个记录开头。此方法必须始终为特定的 start 和 end 对返回相同的数据; 即使在另一个节点上重新启动 Source 之后也是如此。
// 更上层总是调用此方法,其值 start 大于或等于传递给 commit 的最后一个值,而 end 值小于或等于 getOffset 返回的最后一个值
// 当从日志中获取数据时,offset 的类型可能是 SerializedOffset。此外,StreamExecution 仅比较 Offset JSON 以确定两个对象是否相等。修改 Offset JSON 格式时可能会产生冲突,在这种情况下,Source应该返回一个空的DataFrame
def getBatch(start: Option[Offset], end: Offset): DataFrame

// 通知 Source 已完成处理到 end 偏移量的所有数据,并且将来只会请求大于 end 的偏移量
def commit(end: Offset) : Unit

// 停止此 Source 并释放它已分配的所有资源
def stop(): Unit

1.1、Offset

代码语言:javascript
复制
// Offset 的 JSON 序列化表示,用于将偏移量保存到 offsetLog
// 注意:我们假设 等效/相等 offset 序列化为相同的 JSON 字符串
public abstract String json();

@Override
public boolean equals(Object obj) {
    if (obj instanceof Offset) {
        return this.json().equals(((Offset) obj).json());
    } else {
        return false;
    }
}

1.2、SerializedOffset

代码语言:javascript
复制
case class SerializedOffset(override val json: String) extends Offset

用于从外部存储加载 JSON 序列化偏移时使用。 目前不会将 JSON 序列化数据转换为特定的 offset 对象。Source 应在其对应的 Offset 伴生 object 中定义工厂方法,该对象接受 SerializedOffset 进行转换。

比如,object KafkaSourceOffsetdef apply(offset: SerializedOffset): KafkaSourceOffset 方法将从 hdfs 文件上读取并转化为 KafkaSourceOffset。

二、KafkaSource(extends Source)

使用以下设计从 Kafka 读取数据的 Source

  • KafkaSourceOffset 是为此 Source 定义的自定义偏移量,其包含 TopicPartition 到 offset 的映射。

KafkaSource 主要流程如下:

  1. 创建 Source 后,预配置的 KafkaOffsetReader 将用于查询此 Source 应开始读取的初始 offset。这用于创建 first batch。
  2. getOffset() 使用 KafkaOffsetReader 查询最新的可用 offset,以 KafkaSourceOffset 类型返回
  3. getBatch() 返回一个 DF,它从 start offset 读取,直到每个分区的 end offset 为止。排除 end offset,以与 KafkaConsumer.position()的语义一致
  4. 返回的 DF 基于 KafkaSourceRDD

删除 topic 时无法保证不丢失数据。如果丢失零数据至关重要,则用户必须确保在删除 topic 时已处理 topic 中的所有消息

2.1、KafkaSource#schema

代码语言:javascript
复制
def kafkaSchema: StructType = StructType(Seq(
    StructField("key", BinaryType),
    StructField("value", BinaryType),
    StructField("topic", StringType),
    StructField("partition", IntegerType),
    StructField("offset", LongType),
    StructField("timestamp", TimestampType),
    StructField("timestampType", IntegerType)
  ))

2.2、KafkaSource#getOffset

若有新增的 kafka partitonis,getOffset 中也会返回要 fetch 新增的 partitions 的数据到哪个 end offset

返回该 Source 可用的最大的 offset(会考虑到设置的单个 batch 允许消费的最大 offset 数),kafka 的 offset 类型如下:

代码语言:javascript
复制
// 包含各个 topic partition 对应的 offset
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
  override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}

KafkaSource#getOffset 实现如下:

代码语言:javascript
复制
override def getOffset: Option[Offset] = {
    // 获取 init offsets
    initialPartitionOffsets
    
    // 获取 lastest offsets
    val latest = kafkaReader.fetchLatestOffsets()
    // maxOffsetsPerTrigger 为每次 trigger 拉取的 offset 数
    val offsets = maxOffsetsPerTrigger match {
      case None =>
        // 若为指定,则拉取到 lastest
        latest
      // currentPartitionOffsets 上一次消费到的 offsets
      case Some(limit) if currentPartitionOffsets.isEmpty =>
        rateLimit(limit, initialPartitionOffsets, latest)
      case Some(limit) =>
        rateLimit(limit, currentPartitionOffsets.get, latest)
    }

    currentPartitionOffsets = Some(offsets)
    logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
    Some(KafkaSourceOffset(offsets))
  }

核心逻辑:

  1. 获取 init offsets
  2. 获取 latest offsets
  3. 根据 maxOffsetsPerTrigger 以及上次消费到的 offsets,来确定本次消费的 end offsets:
    • 若 maxOffsetsPerTrigger 为 None => end offsets 为 latest offsets
    • 若 maxOffsetsPerTrigger 为 limit 且上次消费到 currentPartitionOffsets => rateLimit(limit, initialPartitionOffsets, latest)
    • 若 maxOffsetsPerTrigger 为 limit 且本次为第一次消费 => rateLimit(limit, currentPartitionOffsets.get, latest)
2.2.1、获取 init offsets

KafkaSource#initialPartitionOffsets 如下:

代码语言:javascript
复制
private lazy val initialPartitionOffsets = {
  val metadataLog =
    // metadataPath 为该 KafkaSource 对应的 meta 持久化到 hdfs 的地址
    new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
      // 将 KafkaSourceOffset 写到 hdfs 
      override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
        out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
        val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
        writer.write("v" + VERSION + "\n")
        writer.write(metadata.json)
        writer.flush
      }

      // 从 hdfs 中读出 KafkaSourceOffset
      override def deserialize(in: InputStream): KafkaSourceOffset = {
        in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
        val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
        // HDFSMetadataLog guarantees that it never creates a partial file.
        assert(content.length != 0)
        if (content(0) == 'v') {
          val indexOfNewLine = content.indexOf("\n")
          if (indexOfNewLine > 0) {
            val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
            KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
          } else {
            throw new IllegalStateException(
              s"Log file was malformed: failed to detect the log file version line.")
          }
        } else {
          // The log was generated by Spark 2.1.0
          KafkaSourceOffset(SerializedOffset(content))
        }
      }
    }

  // EarliestOffsetRangeLimit: 希望返回最小的 offset
  // LatestOffsetRangeLimit: 希望返回最大的 offset
  // SpecificOffsetRangeLimit: 希望返回指定的 offset
  metadataLog.get(0).getOrElse {
    val offsets = startingOffsets match {
      case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
      case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
      case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
    }
    metadataLog.add(0, offsets)
    logInfo(s"Initial offsets: $offsets")
    offsets
  }.partitionToOffsets
}

核心逻辑即:

  1. 定义一个用于读写 meta 持久化在 hdfs 上文件的 metadataLog(持久化文件路径在 KafkaSource 构造函数中传入)
  2. 读取持久化 meta 文件:
    • 若存在,则以读取到的 offsets 为 init offsets
    • 若不存在,则根据 KafaSource 构造函数中的 startingOffsets 类型来决定使用最小最大还是指定的 offsets 作为 init offsets
2.2.2、rateLimit
代码语言:javascript
复制
private def rateLimit(
      limit: Long,
      from: Map[TopicPartition, Long],
      until: Map[TopicPartition, Long]): Map[TopicPartition, Long]

根据 from 到 until 每个 topic partition 的 offset diff 来计算每个 topic partition 应该占用 limit 的比例,算出比例后,结合 from 就能得到 end offsets 了

2.3、KafkaSource#getBatch

返回 [start.get.partitionToOffsets, end.partitionToOffsets) 之间的数据

代码语言:javascript
复制
def getBatch(start: Option[Offset], end: Offset): DataFrame

核心流程如下:

上面的流程图中,以下几个点需要额外关注:

  • 对于可能的数据丢失,是否需要抛异常来中止,如:新增的 partitions 被删除,新增的 partitions 的起始 offsets 不为 0

2.4、KafkaSource#commit

do nothing

2.4、KafkaSource#stop

代码语言:javascript
复制
override def stop(): Unit = synchronized {
  kafkaReader.close()
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.02.11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、trait Source
    • 1.1、Offset
      • 1.2、SerializedOffset
      • 二、KafkaSource(extends Source)
        • 2.1、KafkaSource#schema
          • 2.2、KafkaSource#getOffset
            • 2.2.1、获取 init offsets
            • 2.2.2、rateLimit
          • 2.3、KafkaSource#getBatch
            • 2.4、KafkaSource#commit
              • 2.4、KafkaSource#stop
              相关产品与服务
              批量计算
              批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档