前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka源码系列之kafka如何实现高性能读写的

Kafka源码系列之kafka如何实现高性能读写的

作者头像
Spark学习技巧
发布2018-01-30 18:10:43
3.4K0
发布2018-01-30 18:10:43
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

本文依然是以kafka 0.8.2.2的源码为例进行讲解。

一,kafka高性能的原因

Kafka吞吐量是大家公认的高,那么这是为什么呢?个人总结为以下三点:

1,Broker NIO异步消息处理,实现了IO线程与业务线程分离。

2,磁盘顺序写。

3,零拷贝。

本文主要是讲解,磁盘顺序写和零拷贝。关于Broker的消息处理体系,请大家阅读我的另一篇文章<Kafka源码系列之Broker的IO服务及业务处理>。

二,本文牵涉到的类

由于本文要从生产者生产消息到消费者消费到消息整个流水讲起,那么会导致源码量比较大,这里只会截取部分关键步骤的重要代码,不会全部贴出,影响大家阅读效果。

1,ByteBufferMessageSet

代表着存储在byte buffer的一系列的消息。有两种创建的方式

方式一,使用一个已经包含了序列化好的Message的ByteBuffer。消费者使用的是这种方式。消费者端代码

new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))

方式二,给它一个消息列表,以及关于序列化的指令。生产者会使用这种方式。

new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)

2,FileMessageSet

这个是在SimpleConsumer请求消息的时候封装的,然后通过自身的writeTo方法,将数据高效的传输给SimpleConusmer。在SimpleConsumer端,接收后又封装成了ByteBufferMessageSet。

主要作用,表示带磁盘上的消息集合,可以指定起始位置,并且允许对文件的子集进行切片。

3,LogSegment

表示一段日志。每段日志有两个元素:日志文件和索引。数据就是FileMessageSet的包含的实际消息。索引文件就是从逻辑偏移到物理文件位置的映射。

4,Partition

代表一个topic的分区。Leader分区管理着:AR,ISR,CUR,RAR.

AR: replicas assigned to this partition

ISR:In-sync replica set, maintained at the leader

CUR:Catch-up replica set, maintained at the leader

RAR:being reassigned to other brokers., maintained at the leader

数据追加和读写都会有leader 分区获取,交由Replica进行操作。

这里会有个关系是:AR=ISR+CUR

5,Log

仅仅支持追加的方式存储消息,代表了一些列的LogSegment的集合,每个都包含一个基准偏移,代表在segment中的第一个消息。根据我们的指定的策略创建新的logsegments,比如按照大小或者时间周期。

6,LogOffsetMetadata

这表示一个日志偏移的数据结构,包含以下几个部分:

A),消息偏移;B),定位的segment的起始偏移;C),在磁盘上的物理位置。

7,Replica

主要是代表着一个topic的分区的副本。主要是维护一个log对象,用于数据读写,和log的结束偏移等信息。由我们的Partition对象获取和维护。

8,ReplicaManager

负责管理当前机器的所有副本,处理读写具体动作。

读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。

三,kafka存储的结构及读取过程

1,topic的文件树

每个topic有n个partition,每个partition有n个segmentfile组成。一个segment有*.index和*.log组成

2,存储目录

Partition的位置由broker参数log.dirs=/tmp/kafka-logs,可以使目录列表,逗号分开。

3,存储时间及大小

broker 级别的日志文件的存储时间控制:

log.retention.hours=168

log.retention.minutes=168

log.retention.ms=168

更高优先级topic 级别

--config retention.ms=?

一个segmentfile大小的控制

log.segment.bytes=1073741824

--config segment.bytes=?

4,kafka Segment读取

假如消息偏移为4255133,先通过二分查找找到segmentfile的00000000000004255130.index,然后计算在该文件的第几个位置n=4255133-(4255130+1)=2,获取偏移1220,然后去00000000000004255130.log取出偏移为1220的msg4255132,结束。

四,kafka源码接触

本处总共分四个部分:

a,Producer压缩块发送消息

b,Broker顺序写入

c,Broker zero-copy 数据给SimpleConsumer

d,消费者,Broker消息大小的限制。

1,Producer压缩块发送消息

在<Kafka源码系列之通过源码分析Producer性能瓶颈>中提到过,producer会为每个Broker创建一个链接,然后使用该链接将数据发出。那么为了Broker端能将消息进行准确到识别并写入各个topic的分区副本文件中,producer在生产消息的时候就需要对消息进行分类:根据key和topic,按照topicAndpartition进行分类。

源代码是在DefaultEventHandler的partitionAndCollate进行归类。

代码语言:js
复制
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
 val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
 try {
 for (message <- messages) {
 //返回值是指定topic的所有分区在broker分布 (brokerId, numPartitions)
 val topicPartitionsList = getPartitionListForTopic(message)
 //假如指定了分区方法和key,会调用我们的分区方法,获取key指定的分区
 val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
 //获取去指定分区的PartitionAndLeader
 val brokerPartition = topicPartitionsList(partitionIndex)

 // postpone the failure until the send operation, so that requests for other brokers are handled correctly
 val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)

 var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null

 //[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
      // 获取数组[TopicAndPartition, Seq[KeyedMessage[K,Message]]]],不存在的话创建
 ret.get(leaderBrokerId) match {
 case Some(element) =>
          dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
 case None =>
          dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
          ret.put(leaderBrokerId, dataPerBroker)
      }

 val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
 var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null

 //取出[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]中的消息数组Seq[KeyedMessage[K,Message]]],将消息加入
 dataPerBroker.get(topicAndPartition) match {
 case Some(element) =>
          dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
 case None =>
          dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
          dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
      }
      dataPerTopicPartition.append(message)
    }
 Some(ret)

归类结束之后,会针对每个分区的消息封装成ByteBufferMessageSet进行压缩序列化是在groupMessagesToSet方法中

代码语言:js
复制
val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
 val rawMessages = messages.map(_.message)
  ( topicAndPartition,
 config.compressionCodec match {
 case NoCompressionCodec =>
        debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
 new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
 case _ =>
        config.compressedTopics.size match {
 case 0 =>
            debug("Sending %d messages with compression codec %d to %s"
 .format(messages.size, config.compressionCodec.codec, topicAndPartition))
 new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
 case _ =>
 if(config.compressedTopics.contains(topicAndPartition.topic)) {
              debug("Sending %d messages with compression codec %d to %s"
 .format(messages.size, config.compressionCodec.codec, topicAndPartition))
 new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
            }
 else {
              debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
 .format(messages.size, topicAndPartition, config.compressedTopics.toString))
 new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
            }
        }
    }
    )
}
messagesPerTopicPartition

目前支持的压缩方式是

代码语言:js
复制
compressionCodec match {
 case DefaultCompressionCodec => new GZIPOutputStream(stream)
 case GZIPCompressionCodec => new GZIPOutputStream(stream)
 case SnappyCompressionCodec => 
 import org.xerial.snappy.SnappyOutputStream
 new SnappyOutputStream(stream)
 case LZ4CompressionCodec =>
 new KafkaLZ4BlockOutputStream(stream)
 case _ =>
 throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}

2,Broker端的顺序写入

此处,代码量相当大,我们只会捡重要的函数进行讲解。

首先是Broker接受到消息后,会执行kafkaApis的

代码语言:js
复制
case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)

下面是会针对不同topic的不同分区进行消息区分写入

代码语言:js
复制
partitionAndData.map {case (topicAndPartition, messages) =>
 try {
 if (Topic.InternalTopics.contains(topicAndPartition.topic) &&
        !(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName)) {
 throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))
    }
 //主要是获取到分区
 val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
 val info = partitionOpt match {
 case Some(partition) =>
 //开始追加到日志
 partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)

其中,appendMessagesToLeader方法中会通过Partition取出replica对象,调用其内部log的append方法

代码语言:js
复制
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
 case Some(leaderReplica) =>
 val log = leaderReplica.log.get
 

剩下的就是在append方法中,首先验证消息的有效性,比如单条消息是不是超过了Broker所接受的消息的大小等。

代码语言:js
复制
val appendInfo = analyzeAndValidateMessageSet(messages)

然后,给消息分配偏移

代码语言:js
复制
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
 try {
//            给我们的数据块赋值偏移
 validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

最后,判断Segment是否需要roll up,消息正式追加写入和最大偏移更新

代码语言:js
复制
// maybe roll the log if this segment is full
val segment = maybeRoll(validMessages.sizeInBytes)
// now append to the log 并未拆解 初始偏移和消息块
segment.append(appendInfo.firstOffset, validMessages)
// increment the log end offset //更新修大偏移
updateLogEndOffset(appendInfo.lastOffset + 1)

通过FileMessageSet顺序追加到Segment文件中,是对整个Messageset进行写入的

代码语言:js
复制
def append(messages: ByteBufferMessageSet) {
 val written = messages.writeTo(channel, 0, messages.sizeInBytes)
 _size.getAndAdd(written)
}

3,Broker zero-copy 数据给SimpleConsumer

Broker接收到数据请求后,执行KafkaApis的

代码语言:js
复制
case RequestKeys.FetchKey => handleFetchRequest(request)
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
val dataRead = replicaManager.readMessageSets(fetchRequest)
在readMessageSets方法中
//读取指定topic 分区 offset 最大获取大小 MessageSet
val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)

获取副本replica对象,获取log的最大偏移,然后获取log对象,调用其read方法

代码语言:js
复制
val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
 getReplicaOrException(topic, partition)
else
 getLeaderReplicaIfLocal(topic, partition)
trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
val maxOffsetOpt =
 if (Request.isValidBrokerId(fromReplicaId))
    None
 else
 Some(localReplica.highWatermark.messageOffset) //最大消息偏移
val fetchInfo = localReplica.log match {
 case Some(log) =>
    log.read(offset, maxSize, maxOffsetOpt)

根据其实offset找到LogSegment。调用其read方法读取消息,根据最大偏移,起始偏移,和读取的最大bytes。

代码语言:js
复制
var entry = segments.floorEntry(startOffset)
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)

计算起始偏移和最大偏移对应的实际物理position

代码语言:js
复制
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset) //根据offset找到物理存储的偏移的位置

// if the start position is already off the end of the log, return null
if(startPosition == null)
 return null

val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

// if the size is zero, still return a log segment but with zero size
if(maxSize == 0)
 return FetchDataInfo(offsetMetadata, MessageSet.Empty)

// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length = 
  maxOffset match {
 case None =>
 // no max offset, just use the max size they gave unmolested 无麻烦的
 maxSize
 case Some(offset) => {
 // there is a max offset, translate it to a file position and use that to calculate the max read size
 if(offset < startOffset)
 throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))

 val mapping = translateOffset(offset, startPosition.position)
 val endPosition = 
 if(mapping == null) //说明消息最大偏移超过了当前块,所以最大偏移就是当前块大小
 logSize // the max offset is off the end of the log, use the end of the file
 else
 mapping.position //否则就是最大偏移
 min(endPosition - startPosition.position, maxSize) //文件剩余数据的大小和最大索取大小哪个小,取最小的
 }
  }
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

最终调用FileMessageSet的read方法,根据我们提供的信息封装成了

代码语言:js
复制
new FileMessageSet(file,
 channel,
 start = this.start + position,
 end = math.min(this.start + position + size, sizeInBytes()))

至此,都是在讲我们Broker 在接收到数据读取请求后的处理业务线程。他处理结束之后会在handleFetchRequest方法中

代码语言:js
复制
val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))

zero-copy发生在socketServer回复SimpleConsumer的过程中

Processor的write方法中

代码语言:js
复制
val response = key.attachment().asInstanceOf[RequestChannel.Response]
val responseSend = response.responseSend
if(responseSend == null)
 throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = responseSend.writeTo(socketChannel)

下面,嵌套比较深,这里只列出了函数名称

FetchResponseSend.writeTo

MultiSend.writeTo

TopicDataSend.writeTo

MultiSend.writeTo

PartitionDataSend.writeTo

FileMessageSet.writeTo

此时我们就可以看到我们久违transferTo了。这就是整个zero-copy的过程

代码语言:js
复制
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
 // Ensure that the underlying size has not changed.
 val newSize = math.min(channel.size().toInt, end) - start
 if (newSize < _size.get()) {
 throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
 .format(file.getAbsolutePath, _size.get(), newSize))
  }
 val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
  trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
    + " bytes requested for transfer : " + math.min(size, sizeInBytes))
 bytesTransferred
}

4,消费者,Broker,消息大小的限制。

Broker接收到Producer消息后会对消息的有效性进行验证,是在Log的append方法中调用了

代码语言:js
复制
val appendInfo = analyzeAndValidateMessageSet(messages)

在该方法内部,进行了大小的验证

代码语言:js
复制
// Check if the message sizes are valid.
val messageSize = MessageSet.entrySize(m) //单个消息的大小
if(messageSize > config.maxMessageSize) {
  BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
  BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
 throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
 .format(messageSize, config.maxMessageSize))
}

Broker端接受的最大消息大小的配置为

message.max.bytes

1000000

要结合消费者获取最大消息尺寸设计,否则有可能导致生产者生产的消息消费者无法消费。

消费者,对消息尺寸进行验证。看过去前面的文章应该知道,SimpleConsumer获取消息后最终是由kafkaStream(ConsumerIterator),将消息从对列里取出使用的。实际是在ConsumerIterator的makeNext方法中进行消息有效性验证

代码语言:js
复制
// if we just updated the current chunk and it is empty that means the fetch size is too small!
if(currentDataChunk.messages.validBytes == 0)
 throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
 "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
 .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))

不满足要求就会抛出一个异常。关键就是第一次迭代取消息的时候,假如消息消费者消费时指定的最大消息大小,小于生产者生产的,此时会导致消息获取不完整,然后进一步导致,shallowValidBytes返回值为零,以此来判断消息是否超过消费者所能消费消息大小,然后抛出相关异常。

代码语言:js
复制
 private def shallowValidBytes: Int = {
 if(shallowValidByteCount < 0) {
 var bytes = 0
 val iter = this.internalIterator(true)
 while(iter.hasNext) {
 val messageAndOffset = iter.next
        bytes += MessageSet.entrySize(messageAndOffset.message)
      }
//      bytes=0,代表消息不完整,当时第一次迭代就读取的消息不完整的话就会出现bytes为零,这时就需要调整消费者的消息大小了。
 this.shallowValidByteCount = bytes
    }
 shallowValidByteCount
 }

消费者所能获取的最大消息大小的配置

fetch.message.max.bytes

1024 * 1024

消费者所能获取的最大消息大小

五,总结

本节主要目的是分析kafka的磁盘顺序写和zero-copy源码。这个也是kafka只所以高效的最关键步骤,在这里浪尖给出了一下使用时总结。

1,生产者生产消息时,采用异步

异步,并发

消息块方式减少网络io请求次数

可以更加好的利用Broker端的磁盘顺序写

2,生产者生产消息是指定压缩

节省网络传输带宽

配置方式

compression.codec

none

This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".

3,注意消息大小限制

结合自己的消息类型特点,进行相关配置。

最后赘述一个kafka高性能的原因:

1,Broker NIO异步消息处理,实现了IO线程与业务线程分离。

2,磁盘顺序写。

3,零拷贝。

读完这篇文章希望大家对kafka的高性能有更具体的认识,也希望能用到我们自己设计的系统中去。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-06-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档