Kafka源码系列之通过源码分析Producer性能瓶颈

Kafka源码系列之通过源码分析Producer性能瓶颈

本文,kafka源码是以0.8.2.2,原因是浪尖一直没对kafka系统进行升级。主要是java的kafka生产者源码,Broker接收到producer请求之后处理的相关源码。估计源码内容是比较多的,只给出大致逻辑,主类和函数名称。本文的目的是让大家,彻底了解发送消息到kafka的过程及如何对producer进行调优。

一,kafka的producer基本介绍及主要类

1,基本介绍

Kafka的Producer,主要负责将消息发送给kafka集群。主要核心特性有两点:

1),异步 or 同步。

可以通过配置producer.type(1或async和2或sync)

设置为异步的话,其实就是启动了一个后台线程,负责从队列里面取数据发送给kafka,我们的主程序负责往队列了写数据。支持批发送。

2),消息的key决定分区。

可以采用分区器,来决定我们的消息发往哪个partition。实现Partitioner可以实现自定义分区器。默认的分区策略是对key取hash值,然后对总的分区数取余,得到的余数作为我们发送消息到哪个分区的依据。

2,主要类

Producer

kafka.producer.Producer该类异常重要,负责对DefaultEventHandler进行初始化并且在此过程也初始化真正的发送者池ProducerPool。

异步发送消息的策略情况下会对初始化我们后台发送线程。

DefaultEventHandler

该类主要是为消息发送做准备,比如更新broker信息,找到分区的leader等,最终通过SyncProducer将消息发送给Broker。

ProducerSendThread

这个是异步发送情况下才会有的,一个后台发送线程。这种模式下,生产端实际是形成了一个生产消费模型,用户调用Producer.send实际是将消息添加到了一个消息队列里面LinkedBlockingQueue,然后由ProducerSendThread的processEvents,批量处理后交给了DefaultEventHandler。

ProducerPool

维护了Broker数目个SyncProducer,会在每次发送消息前更新。

SyncProducer

每个SyncProducer对象,包含了一个到一个broker链接,我们通过获取该对象来发消息到特定的broker。

BrokerPartitionInfo

该类主要是在获取topic元数据信息的时候会用到。在更新元数据的时候会同时更新我们ProducerPool,避免Borker挂掉之后SyncProducer对象不可用。

Partitioner

主要是分区器。对我们的消息按照key进行分区。

KafkaApis

Kafka的各种请求的逻辑处理类。

Processor

负责应答请求。

二,源码讲解

producer与Broker通信骨干

由上图可以看出,生产者于消费者通讯的骨干分两类:

1,TopicMetadataRequest

RequestId:RequestKeys.MetadataKey。主要是负责请求topic的元数据,更新Producer里面syncProducers链接信息,也获取了topic的具体信息。

在DefaultEventHandler的Handle方法里,开启了元数据的更新

brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)

def updateInfo(topics: Set[String], correlationId: Int) {
 var topicsMetadata: Seq[TopicMetadata] = Nil
 val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)

在ClientUtils.fetchTopicMetadata方法里,会根据我们给定的broker列表,随机选出一个创建一个SyncProducer,去获取元数据。获取到就会推出(一台Broker会包含所有的元数据)。

val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
// shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the
// same broker
val shuffledBrokers = Random.shuffle(brokers)
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
 val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
  info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
 try {
    topicMetadataResponse = producer.send(topicMetadataRequest)
    fetchMetaDataSucceeded = true
 }
 catch {
 case e: Throwable =>
      warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
 .format(correlationId, topics, shuffledBrokers(i).toString), e)
      t = e
  } finally {
    i = i + 1
 producer.close()
  }
}

ducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))

TopicMetadataRequest被KafkaApis识别为获取topic元数据的请求,最终调用的处理函数是handleTopicMetadataRequest(request)。

2,ProducerRequest

RequestId:RequestKeys.ProduceKey。被KafkaApis识别为消息发送请求。会调用的处理函数handleProducerOrOffsetCommitRequest(request)。

具体过程如下:

根据上图可知,异步通讯这种方式其实涵盖了,同步通讯。所以,这块源码呢,我们主要关注异步通讯。

1),初始化

前面已经说过了在构建kafka.producer.Producer对象的时候会初始化ProducerPool和DefaultEventHandler。

//构建DefaultEventHandler的时候会通过反射构建分区器,和key-value的序列化方式
def this(config: ProducerConfig) =
 this(config,
 new DefaultEventHandler[K,V](config,
 Utils.createObject[Partitioner](config.partitionerClass, config.props),
 Utils.createObject[Encoder[V]](config.serializerClass, config.props),
 Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
 new ProducerPool(config)))

假如为异步消息发送也会构建一个后台发送线程

config.producerType match {
 case "sync" =>
 case "async" =>
 sync = false
 producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
 queue,
 eventHandler,
 config.queueBufferingMaxMs,
 config.batchNumMessages,
 config.clientId)
 producerSendThread.start()
}

2),具体数据发送过程

A),消息生产到消息队列

producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

调用kafka.producer.Producer对象asyncSend将消息存储到消息队列,自己的。

asyncSend(messages: Seq[KeyedMessage[K,V]])

LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

B),后台线程从消息队列取出消息

此过程,牵涉到发送调优的一个策略:

(1)满足消息发送批大小发送

(2)消息等待超时也会将消息发送

首先是构建清空队列的流

Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)) .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach

然后里面会进行判断

// check if the batch size is reached
full = events.size >= batchSize

if(full || expired) {
 if(expired)
    debug(elapsed + " ms elapsed. Queue time reached. Sending..")
 if(full)
    debug("Batch full. Sending..")
 // if either queue time has reached or batch size has reached, dispatch to event handler
 tryToHandle(events)
  lastSend = SystemTime.milliseconds
 events = new ArrayBuffer[KeyedMessage[K,V]]
}

C),DefaultEventHandler消息发送前的准备

在handle方法里,首先是会判断是否满足topic元数据更新的时间间隔,间隔设置方式:

topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)

满足则进行topic元数据更新

brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)

然后呢?获取leader,按照key进行分区,构建发送的消息块,发送消息。具体由DefaultEventHandler的dispatchSerializedData方法完成

首先,用DefaultEventHandler的partitionAndCollate方法获取

Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]

其中,其中map的key是分区leader的brokerId

Map的Value,又是一个map,key是TopicAndPartition,value是该分区的具体消息。

接着,正式进入消息封装与发送

val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker)

在send方法中构建了

val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
 config.requestTimeoutMs, messagesPerTopic)

从Producer池中获取一个SyncProducer,发送消息

val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)

Broker接受到消息根据requestId,进行逻辑处理,最终是交给了KafkaApis的方法

case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)

三,总结

由于kafka在我们整个生产环境系统中的重要性,主要体现在,kafka集群一旦垮了,会导致真个业务系统断了,系统瘫痪。所以,保证kafka集群的存活,高效运转,是我们大数据工作者一个重要责任。这次主要是从生产者的角度强调我们的优化过程。后面会陆续出文章讲多种消费者角色的源码和优化策略和kafka本身的优化策略。

1,采用异步发送策略

异步策略,增加了一个后台发送线程,增加并发度。

异步策略支持批量发送和超时发送,提升了性能。

2,设置合理的批大小和超时时间(异步处理情况)

配置

默认

作用

queue.buffering.max.ms

5000

异步发送消息超时发送时间

batch.num.messages

200

异步消息批量发送的阈值

3,设置合适的kafka分区数

分区数一方面决定了我们写消息的并发度,由此也影响着吞吐量。也决定后端处理线程的并发度。

分区并发度决定因素:数据量,后端业务处理复杂度,磁盘数量。

并不是分区数越多就越好,磁盘竞争也很影响性能的。

4,尽量使数据均匀分布

重要等级高,可以使我们后端处理线程负载均匀。

1),key随机或者轮训分区进行发送

2),自定义分区策略

5,如何保证消息顺序性

将需要保证顺序的消息,采用同步的方式发送发送到同一个分区里。

6,高级优化策略

自己使用kafka client的api实现自己的生产者,减少中间环节,尤其针对生产者跟kafka集群在同一台主机的时候,我们可以只发送数据到当前的主机的分区,减少了流量跨主机传输,节省带宽。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-06-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大学生计算机视觉学习DeepLearning

c++ 网络编程(二)TCP/IP linux 下多进程socket通信 多个客户端与单个服务端交互代码实现回声服务器

原文链接:https://www.cnblogs.com/DOMLX/p/9612820.html

50990
来自专栏JavaEdge

操作系统之文件管理一、文件与文件系统二、文件控制块和文件目录三、文件的物理结构四、文件系统的实现五、文件系统实例(UNIX)六、UNIX文件系统一、文件系统实例(FAT)二、文件操作的实现三、文件系统

62160
来自专栏MessageQueue

消息中间件核心实体(0)

从之前讨论的架构来说,消息中间件也是有主从复制这个模块的,像Rocket就支持主从模式。

10940
来自专栏小狼的世界

在Codeigniter框架中使用NuSOAP

NuSOAP 是一组功能强大的PHP类,这个工具的发布让使用和创建SOAP消息变得相当简单。 NuSOAP有Dirtrich Ayala编写,可以无缝的与许多最...

9710
来自专栏前端杂货铺

Nodejs cluster模块深入探究

由表及里 HTTP服务器用于响应来自客户端的请求,当客户端请求数逐渐增大时服务端的处理机制有多种,如tomcat的多线程、nginx的事件循环等。而对于nod...

579100
来自专栏ImportSource

并发编程-到处都是线程!

即使是你在自己的program中从来没有显式的创建线程,框架也依然会代表你偷偷的创建一些线程。所以呢,被这些线程调用的代码也必须是线程安全的。这样的设计和实现给...

34370
来自专栏黄Java的地盘

提高代码质量——使用Jest和Sinon给已有的代码添加单元测试

在日常的功能开发中,我们的代码测试都依赖于自己或者QA进行测试。这些操作不仅费时费力,而且还依赖开发者自身的驱动。在开发一些第三方依赖的库时,我们也没有办法给第...

20800
来自专栏小怪聊职场

爬虫架构|利用Kafka处理数据推送问题(2)

604120
来自专栏专注于主流技术和业务

axios2教程

axios 是一个基于 promise 的 HTTP 库,用于浏览器和node.js的http客户端,支持拦截请求和响应,自动转换 JSON 数据, 客户端支持...

1.1K20
来自专栏Seebug漏洞平台

Joomla 权限提升漏洞(CVE-2016-9838)分析

0x00 漏洞概述 1.漏洞简介 Joomla 于12月13日发布了3.6.5的升级公告,此次升级修复了三个安全漏洞,其中 CVE-2016-9838 被官方定...

382100

扫码关注云+社区

领取腾讯云代金券