abstrace class
, 主要是定义了ByteBuffer与各种Object之间的序列化和反序列化;public abstract void write(ByteBuffer buffer, Object o);
public abstract Object read(ByteBuffer buffer);
public abstract Object validate(Object o);
public abstract int sizeOf(Object o);
public boolean isNullable();
public static final Type INT8
public static final Type INT16
public static final Type INT32
public static final Type INT64
public static final Type STRING
public static final Type BYTES
public static final Type NULLABLE_BYTES
final int index;
public final String name;
public final Type type;
public final Object defaultValue;
public final String doc;
final Schema schema;
Type类
, 又包含了一个Field类
对象的数组, 构成了记录的Schema;Schema
对象; 一个Object[] values
数组,用于存放Schema描述的所有Field对应的值; private final Schema schema;
private final Object[] values;
getXXX
方法, 用来获取schema中某个Field对应的值;set
方法, 用来设置schema中某个Field对应的值;writeTo
用来将Stuct对象序列华到ByteBuffer;Schema
就是模板,Struct
负责特化这个模板,向模板里添数据,构造出具体的request对象, 并可以将这个对象与ByteBuffer互相转化;public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
new Field("api_version", INT16, "The version of the API."),
new Field("correlation_id",
INT32,
"A user-supplied integer value that will be passed back with the response"),
new Field("client_id",
STRING,
"A user specified identifier for the client making the request."));
...
PRODUCE(0, "Produce"),
FETCH(1, "Fetch"),
LIST_OFFSETS(2, "Offsets"),
METADATA(3, "Metadata"),
LEADER_AND_ISR(4, "LeaderAndIsr"),
STOP_REPLICA(5, "StopReplica"),
UPDATE_METADATA_KEY(6, "UpdateMetadata"),
CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
OFFSET_COMMIT(8, "OffsetCommit"),
OFFSET_FETCH(9, "OffsetFetch"),
GROUP_COORDINATOR(10, "GroupCoordinator"),
JOIN_GROUP(11, "JoinGroup"),
HEARTBEAT(12, "Heartbeat"),
LEAVE_GROUP(13, "LeaveGroup"),
SYNC_GROUP(14, "SyncGroup"),
DESCRIBE_GROUPS(15, "DescribeGroups"),
LIST_GROUPS(16, "ListGroups");
每个Request和Response都由RequestHeader(ResponseHeader) + 具体的消费体构成;
protected final Struct struct
public int sizeOf()
public void writeTo(ByteBuffer buffer)
public String toString()
public int hashCode()
public boolean equals(Object obj)
AbstractReqeustResponse
类, 增加了接口:
public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e)
public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
switch (ApiKeys.forId(requestId)) {
case PRODUCE:
return ProduceRequest.parse(buffer, versionId);
case FETCH:
return FetchRequest.parse(buffer, versionId);
case LIST_OFFSETS:
return ListOffsetRequest.parse(buffer, versionId);
case METADATA:
return MetadataRequest.parse(buffer, versionId);
case OFFSET_COMMIT:
return OffsetCommitRequest.parse(buffer, versionId);
case OFFSET_FETCH:
return OffsetFetchRequest.parse(buffer, versionId);
case GROUP_COORDINATOR:
return GroupCoordinatorRequest.parse(buffer, versionId);
case JOIN_GROUP:
return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
return HeartbeatRequest.parse(buffer, versionId);
case LEAVE_GROUP:
return LeaveGroupRequest.parse(buffer, versionId);
case SYNC_GROUP:
return SyncGroupRequest.parse(buffer, versionId);
case STOP_REPLICA:
return StopReplicaRequest.parse(buffer, versionId);
case CONTROLLED_SHUTDOWN_KEY:
return ControlledShutdownRequest.parse(buffer, versionId);
case UPDATE_METADATA_KEY:
return UpdateMetadataRequest.parse(buffer, versionId);
case LEADER_AND_ISR:
return LeaderAndIsrRequest.parse(buffer, versionId);
case DESCRIBE_GROUPS:
return DescribeGroupsRequest.parse(buffer, versionId);
case LIST_GROUPS:
return ListGroupsRequest.parse(buffer, versionId);
default:
return null;
}
}
实现上是调用各个具体Request对象的parse
方法根据bytebuffer和versionid来产生具体的Request对象;
private final short acks;
private final int timeout;
private final Map<TopicPartition, ByteBuffer> partitionRecords;
public ProduceRequest(Struct struct)
, 利用Struct里定义的Schame来从ByteBuffer反序列化出ProduceRequest对象;public ProduceRequest(Struct struct) {
super(struct);
partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
Struct topicData = (Struct) topicDataObj;
String topic = topicData.getString(TOPIC_KEY_NAME);
for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
partitionRecords.put(new TopicPartition(topic, partition), records);
}
}
acks = struct.getShort(ACKS_KEY_NAME);
timeout = struct.getInt(TIMEOUT_KEY_NAME);
}
private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");
request_response.png
NOTE: this map only includes the server-side request/response handlers. Newer request types should only use the client-side versions which are parsed with o.a.k.common.requests.AbstractRequest.getRequest()
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) =>
RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
FetchKey -> ("Fetch", FetchRequest.readFrom),
OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
ControlledShutdownKey -> ("ControlledShutdown",
ControlledShutdownRequest.readFrom),
OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
def readFrom(buffer: ByteBuffer): ProducerRequest = {
val versionId: Short = buffer.getShort
val correlationId: Int = buffer.getInt
val clientId: String = readShortString(buffer)
val requiredAcks: Short = buffer.getShort
val ackTimeoutMs: Int = buffer.getInt
//build the topic structure
val topicCount = buffer.getInt
val partitionDataPairs = (1 to topicCount).flatMap(_ => {
// process topic
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partition = buffer.getInt
val messageSetSize = buffer.getInt
val messageSetBuffer = new Array[Byte](messageSetSize)
buffer.get(messageSetBuffer,0,messageSetSize)
(TopicAndPartition(topic, partition), new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
})
})
RequestChannel::Request
, 保存在RequestChannel
的ArrayBlockingQueue[RequestChannel.Request]
中, 这个前面章节已经讲过;