中国民生银行大数据团队Kafka1.X管控实践

一、前述

我行自2016年开始使用Kafka,主要用于两大类应用:一是用于在应用间构建实时的数据流通道,二是用于构建传输或处理数据流的实时流式应用。Kafka自身没有监控管理页面,我们调研并对比了市面主要的Kafka管理工具,得到的结论是各个产品有自身的设计局限性,并且开源产品缺乏稳定性,因此在满足实际需求的前提下,集合众多产品的优点基础上,设计并研发了一套适合通用的Kafka监控管理平台。以下是调研的市面主要的Kafka管理工具:

工具

优点

缺点

Kafka Manager

功能较全面,有集群管理,topic管理,consumer offset和lag监控,支持JMX监控

无ACL功能,部署较为复杂

Kafka-monitor

实时监控服务的可用性、消息丢失率、延迟率

偏重监控,无集群管理

KafkaOffsetMonitor

监控offset,并可存储历史offset;jar包部署,较方便

功能单一,无topic管理,集群管理

Confluent Control Center

监控集群、数据流程,管理Kafka Connect和topic

开源版功能较少

二、民生Kafka PAAS介绍

Kafka Paas平台是我行自研的一套平台,提供Kafka接入和Kafka运维管控解决方案,集成集群监控管理、Topic管理、Consumer Group管理、Schema Registry管理和ZooKeeper管理功能,帮助Kafka运维人员和接入人员快速使用和管控Kafka集群。该平台提供了基于Kafka 0.10.X和1.1.X版本的REST API,在第三部分我们会详细介绍。以下是部分功能截图展示:

三、开放的Kafka REST API

该API分为Kafka管理、ZooKeeper管理、SchemaRegistry管理、JMX Metric采集、用户管理等几大功能,接下来将分别介绍该API的主要功能:

3.1 Kafka管理

这是API的核心功能,包括Cluster/Broker管理、Topic管理、Consumer管理三大功能,以下分别介绍:

A. Cluster/Broker管理

Cluster/Broker管理包含以下功能:

  • (1)查看集群的id、controller和集群中每个broker信息;
  • (2)查看broker的信息,包括id,端口号,在zk节点注册时间、JMX端口、endpoints、listener security等信息;
  • (3)查看broker上日志目录信息,根据broker、topic信息查找副本所在的日志目录;
  • (4)查看broker的配置信息和动态配置信息,对于动态配置信息可以修改和删除。这部分主要是利用clients包里的KafkaAdminClient实现的,KafkaAdminClient是0.11.0.0版本开始提供的管理类,是用来替代之前core包里AdminUtils,AdminClient,ZKUtils等功能,但是这个类功能目前还不完善,比如没有管理consumer相关的方法,下面列举查看cluster的信息为例说明KafkaAdminClient的使用:
  DescribeClusterOptions describeClusterOptions =
      new DescribeClusterOptions().timeoutMs((int) kafkaAdminClientGetTimeoutMs);

  DescribeClusterResult describeClusterResult =
      kafkaAdminClient.describeCluster(describeClusterOptions);

  KafkaFuture<String> clusterIdFuture = describeClusterResult.clusterId();
  KafkaFuture<Node> controllerFuture = describeClusterResult.controller();
  KafkaFuture<Collection<Node>> nodesFuture = describeClusterResult.nodes();
  String clusterId = "";
  Node controller = null;
  Collection<Node> nodes = new ArrayList<>();

  try {
    clusterId = clusterIdFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
    controller = controllerFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
    nodes = nodesFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
  } catch (Exception exception) {
    log.warn("Describe cluster exception:" + exception);
    throw new ApiException("Describe cluster exception:" + exception);
  } finally {
    if (clusterIdFuture.isDone() && !clusterIdFuture.isCompletedExceptionally()) {
      //        clusterDetail.put("clusterId", clusterId);
      clusterInfo.setClusterId(clusterId);
    }
    if (controllerFuture.isDone() && !controllerFuture.isCompletedExceptionally()) {
      //        clusterDetail.put("controllerId", controller);
      clusterInfo.setController(controller);
    }
    if (nodesFuture.isDone() && !nodesFuture.isCompletedExceptionally()) {
      //        clusterDetail.put("nodes", nodes);
      clusterInfo.setNodes(nodes);
    }
  }

除了用到KafkaAdminClient,broker的JMX端口,在zk注册的时间等这些需要获取zk上/brokers/ids的信息,其实在KafkaZkClient里有这类似的实现方法,但是返回的Broker信息里只有id,endpoints,rack信息,没有timestamp,jmxPort,endpoints这些信息,因此我们使用ZooKeeper的CuratorClient获取该路径的数据,然后通过反序列化读取全部数据,实现细节截取如下:

  String brokerInfoStr = null;
  try {
    // TODO replace zkClient with kafkaZKClient
    brokerInfoStr =
        new String(
            zkClient
                .getData()
                .forPath(ZkUtils.BrokerIdsPath() + "/" + entry.getKey()));
  } catch (Exception e) {
    e.printStackTrace();
  }
  BrokerInfo brokerInfo;
  try {
    ObjectMapper objectMapper = new ObjectMapper();
    brokerInfo = objectMapper.readValue(brokerInfoStr, BrokerInfo.class);
  } catch (Exception exception) {
    throw new ApiException("List broker exception." + exception);
}

需要注意的是不同版本的Broker在zk上注册的字段信息不同,字段信息可通过版本区分,在0.10.0.1版本是3,在1.1.1版本是4,以下是version=3和4的一个示例,在version 4里多一个字段listenersecurityprotocol_map,详细可参考类ZkData.scala:

    * Version 3 JSON schema for a broker is:
    * {
    *   "version":3,
    *   "host":"localhost",
    *   "port":9092,
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
    *   "rack":"dc1"
    * }
    *
    * Version 4 (current) JSON schema for a broker is:
    * {
    *   "version":4,
    *   "host":"localhost",
    *   "port":9092,
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
    *   "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
    *   "rack":"dc1"
    * }

另外KafkaAdminClient有获取broker配置信息的方法describeConfigs,但是该方法获取不到broker动态的配置信息,在重分区的时候,可以设置限速,但是该方法获取不到限速的值(值为null),因此我们改进了获取broker配置信息的方法,将describeConfigs方法获取的配置和从/config/brokers/id上获取的信息进行merge,形成最全最准确的配置信息。

B. Topic管理

Topic管理包含以下功能:

  • (1)Topic的查看、创建、删除(删除是异步的)、配置查看/修改、schema查看;KafkaAdminClient提供了listTopics()、describeTopics()、createTopics()、deleteTopics()等topic管理方法,配置的查看修改部分是利用describeConfigs()、alterConfigs()实现,和Broker配置管理类似,需要整合从zookeeper上获取的topic动态配置信息。schema的查看是根据topic名称和schema名称的对应关系获取。
  • (2)分区查看,分区重分布生成计划、执行和检查(支持跨broker和broker内重分布,带限速功能) 分区信息查看,包括查看分区id、分区的leader、副本分本、ISR、开始offset、结束offset、消息数,前4个字段信息是通过KafkaAdminClient的describeTopics()实现,开始offset和结束offset是通过KafkaConsumer的beginningOffsets()和endoffsets()方法获取,消息数是这两个值的差值。在kafka中有脚本kafka-reassign-partitions提供了分区重分布的功能,该脚本可用于增加分区、增加副本、分区迁移,在1.1版本开始支持不同路径间的迁移;增加分区是采用KafkaAdminClient.createPartitions()方法实现的,其他功能该客户端还未实现,我们是调用ReassignPartitionsCommand.scala类实现的,Scala被编译为Java字节码,运行在JVM之上,因此在Java里可以直接调用Scala,对于Java和Scala之间类型转换,使用Scala提供的JavaConverters.scala,以下生成分区重分布计划的代码,里面涉及到了Java和Scala对象的转化:
  // Return <Current partition replica assignment, Proposed partition reassignment>
  public List<ReassignModel> generateReassignPartition(ReassignWrapper reassignWrapper) {
    KafkaZkClient kafkaZkClient = zookeeperUtils.getKafkaZkClient();
    List<ReassignModel> result = new ArrayList<>();

    Seq brokerSeq =
        JavaConverters.asScalaBufferConverter(reassignWrapper.getBrokers()).asScala().toSeq();
    // <Proposed partition reassignment,Current partition replica assignment>
    Tuple2 resultTuple2;
    try {
      resultTuple2 =
          ReassignPartitionsCommand.generateAssignment(
              kafkaZkClient, brokerSeq, reassignWrapper.generateReassignJsonString(), false);
    } catch (Exception exception) {
      throw new ApiException("Generate reassign plan exception." + exception);
    }
    HashMap<TopicPartitionReplica, String> emptyMap = new HashMap<>();
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      result.add(
          objectMapper.readValue(
              ReassignPartitionsCommand.formatAsReassignmentJson(
                  (scala.collection.Map<TopicPartition, Seq<Object>>) resultTuple2._2(),
                  JavaConverters.mapAsScalaMapConverter(emptyMap).asScala()),
              ReassignModel.class));
      result.add(
          objectMapper.readValue(
              ReassignPartitionsCommand.formatAsReassignmentJson(
                  (scala.collection.Map<TopicPartition, Seq<Object>>) resultTuple2._1(),
                  JavaConverters.mapAsScalaMapConverter(emptyMap).asScala()),
              ReassignModel.class));
      Collections.sort(result.get(0).getPartitions());
      Collections.sort(result.get(1).getPartitions());
    } catch (Exception exception) {
      throw new ApiException("Generate reassign plan exception." + exception);
    }

    return result;
  }
  • (3)消息查看,消息查看支持根据offset和timestamp两种查询方式,对于消息中的key和value支持StringDeserializer、ByteArrayDeserializer、FloatDeserializer、DoubleDeserializer、KafkaAvroDeserializer等方式进行反序列化解码,实现的原理是根据传入的Key和Value的解码方式创建consumer,根据反序列化类型的不同,我们分两种情况:a.如果解码方式是ByteArrayDeserializer或者是KafkaAvroDeserializer,拉取消息返回的类型是ConsumerRecords,这里需要注意的是Producer端如果是用KafkaAvroSerializer编码方式发送的消息,那么每条消息里从第6个字节(第1个字节是MAGIC_BYTE,第2-6字节是一个整数,是schema的ID)才是真正数据,部分代码细节如下:
  private Object avroDeserialize(byte[] bytes, String avroSchema, boolean isInSchemaRegistry) {
    Schema schema = new Schema.Parser().parse(avroSchema);
    DatumReader reader = new GenericDatumReader<GenericRecord>(schema);
    ByteBuffer buffer = ByteBuffer.wrap(bytes);
    Object object = null;

    if (isInSchemaRegistry) {
      try {
        //这里利用confluent提供的io.confluent.kafka.serializers.KafkaAvroDeserializer进行解码
        object = confluentSchemaService.deserializeBytesToObject("", bytes, schema);
      } catch (SerializationException serializationException) {
        throw new ApiException("Avro Deserialize exception. " + serializationException);
      }
    } else {
      try {
        object =
            reader.read(
                null,
                DecoderFactory.get().binaryDecoder(buffer.array(), 0, bytes.length, null));
      } catch (IOException exception) {
        throw new ApiException("Avro Deserialize exception. " + exception);
      }
    }

    return object;
  }

b.其他方式下,拉取消息时返回的ConsumerRecords,然后根据Java的反射机制判断每条ConsumerRecord内数据的类型,然后转化成String类型

这部分代码细节请参考:

    ConsumerRecords<Object, Object> crs = consumer.poll(timeoutMs);
    if (crs.count() != 0) {
      Iterator<ConsumerRecord<Object, Object>> it = crs.iterator();
      while (it.hasNext()) {
        Record record = Record.builder().topic(topic).keyDecoder(keyDecoder)
            .valueDecoder(valueDecoder).build();
        ConsumerRecord<Object, Object> initCr = it.next();
        record.setOffset(initCr.offset());
        record.setTimestamp(initCr.timestamp());
        record.setKey(initCr.key());
        record.setValue(initCr.value());
        recordList.add(record);
      }
    }

    public String getValueByDecoder(String decoder, Object value) {
    if (value == null) return null;
    Class<?> type = KafkaUtils.DESERIALIZER_TYPE_MAP.get(decoder);
    try {
      if (String.class.isAssignableFrom(type)) {
        return value.toString();
      }

      if (Short.class.isAssignableFrom(type)) {
        return value.toString();
      }
      ...

      if (Bytes.class.isAssignableFrom(type)) {
        Bytes bytes = (Bytes) value;
        return bytes.toString();
      }

      if (byte[].class.isAssignableFrom(type)) {
        if (decoder.contains("AvroDeserializer")) {
          return value.toString();
        } else {
          byte[] byteArray = (byte[]) value;
          return new String(byteArray);
        }
      }

      if (ByteBuffer.class.isAssignableFrom(type)) {
        ByteBuffer byteBuffer = (ByteBuffer) value;
        return new String(byteBuffer.array());
      }
    }
    ...

C. Consumer管理

Consumer管理包含以下功能:

  • (1)new/old Consumer的查看,包括state、组内consumer分配策略、group所在协调节点、组内所有consumer成员信息、consumer消费的topic信息和消费位移/lag信息,另外可以根据consumer名称/topic名称查询消费consumer信息和消费位移/lag信息

这部分实现是利用kafka.admin.AdminClient实现的,describeConsumerGroup方法可以获取state、组内consumer分配策略、协调节点和组内consumer信息,关于获取消费的位移/lag信息,利用adminclient.listGroupOffsets()方法获取消费过的所有分区信息和位移信息,

再结合describeConsumerGroups()获取到的Consumer分配信息获取consumer的消费位移/lag信息,注意的是listGroupOffsets中获取的分区信息可能比describeConsumerGroup()获取到的ConsumerSummary里分区的总和,也就是存在没有consumer的消费信息,这些分区对应的consumerId,clientId,host用“-”代替,此处是参考ConsumerGroupCommand.scala类里的collectGroupOffsets()方法实现

  • (2)重置consumer的offset,支持重置到earliest、latest、指定offset,也可以重置到指定时间点的offset

实现是创建一个同group下的KafkaConsumer,然后用assign()方法指定消费分区,然后再利用KafkaConsumer提供的seekToBeginning(),seekToEnd(),seek()方法实现重置,如果需要重置到指定时间点,先用offsetsForTimes()方法获取不早于该时间戳的分区位移,然后再用seek()方法重置,需要注意的是为了不影响该group的消费位移,重置之前需要将处于active状态的group先停掉。

  • (3)new/old Consumer删除

虽然之后kafka不再支持old consumer,但是在我行0.10.0.1环境中,还是有使用old consumer的客户端,因此对于删除old consumer的功能仍保留,实现原理是删除zookeeper路径/consumers/{groupName}的数据;对于new consumer,使用kafka.admin.AdminClient.deleteConsumerGroups()实现。

3.2 Zookeeper管理

ZooKeeper管理包含以下功能:

(1)查看zk的环境信息

实现原理是通过zookeeper的四字命令envi获取数据,然后解析。

(2)查看服务器的详细信息:服务器的详细信息:接收/发送包数量、连接数、模式(leader/follower)、节点总数、延迟,以及所有客户端的列表

实现原理也是通过zookeeper的四字命令stat获取数据,然后解析,细节代码如下:

public Map<HostAndPort, ZkServerStat> stat() {
List<HostAndPort> hostAndPortList = zookeeperUtils.getZookeeperConfig().getHostAndPort();
Map<HostAndPort, ZkServerStat> result = new HashMap<>();
for (int i = 0; i < hostAndPortList.size(); i++) {
  HostAndPort hp = hostAndPortList.get(i);
  try {
    result.put(
        hp,
        zookeeperUtils.parseStatResult(
            zookeeperUtils.executeCommand(
                hp.getHostText(), hp.getPort(), ZkServerCommand.stat.toString())));
  } catch (ServiceNotAvailableException serviceNotAvailbleException) {
    log.warn(
        "Execute "
            + ZkServerCommand.stat.toString()
            + " command failed. Exception:"
            + serviceNotAvailbleException);
    result.put(
        hp,
        ZkServerStat.builder()
            .mode(serviceNotAvailbleException.getServiceState())
            .msg(serviceNotAvailbleException.getMessage())
            .build());
  }
}
return result;
}

(3)查看指定路径下节点信息,获取指定路径的节点数据

实现效果和ls/get命令一样,是通过curatorClient提供的getChildren()方和getData()方法实现。

3.3 Schema Registry管理

在我行实际使用过程中,Kafka的数据来源之一是CDC(Change Data Capture),这些数据是Avro格式的,且包含schema,因此我们部署了confluent版本Schema Registry,Schema Registry服务在CDC同步架构中作为Avro Schema的存储库。Confluent本身也提供了RESTful的接口,为了将这些管理功能集成,在我们的API中封装了主要接口,包括以下功能:(1)获取所有subjects,在原有的接口上进行扩展,不仅获取了subject名称,还获取了最新的schema的Id,版本,schema内容,代码细节如下:

 public List<SchemaRegistryMetadata> getAllSubjects() {
    try {
      Collection<String> subjects = this.schemaRegistryClient.getAllSubjects();
      List<SchemaRegistryMetadata> allSubjects = new ArrayList<>();
      SchemaMetadata schemaMetadata;
      SchemaRegistryMetadata schemaRegistryMetadata;

      for (String subject : subjects) {
        schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
        schemaRegistryMetadata = SchemaRegistryMetadata.builder().subject(subject)
            .id(schemaMetadata.getId())
            .version(schemaMetadata.getVersion()).schema(schemaMetadata.getSchema()).build();
        allSubjects.add(schemaRegistryMetadata);
      }
      return allSubjects;
    } catch (Exception exception) {
      throw new ApiException("ConfluentSchemaService getAllSubjects exception : " + exception);
    }
  }

另外提供获取指定subject的shema所有的版本信息,删除指定的subject

(2)注册schema;根据schemaId获取shema信息,在原有的接口上进行了扩展,不仅获取了shema的详细内容,还能获取对应的subject名称,版本信息;根据subject名称和schema版本获取schema信息

3.4 User管理和JMX metric采集

由于该API是管理Kafka、ZooKeeper、Schema Registry等众多产品的API,且API操作中有很多修改,删除等接口,为了增加接口的安全性,我们增加了身份认证(是否添加认证可通过参数配置),采用HTTP basic方式进行认证。实现上使用Spring Boot集成Spring Security方式,为了减少API对其他产品的依赖,我们将用户名和密码存储在文件security.yml中,同时启动一个线程定期load该文件的数据,添加用户后不需要重启应用程序。JMX metric采集是可以根据传入的jmx url和筛选指标,从对应的jmx端口获取指标信息,由于后续行内有统一的jmx指标采集规划,因此该功能后续不再更新。

四、后续计划

随着业务的不断发展,我行使用Kafka的应用不断增多,Kafka变得越来越重要。后续我们会重点进行Kafka PaaS平台建设,近期我们会先将kafka升级到1.1.1,同时增加ACL权限管控,关于ACL的管控接口会逐步开发并开源出来,另外就是规划多集群的分级管理,欢迎各行各业的朋友和我们交流和联系。

作者介绍

文乔,工作于中国民生银行总行信息技术部大数据基础产品平台组,天眼日志平台主要参与人。

王健,民生银行信息科技部 dba ,2011年加入民生银行,主要从事数据库和kafka,数据复制等运维工作。

本文转载自公众号民生运维(ID:CMBCOP)

原文链接

https://mp.weixin.qq.com/s?__biz=MzUxNzEwOTUyMg==&mid=2247484934&idx=1&sn=63336e5fd0a55eada2d82f085ec84253&chksm=f99c647bceebed6d1b2de88aad8b7552e8cbe886f1d929e92d5e43f863dbd6931a465db9f4f4&scene=27#wechat_redirect

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/XWDViAGo1H3zYmzhqegw
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券