前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark streaming访问kafka出现offset越界问题处理

spark streaming访问kafka出现offset越界问题处理

原创
作者头像
用户1307420
修改2021-02-24 19:22:39
1.2K0
修改2021-02-24 19:22:39
举报
文章被收录于专栏:系统高可用系统高可用

背景

项目中使用了spark streaming + kafka来做实时数据分析,有的时候在访问kafka时会报offset越界错误(OffsetOutOfRangeException),如下:

消费kafka offset越界错误
消费kafka offset越界错误

分析

从字面意思上,说是kafka topic的offset越界异常;在job中使用的是Kafka DirectStream,每成功处理一批数据,就把对应的offset更新到本地中;和数组越界异常一样,offset越界应该分为头越界和尾越界,如下图所示。 越界示意图

消费offset越界示意图
消费offset越界示意图

头部越界: 本地保存的offset在topic中仍然存在的最老message的offset之前时(local_offset < earliest_offset); 尾部越界: 本地保存的offset在topic中最新message的offset之后时(local_offset > last_offset)

是什么导致头部越界呢? 考虑到kafka broker配置中修改了message的保持时间为24小时:

代码语言:javascript
复制
log.retention.hours=24(The minimum age of a log file to be eligible for deletion)

因此,应该是kafka 中未被消费的数据被broker清除了,使得消费的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。

试验

1、改kafka broker 的retention time 为2分钟

2、修改完成后重启kafka

3、使用zk shell 命令得到解析器所保存的zk_offset

4、停止spark streaming kafka DirectStream job

5、发送数据到kafka topic,等待一段时间(超过两分钟)

6、启动streaming job,复现该异常。

通过异常验证可以导致异常的原因为:kafka broker因为log.retention.hours的配置,导致topic中有些数据被清除,而在retention时间范围内streaming job都没有把将要被清除的message消费掉,因此zk中offset落在了earliest_offset的左侧,引发异常。

解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.time的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现local_offset<earliest_offset时矫正local_offset为合法值。

自动修正offset核心代码

代码语言:javascript
复制
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.storagelevel import StorageLevel
from kafka import SimpleClient
from kafka.common import OffsetRequestPayload

# 获取 offset
localOffsetRanges = []
if os.path.isfile('%s/%s_offset.txt' % (config.offset_store_location, groupid)):
    with open('%s/%s_offset.txt' % (config.offset_store_location, groupid), 'rb') as f:
        localOffsetRanges = pickle.load(f)
offsets = {}

cur_kafka_topic_offset_map = {}
for temp_topic in topics:
    # 获取kafka当前最小和最大的offset信息,用于跟当前消费到的offset进行对比,以便自动修复潜在的消费kafka offset头尾越界问题,避免人工干预。
    temp_offset_map = get_cur_kafka_topic_offset_map(brokers, temp_topic)
    if temp_offset_map:
        cur_kafka_topic_offset_map[temp_topic] = temp_offset_map

fix_offset_content = u""
total_fix_num = 0
alert_fix_max_num = 5
alert_fix_num = 0
for offsetRange in localOffsetRanges:
    temp_topic = offsetRange.topic
    partition_idx = offsetRange.partition
    if temp_topic in topics:
        topicPartition = TopicAndPartition(temp_topic, partition_idx)
        cur_consumer_offset = offsetRange.untilOffset
        temp_offset_map = cur_kafka_topic_offset_map.get(temp_topic)
        if temp_offset_map:
            cur_kafka_topic_offset_infos = temp_offset_map.get(partition_idx)
            if cur_kafka_topic_offset_infos:
                cur_kafka_topic_min_offset = cur_kafka_topic_offset_infos[0]
                cur_kafka_topic_max_offset = cur_kafka_topic_offset_infos[1]
                if cur_kafka_topic_min_offset > 0 and cur_consumer_offset < cur_kafka_topic_min_offset:

                    total_fix_num += 1
                    alert_fix_num += 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content += "\nconsumer_offset(%s)<min_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic)
                    print(
                        "cur_consumer_offset(%s)<cur_kafka_topic_min_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_min_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_min_offset

                if cur_kafka_topic_max_offset > 0 and cur_consumer_offset > cur_kafka_topic_max_offset:

                    total_fix_num += 1
                    alert_fix_num += 1
                    if alert_fix_num <= alert_fix_max_num:
                        if fix_offset_content == "":
                            fix_offset_content = "consumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)
                        else:
                            fix_offset_content += "\nconsumer_offset(%s)>max_offset(%s),need fix,partition=%s,topic=%s" % (
                                cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic)

                    print(
                        "cur_consumer_offset(%s)>cur_kafka_topic_max_offset(%s),need fix,partition=%s,topic=%s,brokers=%s" % (
                            cur_consumer_offset, cur_kafka_topic_max_offset, partition_idx, temp_topic, brokers))
                    cur_consumer_offset = cur_kafka_topic_max_offset

        offsets[topicPartition] = cur_consumer_offset

if total_fix_num > 0:
    receivers = config.recvers.split(',')
    alarmopt = alarm_opt.AlarmOpt(receivers)
    alert_content = u"[%s][消费offset和最新offset有出入(共修正%s次)]:\n%s" % (params_name, total_fix_num, fix_offset_content)
    alarmopt.alarm(alarm_opt.WX_SMS, alert_content, u'spark告警')

代码语言:javascript
复制
def get_cur_kafka_topic_offset_map(brokers,topic):
    cur_kafka_offset_map={}
    try:
        client = SimpleClient(brokers)
        LATEST = -1
        EARLIEST = -2
        # 获取topic分区数
        partitions = client.topic_partitions[topic]
        partition_num=len(partitions.keys())
        print("partition_num=%s,topic=%s" % (partition_num,topic))
        # 获取每个分区的最小offset
        min_offset_requests = [OffsetRequestPayload(topic, p, EARLIEST, 1) for p in partitions.keys()]
        min_offsets_responses = client.send_offset_request(min_offset_requests)
        if not min_offsets_responses or len(min_offsets_responses)!=partition_num:
            print("min_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(min_offsets_responses)=%s,topic=%s" % (len(min_offsets_responses),topic))
        for r in min_offsets_responses:
            cur_kafka_offset_map[r.partition] = [r.offsets[0]]
        # 获取每个分区的最大offset
        max_offset_requests = [OffsetRequestPayload(topic, p, LATEST, 1) for p in partitions.keys()]
        max_offsets_responses = client.send_offset_request(max_offset_requests)
        if not max_offsets_responses or len(max_offsets_responses)!=partition_num:
            print("max_offsets_responses is illegal,topic=%s,brokers=%s" % (topic,brokers))
            return None
        print("len(max_offsets_responses)=%s,topic=%s" % (len(max_offsets_responses),topic))
        cur_kafka_offset_str=""
        for r in max_offsets_responses:
            if cur_kafka_offset_map.has_key(r.partition):
                cur_kafka_offset_map[r.partition].append(r.offsets[0])
            else:
                cur_kafka_offset_map[r.partition] = [-1, r.offsets[0]]
            partition_info_str="[%s,%s,%s]"%(r.partition,cur_kafka_offset_map[r.partition][0],cur_kafka_offset_map[r.partition][1])
            if cur_kafka_offset_str=="":
                cur_kafka_offset_str=partition_info_str
            else:
                cur_kafka_offset_str += ",%s" % (partition_info_str)
        print("cur_kafka_offset_str=%s,topic=%s,brokers=%s" % (cur_kafka_offset_str, topic,brokers))
        return cur_kafka_offset_map
    except Exception as e:
        print("get_cur_kafka_topic_offset_map Exception: %s,topic=%s,brokers=%s"%(str(e),topic,brokers))
        return None

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 分析
  • 试验
  • 解决方法
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档