阅读完本文你大概会获得以下知识
我们之前有讲过生产者的ProducerBatch, 这个RecordBatch跟ProducerBatch的区别是什么呢?
RecordBatch是在ProducerBatch里面的一个专门存放消息的对象, 除此之外ProducerBatch还有其他相关属性,例如还有重试、回调等等相关属性。
在创建一个需要创建一个新的ProducerBatch的时候,同时需要构建一个MemoryRecordsBuilder, 这个对象我们可以理解为消息构造器,所有的消息相关都存放到这个里面。
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
// 省略部分....
this.magic = magic;
this.timestampType = timestampType;
this.compressionType = compressionType;
this.baseOffset = baseOffset;
this.logAppendTime = logAppendTime;
this.numRecords = 0;
this.uncompressedRecordsSizeInBytes = 0;
this.actualCompressionRatio = 1;
this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
this.isControlBatch = isControlBatch;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
// Buffer一开始就需要预留61B的位置用于 存放消息投 RecordHeader
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
//选择合适的压缩器实现类
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
上面的源码可知重点:
compression.type
,选择对应的压缩输出流。例如假设使用lz4
压缩类型,返回的输出流实体对象为KafkaLZ4BlockOutputStream
, 这里面有写入消息的方法和压缩方法。创建了Batch之后,自然需要写入消息
源码位置:
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
// 位移偏移量 ;offset 是当前lastOffset+1, 如果是最开始的时候,它是0; baseOffset 默认是0
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - firstTimestamp;
//将数据 写到appendStream中。
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
// 记录一下 写入了多少数据
recordWritten(offset, timestamp, sizeInBytes);
}
offset - baseOffset
); 使用偏移量可以节省字节数
offset 值等于当前RecordBatch的最后一个offset+1,计算逻辑是(offset = lastOffset == null ? baseOffset : lastOffset + 1;
)
baseOffset 值是RecordBatch的起始偏移量,一般值为0 ;
timestamp - firstTimestamp
) ,使用偏移量可以节省字节数
timestamp 值逻辑timestamp = record.timestamp() == null ? nowMs : record.timestamp()
,意思是这个值也是可以通过设置record属性来设置的。
firstTimestamp 值就是timestamp第一次的值。
write()
的时候,底层执行的是根据你选择的压缩类型决定使用哪个实现类,例如KafkaLZ4BlockOutputStream。 具体的Record的格式请看下面的 Record格式
注意: 这里写入消息的时候,第一条消息,是从第62位写入的,因为前面的61B已经被BatchHeader先预定了(初始化的时候)。
要了解消息的格式,我们先看看消息是怎么写入的
DefaultRecord#writeTo
public static int writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
ByteUtils.writeVarint(sizeInBytes, out);
byte attributes = 0; // there are no used record attributes at the moment
out.write(attributes);
ByteUtils.writeVarlong(timestampDelta, out);
ByteUtils.writeVarint(offsetDelta, out);
if (key == null) {
ByteUtils.writeVarint(-1, out);
} else {
int keySize = key.remaining();
ByteUtils.writeVarint(keySize, out);
Utils.writeTo(out, key, keySize);
}
if (value == null) {
ByteUtils.writeVarint(-1, out);
} else {
int valueSize = value.remaining();
ByteUtils.writeVarint(valueSize, out);
Utils.writeTo(out, value, valueSize);
}
if (headers == null)
throw new IllegalArgumentException("Headers cannot be null");
ByteUtils.writeVarint(headers.length, out);
for (Header header : headers) {
String headerKey = header.key();
if (headerKey == null)
throw new IllegalArgumentException("Invalid null header key found in headers");
byte[] utf8Bytes = Utils.utf8(headerKey);
ByteUtils.writeVarint(utf8Bytes.length, out);
out.write(utf8Bytes);
byte[] headerValue = header.value();
if (headerValue == null) {
ByteUtils.writeVarint(-1, out);
} else {
ByteUtils.writeVarint(headerValue.length, out);
out.write(headerValue);
}
}
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
从源码可以得知消息格式为:
Record属性解释:
Varints 是可变长自动,可以有效的节省空间
Header属性解释:
类似,就不再赘述了。
当一个ProducerBatch即将发送出去的时候(ReadyBatch), 会先将Batch关闭掉batch.close()
。
在这个过程中,也会将appendStream
关闭掉, 也就是用于存储消息体的输出流,那么在它调用 out.flush()
的时候就会调用对应的实现类流,比如我们的压缩类型是lz4, 那么这里实现类就是 KafkaLZ4BlockOutputStream
MemoryRecordsBuilder#closeForRecordAppends KafkaLZ4BlockOutputStream#flush
public void flush() throws IOException {
if (!finished) {
writeBlock();
}
if (out != null) {
out.flush();
}
}
什么时候执行压缩操作 其中的 writeBlock()就是在执行压缩操作, 所以你应该知道, 这个时候压缩了Records。并且只是Records。
上面我们已经给Records消息集压缩过了, 还记得我们在写入消息的时候是从 position 61 后面开始写的吗?
这个61B的空间是用来干嘛的呢?
MemoryRecordsBuilder#writeDefaultBatchHeader
private int writeDefaultBatchHeader() {
ensureOpenForRecordBatchWrite();
ByteBuffer buffer = bufferStream.buffer();
//当前buffer的位置
int pos = buffer.position();
//将位置移动到初始位置0
buffer.position(initialPosition);
// 大小
int size = pos - initialPosition;
//已压缩的大小
int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
// 偏移量增量
int offsetDelta = (int) (lastOffset - baseOffset);
final long maxTimestamp;
if (timestampType == TimestampType.LOG_APPEND_TIME)
maxTimestamp = logAppendTime;
else
maxTimestamp = this.maxTimestamp;
//讲RecordBatch 消息头写入buffer
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
partitionLeaderEpoch, numRecords);
//重新定位
buffer.position(pos);
return writtenCompressed;
}
真正写入数据的地方的
DefaultRecordBatch#writeHeader
static void writeHeader(ByteBuffer buffer,
long baseOffset,
int lastOffsetDelta,
int sizeInBytes,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long firstTimestamp,
long maxTimestamp,
long producerId,
short epoch,
int sequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD);
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
buffer.put(position + MAGIC_OFFSET, magic);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);
long crc = Crc32C.compute(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET);
buffer.putInt(position + CRC_OFFSET, (int) crc);
buffer.position(position + RECORD_BATCH_OVERHEAD);
}
可以看到CRC的计算,是在最后面的时候计算,然后填充到buffer里面的,但是这个并不意味着crc32是放在最后一个, CRC_OFFSET的位置是17的位置。
RecordBatchHeader属性解释:
partition leader epoch
字段开始到整体末尾的长度,计算的逻辑是(sizeInBytes - LOG_OVERHEAD), 这个sizeInBytes
就是整个RecordBatch的长度。LOG_OVERHEAD = 12 buffer.possition(61)