专栏首页后端技术Kafka protocol 版本适应的通信协议

Kafka protocol 版本适应的通信协议

序言

在Kafka客户端与服务端通信的过程中,为了正确地发出、读取不同命令,需要定义通信的格式。org.apache.kafka.common.protocol包就负责该功能。

包内有以下成员:

  1. ApiKeys定义了不少Api,每个ApiKeys都包含多个Schema。
  2. types包首先定义了Type抽象类,暴露读写方法供子类实现。Schema继承了Type[1]
  3. CommonFields定义了很多公共使用的Field对象。

比较重要的几个类是ApiKeys、Schema、struct、Field,我们逐个理解。

Type

Type是抽象类,定义了多个接口,主要是write和read,对ByteBuffer进行读写。Type内定义了多个内部静态类,比如Type.BOOLEAN, Type.INT8, Type.INT16。 我们看看Type.BOOLEAN的实现,write操作就是简单地往ByteBuffer写入byte类型的0或1,read操作就是读取一个byte并转换为Boolean类型。

如图

ArrayOf 继承了Type,但本身表示type的数组。其write方法首先为数组的每个元素调用write,再写入数组长度;read方法首先读取数组长度,再依次读取数组的每个元素。

/**
 * Represents a type for an array of a particular type
 */
public class ArrayOf extends Type {

  private final Type type;
  private final boolean nullable;

  public ArrayOf(Type type) {
      this(type, false);
  }

  public static ArrayOf nullable(Type type) {
      return new ArrayOf(type, true);
  }

  private ArrayOf(Type type, boolean nullable) {
      this.type = type;
      this.nullable = nullable;
  }

  @Override
  public boolean isNullable() {
      return nullable;
  }

  @Override
  public void write(ByteBuffer buffer, Object o) {
      if (o == null) {
          buffer.putInt(-1);
          return;
      }

      Object[] objs = (Object[]) o;
      int size = objs.length;
      buffer.putInt(size);

      for (Object obj : objs)
          type.write(buffer, obj);
  }

  @Override
  public Object read(ByteBuffer buffer) {
      int size = buffer.getInt();
      if (size < 0 && isNullable())
          return null;
      else if (size < 0)
          throw new SchemaException("Array size " + size + " cannot be negative");

      if (size > buffer.remaining())
          throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available");
      Object[] objs = new Object[size];
      for (int i = 0; i < size; i++)
          objs[i] = type.read(buffer);
      return objs;
  }

Field

Field类的意思是"值域"

  • 它有几个属性: name、docString、type, 如果hasDefaultValue为真,则还有defaultValue属性。name和docString分别表示该值域的名字和描述。type表示该值域的类型。
  • 内部有几个子类,Str, Int8, Int32等,表示不同类型的值域。从Int8和Int32类可知,它们的type分别为Type.INT8和Type.INT32。

Field类的结构图示如下,其中defaultValue为虚线,因为该属性在hasDefaultValue为false时不存在。

各种Field的继承类与type类型的对应关系如下:

Schema

Schema顾名思义,就是格式的意思,按顺序定义了一个格式中多个值域的顺序。它继承了Type类,可对ByteBuffer进行读写操作。

  • 从构造函数中可知,传入多个Field,按顺序包装为BoundField,添加到fields。再生成按值域名查找的fieldsByName,便于查找。
  • 从read方法可知
    • Schema向ByteBuffer读取,其实就是依次让其每个值域(的type)进行读取。write的行为同理,不赘述。
    • read最后会返回一个Struct变量,下文分析。构建Struct传入了自己和objects,只要有Schema,就能按顺序再取出来。 每个BoundField都有变量def,记录它保存的Field;还有index变量,记录它在一个Schema中的位置。
/**
 * The schema for a compound record definition
 */
public class Schema extends Type {

    private final BoundField[] fields;
    private final Map<String, BoundField> fieldsByName;

    /**
     * Construct the schema with a given list of its field values
     *
     * @throws SchemaException If the given list have duplicate fields
     */
    public Schema(Field... fs) {
        this.fields = new BoundField[fs.length];
        this.fieldsByName = new HashMap<>();
        for (int i = 0; i < this.fields.length; i++) {
            Field def = fs[i];
            if (fieldsByName.containsKey(def.name))
                throw new SchemaException("Schema contains a duplicate field: " + def.name);
            this.fields[i] = new BoundField(def, this, i);
            this.fieldsByName.put(def.name, this.fields[i]);
        }
    }

    ...

    /**
     * Read a struct from the buffer
     */
    @Override
    public Struct read(ByteBuffer buffer) {
        Object[] objects = new Object[fields.length];
        for (int i = 0; i < fields.length; i++) {
            try {
                objects[i] = fields[i].def.type.read(buffer);
            } catch (Exception e) {
                throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
        return new Struct(this, objects);
    }
/**
 * A field definition bound to a particular schema.
 */
public class BoundField {
    public final Field def;
    final int index;
    final Schema schema;

    public BoundField(Field def, Schema schema, int index) {
        this.def = def;
        this.schema = schema;
        this.index = index;
    }

    @Override
    public String toString() {
        return def.name + ":" + def.type;
    }
}

图示如下:

Struct

维护了values和schema两种变量,分别代表一组值域和可以解析它的格式。

/**
 * A record that can be serialized and deserialized according to a pre-defined schema
 */
public class Struct {
    private final Schema schema;
    private final Object[] values;

提供各种setter和getter,都是按照Schema填入values,和从values取出值域。

  • getter中get和getArray调用较多,行为都是先根据name找到BoundField,再根据其保存index从values读取出来。
/**
     * Get the record value for the field with the given name by doing a hash table lookup (slower!)
     *
     * @param name The name of the field
     * @return The value in the record
     * @throws SchemaException If no such field exists
     */
    public Object get(String name) {
        BoundField field = schema.get(name);
        if (field == null)
            throw new SchemaException("No such field: " + name);
        return getFieldOrDefault(field);
    }

    public Object[] getArray(String name) {
        return (Object[]) get(name);
    }

   /**
     * Return the value of the given pre-validated field, or if the value is missing return the default value.
     *
     * @param field The field for which to get the default value
     * @throws SchemaException if the field has no value and has no default.
     */
    private Object getFieldOrDefault(BoundField field) {
        Object value = this.values[field.index];
        if (value != null)
            return value;
        else if (field.def.hasDefaultValue)
            return field.def.defaultValue;
        else if (field.def.type.isNullable())
            return null;
        else
            throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value.");
    }

AbstractRequest/AbstractResponse

AbstractResponse提供toStruct和parseResponse,负责Struct与Abstract之间的转换。

  • toStruct会按照Schema构建Struct
  • 各种Response在构建时,会按照其定义的名字从定义了Schema的Struct中取出各个值域

AbstractRequest同理,此处不赘述

ApiKeys

ApiKeys是enum类型,有很多个实例。它为很多组Api的不同版本,定义了请求和响应的格式。每个Api,比如PRODUCE、FETCH等,都分为请求和响应两部分,它们各自有一个格式,在不同版本下的格式还不同。

ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas) {
    this(id, name, false, requestSchemas, responseSchemas);
}

ApiKeys(int id, String name, boolean clusterAction, Schema[] requestSchemas, Schema[] responseSchemas) {
    this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, requestSchemas, responseSchemas);
}

ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic,
        Schema[] requestSchemas, Schema[] responseSchemas) {
    if (id < 0)
        throw new IllegalArgumentException("id must not be negative, id: " + id);
    this.id = (short) id;
    this.name = name;
    

    ...
    this.requestSchemas = requestSchemas;
    this.responseSchemas = responseSchemas;
}

外界经常调用的是parseRequest和parseResponse,根据版本来解析请求/响应。 parseResponse的行为:

  1. 根据version从responseSchemas取出Schema
  2. 调用其read方法

parseRequest同理。因此ApiKeys下的每个实例(PRODUCE、FETCH等)都能根据版本解析请求/响应

public Schema requestSchema(short version) {
    return schemaFor(requestSchemas, version);
}

public Schema responseSchema(short version) {
    return schemaFor(responseSchemas, version);
}

public Struct parseRequest(short version, ByteBuffer buffer) {
    return requestSchema(version).read(buffer);
}

public Struct parseResponse(short version, ByteBuffer buffer) {
    return responseSchema(version).read(buffer);
}

protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
    int bufferPosition = buffer.position();
    try {
        return responseSchema(version).read(buffer);
    } catch (SchemaException e) {
        if (version != fallbackVersion) {
            buffer.position(bufferPosition);
            return responseSchema(fallbackVersion).read(buffer);
        } else
            throw e;
    }
}

private Schema schemaFor(Schema[] versions, short version) {
    if (!isVersionSupported(version))
        throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
    return versions[version];
}

Response中Struct的生成

  1. 调用inFlightRequests.completeNext取出头部的请求,暗示当前收到的响应就对应该请求,因为Kafka服务端会保证按照顺序响应请求。
  2. parseStruct...会将请求的ByteBuffer转换为Struct。注意传入了对应请求的header。
  3. AbstractResponse.parseResponse将Struct转换为响应。

parseStruct... 调用ApiKeys::parseResponse将ByteBuffer解析为Struct。调用中有两个细节:

  • 请求与响应,应当属于同一对apiKey。因此用与请求相同的apiKey解析响应
  • 请求与响应,应当属于同一api版本。因此传入的api版本为resquestHeader.apiVersion(),请求的api版本。
  • responseSchema根据版本取出Api在该版本下的Schema,然后调用read读取ByteBuffer。
  • Schema::read方法会按顺序读取每一个Field,作为Object类型存储。(话说fields[i].def.type.read这么长的调用,违反迪米特法则了啊)
// ApiKeys.java
public Struct parseResponse(short version, ByteBuffer buffer) {
  return responseSchema(version).read(buffer);
}

// Schema.java
/**
 * Read a struct from the buffer
 */
@Override
public Struct read(ByteBuffer buffer) {
    Object[] objects = new Object[fields.length];
    for (int i = 0; i < fields.length; i++) {
        try {
            objects[i] = fields[i].def.type.read(buffer);
        } catch (Exception e) {
            throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
                                      (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
        }
    }
    return new Struct(this, objects);
}

AbstractResponse.parseResponse 不赘述,可选择某个Response查看实现。


  1. Schema继承了Type,却可能包含了多个Type,所以此处用了组合设计模式,但不够严格,因为ApiKeys引用了Schema

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • class.getResource和classLoader.getResource 区别

    平凡的学生族
  • jrebel远程部署

    平凡的学生族
  • Java8 异步编程

    提供空构造函数,complete, completeExceptionally,用于手动完成future

    平凡的学生族
  • Linux命令ping,nc的学习(r10笔记第20天)

    今天看了下《Linux大棚命令百篇》网络和系统篇,发现了几个很不错的命令,我是看着目录然后根据自己的需要选了3个命令,没想到3个命令都让人眼前一亮,刷新了我原本...

    jeanron100
  • 8个有用的JavaScript技巧

    这些技巧可能大家大部分都用过了,如果用过就当作加深点映像,如果没有遇到过,就当作学会了几个技巧。

    行云博客
  • 8个有用的JS技巧

    这些技巧可能大家大部分都用过了,如果用过就当作加深点映像,如果没有遇到过,就当作学会了几个技巧。

    前端小智@大迁世界
  • spring cloud笔记 oauth2授权服务 clientDetails配置源码

    路过君
  • 8 个实用的 JavaScript 技巧

    每种编程语言都它独特的技巧。其中很多都是为开发人员所熟知的,但其中一些相当的 hackish。在这边篇文章中,我将向你展示一些我觉得有用的技巧。其中一些我在实践...

    ConardLi
  • 特别的字节对齐问题

    有如下一个结构体: struct X {         uint32_t a;         char* b[0]; }; sizeof(X)...

    一见
  • 650. 只有两个键的键盘 Krains 2020-08-02 09:39:39 动态规划DFS

    Krains

扫码关注云+社区

领取腾讯云代金券