前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zabbix监控之从Kafka中获取消费进度和lag

Zabbix监控之从Kafka中获取消费进度和lag

作者头像
我是李超人
发布2020-08-21 10:34:10
1.6K0
发布2020-08-21 10:34:10
举报

在0.9及之后的版本,kafka自身提供了存放消费进度的功能。本文讲解的是如何从kafka自身获取消费进度。从zookeeper中获取消费进度请阅读我的另一片文章传送门

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka 这是官网上的教程,提供了scala版本的获取消费状态和提交消费状态的代码。仅供参考。

http://pykafka.readthedocs.io/en/latest/api/broker.html 这是pykafka官网提供获取消费状态的API,试过不知道怎么用,网上也找不到相关代码。

http://kafka-python.readthedocs.io/en/latest/usage.html 这是python-kafka官网,找不到想要的API,没试过。

获取消费进度之前,一定要先弄明白kafka的存储结构以及消费进度是存放在zookeeper中还是kafka中,否则可能会发现到头来,自己都不知道自己在干什么。以上几种方式我都试过,但是都没成功,最后选择命令行的方式获取到消费状态,将消费状态写入文件中,再解析文件。 Kafka管理工具 https://www.iteblog.com/archives/1605.html http://orchome.com/454

使用指令可以获取该组下每个consumer的消费进度

代码语言:javascript
复制
/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.12.11.131:9092 --group kafkaTestGroup --describe

然后再将其中的数据取出来,echo到文件中,可以使用crontab来执行指令,定时更新文件。

代码语言:javascript
复制
/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.1.8.74:9092 --group datasync.server.10.1.2.118 --describe |grep datasync.server.10.1.2.118 | awk '{print $1,$2,$3,$4,$5,$6,$7}'

将消费状态存放在kafka.log文件中,再解析文件,我这里监控阀值设置为1000,将lag值大于1000的数据取出来并输出。下面是解析文件的python脚本。

代码语言:javascript
复制
#!/usr/bin/env python
#coding=utf-8

import os.path
import time
import pdb
from fileStatus import File


if __name__=="__main__":
    filePath='/data/python-scripts/inspector/AccountInspector/otherInspector/kafka.log'
    f=open(filePath,"r")
    columnNameList=['GROUP','TOPIC','PARTITION','CURRENT-OFFSET','LOG-END-OFFSET','LAG','OWNER']
    result='no topicPartition lag is over allowedRange'
    resultDic={}
    overAllowedLagDic={}
    for line in f:
        #使用命令行处理时有时会得到Consumer group is not exists 或者Consumer group is rebalancing等不正常的结果,这种数据忽略不处理
        if 'Consumer group' not in line: 
            line=line.strip('\n')
            lineSplit=line.split(' ')
            dicKey=lineSplit[0]+'_'+lineSplit[1]+'_'+lineSplit[2]
            dicValue={}
            for i in range(0,len(lineSplit),1):
                dicValue[columnNameList[i]]=lineSplit[i]
            #由于我设置的阀值时lag值为1000时就告警,此处LOG-END-OFFSET就是logsize,当logsize小于1000时可以忽略(因为lag总是小于logsize的)
            if dicValue['LOG-END-OFFSET']<='1000':
                dicValue['CURRENT-OFFSET']='0'
                dicValue['LAG']='0'
            resultDic[dicKey]=dicValue
            #使用命令行有缺陷,经常会出现取出来的值为unknown的情况,出现这种情况也当作告警处理
            if dicValue['LAG'] == 'unknown':
                overAllowedLagDic[dicKey]=dicValue
            else:
                if int(dicValue['LAG'])>1000:
                    overAllowedLagDic[dicKey]=dicValue
    if len(overAllowedLagDic)>0:
        result=''
        for key in overAllowedLagDic:
            dicValue=overAllowedLagDic[key]
            lag=dicValue['LAG']
            result=key+':'+lag+'; '+result
    print result

方式很low,而且还有漏洞,后面有时间研究下使用API的方式获取消费进度。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-07-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档