专栏首页大数据成神之路Kafka+Spark Streaming管理offset的几种方法

Kafka+Spark Streaming管理offset的几种方法

By 大数据技术与架构

场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序崩溃的异常情况,我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。只有管理好offset,才能使整个流式系统最大限度地接近exactly once语义。

关键词:offset Spark Streaming

Kafka+Spark Streaming主要用于实时流处理。到目前为止,在大数据领域中是一种非常常见的架构。Kafka在其中主要起着一个缓冲的作用,所有的实时数据都会经过kafka。所以对kafka offset的管理是其中至关重要的一环。

我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。

一但管理不善,就会到导致数据丢失或重复消费。

offset的管理方式

一个简单的流程如下:

  • 在Kafka DirectStream初始化时,取得当前所有partition的存量offset,以让DirectStream能够从正确的位置开始读取数据。
  • 读取消息数据,处理并存储结果。
  • 提交offset,并将其持久化在可靠的外部存储中。 图中的“process and store results”及“commit offsets”两项,都可以施加更强的限制,比如存储结果时保证幂等性,或者提交offset时采用原子操作。

保存offset的方式

Checkpoint:

Spark Streaming的checkpoints是最基本的存储状态信息的方式,一般是保存在HDFS中。但是最大的问题是如果streaming程序升级的话,checkpoints的数据无法使用,所以几乎没人使用。

offset的三种管理方式:

自动提交offset:

  • enable.auto.commit=true。 一但consumer挂掉,就会导致数据丢失或重复消费。 offset不可控。

Kafka自身的offset管理:

  • (属于At-least-once语义,如果做好了幂等性,可以使用这种方式): 在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。 Spark Streaming也专门提供了commitAsync() API用于提交offset。 需要将参数修改为enable.auto.commit=false。 在我实际测试中发现,这种offset的管理方式,不会丢失数据,但会出现重复消费。 停掉streaming应用程序再次启动后,会再次消费停掉前最后的一个批次数据,应该是由于offset是异步提交的方式导致,offset更新不及时引起的。 因此需要做好数据的幂等性。 (修改源码将异步改为同步,应该是可以做到Exactly-once语义的)

自定义offset:

  • (推荐,采用这种方式,可以做到At-least-once语义): 可以将offset存放在第三方储中,包括RDBMS、Redis、ZK、ES等。 若消费数据存储在带事务的组件上,则强烈推荐将offset存储在一起,借助事务实现 Exactly-once 语义。

示例

Kafka自身管理offset:

在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。所以我们读写offset的对象正是这个topic,Spark Streaming也专门提供了commitAsync() API用于提交offset。实际上,一切都已经封装好了,直接调用相关API即可。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 确保结果都已经正确且幂等地输出了
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

ZooKeeper

在Spark Streaming连接Kafka应用中使用Zookeeper来存储offsets也是一种比较可靠的方式。

在这个方案中,Spark Streaming任务在启动时会去Zookeeper中读取每个分区的offsets。如果有新的分区出现,那么他的offset将会设置在最开始的位置。在每批数据处理完之后,用户需要可以选择存储已处理数据的一个offset或者最后一个offset。此外,新消费者将使用跟旧的Kafka 消费者API一样的格式将offset保存在ZooKeeper中。因此,任何追踪或监控Zookeeper中Kafka Offset的工具仍然生效的。

一个典型的工具类:

class ZkKafkaOffsetManager(zkUrl: String) {
    private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager])

    private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000);
    private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false)

    def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
        val offsets = mutable.HashMap.empty[TopicPartition, Long]
        val partitionsForTopics = zkUtils.getPartitionsForTopics(topics)

        // /consumers/<groupId>/offsets/<topic>/<partition>
        partitionsForTopics.foreach(partitions => {
            val topic = partitions._1
            val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic)

            partitions._2.foreach(partition => {
                val path = groupTopicDirs.consumerOffsetDir + "/" + partition
                try {
                    val data = zkUtils.readData(path)
                    if (data != null) {
                        offsets.put(new TopicPartition(topic, partition), data._1.toLong)
                        logger.info(
                            "Read offset - topic={}, partition={}, offset={}, path={}",
                            Seq[AnyRef](topic, partition.toString, data._1, path)
                        )
                    }
                } catch {
                    case ex: Exception =>
                        offsets.put(new TopicPartition(topic, partition), 0L)
                        logger.info(
                            "Read offset - not exist: {}, topic={}, partition={}, path={}",
                            Seq[AnyRef](ex.getMessage, topic, partition.toString, path)
                        )
                }
            })
        })

        offsets.toMap
    }

    def saveOffsets(offsetRanges: Seq[OffsetRange], groupId: String): Unit = {
        offsetRanges.foreach(range => {
            val groupTopicDirs = new ZKGroupTopicDirs(groupId, range.topic)
            val path = groupTopicDirs.consumerOffsetDir + "/" + range.partition
            zkUtils.updatePersistentPath(path, range.untilOffset.toString)
            logger.info(
                "Save offset - topic={}, partition={}, offset={}, path={}",
                Seq[AnyRef](range.topic, range.partition.toString, range.untilOffset.toString, path)
            )
        })
    }
}

这样,offset就会被存储在ZK的/consumers/[groupId]/offsets/[topic]/[partition]路径下。当初始化DirectStream时,调用readOffsets()方法获得offset。当数据处理完成后,调用saveOffsets()方法来更新ZK中的值。

其他介质

Hbase、Redis甚至Mysql也经常被用作进行offset的存储。方式和上面类似,代码可以去网上搜一搜。

需要注意的点

特别需要注意,在转换过程中不能破坏RDD分区与Kafka分区之间的映射关系。亦即像map()/mapPartitions()这样的算子是安全的,而会引起shuffle或者repartition的算子,如reduceByKey()/join()/coalesce()等等都是不安全的。

对Dstream进行窗口操作后就不能手动提交offset。

因为保存offset需要HasOffsetRanges这个类。而HasOffsetRangesKafkaRDD的一个trait,而CanCommitOffsetsDirectKafkaInputDStream的一个trait。

Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大。

与接口不同的是,它还可以定义属性和方法的实现。

如下:

private[spark] class KafkaRDD[K, V](
    sc: SparkContext,
    val kafkaParams: ju.Map[String, Object],
    val offsetRanges: Array[OffsetRange],
    val preferredHosts: ju.Map[TopicPartition, String],
    useConsumerCache: Boolean
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges

private[spark] class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {

不能对stream对象做transformation操作之后的结果进行强制转换(会直接报ClassCastException),因为RDD与DStream的类型都改变了。只有RDD或DStream的包含类型为ConsumerRecord才行。

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java新的时间API究竟怎么用

    Java新时间API中有三个特别重要的类,分别是Instant、LocalDateTime、ZonedDateTime,它们分别对应到上一篇文章中讲到的时间概念...

    wangyuntao
  • 通俗讲解 RESTful

    百度一下 RESTful,查到的资料很多都讲得不清楚,看完了都不知道说的是啥,导致很多人对 RESTful 不甚了解。来看一下常见的解释:

    丹枫无迹
  • 如何快速部署容器化应用

    摘要:容器化推行的过程中,研发、运维学习及使用成本都非常高,那有没有一款简单易用的平台呢?本文介绍基于Kubernetes的应用管理平台-开普勒云平台。

    宜信技术学院
  • JAVA中几种常用JSON库性能比较

    本篇通过JMH来测试一下Java中几种常见的JSON解析库的性能。每次都在网上看到别人说什么某某库性能是如何如何的好,碾压其他的库。但是百闻不如一见,只有自己亲...

    Java技术江湖
  • 中英翻译(基于百度翻译)

    市面上有名气的翻译公司就是有道和百度了,有道尝试了一下,分为API和SDK两种,但是demo下载下来跑不了

    用户3112896
  • Java虚拟机

    1.常说的JDK包含了Java语言、Java虚拟机和Java API类库这三部分,是Java程序开发的最小环境

    用户3112896
  • 产品动态 | 地点云发布、小程序插件更新、JS API GL优化、SDK升级

    【产品动态·导读】 地点云全新发布 - 用自己的数据,建自己的地图 微信小程序插件 - 新增地图选点插件 JavaScript API GL连发3版 - 常用...

    腾讯位置服务
  • 在几分钟内构建强大的可用于生产的深度学习视觉模型

    得益于更快的计算,更好的存储和易于使用的软件,基于深度学习的解决方案绝对可以看到从概念验证隧道进入现实世界的曙光!看到深度学习模型已广泛应用于该行业的各个领域,...

    代码医生工作室
  • Harbor企业级实践丨零侵入改造!

    ? 本文作者 / 阿杜 玩Docker,玩K8s,玩Harbor 爱技术,爱运动,爱生活 “K8s&云原生技术开放日”特邀讲师 在上一篇中,我们分享了Harb...

    腾讯云TStack
  • Selenium Webdriver 3.X源码分析之移动触摸动作touch_actions.py

    在selenium webdriver Python代码提供了完整的移动设备端触摸能力的支持,其代码定义实现在如下路径:

    苦叶子

扫码关注云+社区

领取腾讯云代金券