我行自2016年开始使用Kafka,主要用于两大类应用:一是用于在应用间构建实时的数据流通道,二是用于构建传输或处理数据流的实时流式应用。Kafka自身没有监控管理页面,我们调研并对比了市面主要的Kafka管理工具,得到的结论是各个产品有自身的设计局限性,并且开源产品缺乏稳定性,因此在满足实际需求的前提下,集合众多产品的优点基础上,设计并研发了一套适合通用的Kafka监控管理平台。以下是调研的市面主要的Kafka管理工具:
工具 | 优点 | 缺点 |
---|---|---|
Kafka Manager | 功能较全面,有集群管理,topic管理,consumer offset和lag监控,支持JMX监控 | 无ACL功能,部署较为复杂 |
Kafka-monitor | 实时监控服务的可用性、消息丢失率、延迟率 | 偏重监控,无集群管理 |
KafkaOffsetMonitor | 监控offset,并可存储历史offset;jar包部署,较方便 | 功能单一,无topic管理,集群管理 |
Confluent Control Center | 监控集群、数据流程,管理Kafka Connect和topic | 开源版功能较少 |
Kafka Paas平台是我行自研的一套平台,提供Kafka接入和Kafka运维管控解决方案,集成集群监控管理、Topic管理、Consumer Group管理、Schema Registry管理和ZooKeeper管理功能,帮助Kafka运维人员和接入人员快速使用和管控Kafka集群。该平台提供了基于Kafka 0.10.X和1.1.X版本的REST API,在第三部分我们会详细介绍。以下是部分功能截图展示:
该API分为Kafka管理、ZooKeeper管理、SchemaRegistry管理、JMX Metric采集、用户管理等几大功能,接下来将分别介绍该API的主要功能:
这是API的核心功能,包括Cluster/Broker管理、Topic管理、Consumer管理三大功能,以下分别介绍:
Cluster/Broker管理包含以下功能:
DescribeClusterOptions describeClusterOptions =
new DescribeClusterOptions().timeoutMs((int) kafkaAdminClientGetTimeoutMs);
DescribeClusterResult describeClusterResult =
kafkaAdminClient.describeCluster(describeClusterOptions);
KafkaFuture<String> clusterIdFuture = describeClusterResult.clusterId();
KafkaFuture<Node> controllerFuture = describeClusterResult.controller();
KafkaFuture<Collection<Node>> nodesFuture = describeClusterResult.nodes();
String clusterId = "";
Node controller = null;
Collection<Node> nodes = new ArrayList<>();
try {
clusterId = clusterIdFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
controller = controllerFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
nodes = nodesFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception exception) {
log.warn("Describe cluster exception:" + exception);
throw new ApiException("Describe cluster exception:" + exception);
} finally {
if (clusterIdFuture.isDone() && !clusterIdFuture.isCompletedExceptionally()) {
// clusterDetail.put("clusterId", clusterId);
clusterInfo.setClusterId(clusterId);
}
if (controllerFuture.isDone() && !controllerFuture.isCompletedExceptionally()) {
// clusterDetail.put("controllerId", controller);
clusterInfo.setController(controller);
}
if (nodesFuture.isDone() && !nodesFuture.isCompletedExceptionally()) {
// clusterDetail.put("nodes", nodes);
clusterInfo.setNodes(nodes);
}
}
除了用到KafkaAdminClient,broker的JMX端口,在zk注册的时间等这些需要获取zk上/brokers/ids的信息,其实在KafkaZkClient里有这类似的实现方法,但是返回的Broker信息里只有id,endpoints,rack信息,没有timestamp,jmxPort,endpoints这些信息,因此我们使用ZooKeeper的CuratorClient获取该路径的数据,然后通过反序列化读取全部数据,实现细节截取如下:
String brokerInfoStr = null;
try {
// TODO replace zkClient with kafkaZKClient
brokerInfoStr =
new String(
zkClient
.getData()
.forPath(ZkUtils.BrokerIdsPath() + "/" + entry.getKey()));
} catch (Exception e) {
e.printStackTrace();
}
BrokerInfo brokerInfo;
try {
ObjectMapper objectMapper = new ObjectMapper();
brokerInfo = objectMapper.readValue(brokerInfoStr, BrokerInfo.class);
} catch (Exception exception) {
throw new ApiException("List broker exception." + exception);
}
需要注意的是不同版本的Broker在zk上注册的字段信息不同,字段信息可通过版本区分,在0.10.0.1版本是3,在1.1.1版本是4,以下是version=3和4的一个示例,在version 4里多一个字段listenersecurityprotocol_map,详细可参考类ZkData.scala:
* Version 3 JSON schema for a broker is:
* {
* "version":3,
* "host":"localhost",
* "port":9092,
* "jmx_port":9999,
* "timestamp":"2233345666",
* "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
* "rack":"dc1"
* }
*
* Version 4 (current) JSON schema for a broker is:
* {
* "version":4,
* "host":"localhost",
* "port":9092,
* "jmx_port":9999,
* "timestamp":"2233345666",
* "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
* "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
* "rack":"dc1"
* }
另外KafkaAdminClient有获取broker配置信息的方法describeConfigs,但是该方法获取不到broker动态的配置信息,在重分区的时候,可以设置限速,但是该方法获取不到限速的值(值为null),因此我们改进了获取broker配置信息的方法,将describeConfigs方法获取的配置和从/config/brokers/id上获取的信息进行merge,形成最全最准确的配置信息。
Topic管理包含以下功能:
// Return <Current partition replica assignment, Proposed partition reassignment>
public List<ReassignModel> generateReassignPartition(ReassignWrapper reassignWrapper) {
KafkaZkClient kafkaZkClient = zookeeperUtils.getKafkaZkClient();
List<ReassignModel> result = new ArrayList<>();
Seq brokerSeq =
JavaConverters.asScalaBufferConverter(reassignWrapper.getBrokers()).asScala().toSeq();
// <Proposed partition reassignment,Current partition replica assignment>
Tuple2 resultTuple2;
try {
resultTuple2 =
ReassignPartitionsCommand.generateAssignment(
kafkaZkClient, brokerSeq, reassignWrapper.generateReassignJsonString(), false);
} catch (Exception exception) {
throw new ApiException("Generate reassign plan exception." + exception);
}
HashMap<TopicPartitionReplica, String> emptyMap = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
try {
result.add(
objectMapper.readValue(
ReassignPartitionsCommand.formatAsReassignmentJson(
(scala.collection.Map<TopicPartition, Seq<Object>>) resultTuple2._2(),
JavaConverters.mapAsScalaMapConverter(emptyMap).asScala()),
ReassignModel.class));
result.add(
objectMapper.readValue(
ReassignPartitionsCommand.formatAsReassignmentJson(
(scala.collection.Map<TopicPartition, Seq<Object>>) resultTuple2._1(),
JavaConverters.mapAsScalaMapConverter(emptyMap).asScala()),
ReassignModel.class));
Collections.sort(result.get(0).getPartitions());
Collections.sort(result.get(1).getPartitions());
} catch (Exception exception) {
throw new ApiException("Generate reassign plan exception." + exception);
}
return result;
}
private Object avroDeserialize(byte[] bytes, String avroSchema, boolean isInSchemaRegistry) {
Schema schema = new Schema.Parser().parse(avroSchema);
DatumReader reader = new GenericDatumReader<GenericRecord>(schema);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Object object = null;
if (isInSchemaRegistry) {
try {
//这里利用confluent提供的io.confluent.kafka.serializers.KafkaAvroDeserializer进行解码
object = confluentSchemaService.deserializeBytesToObject("", bytes, schema);
} catch (SerializationException serializationException) {
throw new ApiException("Avro Deserialize exception. " + serializationException);
}
} else {
try {
object =
reader.read(
null,
DecoderFactory.get().binaryDecoder(buffer.array(), 0, bytes.length, null));
} catch (IOException exception) {
throw new ApiException("Avro Deserialize exception. " + exception);
}
}
return object;
}
b.其他方式下,拉取消息时返回的ConsumerRecords,然后根据Java的反射机制判断每条ConsumerRecord内数据的类型,然后转化成String类型
这部分代码细节请参考:
ConsumerRecords<Object, Object> crs = consumer.poll(timeoutMs);
if (crs.count() != 0) {
Iterator<ConsumerRecord<Object, Object>> it = crs.iterator();
while (it.hasNext()) {
Record record = Record.builder().topic(topic).keyDecoder(keyDecoder)
.valueDecoder(valueDecoder).build();
ConsumerRecord<Object, Object> initCr = it.next();
record.setOffset(initCr.offset());
record.setTimestamp(initCr.timestamp());
record.setKey(initCr.key());
record.setValue(initCr.value());
recordList.add(record);
}
}
public String getValueByDecoder(String decoder, Object value) {
if (value == null) return null;
Class<?> type = KafkaUtils.DESERIALIZER_TYPE_MAP.get(decoder);
try {
if (String.class.isAssignableFrom(type)) {
return value.toString();
}
if (Short.class.isAssignableFrom(type)) {
return value.toString();
}
...
if (Bytes.class.isAssignableFrom(type)) {
Bytes bytes = (Bytes) value;
return bytes.toString();
}
if (byte[].class.isAssignableFrom(type)) {
if (decoder.contains("AvroDeserializer")) {
return value.toString();
} else {
byte[] byteArray = (byte[]) value;
return new String(byteArray);
}
}
if (ByteBuffer.class.isAssignableFrom(type)) {
ByteBuffer byteBuffer = (ByteBuffer) value;
return new String(byteBuffer.array());
}
}
...
Consumer管理包含以下功能:
这部分实现是利用kafka.admin.AdminClient实现的,describeConsumerGroup方法可以获取state、组内consumer分配策略、协调节点和组内consumer信息,关于获取消费的位移/lag信息,利用adminclient.listGroupOffsets()方法获取消费过的所有分区信息和位移信息,
再结合describeConsumerGroups()获取到的Consumer分配信息获取consumer的消费位移/lag信息,注意的是listGroupOffsets中获取的分区信息可能比describeConsumerGroup()获取到的ConsumerSummary里分区的总和,也就是存在没有consumer的消费信息,这些分区对应的consumerId,clientId,host用“-”代替,此处是参考ConsumerGroupCommand.scala类里的collectGroupOffsets()方法实现
实现是创建一个同group下的KafkaConsumer,然后用assign()方法指定消费分区,然后再利用KafkaConsumer提供的seekToBeginning(),seekToEnd(),seek()方法实现重置,如果需要重置到指定时间点,先用offsetsForTimes()方法获取不早于该时间戳的分区位移,然后再用seek()方法重置,需要注意的是为了不影响该group的消费位移,重置之前需要将处于active状态的group先停掉。
虽然之后kafka不再支持old consumer,但是在我行0.10.0.1环境中,还是有使用old consumer的客户端,因此对于删除old consumer的功能仍保留,实现原理是删除zookeeper路径/consumers/{groupName}的数据;对于new consumer,使用kafka.admin.AdminClient.deleteConsumerGroups()实现。
ZooKeeper管理包含以下功能:
(1)查看zk的环境信息
实现原理是通过zookeeper的四字命令envi获取数据,然后解析。
(2)查看服务器的详细信息:服务器的详细信息:接收/发送包数量、连接数、模式(leader/follower)、节点总数、延迟,以及所有客户端的列表
实现原理也是通过zookeeper的四字命令stat获取数据,然后解析,细节代码如下:
public Map<HostAndPort, ZkServerStat> stat() {
List<HostAndPort> hostAndPortList = zookeeperUtils.getZookeeperConfig().getHostAndPort();
Map<HostAndPort, ZkServerStat> result = new HashMap<>();
for (int i = 0; i < hostAndPortList.size(); i++) {
HostAndPort hp = hostAndPortList.get(i);
try {
result.put(
hp,
zookeeperUtils.parseStatResult(
zookeeperUtils.executeCommand(
hp.getHostText(), hp.getPort(), ZkServerCommand.stat.toString())));
} catch (ServiceNotAvailableException serviceNotAvailbleException) {
log.warn(
"Execute "
+ ZkServerCommand.stat.toString()
+ " command failed. Exception:"
+ serviceNotAvailbleException);
result.put(
hp,
ZkServerStat.builder()
.mode(serviceNotAvailbleException.getServiceState())
.msg(serviceNotAvailbleException.getMessage())
.build());
}
}
return result;
}
(3)查看指定路径下节点信息,获取指定路径的节点数据
实现效果和ls/get命令一样,是通过curatorClient提供的getChildren()方和getData()方法实现。
在我行实际使用过程中,Kafka的数据来源之一是CDC(Change Data Capture),这些数据是Avro格式的,且包含schema,因此我们部署了confluent版本Schema Registry,Schema Registry服务在CDC同步架构中作为Avro Schema的存储库。Confluent本身也提供了RESTful的接口,为了将这些管理功能集成,在我们的API中封装了主要接口,包括以下功能:(1)获取所有subjects,在原有的接口上进行扩展,不仅获取了subject名称,还获取了最新的schema的Id,版本,schema内容,代码细节如下:
public List<SchemaRegistryMetadata> getAllSubjects() {
try {
Collection<String> subjects = this.schemaRegistryClient.getAllSubjects();
List<SchemaRegistryMetadata> allSubjects = new ArrayList<>();
SchemaMetadata schemaMetadata;
SchemaRegistryMetadata schemaRegistryMetadata;
for (String subject : subjects) {
schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
schemaRegistryMetadata = SchemaRegistryMetadata.builder().subject(subject)
.id(schemaMetadata.getId())
.version(schemaMetadata.getVersion()).schema(schemaMetadata.getSchema()).build();
allSubjects.add(schemaRegistryMetadata);
}
return allSubjects;
} catch (Exception exception) {
throw new ApiException("ConfluentSchemaService getAllSubjects exception : " + exception);
}
}
另外提供获取指定subject的shema所有的版本信息,删除指定的subject
(2)注册schema;根据schemaId获取shema信息,在原有的接口上进行了扩展,不仅获取了shema的详细内容,还能获取对应的subject名称,版本信息;根据subject名称和schema版本获取schema信息
由于该API是管理Kafka、ZooKeeper、Schema Registry等众多产品的API,且API操作中有很多修改,删除等接口,为了增加接口的安全性,我们增加了身份认证(是否添加认证可通过参数配置),采用HTTP basic方式进行认证。实现上使用Spring Boot集成Spring Security方式,为了减少API对其他产品的依赖,我们将用户名和密码存储在文件security.yml中,同时启动一个线程定期load该文件的数据,添加用户后不需要重启应用程序。JMX metric采集是可以根据传入的jmx url和筛选指标,从对应的jmx端口获取指标信息,由于后续行内有统一的jmx指标采集规划,因此该功能后续不再更新。
随着业务的不断发展,我行使用Kafka的应用不断增多,Kafka变得越来越重要。后续我们会重点进行Kafka PaaS平台建设,近期我们会先将kafka升级到1.1.1,同时增加ACL权限管控,关于ACL的管控接口会逐步开发并开源出来,另外就是规划多集群的分级管理,欢迎各行各业的朋友和我们交流和联系。
作者介绍:
文乔,工作于中国民生银行总行信息技术部大数据基础产品平台组,天眼日志平台主要参与人。
王健,民生银行信息科技部 dba ,2011年加入民生银行,主要从事数据库和kafka,数据复制等运维工作。
本文转载自公众号民生运维(ID:CMBCOP)。
原文链接:
领取专属 10元无门槛券
私享最新 技术干货