首页
学习
活动
专区
工具
TVP
发布

4、深潜KafkaProducer —— RecordAccumulator

通过上一课时的介绍我们了解到,业务线程使用 KafkaProducer.send() 方法发送 message 的时候,会先将其写入RecordAccumulator 中进行缓冲,当 RecordAccumulator...本课时我们就重点来看一下 RecordAccumulator 这个缓冲区的结构。...首先,我们从上图中可以看出,RecordAccumulator 会由业务线程写入、Sender 线程读取,这是一个非常明显的生产者-消费者模式,所以我们需要保证 RecordAccumulator 是线程安全的...RecordAccumulator 、ProducerBatch、MemoryRecordsBuilder 这三个核心类的关系如下图所示: message 格式 既然我们准备深入 KafkaProducer...、ProducerBatch、BufferPool 等底层组件,以及 RecordAccumulator 的核心方法。

1.1K00

(五)Kafka系列:一文了解Kafka的消息收集器RecordAccumulator

〇、前言 在上一篇文章《连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka》中,我们介绍了Main Thread的工作原理,那么在本篇文章中,我们继续介绍第二部分内容:RecordAccumulator...一、RecordAccumulator 在上文中,我们介绍了主线程(Main Thread)的执行流程,当我们使用KafkaProducer发送消息的时候,消息会经过拦截器(Interceptor)、序列化器...(Serializer)和分区器(Partitioner),最后会暂存到消息收集器(RecordAccumulator)中,那么,本节就来针对其进行介绍。...RecordAccumulator的主要作用是暂存Main Thread发送过来的消息,然后Sender Thread就可以从RecordAccumulator中批量的获取到消息,减少单个消息获取的请求次数...properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000); 在RecordAccumulator中,我们通过getOrCreateDeque

21320
您找到你想要的搜索结果了吗?
是的
没有找到

Kafka技术知识总结之八——Kafka生产者结构

Kafka 生产者结构 Kafka Producer 主要有三个部分组成:主线程、Sender 线程、RecordAccumulator。...主线程:执行序列化、分区、拦截器处理等主要操作,并将消息缓存到 RecordAccumulator 中; Sender 线程:从 RecordAccumulator 中拉数据,发送到 broker; RecordAccumulator...:缓存主线程的消息,并提供给 Sender 线程; 生产者执行流程: 主线程处理数据,包括处理拦截器操作、序列化、分区; 主线程将数据发送给 RecordAccumulatorRecordAccumulator...ProducerRecord) 压缩成 (ProducerBatch),并以分区 (TopicPartition) - 消息队列 (Deque(ProducerBatch)) 的消息存储主线程发来的消息; Sender 从 RecordAccumulator

49310

一文了解Kafka的消息收集器RecordAccumulate

〇、前言 在上一篇文章《连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka》中,我们介绍了Main Thread的工作原理,那么在本篇文章中,我们继续介绍第二部分内容:RecordAccumulator...一、RecordAccumulator 在上文中,我们介绍了主线程(Main Thread)的执行流程,当我们使用KafkaProducer发送消息的时候,消息会经过拦截器(Interceptor)、序列化器...(Serializer)和分区器(Partitioner),最后会暂存到消息收集器(RecordAccumulator)中,那么,本节就来针对其进行介绍。...RecordAccumulator的主要作用是暂存Main Thread发送过来的消息,然后Sender Thread就可以从RecordAccumulator中批量的获取到消息,减少单个消息获取的请求次数...properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000); 在RecordAccumulator中,我们通过getOrCreateDeque

16120

KafkaProducer源码分析

,主要用于存放消息(KafkaProducer主线程往RecordAccumulator中写消息,Sender线程从RecordAccumulator中读消息并发送到Kafka中) 6.解析Broker...// 唤醒Sender线程 Sender.wakeup RecordAccumulator RecordAccumulator是消息队列用于缓存消息,根据TopicPartition对消息分组 重点看下...RecordAccumulator.applend追加消息的流程 // 记录进行applend的线程数 appendsInProgress.incrementAndGet(); // 根据TopicPartition...// 根据准备好的节点信息从缓冲区中获取topicPartion对应的Deque队列中取出ProducerBatch信息 RecordAccumulator.drain // 将消息转移到每个节点的生产请求队列中...通过上面的介绍,我们梳理出了Kafka生产消息的主要流程,涉及到主线程往RecordAccumulator中写入消息,同时后台的Sender线程从RecordAccumulator中获取消息,使用NIO

57110

更好的理解kafka,快来学习kafka的架构设计

在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。...Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。...RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗提升性能。...RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory的配置,默认值为32MB。...主线程中发送过来的消息都会被追回到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch

40620

kafka-2-生产者-流程

2、RecordAccumulator:是一个记录收集器,用于收集客户端发送的消息,并将收集到的消息暂存到客户端缓存中。...流程详解:消息发送的过程中,涉及到两个线程协同工作:1、主线程首先将业务数据封装成ProducerRecord对象,2、之后调用send()方法将消息放入RecordAccumulator(消息收集器,...也可以理解为主线程与Sender线程直接的缓冲区)中暂存,3、Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去, 需要注意的是...hash,然后对分区数取模(Utils.murmur2(key) % numPartitions) ~ 3、如未指定且没key,则轮询发送给分区(低版本采用随机)5、临时缓存(存储) RecordAccumulator...采用了双端队列(Deque)数据结构来临时存储 目的:提高发送数据的吞吐量 确定消息发送的分区后,会在RecordAccumulator寻找对应的Deque,找不到对应的Deque则新建

7410
领券