前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-生产者发送流程

kafka-生产者发送流程

作者头像
一点博客
发布2023-02-20 17:24:36
5020
发布2023-02-20 17:24:36
举报
文章被收录于专栏:一点博客

生产者整体架构:

image.png

发送之前会经历 拦截器, 序列化器, 分区器.

发送过程: 由两个线程完成. 主线程和sender线程.

主线程: 负责将消息发送到消息累加器(RecordAccumulator) .

Sender线程: 负责将消息累加器(RecordAccumulator)中获取消息并发送到Broker.

RecordAccumulator: 主要用来缓存消息, Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B ,即 32M, 如果生产者发送消息的速度超过发送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer send() 方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max .block.ms 的配置,此参数的默认值为60000, 即60秒.

主线程中发送过来的消息都会被加到 RecordAccumulator 的某个双端队列( Deque )中, RecordAccumulator 的内部为每个分区都维护了 个双端队列,队列中的内容就是 Producer Batch ,即 Deque ProducerBatch。消息写入缓存时,追加到双端队列的尾部:Sender读取消息时, 从队列头部读取.

在 RecordAccumulator 的内部还有一个 BufferPool, 它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用 。不 BufferPool 只针对特定大ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小 batch.size 参数来指定,默认值为 16384B ,即 16KB 我们可以适当地调大 batch.size 参数 以便多缓存一些消息。

总结:kafka是微批发送消息的,不是实时发送。每个批次的大小为batch.size;

rocketmq是实时发送.

重要参数

参数名

说明

max.request.size

客户端能发送消息的最大值, 默认1 M

retries

重试次数

retry.backoff.ms

两次重试之间的间隔

compression.type

消息压缩, 默认为none, 压缩后减少IO, 但是会加大时延.

liner.ms

生产者客户端会在 ProducerBatch 填满或等待时间超过 linger.ms 值时发送出去。

receive.buffer.bytes

socket接收消息的缓冲区, 默认32Kb, producer与broker处于不同的机房,适当调高该值

send.buffer.bytes

发送消息的socket缓冲区, 默认128KB.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一点博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 重要参数
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档