Kafka的Request和Response

  • 先了解Reqeust和Response的构成, 有助于我们分析各种请求的处理过程;
  • Kafka的Request基本上分为client->server和server->server两大类;

基础数据结构类:

Type类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
  • 这是一个abstrace class, 主要是定义了ByteBuffer与各种Object之间的序列化和反序列化;
public abstract void write(ByteBuffer buffer, Object o);
public abstract Object read(ByteBuffer buffer);
public abstract Object validate(Object o);
public abstract int sizeOf(Object o);
public boolean isNullable();
  • 定义了若干Type类的实现类:
public static final Type INT8
public static final Type INT16
public static final Type INT32
public static final Type INT64
public static final Type STRING
public static final Type BYTES
public static final Type NULLABLE_BYTES

ArrayOf类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
  • Type类的具体实现, 是Type对象的数组类型;

Field类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
  • 定义了在这个schema中的一个字段;
  • 成员:
    final int index;
    public final String name;
    public final Type type;
    public final Object defaultValue;
    public final String doc;
    final Schema schema;

Schema类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/schema.java
  • Schema类本身实现了Type类, 又包含了一个Field类对象的数组, 构成了记录的Schema;

Sturct类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/struct.java
  • 包括了一个Schema对象; 一个Object[] values数组,用于存放Schema描述的所有Field对应的值;
    private final Schema schema;
    private final Object[] values;
  • 定义了一系列getXXX方法, 用来获取schema中某个Field对应的值;
  • 定义了set方法, 用来设置schema中某个Field对应的值;
  • writeTo 用来将Stuct对象序列华到ByteBuffer;
  • Schema就是模板,Struct负责特化这个模板,向模板里添数据,构造出具体的request对象, 并可以将这个对象与ByteBuffer互相转化;

协议相关类型:

Protocol类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
  • 定义了各种Schema:
public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
                                                           new Field("api_version", INT16, "The version of the API."),
                                                           new Field("correlation_id",
                                                                     INT32,
                                                                     "A user-supplied integer value that will be passed back with the response"),
                                                           new Field("client_id",
                                                                     STRING,
                                                                     "A user specified identifier for the client making the request."));
...

ApiKeys类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
  • 定义了所有Kafka Api 的ID和名字
  • 如下:
    PRODUCE(0, "Produce"),
    FETCH(1, "Fetch"),
    LIST_OFFSETS(2, "Offsets"),
    METADATA(3, "Metadata"),
    LEADER_AND_ISR(4, "LeaderAndIsr"),
    STOP_REPLICA(5, "StopReplica"),
    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
    OFFSET_COMMIT(8, "OffsetCommit"),
    OFFSET_FETCH(9, "OffsetFetch"),
    GROUP_COORDINATOR(10, "GroupCoordinator"),
    JOIN_GROUP(11, "JoinGroup"),
    HEARTBEAT(12, "Heartbeat"),
    LEAVE_GROUP(13, "LeaveGroup"),
    SYNC_GROUP(14, "SyncGroup"),
    DESCRIBE_GROUPS(15, "DescribeGroups"),
    LIST_GROUPS(16, "ListGroups");

Request和Response相关类型

每个Request和Response都由RequestHeader(ResponseHeader) + 具体的消费体构成;

AbstractRequestResponse类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/request/AbstractRequestResponse.java
  • 所有Request和Response的抽象基类
  • 主要数据成员: protected final Struct struct
  • 主要接口:
public int sizeOf()
public void writeTo(ByteBuffer buffer)
public String toString()
public int hashCode()
public boolean equals(Object obj)

AbstractRequest类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/request/AbstractRequest.java
  • 继承自AbstractReqeustResponse类, 增加了接口: public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e)
  • 最重要的是它提供了一个工厂方法用于从ByteBuffer来产生不同类型的具体的Request;
public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
        switch (ApiKeys.forId(requestId)) {
            case PRODUCE:
                return ProduceRequest.parse(buffer, versionId);
            case FETCH:
                return FetchRequest.parse(buffer, versionId);
            case LIST_OFFSETS:
                return ListOffsetRequest.parse(buffer, versionId);
            case METADATA:
                return MetadataRequest.parse(buffer, versionId);
            case OFFSET_COMMIT:
                return OffsetCommitRequest.parse(buffer, versionId);
            case OFFSET_FETCH:
                return OffsetFetchRequest.parse(buffer, versionId);
            case GROUP_COORDINATOR:
                return GroupCoordinatorRequest.parse(buffer, versionId);
            case JOIN_GROUP:
                return JoinGroupRequest.parse(buffer, versionId);
            case HEARTBEAT:
                return HeartbeatRequest.parse(buffer, versionId);
            case LEAVE_GROUP:
                return LeaveGroupRequest.parse(buffer, versionId);
            case SYNC_GROUP:
                return SyncGroupRequest.parse(buffer, versionId);
            case STOP_REPLICA:
                return StopReplicaRequest.parse(buffer, versionId);
            case CONTROLLED_SHUTDOWN_KEY:
                return ControlledShutdownRequest.parse(buffer, versionId);
            case UPDATE_METADATA_KEY:
                return UpdateMetadataRequest.parse(buffer, versionId);
            case LEADER_AND_ISR:
                return LeaderAndIsrRequest.parse(buffer, versionId);
            case DESCRIBE_GROUPS:
                return DescribeGroupsRequest.parse(buffer, versionId);
            case LIST_GROUPS:
                return ListGroupsRequest.parse(buffer, versionId);
            default:
                return null;
        }
    }

实现上是调用各个具体Request对象的parse方法根据bytebuffer和versionid来产生具体的Request对象;

ProduceRequest类:

  • 我们找其中一个ProduceRqeust类来分析一下, 这个类是客户端提交消息到broker时使用的请求;
  • 所在文件: clients/src/main/java/org/apache/kafka/common/request/ProduceRequest.java
  • 一个ProduceRequest包括下列字段:
    private final short acks;
    private final int timeout;
    private final Map<TopicPartition, ByteBuffer> partitionRecords;
  • 构造函数public ProduceRequest(Struct struct), 利用Struct里定义的Schame来从ByteBuffer反序列化出ProduceRequest对象;
public ProduceRequest(Struct struct) {
        super(struct);
        partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
        for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
            Struct topicData = (Struct) topicDataObj;
            String topic = topicData.getString(TOPIC_KEY_NAME);
            for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
                Struct partitionResponse = (Struct) partitionResponseObj;
                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
                partitionRecords.put(new TopicPartition(topic, partition), records);
            }
        }
        acks = struct.getShort(ACKS_KEY_NAME);
        timeout = struct.getInt(TIMEOUT_KEY_NAME);
    }

RequestHeader类:

  • 所在文件: clients/src/main/java/org/apache/kafka/common/request/RequestHeader.java
  • Request的消息头
  • 主要成员:
    private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
    private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
    private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
    private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");

关系图:

request_response.png

实际上在 core/src/main/scala/kafka/api下也定义了各种Request和Response:

  • 代码中的注释:

NOTE: this map only includes the server-side request/response handlers. Newer request types should only use the client-side versions which are parsed with o.a.k.common.requests.AbstractRequest.getRequest()

     val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) =>       
     RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
        FetchKey -> ("Fetch", FetchRequest.readFrom),
        OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
        MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
        LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
        ControlledShutdownKey -> ("ControlledShutdown",   
        ControlledShutdownRequest.readFrom),
        OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
  • 这部分作解析, 没有采用schema的形式, 是采用的直接读取方式:
def readFrom(buffer: ByteBuffer): ProducerRequest = {
    val versionId: Short = buffer.getShort
    val correlationId: Int = buffer.getInt
    val clientId: String = readShortString(buffer)
    val requiredAcks: Short = buffer.getShort
    val ackTimeoutMs: Int = buffer.getInt
    //build the topic structure
    val topicCount = buffer.getInt
    val partitionDataPairs = (1 to topicCount).flatMap(_ => {
      // process topic
      val topic = readShortString(buffer)
      val partitionCount = buffer.getInt
      (1 to partitionCount).map(_ => {
        val partition = buffer.getInt
        val messageSetSize = buffer.getInt
        val messageSetBuffer = new Array[Byte](messageSetSize)
        buffer.get(messageSetBuffer,0,messageSetSize)
        (TopicAndPartition(topic, partition), new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
      })
    })

请求生成与保存

  • 所有进来的请求最终会转换成 RequestChannel::Request, 保存在RequestChannelArrayBlockingQueue[RequestChannel.Request]中, 这个前面章节已经讲过;

Kafka协议官网地址

下一篇Kafka初始化流程与请求处理

Kafka源码分析-汇总

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏不想当开发的产品不是好测试

mybatis 使用tips - 使用多个参数

执行如下命令: mvn -Dmybatis.generator.overwrite=true mybatis-generator:generate 可以使用my...

20690
来自专栏ml

nyoj-括号匹配问题

括号配对问题 时间限制:3000 ms  |  内存限制:65535 KB 难度:3 描述现在,有一行括号序列,请你检查这行括号是否配对。 输入第一行输入一个数...

31660
来自专栏不想当开发的产品不是好测试

mysql @value := 用法

背景 有这么一张表,记录名字和分数,现在需要按照成绩排名,存在并列名次的情况 ? 解决方法 思路:按照score从大到小排序,第一行数据就是第一名,第二行就是第...

32980
来自专栏信安之路

SQL注入的常规思路及奇葩技巧

最近在看《SQL注入攻击与防御》这本书,看了之后感觉自己之前的视野和格局还是太小了些。SQLi的应用特别广泛,多种web数据库不说,移动安卓端也存在通用的SQL...

15300
来自专栏杨建荣的学习笔记

pl/sql中的参数模式(r4笔记第54天)

在平时的工作中,可能通过pl/sql传入参数来做一些特定的操作,参数模式一般有In,out.in out这几种 比如dbms_sqltune下的PREPARE_...

32440
来自专栏郭少华

Spring boot Mybatis-XML方式通用Mapper插件(七)

特别注意,如果使用了1.2.0以上版本 @MapperScan 注解,请使用 tk.mybatis.spring.annotation.MapperScan 注...

1.2K10
来自专栏Python

Django---ORM操作大全

前言 Django框架功能齐全自带数据库操作功能,本文主要介绍Django的ORM框架 到目前为止,当我们的程序涉及到数据库相关操作时,我们一般都会这么搞:...

1.2K100
来自专栏芋道源码1024

数据库中间件 MyCAT源码分析——跨库两表Join

1. 概述 2. 主流程 3. ShareJoin 3.1 JoinParser 3.2 ShareJoin.processSQL(...) 3.3 Batc...

1.2K80
来自专栏cnblogs

NHibernate联合主键详细示例

使用NHibernate实现一对多,多对一的关联很是简单,可如果要用复合主键实现确实让人有些淡淡的疼。虽然很淡疼但还是要去抹平这个坑,在下不才,愿意尝试。 以示...

20880
来自专栏张善友的专栏

LINQ to SQL集成到应用程序中需考虑的一些问题

1、LINQ to SQL集成到应用程序中需考虑的一个问题, 到底应该返回IQueryable<T>还是IQueryable? 或许这个列表还应该继续扩展为T,...

22460

扫码关注云+社区

领取腾讯云代金券