前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >图解Kafka的RecordBatch结构

图解Kafka的RecordBatch结构

作者头像
石臻臻的杂货铺[同名公众号]
发布2022-05-23 11:03:28
8450
发布2022-05-23 11:03:28
举报
文章被收录于专栏:kafka专栏

文章目录

阅读完本文你大概会获得以下知识

  1. 什么时候执行消息的压缩操作
  2. RecordBatch结构图

RecordBatch

我们之前有讲过生产者的ProducerBatch, 这个RecordBatch跟ProducerBatch的区别是什么呢?

RecordBatch是在ProducerBatch里面的一个专门存放消息的对象, 除此之外ProducerBatch还有其他相关属性,例如还有重试、回调等等相关属性。

RecordBatch初始化

在创建一个需要创建一个新的ProducerBatch的时候,同时需要构建一个MemoryRecordsBuilder, 这个对象我们可以理解为消息构造器,所有的消息相关都存放到这个里面。

代码语言:javascript
复制
    public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
                                byte magic,
                                CompressionType compressionType,
                                TimestampType timestampType,
                                long baseOffset,
                                long logAppendTime,
                                long producerId,
                                short producerEpoch,
                                int baseSequence,
                                boolean isTransactional,
                                boolean isControlBatch,
                                int partitionLeaderEpoch,
                                int writeLimit) {
   		// 省略部分....
        this.magic = magic;
        this.timestampType = timestampType;
        this.compressionType = compressionType;
        this.baseOffset = baseOffset;
        this.logAppendTime = logAppendTime;
        this.numRecords = 0;
        this.uncompressedRecordsSizeInBytes = 0;
        this.actualCompressionRatio = 1;
        this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
        this.producerId = producerId;
        this.producerEpoch = producerEpoch;
        this.baseSequence = baseSequence;
        this.isTransactional = isTransactional;
        this.isControlBatch = isControlBatch;
        this.partitionLeaderEpoch = partitionLeaderEpoch;
        this.writeLimit = writeLimit;
        this.initialPosition = bufferStream.position();
        this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
		// Buffer一开始就需要预留61B的位置用于 存放消息投 RecordHeader
        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        this.bufferStream = bufferStream;
        //选择合适的压缩器实现类
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }

上面的源码可知重点:

  1. bufferStream 一开始的时候就需要预留 61B 的位置给 消息头使用,也就是RecordHeader。batchHeaderSizeInBytes = 61
  2. 根据配置的压缩类型compression.type,选择对应的压缩输出流。例如假设使用lz4压缩类型,返回的输出流实体对象为KafkaLZ4BlockOutputStream , 这里面有写入消息的方法和压缩方法。

写入消息

创建了Batch之后,自然需要写入消息

源码位置:

代码语言:javascript
复制
   private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                     Header[] headers) throws IOException {
        ensureOpenForRecordAppend();
        // 位移偏移量 ;offset 是当前lastOffset+1, 如果是最开始的时候,它是0; baseOffset 默认是0
        int offsetDelta = (int) (offset - baseOffset);
        long timestampDelta = timestamp - firstTimestamp;
        //将数据 写到appendStream中。
        int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
        // 记录一下 写入了多少数据
        recordWritten(offset, timestamp, sizeInBytes);
    }
  1. offsetDelta:表示该条消息的相对整个RecordBatch的位移偏移量, 计算逻辑是(offset - baseOffset); 使用偏移量可以节省字节数 offset 值等于当前RecordBatch的最后一个offset+1,计算逻辑是(offset = lastOffset == null ? baseOffset : lastOffset + 1;) baseOffset 值是RecordBatch的起始偏移量,一般值为0 ;
  2. timestampDelta : 表示该条消息的相对整个RecordBatch的时间戳的偏移量,计算逻辑(timestamp - firstTimestamp) ,使用偏移量可以节省字节数 timestamp 值逻辑timestamp = record.timestamp() == null ? nowMs : record.timestamp() ,意思是这个值也是可以通过设置record属性来设置的。 firstTimestamp 值就是timestamp第一次的值。
  3. 得到了上面的基础值之后, 就将消息写入到Buffer中, 这里的写入涉及到变长字段Varints,一定程度节省空间。这里写入write()的时候,底层执行的是根据你选择的压缩类型决定使用哪个实现类,例如KafkaLZ4BlockOutputStream。 具体的Record的格式请看下面的 Record格式

注意: 这里写入消息的时候,第一条消息,是从第62位写入的,因为前面的61B已经被BatchHeader先预定了(初始化的时候)。

Record结构图

要了解消息的格式,我们先看看消息是怎么写入的

DefaultRecord#writeTo

代码语言:javascript
复制
 public static int writeTo(DataOutputStream out,
                              int offsetDelta,
                              long timestampDelta,
                              ByteBuffer key,
                              ByteBuffer value,
                              Header[] headers) throws IOException {
        int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
        ByteUtils.writeVarint(sizeInBytes, out);

        byte attributes = 0; // there are no used record attributes at the moment
        out.write(attributes);

        ByteUtils.writeVarlong(timestampDelta, out);
        ByteUtils.writeVarint(offsetDelta, out);

        if (key == null) {
            ByteUtils.writeVarint(-1, out);
        } else {
            int keySize = key.remaining();
            ByteUtils.writeVarint(keySize, out);
            Utils.writeTo(out, key, keySize);
        }

        if (value == null) {
            ByteUtils.writeVarint(-1, out);
        } else {
            int valueSize = value.remaining();
            ByteUtils.writeVarint(valueSize, out);
            Utils.writeTo(out, value, valueSize);
        }

        if (headers == null)
            throw new IllegalArgumentException("Headers cannot be null");

        ByteUtils.writeVarint(headers.length, out);

        for (Header header : headers) {
            String headerKey = header.key();
            if (headerKey == null)
                throw new IllegalArgumentException("Invalid null header key found in headers");

            byte[] utf8Bytes = Utils.utf8(headerKey);
            ByteUtils.writeVarint(utf8Bytes.length, out);
            out.write(utf8Bytes);

            byte[] headerValue = header.value();
            if (headerValue == null) {
                ByteUtils.writeVarint(-1, out);
            } else {
                ByteUtils.writeVarint(headerValue.length, out);
                out.write(headerValue);
            }
        }

        return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
    }

从源码可以得知消息格式为:

Record属性解释:

  1. length:整个Record的消息总大小, 使用可变字段。
  2. attributes:已经弃用,默认为0,固定占用了1B
  3. timestampDelta: 时间戳的增量,使用可变字段。使用增量可以有效节约内存
  4. offsetDelta: 位移的增量,使用可变字段, 使用增量可以有效节约内存
  5. keyLength: key的长度,使用可变字段, 如果没有key,该值为-1。
  6. key: key的信息,正常存储。如果key==null,则该值不存在。
  7. valueLength:value的长度,使用可变字段, 如果没有key,改值为-1.
  8. value: value的信息,正常存储,如果value==null,则该值也不存在。
  9. headers:消息头,这个字段用于支持应用级别的扩展,可以携带很多信息,例如你带一个TraceId也不过分。
  10. header counts : 消息头的数量,使用可变字段

Varints 是可变长自动,可以有效的节省空间

Header属性解释:

类似,就不再赘述了。

关闭ProducerBatch

当一个ProducerBatch即将发送出去的时候(ReadyBatch), 会先将Batch关闭掉batch.close()

关闭输出流appendStream并压缩数据

在这个过程中,也会将appendStream关闭掉, 也就是用于存储消息体的输出流,那么在它调用 out.flush()的时候就会调用对应的实现类流,比如我们的压缩类型是lz4, 那么这里实现类就是 KafkaLZ4BlockOutputStream

MemoryRecordsBuilder#closeForRecordAppends KafkaLZ4BlockOutputStream#flush

代码语言:javascript
复制
    public void flush() throws IOException {
        if (!finished) {
            writeBlock();
        }
        if (out != null) {
            out.flush();
        }
    }

什么时候执行压缩操作 其中的 writeBlock()就是在执行压缩操作, 所以你应该知道, 这个时候压缩了Records。并且只是Records。

填充RecordBatchHeader数据

上面我们已经给Records消息集压缩过了, 还记得我们在写入消息的时候是从 position 61 后面开始写的吗?

这个61B的空间是用来干嘛的呢?

MemoryRecordsBuilder#writeDefaultBatchHeader

代码语言:javascript
复制
    private int writeDefaultBatchHeader() {
        ensureOpenForRecordBatchWrite();
        ByteBuffer buffer = bufferStream.buffer();
        //当前buffer的位置
        int pos = buffer.position();
        //将位置移动到初始位置0
        buffer.position(initialPosition);
        // 大小
        int size = pos - initialPosition;
        //已压缩的大小
        int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
        // 偏移量增量 
        int offsetDelta = (int) (lastOffset - baseOffset);

        final long maxTimestamp;
        if (timestampType == TimestampType.LOG_APPEND_TIME)
            maxTimestamp = logAppendTime;
        else
            maxTimestamp = this.maxTimestamp;
        //讲RecordBatch 消息头写入buffer
        DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
                firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
                partitionLeaderEpoch, numRecords);
		//重新定位
        buffer.position(pos);
        return writtenCompressed;
    }

真正写入数据的地方的

DefaultRecordBatch#writeHeader

代码语言:javascript
复制
    static void writeHeader(ByteBuffer buffer,
                            long baseOffset,
                            int lastOffsetDelta,
                            int sizeInBytes,
                            byte magic,
                            CompressionType compressionType,
                            TimestampType timestampType,
                            long firstTimestamp,
                            long maxTimestamp,
                            long producerId,
                            short epoch,
                            int sequence,
                            boolean isTransactional,
                            boolean isControlBatch,
                            int partitionLeaderEpoch,
                            int numRecords) {
        if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
            throw new IllegalArgumentException("Invalid magic value " + magic);
        if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
            throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);

        short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);

        int position = buffer.position();
        buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
        buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD);
        buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
        buffer.put(position + MAGIC_OFFSET, magic);
        buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
        buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);
        buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
        buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
        buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
        buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
        buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
        buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);
        long crc = Crc32C.compute(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET);
        buffer.putInt(position + CRC_OFFSET, (int) crc);
        buffer.position(position + RECORD_BATCH_OVERHEAD);
    }

可以看到CRC的计算,是在最后面的时候计算,然后填充到buffer里面的,但是这个并不意味着crc32是放在最后一个, CRC_OFFSET的位置是17的位置。

RecordBatchHeader结构图

RecordBatchHeader属性解释:

  1. baseOffset: 当然RecordBatch的起始位移,一般默认为0
  2. length:计算从partition leader epoch 字段开始到整体末尾的长度,计算的逻辑是(sizeInBytes - LOG_OVERHEAD), 这个sizeInBytes就是整个RecordBatch的长度。LOG_OVERHEAD = 12
  3. partition leader epoch: 分区的Leader纪元,也就是版本号
  4. magic: 消息格式版本号, V2版本 该值为2
  5. crc32: 该RecordBatch的校验值, 计算该值是从attributes的位置开始计算的。
  6. attributes:消息的属性,这里用了2个字节, 低3位表示压缩格式,第4位表示时间戳,第5位表示事务标识,第6位表示是否控制消息。如下图
  1. last offset delta : RecordBatch中最后一个Record的offset与first offset的差值。
  2. first timestamp: 第一条Record的时间戳。对于Record的时间戳的值 ,如果在构造待发送的ProducerRecord的时候设置了timestamp,那么就是这个设置的值,如果没有设置那就是当前时间戳的值。
  3. max timestamp: RecordBatch中最大时间戳。
  4. producer id : 用于支持幂等和事务的属性。
  5. producer epoch :用于支持幂等和事务的属性。
  6. base sequence :用于支持幂等和事务的属性。
  7. record count : 消息数量

RecordBatch整体结构图

  1. 在创建RecordBatch的时候,会先预留61B的位置给BatchHeader, 实现方式就是让buffer的位置移动到61位 buffer.possition(61)
  2. 消息写入的时候并不会压缩,只有等到即将发送这个Batch的时候,会关闭Batch,从而进行压缩(如果配置了压缩策略的话), 压缩的知识Records, 不包含 RecordBatchHeader
  3. 填充RecordBatchHeader
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/05/22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • RecordBatch
    • RecordBatch初始化
      • 写入消息
        • Record结构图
          • 关闭ProducerBatch
            • 关闭输出流appendStream并压缩数据
            • 填充RecordBatchHeader数据
          • RecordBatchHeader结构图
          • RecordBatch整体结构图
          相关产品与服务
          文件存储
          文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档