前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >sparkstreaming遇到的问题

sparkstreaming遇到的问题

作者头像
soundhearer
发布2020-12-18 14:29:07
1.4K0
发布2020-12-18 14:29:07
举报
文章被收录于专栏:数据湖数据湖

这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。

sparkstreaming offset存储

sparkstreaming采用kafkaUtils的createDirectStream()处理kafka数据的方式,会直接从kafka的broker的分区中读取数据,跳过zookeeper,并且没有receiver,是spark的task直接对接kafka topic partition。

由于这种方式没有经过ZK,topic的offset没有保存,当job重启后只能从最新的offset开始消费数据,造成重启过程中的消息丢失。

如果spark自动提交,会在sparkstreaming刚运行时就立马提交offset,如果这个时候Spark streaming消费信息失败了,那么offset也就错误提交了。

所以要在sparkstreaming中实现exactly-once恰好一次,必须

1.手动提交偏移量

2.处理完业务数据后再提交offset

手动维护偏移量 需设置kafka参数enable.auto.commit改为false

手动维护提交offset有两种选择:

1.处理完业务数据后手动提交到Kafka

2.处理完业务数据后手动提交到本地库 如MySql、HBase

也可以将offset提交到zookeeper,但是经过我们测试,发现zookeeper不适合存储大量数据,在大数据量的情况下很容易崩溃。

我们来看下如何将offset存储到mysql中:

代码语言:javascript
复制
/ 处理完 业务逻辑后,手动提交offset偏移量到本地Mysql中
stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })

HBase中也是类似的

代码语言:javascript
复制
inputDStream.foreachRDD((rdd, batchTime) => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      val newRDD = rdd.map(message => processMessage(message))
      newRDD.count()
      //save the offsets to HBase  批量处理把数据存储到Hbase当中
      saveOffsets(topic, consumerGroupID, offsetRanges, hbaseTableName, batchTime)
    })
    ssc
  }


  /**
    * 对数据进行处理
    * @param message
    * @return
    */
  def processMessage(message: ConsumerRecord[String, String]): ConsumerRecord[String, String] = {
    message
  }


  /*
Save Offsets into HBase
 */
  def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange], hbaseTableName: String,
                  batchTime: org.apache.spark.streaming.Time) = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.addResource("src/main/resources/hbase-site.xml")
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds)
    val put = new Put(rowKey.getBytes)
    for (offset <- offsetRanges) {
      put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString),
        Bytes.toBytes(offset.untilOffset.toString))
    }
    table.put(put)
    conn.close()
  }
  /*
Returns last committed offsets for all the partitions of a given topic from HBase in following cases.
  - CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions from
    Zookeeper and for each partition returns the last committed offset as 0
  - CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Last
    committed offsets for each topic-partition is returned as is from HBase.
  - CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, last
    committed offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,
    function returns last committed offsets as 0
 */
  def getLastCommittedOffsets(TOPIC_NAME: String, GROUP_ID: String, hbaseTableName: String, zkQuorum: String,
                              zkRootDir: String, sessionTimeout: Int, connectionTimeOut: Int): Map[TopicPartition, Long] = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.addResource("src/main/resources/hbase-site.xml")
    val zkUrl = zkQuorum + "/" + zkRootDir
    val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeOut)
    val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
    val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size
    //Connect to HBase to retrieve last committed offsets
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
    val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
    val scan = new Scan()
    val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
    val result = scanner.next()
    var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
    if (result != null) {
      //If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells
      hbaseNumberOfPartitionsForTopic = result.listCells().size()
    }
    val fromOffsets = collection.mutable.Map[TopicPartition, Long]()
    if (hbaseNumberOfPartitionsForTopic == 0) {
      // initialize fromOffsets to beginning
      for (partition <- 0 to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else if (zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic) {
      // handle scenario where new partitions have been added to existing kafka topic
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
      for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else {
      //initialize fromOffsets from last run
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
    }
    scanner.close()
    conn.close()
    fromOffsets.toMap
  }
}

第一个问题 numRecords must not be negative

当*删除已经使用过的kafka topic,然后新建同名topic*,用spark streaming Kakfa createDirectStream方法时出现了"numRecords must not be negative"异常,这个异常是不合法的参数异常,RDD的记录数目必须不能是负数

异常分析

首先我们看异常打印出现问题的位置

org.apache.spark.streaming.scheduler.StreamInputInfo.InputInfoTracker的第38行

代码语言:javascript
复制
/**
 * :: DeveloperApi ::
 * Track the information of input stream at specified batch time.
 *
 * @param inputStreamId the input stream id
 * @param numRecords the number of records in a batch
 * @param metadata metadata for this batch. It should contain at least one standard field named
 *                 "Description" which maps to the content that will be shown in the UI.
 */
@DeveloperApi
case class StreamInputInfo(
    inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) {
  require(numRecords >= 0, "numRecords must not be negative")

  def metadataDescription: Option[String] =
    metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
}

代码38行,判断了numRecords是否大于等于0,当不满足条件时抛出异常,可判断此时numRecords<0。 numRecords的解释: numRecords: the number of records in a batch 应该是当前rdd中records 数目计算出了问题。

offsetRanges的计算逻辑

offsetRanges的定义

代码语言:javascript
复制
offsetRanges: offset ranges that define the Kafka data belonging to this RDD

在KafkaRDDPartition 40行找到kafka partition offsetRange的计算逻辑:

代码语言:javascript
复制
def count(): Long = untilOffset - fromOffset`
`fromOffset: per-topic/partition Kafka offset defining the (inclusive) starting point of the batch`
`untilOffset: per-topic/partition Kafka offset defining the (inclusive) ending point of the batch

fromOffset来自zk中保存; untilOffset通过DirectKafkaInputDStream第145行:

代码语言:javascript
复制
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))

计算得到,计算过程得到最新的offset,然后使用spark.streaming.kafka.maxRatePerPartition做clamp,得到允许的最大untilOffsets,##而此时新建的topic,如果topic中没有数据,untilOffsets应该为0##

原因总结

当删除一个topic时,zk中的offset信息并没有被清除,因此KafkaDirectStreaming再次启动时仍会得到旧的topic offset为old_offset,作为fromOffset。 当新建了topic后,使用untiloffset计算逻辑,得到untilOffset为0(如果topic已有数据则>0); 再次被启动的KafkaDirectStreaming Job通过异常的计算逻辑得到的rdd numRecords值为可计算为: numRecords = untilOffset - fromOffset(old_offset)\ 当untilOffset < old_offset时,此异常会出现,对于新建的topic这种情况的可能性很大

解决方法

根据以上分析,可在确定KafkaDirectStreaming 的fromOffsets时判断fromOffset与untiloffset的大小关系,当untilOffset < fromOffset时,矫正fromOffset为offset初始值0。

•从zk获取topic/partition 的fromOffset•利用SimpleConsumer获取每个partiton的lastOffset(untilOffset )•判断每个partition lastOffset与fromOffset的关系•当lastOffset < fromOffset时,将fromOffset赋值为0 通过以上步骤完成fromOffset的值矫正。

矫正offset的核心代码如下:

代码语言:javascript
复制
 /** 以下 矫正 offset */
    // 得到Topic/partition 的lastOffsets
    Map&lt;TopicAndPartition, Long&gt; topicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
            topicList, "my.group.id");

    // 遍历每个Topic.partition
    for (Map.Entry&lt;TopicAndPartition, Long&gt; topicAndPartitionLongEntry : fromOffsets.entrySet()) {
      // fromOffset &gt; lastOffset时
      if (topicAndPartitionLongEntry.getValue() &gt;
          topicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey())) {
         //矫正fromoffset为offset初始值0
        topicAndPartitionLongEntry.setValue(0L);
      }
    }
    /** 以上 矫正 offset */

第二个问题 Offsets out of range

Kafka DirectStream 读取topic中数据做测试,停止了一段时间,再次启动时出现了kafka.common.OffsetOutOfRangeException

异常如下:

代码语言:javascript
复制
0/12/16 11:08:33 WARN TaskSetManager: Lost task 2.0 in stage 105.0 (TID 85, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {register_topic-5=23550}

异常分析

offset越界分为头越界和尾越界,头部越界是zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);尾越界是zookeeper中保存的offset在topic中最新message的offset之后(zk_offset > last_offset),我们在前面遇到了这个问题,并做了处理,因此这个问题应该是头部越界导致。

经过分析,我们有一段时间没有消费topic中的数据了,大概已经过了七天,而kafka broker中我们设置的log保存时间为七天

因此,应该是kafka 中未被消费的数据被broker清除了,使得从zookeeper中读取到的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。

解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.ms的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现zk_offset时矫正zk_offset为合法值

矫正offset的核心的代码如下:

代码语言:javascript
复制
/** 以下 矫正 offset */

    // lastest offsets
    Map&lt;TopicAndPartition, Long&gt; lastestTopicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
            Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));

    // earliest offsets
    Map&lt;TopicAndPartition, Long&gt; earliestTopicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getEarliestOffset(kafkaParams.get("metadata.broker.list"),
            Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));


    for (Map.Entry&lt;TopicAndPartition, Long&gt; topicAndPartitionLongEntry : fromOffsets.entrySet()) {

      long zkOffset = topicAndPartitionLongEntry.getValue();
      long lastestOffset = lastestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
      long earliestOffset = earliestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
      // zkoffset 不在可用message offset区间内
      if (zkOffset &gt; lastestOffset || zkOffset &lt; earliestOffset) {
        // set offset = earliestOffset
        logger.warn("矫正offset: " + zkOffset +" -&gt; "+ earliestOffset);
        topicAndPartitionLongEntry.setValue(earliestOffset);
      }
    }
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据湖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • sparkstreaming offset存储
  • 第一个问题 numRecords must not be negative
    • 异常分析
      • offsetRanges的计算逻辑
        • 原因总结
          • 解决方法
          • 第二个问题 Offsets out of range
            • 异常分析
              • 解决方法
              相关产品与服务
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档