前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka protocol 版本适应的通信协议

Kafka protocol 版本适应的通信协议

作者头像
平凡的学生族
发布2020-06-29 16:09:50
8720
发布2020-06-29 16:09:50
举报
文章被收录于专栏:后端技术

序言

在Kafka客户端与服务端通信的过程中,为了正确地发出、读取不同命令,需要定义通信的格式。org.apache.kafka.common.protocol包就负责该功能。

包内有以下成员:

  1. ApiKeys定义了不少Api,每个ApiKeys都包含多个Schema。
  2. types包首先定义了Type抽象类,暴露读写方法供子类实现。Schema继承了Type[1]
  3. CommonFields定义了很多公共使用的Field对象。

比较重要的几个类是ApiKeys、Schema、struct、Field,我们逐个理解。

Type

Type是抽象类,定义了多个接口,主要是write和read,对ByteBuffer进行读写。Type内定义了多个内部静态类,比如Type.BOOLEAN, Type.INT8, Type.INT16。 我们看看Type.BOOLEAN的实现,write操作就是简单地往ByteBuffer写入byte类型的0或1,read操作就是读取一个byte并转换为Boolean类型。

如图

ArrayOf 继承了Type,但本身表示type的数组。其write方法首先为数组的每个元素调用write,再写入数组长度;read方法首先读取数组长度,再依次读取数组的每个元素。

代码语言:javascript
复制
/**
 * Represents a type for an array of a particular type
 */
public class ArrayOf extends Type {

  private final Type type;
  private final boolean nullable;

  public ArrayOf(Type type) {
      this(type, false);
  }

  public static ArrayOf nullable(Type type) {
      return new ArrayOf(type, true);
  }

  private ArrayOf(Type type, boolean nullable) {
      this.type = type;
      this.nullable = nullable;
  }

  @Override
  public boolean isNullable() {
      return nullable;
  }

  @Override
  public void write(ByteBuffer buffer, Object o) {
      if (o == null) {
          buffer.putInt(-1);
          return;
      }

      Object[] objs = (Object[]) o;
      int size = objs.length;
      buffer.putInt(size);

      for (Object obj : objs)
          type.write(buffer, obj);
  }

  @Override
  public Object read(ByteBuffer buffer) {
      int size = buffer.getInt();
      if (size < 0 && isNullable())
          return null;
      else if (size < 0)
          throw new SchemaException("Array size " + size + " cannot be negative");

      if (size > buffer.remaining())
          throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available");
      Object[] objs = new Object[size];
      for (int i = 0; i < size; i++)
          objs[i] = type.read(buffer);
      return objs;
  }

Field

Field类的意思是"值域"

  • 它有几个属性: name、docString、type, 如果hasDefaultValue为真,则还有defaultValue属性。name和docString分别表示该值域的名字和描述。type表示该值域的类型。
  • 内部有几个子类,Str, Int8, Int32等,表示不同类型的值域。从Int8和Int32类可知,它们的type分别为Type.INT8和Type.INT32。

Field类的结构图示如下,其中defaultValue为虚线,因为该属性在hasDefaultValue为false时不存在。

各种Field的继承类与type类型的对应关系如下:

Schema

Schema顾名思义,就是格式的意思,按顺序定义了一个格式中多个值域的顺序。它继承了Type类,可对ByteBuffer进行读写操作。

  • 从构造函数中可知,传入多个Field,按顺序包装为BoundField,添加到fields。再生成按值域名查找的fieldsByName,便于查找。
  • 从read方法可知
    • Schema向ByteBuffer读取,其实就是依次让其每个值域(的type)进行读取。write的行为同理,不赘述。
    • read最后会返回一个Struct变量,下文分析。构建Struct传入了自己和objects,只要有Schema,就能按顺序再取出来。 每个BoundField都有变量def,记录它保存的Field;还有index变量,记录它在一个Schema中的位置。
代码语言:javascript
复制
/**
 * The schema for a compound record definition
 */
public class Schema extends Type {

    private final BoundField[] fields;
    private final Map<String, BoundField> fieldsByName;

    /**
     * Construct the schema with a given list of its field values
     *
     * @throws SchemaException If the given list have duplicate fields
     */
    public Schema(Field... fs) {
        this.fields = new BoundField[fs.length];
        this.fieldsByName = new HashMap<>();
        for (int i = 0; i < this.fields.length; i++) {
            Field def = fs[i];
            if (fieldsByName.containsKey(def.name))
                throw new SchemaException("Schema contains a duplicate field: " + def.name);
            this.fields[i] = new BoundField(def, this, i);
            this.fieldsByName.put(def.name, this.fields[i]);
        }
    }

    ...

    /**
     * Read a struct from the buffer
     */
    @Override
    public Struct read(ByteBuffer buffer) {
        Object[] objects = new Object[fields.length];
        for (int i = 0; i < fields.length; i++) {
            try {
                objects[i] = fields[i].def.type.read(buffer);
            } catch (Exception e) {
                throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
                                          (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
            }
        }
        return new Struct(this, objects);
    }
代码语言:javascript
复制
/**
 * A field definition bound to a particular schema.
 */
public class BoundField {
    public final Field def;
    final int index;
    final Schema schema;

    public BoundField(Field def, Schema schema, int index) {
        this.def = def;
        this.schema = schema;
        this.index = index;
    }

    @Override
    public String toString() {
        return def.name + ":" + def.type;
    }
}

图示如下:

Struct

维护了values和schema两种变量,分别代表一组值域和可以解析它的格式。

代码语言:javascript
复制
/**
 * A record that can be serialized and deserialized according to a pre-defined schema
 */
public class Struct {
    private final Schema schema;
    private final Object[] values;

提供各种setter和getter,都是按照Schema填入values,和从values取出值域。

  • getter中get和getArray调用较多,行为都是先根据name找到BoundField,再根据其保存index从values读取出来。
代码语言:javascript
复制
/**
     * Get the record value for the field with the given name by doing a hash table lookup (slower!)
     *
     * @param name The name of the field
     * @return The value in the record
     * @throws SchemaException If no such field exists
     */
    public Object get(String name) {
        BoundField field = schema.get(name);
        if (field == null)
            throw new SchemaException("No such field: " + name);
        return getFieldOrDefault(field);
    }

    public Object[] getArray(String name) {
        return (Object[]) get(name);
    }

   /**
     * Return the value of the given pre-validated field, or if the value is missing return the default value.
     *
     * @param field The field for which to get the default value
     * @throws SchemaException if the field has no value and has no default.
     */
    private Object getFieldOrDefault(BoundField field) {
        Object value = this.values[field.index];
        if (value != null)
            return value;
        else if (field.def.hasDefaultValue)
            return field.def.defaultValue;
        else if (field.def.type.isNullable())
            return null;
        else
            throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value.");
    }

AbstractRequest/AbstractResponse

AbstractResponse提供toStruct和parseResponse,负责Struct与Abstract之间的转换。

  • toStruct会按照Schema构建Struct
  • 各种Response在构建时,会按照其定义的名字从定义了Schema的Struct中取出各个值域

AbstractRequest同理,此处不赘述

ApiKeys

ApiKeys是enum类型,有很多个实例。它为很多组Api的不同版本,定义了请求和响应的格式。每个Api,比如PRODUCE、FETCH等,都分为请求和响应两部分,它们各自有一个格式,在不同版本下的格式还不同。

代码语言:javascript
复制
ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas) {
    this(id, name, false, requestSchemas, responseSchemas);
}

ApiKeys(int id, String name, boolean clusterAction, Schema[] requestSchemas, Schema[] responseSchemas) {
    this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, requestSchemas, responseSchemas);
}

ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic,
        Schema[] requestSchemas, Schema[] responseSchemas) {
    if (id < 0)
        throw new IllegalArgumentException("id must not be negative, id: " + id);
    this.id = (short) id;
    this.name = name;
    

    ...
    this.requestSchemas = requestSchemas;
    this.responseSchemas = responseSchemas;
}

外界经常调用的是parseRequest和parseResponse,根据版本来解析请求/响应。 parseResponse的行为:

  1. 根据version从responseSchemas取出Schema
  2. 调用其read方法

parseRequest同理。因此ApiKeys下的每个实例(PRODUCE、FETCH等)都能根据版本解析请求/响应

代码语言:javascript
复制
public Schema requestSchema(short version) {
    return schemaFor(requestSchemas, version);
}

public Schema responseSchema(short version) {
    return schemaFor(responseSchemas, version);
}

public Struct parseRequest(short version, ByteBuffer buffer) {
    return requestSchema(version).read(buffer);
}

public Struct parseResponse(short version, ByteBuffer buffer) {
    return responseSchema(version).read(buffer);
}

protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
    int bufferPosition = buffer.position();
    try {
        return responseSchema(version).read(buffer);
    } catch (SchemaException e) {
        if (version != fallbackVersion) {
            buffer.position(bufferPosition);
            return responseSchema(fallbackVersion).read(buffer);
        } else
            throw e;
    }
}

private Schema schemaFor(Schema[] versions, short version) {
    if (!isVersionSupported(version))
        throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
    return versions[version];
}

Response中Struct的生成

  1. 调用inFlightRequests.completeNext取出头部的请求,暗示当前收到的响应就对应该请求,因为Kafka服务端会保证按照顺序响应请求。
  2. parseStruct...会将请求的ByteBuffer转换为Struct。注意传入了对应请求的header。
  3. AbstractResponse.parseResponse将Struct转换为响应。

parseStruct... 调用ApiKeys::parseResponse将ByteBuffer解析为Struct。调用中有两个细节:

  • 请求与响应,应当属于同一对apiKey。因此用与请求相同的apiKey解析响应
  • 请求与响应,应当属于同一api版本。因此传入的api版本为resquestHeader.apiVersion(),请求的api版本。
  • responseSchema根据版本取出Api在该版本下的Schema,然后调用read读取ByteBuffer。
  • Schema::read方法会按顺序读取每一个Field,作为Object类型存储。(话说fields[i].def.type.read这么长的调用,违反迪米特法则了啊)
代码语言:javascript
复制
// ApiKeys.java
public Struct parseResponse(short version, ByteBuffer buffer) {
  return responseSchema(version).read(buffer);
}

// Schema.java
/**
 * Read a struct from the buffer
 */
@Override
public Struct read(ByteBuffer buffer) {
    Object[] objects = new Object[fields.length];
    for (int i = 0; i < fields.length; i++) {
        try {
            objects[i] = fields[i].def.type.read(buffer);
        } catch (Exception e) {
            throw new SchemaException("Error reading field '" + fields[i].def.name + "': " +
                                      (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
        }
    }
    return new Struct(this, objects);
}

AbstractResponse.parseResponse 不赘述,可选择某个Response查看实现。


  1. Schema继承了Type,却可能包含了多个Type,所以此处用了组合设计模式,但不够严格,因为ApiKeys引用了Schema
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 序言
  • Type
  • Field
  • Schema
  • Struct
  • AbstractRequest/AbstractResponse
  • ApiKeys
  • Response中Struct的生成
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档