专栏首页系统高可用spark streaming访问kafka出现offset越界问题处理
原创

spark streaming访问kafka出现offset越界问题处理

背景

项目中使用了spark streaming + kafka来做实时数据分析,有的时候在访问kafka时会报offset越界错误(OffsetOutOfRangeException),如下:

消费kafka offset越界错误

分析

从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到本地中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。 越界示意图

消费offset越界示意图

头部越界: 本地保存的offset在topic中仍然存在的最老message的offset之前时(local_offset < earliest_offset); 尾部越界: 本地保存的offset在topic中最新message的offset之后时(local_offset > last_offset)

是什么导致头部越界呢? 考虑到kafka broker配置中修改了message的保持时间为24小时:

log.retention.hours=24(The minimum age of a log file to be eligible for deletion)

因此,应该是kafka 中未被消费的数据被broker清除了,使得消费的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。

试验

1、改kafka broker 的retention time 为2分钟

2、修改完成后重启kafka

3、使用zk shell 命令得到解析器所保存的zk_offset

4、停止spark streaming kafka DirectStream job

5、发送数据到kafka topic,等待一段时间(超过两分钟)

6、启动streaming job,复现该异常。

通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。

解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现local_offset<earliest_offset时矫正local_offset为合法值。

自动修正offset核心代码

from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.storagelevel import StorageLevel
from kafka import SimpleClient
from kafka.common import OffsetRequestPayload

# 获取 offset
localOffsetRanges = []
if os.path.isfile('%s/%s_offset.txt' % (config.offset_store_location, groupid)):
    with open('%s/%s_offset.txt' % (config.offset_store_location, groupid), 'rb') as f:
        localOffsetRanges = pickle.load(f)
offsets = {}

cur_kafka_topic_offset_map = {}
for temp_topic in topics:
    # 获取kafka当前最小和最大的offset信息,用于跟当前消费到的offset进行对比,以便自动修复潜在的消费kafka offset头尾越界问题,避免人工干预。
    temp_offset_map = get_cur_kafka_topic_offset_map(brokers, temp_topic)
    if temp_offset_map:
        cur_kafka_topic_offset_map[temp_topic] = temp_offset_map

fix_offset_content = u""
total_fix_num = 0
alert_fix_max_num = 5
alert_fix_num = 0
for offsetRange in localOffsetRanges:
    temp_topic = offsetRange.topic
    partition_idx = offsetRange.partition
    if temp_topic in topics:
        topicPartition = TopicAndPartition(temp_topic, partition_idx)
        cur_consumer_offset = offsetRange.untilOffset
        temp_offset_map = cur_kafka_topic_offset_map.get(temp_topic)
        if temp_offset_map:
            cur_kafka_topic_offset_infos = temp_offset_map.get(partition_idx)
            if cur_kafka_topic_offset_infos:
                cur_kafka_topic_min_offset = cur_kafka_topic_offset_infos[0]
                cur_kafka_topic_max_offset = cur_kafka_topic_offset_infos[1]
                if cur_kafka_topic_min_offset > 0 and cur_consumer_offset < cur_kafka_topic_min_offset:

                    total_fix_num += 1
                    alert_fix_num += 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content += "\nconsumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                    print(
                        "cur_consumer_offset(%s)<cur_kafka_topic_min_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_min_offset

                if cur_kafka_topic_max_offset > 0 and cur_consumer_offset > cur_kafka_topic_max_offset:

                    total_fix_num += 1
                    alert_fix_num += 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content += "\nconsumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)

                    print(
                        "cur_consumer_offset(%s)>cur_kafka_topic_max_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_max_offset

        offsets[topicPartition] = cur_consumer_offset

if total_fix_num > 0:
    receivers = config.recvers.split(',')
    alarmopt = alarm_opt.AlarmOpt(receivers)
    alert_content = u"[%s][消费offset和最新offset有出入(共修正%s次)]:\n%s" % (params_name, total_fix_num, fix_offset_content)
    alarmopt.alarm(alarm_opt.WX_SMS, alert_content, u'spark告警')

def get_cur_kafka_topic_offset_map(brokers,topic):
    cur_kafka_offset_map={}
    try:
        client = SimpleClient(brokers)
        LATEST = -1
        EARLIEST = -2
        # 获取topic分区数
        partitions = client.topic_partitions[topic]
        partition_num=len(partitions.keys())
        print("partition_num=%s,topic=%s" % (partition_num,topic))
        # 获取每个分区的最小offset
        min_offset_requests = [OffsetRequestPayload(topic, p, EARLIEST, 1) for p in partitions.keys()]
        min_offsets_responses = client.send_offset_request(min_offset_requests)
        if not min_offsets_responses or len(min_offsets_responses)!=partition_num:
            print("min_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(min_offsets_responses)=%s,topic=%s" % (len(min_offsets_responses),topic))
        for r in min_offsets_responses:
            cur_kafka_offset_map[r.partition] = [r.offsets[0]]
        # 获取每个分区的最大offset
        max_offset_requests = [OffsetRequestPayload(topic, p, LATEST, 1) for p in partitions.keys()]
        max_offsets_responses = client.send_offset_request(max_offset_requests)
        if not max_offsets_responses or len(max_offsets_responses)!=partition_num:
            print("max_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(max_offsets_responses)=%s,topic=%s" % (len(max_offsets_responses),topic))
        cur_kafka_offset_str=""
        for r in max_offsets_responses:
            if cur_kafka_offset_map.has_key(r.partition):
                cur_kafka_offset_map[r.partition].append(r.offsets[0])
            else:
                cur_kafka_offset_map[r.partition] = [-1, r.offsets[0]]
            partition_info_str="[%s,%s,%s]"%(r.partition,cur_kafka_offset_map[r.partition][0],cur_kafka_offset_map[r.partition][1])
            if cur_kafka_offset_str=="":
                cur_kafka_offset_str=partition_info_str
            else:
                cur_kafka_offset_str += ",%s" % (partition_info_str)
        print("cur_kafka_offset_str=%s,topic=%s,brokers=%s" % (cur_kafka_offset_str, topic,brokers))
        return cur_kafka_offset_map
    except Exception as e:
        print("get_cur_kafka_topic_offset_map Exception: %s,topic=%s,brokers=%s"%(str(e),topic,brokers))
        return None

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

相关文章

  • Spark Streaming应用与实战全攻略

    有一块业务主要是做爬虫抓取与数据输出,通过大数据这边提供的SOA服务入库到HBase,架构大致如下:

    加米谷大数据
  • Spark Streaming应用与实战全攻略

    一、背景与架构改造 1.1 问题描述 有一块业务主要是做爬虫抓取与数据输出,通过大数据这边提供的SOA服务入库到HBase,架构大致如下: ? 架构改造之前 以...

    CSDN技术头条
  • 论Spark Streaming的数据可靠性和一致性

    摘要:Spark Streaming自发布起就得到了广泛的关注,然而作为一个年轻的项目,需要提升的地方同样很多,比如1.2之前版本driver挂掉可能会丢失数据...

    Spark学习技巧
  • sparkstreaming遇到的问题

    这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。

    soundhearer
  • Spark Streaming的优化之路——从Receiver到Direct模式

    随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架MapReduce已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析、决策。Spark S...

    个推君
  • Spark Streaming的优化之路——从Receiver到Direct模式

    随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架MapReduce已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析、决策。Spark S...

    个推
  • 关于Spark Streaming感知kafka动态分区的问题

    本文主要是讲解Spark Streaming与kafka结合的新增分区检测的问题。读本文前关于kafka与Spark Streaming结合问题请参考下面两篇文...

    Spark学习技巧
  • 【Spark】Spark Streaming的程序运行原理及与Kafka的集成

    (1)Streaming Streaming:是一种数据传送技术,它把客户机收到的数据变成一个稳定连续的流,源源不断地送出,使用户听到的声音或看到的图象十分平...

    魏晓蕾
  • Spark踩坑记:Spark Streaming+kafka应用及调优

    作者:肖力涛 前言 在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消...

    企鹅号小编
  • Spark踩坑记:Spark Streaming+kafka应用及调优

    本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka 在舆情项目中的应用,最后将自己...

    肖力涛
  • Kafka+Spark Streaming管理offset的几种方法

    场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序...

    王知无-import_bigdata
  • Kafka+Spark Streaming管理offset的几种方法

    场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序...

    大数据真好玩
  • Spark Streaming快速入门系列(7)

    一般的大型集群和平台, 都需要对其进行监控的需求。 要针对各种数据库, 包括 MySQL, HBase 等进行监控 要针对应用进行监控, 例如 Tomcat...

    刘浩的BigDataPath
  • 如何管理Spark Streaming消费Kafka的偏移量(二)

    我是攻城师
  • 如何管理Spark Streaming消费Kafka的偏移量(一)

    我是攻城师
  • 解析SparkStreaming和Kafka集成的两种方式

    spark streaming是基于微批处理的流式计算引擎,通常是利用spark core或者spark core与spark sql一起来处理数据。在企业实时...

    大数据学习与分享
  • 面试注意点 | Spark&Flink的区别拾遗

    场景描述:Flink是标准的实时处理引擎,而且Spark的两个模块Spark Streaming和Structured Streaming都是基于微批处理的,不...

    王知无-import_bigdata
  • Spark Streaming VS Flink

    本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理...

    美图数据技术团队
  • Spark Streaming Direct Approach (No Receivers) 分析

    这个算是Spark Streaming 接收数据相关的第三篇文章了。 前面两篇是:

    用户2936994

扫码关注云+社区

领取腾讯云代金券