前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka消息存储原理

Kafka消息存储原理

原创
作者头像
haimingli
发布2021-01-13 00:28:26
1.1K0
发布2021-01-13 00:28:26
举报
文章被收录于专栏:kafka消息队列kafka消息队列

Kafka消息存储格式

存储位置及存储文件划分

文件存储概述

  Kafka作为一个高性能的消息队列中间件,有着高效的消息存储方式。我们知道在Kafka中,消息是以topic的形式进行逻辑上的隔离,一个topic又可以分为多个分区,当我们发送消息的时候,会根据某种规则(可以是默认规则,也可以是自定义规则),把消息存储到某个分区当中,同时消息会被分配一个序列号,也就是我们常说的offset,这个offset是一个不断递增的数值。

  一个topic对应多个分区,一个分区对应一个日志目录,一个日志目录里面,又分为多个日志片段,日志片段存储的就是我们的消息内容,我们叫日志片段为LogSegment。那这里就有个问题了,为什么日志还要分为LogSegment呢,首先这么区分是为了方便清理数据,对于过期的数据清理,这样划分为一个个片段,比在一个大文件中去寻找过期的数据方便多了。其次还方便管理,比如我要查找消息,从片段中查找比一个大文件里查找容易多了。LogSegment并不是一个文件,而是指多个,在kafka中,每个LogSegment对应一个日志文件和两个索引文件,以及可能存在的其他文件,比如.txnindex后缀的事务日志索引文件。下面的图片描述了kafka的文件存储的构成:

举个例子说明一个,比如我们通过命令行创建了一个topic,名字叫做topic-log-format,这个topic有两个分区,那么就会在消息存储文件目录中,有两个文件夹,分别叫做topic-log-format-0和topic-log-format-1,命名规则其实就是${topic}-${partition},这两个文件夹存储的就是两个分区的消息,如果我们往topic生产了消息,那么这消息文件目录里就会有segment产生,这个segment包含三个文件,分别为日志文件,偏移量索引文件和时间索引文件。

当我们使用生产者不断完topic里面写数据的时候,消息数据就会不断往这几个文件里面写数据,这里的写操作是一个顺序写。segment文件是可能会有多个的,举个例子,如果当前segment的大小大于我们配置的最大大小,就会产生一个新的segment(当然产生新的segemnt不仅仅这一种情况),消息只会往最新一个segment的文件末尾写数据,这个segment我们叫做activeSegment(当前活跃分片),至于之前的segment就变为了只读的文件了。

  每个segment中,.log后缀表示的是日志文件,为了便于检索日志,会有两个配套的索引文件,分别为偏移量索引文件(.inde后缀)和时间戳索引文件(.timeindex后缀),这三个文件的文件名都是一样的,各位为基准偏移量+文件后缀,这个基准偏移量是一个64位的长整形,为什么叫做基准偏移量呢,因为文件里面会有相对偏移量,这个我们后面详细说明。

如果到这里对存储文件的划分还是不清楚也没关系,后面实际操作讲解中,看一遍就知道是怎么回事了。

消息文件存储示例展示

1.下载kafka,本文下载的是kafka_2.11-1.1.1,然后放置在/opt/目录。接着执行以下命令:

代码语言:txt
复制
[root@VM-232-122-centos /opt]# cd kafka_2.11-1.1.1/config/

// 修改启动配置
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/config]# vi server.properties 
// 修改log.segment.bytes,这个参数表上segment的大小,默认1G,我们这里为了观察,修改为1M
// 修改log.dirs,该参数表示日志文件存储路径,我们这里修改为/tmp/kafka-logs,这里可以配置多个根目录,如果配置多个的情况下,broker会选择分区数最小的根目录创建topic的日志存储文件。

// 启动kafka
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-server-start.sh ../config/server.properties &

// 创建topic
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-topics.sh --zookeeper localhost:2181 --create --topic lhm-log-format-test --partitions 2  --replication-factor 1

// 往topic写消息
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-console-producer.sh --broker-list localhost:9093 --topic lhm-log-format-test
>a
>b
>c
>d
>...// 尝试写入更多的消息

// 查看日志文件
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# cd /tmp/kafka-logs-1/

// 可以看到我们创建的topic下面有两个对应的分区的日志目录
[root@VM-232-122-centos /tmp/kafka-logs-1]# ls
lhm-log-format-test-1
lhm-log-format-test-0 

// 进入lhm-log-format-test-1就可以看到我们的segment文件,随着后面日志的文件增多,
// 会出现基础偏移量更大的segment文件
[root@VM-232-122-centos /tmp/kafka-logs-1/lhm-log-format-test-1]# ls
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint

消息文件存储格式

  注意这里我们讲解的是最新版的消息日志格式,老版本的消息格式这里我们先不关注。kafka_2.x版本使用的都是这种消息类型。消息的存储是以消息集为单位的,称为record batch,每个record batch含有一条或多条消息,这里的消息称为record,record batch和record都有自己的header。

  我们先来看看RecordBatch的数据结构,需要注意的是,即使开启消息压缩,header部分是不会被压缩的(baseOffset到baseSequence,被压缩部分只有records),生产者客户端中的ProducerBatch对应这里的RecordBatch,ProducerRecord对应这里的Record:

代码语言:txt
复制
		baseOffset: int64
		batchLength: int32
		partitionLeaderEpoch: int32
		magic: int8 (current magic value is 2)
		crc: int32
		attributes: int16
			bit 0~2:
				0: no compression
				1: gzip
				2: snappy
				3: lz4
				4: zstd
			bit 3: timestampType
			bit 4: isTransactional (0 means not transactional)
			bit 5: isControlBatch (0 means not a control batch)
			bit 6~15: unused
		lastOffsetDelta: int32
		firstTimestamp: int64
		maxTimestamp: int64
		producerId: int64
		producerEpoch: int16
		baseSequence: int32
		records: [Record]

这里看看RecordBatch的字段含义:

  • baseOffset:当前RecordBatch起始位移
  • batchLength:partitionLeaderEpoch开始的整个RecordBatch长度
  • firstTimestamp:第一条record的时间戳
  • producerId,producerEpoch,baseSequence:用于支持幂等和事务的字段

再来看看Record的格式:

代码语言:txt
复制
        length: varint
		attributes: int8
			bit 0~7: unused
		timestampDelta: varint
		offsetDelta: varint
		keyLength: varint
		key: byte[]
		valueLen: varint
		value: byte[]
		Headers => [Header]

再看看Record里面的Header:

代码语言:txt
复制
        headerKeyLength: varint
		headerKey: String
		headerValueLength: varint
		Value: byte[]

这里可以看到有大量的字段使用到了varint类型,这是一种可变长整型,这种类型就是Protocol Buffers使用的可变长整型,可以参考https://developers.google.com/protocol-buffers/docs/encoding#varints,或者查阅本系列后续文章。这里主要讲讲Record这个类的的字段:

  • length:消息总长度
  • attributes:保留字段,保留一个字节以备后续使用
  • timestampDelta:增量时间戳,这里的增量是和Record Batch的firstTimestamp相比的增量,只保存增量的目的是为了减少存储空间
  • offsetDelta:增量位移,这里的增量是和Record Batch的baseOffset相比的增量,为了节省占用空间
  • Headers:这个字段用于支持应用级别的扩展,一个Record可以包含0到多个Header

索引文件存储格式

偏移量索引

在.index为后缀的偏移量索引文件中,一个偏移量索引项占8个字节,偏移量索引的格式为:4字节相对偏移量(relativeOffset)+ 4字节消息在日志文件中的物理位置(position)。我们如何根据目标偏移targetOffset查找消息内容呢?通用的寻找办法是,先找到baseOffset不大于我们要查找的targetOffset的日志分片,这里kafka是通过一个跳跃表的数据结构查询的,kafka会在内部使用concurrentSkipListMap缓存了所有日志分片的数据,key为文件名(baseOffset)value为分片数据,这样查找的时候就可以快速找到需要的分片。然后通过targetOffset - baseOffset得到目标相对偏移量targetRelativeOffset,然后在偏移量索引文件中,使用二分查询,快速找到不大于targetRelativeOffset的最大索引项,然后就能得到一个position,根据position顺序查找日志分段文件,就能寻找到消息了。

举个例子,在我们上面演示的例子中,00000000000000000000.index,我们查看下内容:

代码语言:txt
复制
[root@VM-232-122-centos /tmp/kafka-logs-1/lhm-log-format-test-1]# hexdump -C 00000000000000000000.index 
00000000  00 00 00 81 00 00 10 32  00 00 00 00 00 00 00 00  |.......2........|

可以看到前4个字节是relativeOffset,转换为十进制为129,后面四个字节为position,转回为十进制为4146,那么假如我们需要查找offset为140的消息如何查找呢?首先找到不大于130的最大的baseOffset的日志分片,假如我们除了00000000000000000000.index的segment,还有00000000000000100000.index,那么不大于130的最大baseOffset分片就是00000000000000000000.index,然后计算目标相对偏移量:140 - 129 = 11,那么我们就可以从00000000000000000000.log定位到4146的position,然后顺序查找11条消息即可得到目标消息了。

时间戳索引

在.timeindex为后缀的时间戳索引文件中,一个时间戳索引项占用12个字节,格式为:8字节时间戳(timestamp)+ 4字节时间戳对应的消息的相对偏移量(relativeOffset)。消息发送到服务端的时候,可以指定时间戳,也可以使用服务端的时间戳,这个时间戳就会记录到时间戳索引当中,所以时间戳索引里面的索引项的时间戳是不断增大的。时间戳索引并没有像偏移量索引那样缓存在kafka的内存,所以需要遍历时间戳索引,时间戳索引定位消息的步骤如下:

1.遍历所有时间戳索引,查询时间戳索引文件最后的时间戳索引项,和目标时间戳对比,找到第一个大于目标时间戳的索引,得到relativeOffset

2.根据relativeOffset,去偏移量索引查找消息即可,步骤参考偏移量索引

根据时间戳索引查找消息的主要代码逻辑如下:

代码语言:txt
复制
def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = {
  val targetSeg = {
    val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
  }

  targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))

}

def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
  // Get the index entry with a timestamp less than or equal to the target timestamp
  val timestampOffset = timeIndex.lookup(timestamp)
  val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position

  // Search the timestamp
  Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
    TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
  }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka消息存储格式
    • 存储位置及存储文件划分
      • 文件存储概述
      • 消息文件存储示例展示
    • 消息文件存储格式
      • 索引文件存储格式
相关产品与服务
消息队列 CKafka 版
消息队列 CKafka 版(TDMQ for CKafka)是一个分布式、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API 2.4、2.8、3.2 版本。CKafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。CKafka 具有高可用、数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合、流式数据集成等场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档