首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka Eagle 源码解读

1.概述

在《Kafka 消息监控 - Kafka Eagle》一文中,简单的介绍了 Kafka Eagle这款监控工具的作用,截图预览,以及使用详情。今天笔者通过其源码来解读实现细节。目前该项目已托管于 Github 之上,作者编写了使用手册,告知使用者如何安装,部署,启动该系统。但对于实现的细节并未在参考手册中详细指出。这里,笔者通过本篇博文,来详细解读其实现细节。相关资料文献地址如下所示:

Kafka Eagle 源码地址:https://github.com/smartloli/kafka-eagle

Kafka Eagle 参考手册:https://ke.smartloli.org/

Kafka Eagle 安装包:http://download.smartloli.org/

2.内容

截止到版本 Kafka Eagle v1.1.1 支持监控0.8.2.x(存放消费信息于Zookeeper)以及 0.10.x(存放消费信息于Kafka的topic中)。对于前者,从Zookeeper中获取消息信息,难度不大,编写Zookeeper客户端实现代码即可,该版本在Zookeeper下的存储结构树如下图所示:

对于实现细节,可使用ZkUtils工具类来获取相关数据,以获取消费信息为例,代码如下所示:

/**Obtaining kafka consumer information from zookeeper.*/publicMap>getConsumers(String clusterAlias) { ZkClient zkc=zkPool.getZkClient(clusterAlias); Map> consumers =newHashMap>();try{ Seq subConsumerPaths =ZkUtils.getChildren(zkc, CONSUMERS_PATH); List groups =JavaConversions.seqAsJavaList(subConsumerPaths);for(String group : groups) { String path= CONSUMERS_PATH + "/" + group + "/owners";if(ZkUtils.pathExists(zkc, path)) { Seq owners =ZkUtils.getChildren(zkc, path); List ownersSerialize =JavaConversions.seqAsJavaList(owners); consumers.put(group, ownersSerialize); }else{ LOG.error("Consumer Path[" + path + "] is not exist."); } } }catch(Exception ex) { LOG.error(ex.getMessage()); }finally{if(zkc !=null) { zkPool.release(clusterAlias, zkc); zkc=null; } }returnconsumers; }

其他监控信息可以按照Zookeeper中结构树路径获取。如下图所示:

3.消费 Owner

当消费的信息存放于Zookeeper中,我们可以直接从consumer模块下直接获取对应的Owner,但是在Kafka的Topic中,我们需要编码来间接的获取。这里,我们需要知道 Kafka 的Owner的组成规则,其规则由 Group+ConusmerHostAddress+Timespan+UUID+PartitionId组成,实现细节可参考源码,界面展示如下图所示:

4.Kafka SQL

关于Kafka SQL,旨在使用SQL来快速可视化Topic的相关信息,目前 Kafka SQL 实现的功能包含有展示某一个Topic的Partition,Offset,以及其对应的消息记录,若不加limit条件限制,默认展示该Topic下最新的5000条记录,详细实现细节,可参看源码,预览截图如下所示:

查询结果,如下图所示:

5.多集群

Kafka Eagle 目前是支持多集群监控,所谓多集群,是指多个Zookeeper集群下的Kafka集群,通过切换Session来管理不同的Zookeeper集群下的Kafka集群,细节参看源码。管理界面如下图所示:

6.总结

Kafka Eagle总体实现思路基本如上所述。针对,Kafka 0.10.x版本,Kafka Eagle监控部分模块不展示的问题,这里在启动 Kafka Eagle之前,默认启动一个系统consumer来消费kafka.eagle该group下的__system.topic__,保证__consumer_offsets是有数据可供获取的。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180416G1UD2R00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券