生产者整体架构:
image.png
发送之前会经历 拦截器, 序列化器, 分区器.
发送过程: 由两个线程完成. 主线程和sender线程.
主线程: 负责将消息发送到消息累加器(RecordAccumulator) .
Sender线程: 负责将消息累加器(RecordAccumulator)中获取消息并发送到Broker.
RecordAccumulator: 主要用来缓存消息, Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B ,即 32M, 如果生产者发送消息的速度超过发送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer send() 方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max .block.ms 的配置,此参数的默认值为60000, 即60秒.
主线程中发送过来的消息都会被加到 RecordAccumulator 的某个双端队列( Deque )中, RecordAccumulator 的内部为每个分区都维护了 个双端队列,队列中的内容就是 Producer Batch ,即 Deque ProducerBatch。消息写入缓存时,追加到双端队列的尾部:Sender读取消息时, 从队列头部读取.
在 RecordAccumulator 的内部还有一个 BufferPool, 它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用 。不 BufferPool 只针对特定大ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小 batch.size 参数来指定,默认值为 16384B ,即 16KB 我们可以适当地调大 batch.size 参数 以便多缓存一些消息。
总结:kafka是微批发送消息的,不是实时发送。每个批次的大小为batch.size;
rocketmq是实时发送.
参数名 | 说明 |
---|---|
max.request.size | 客户端能发送消息的最大值, 默认1 M |
retries | 重试次数 |
retry.backoff.ms | 两次重试之间的间隔 |
compression.type | 消息压缩, 默认为none, 压缩后减少IO, 但是会加大时延. |
liner.ms | 生产者客户端会在 ProducerBatch 填满或等待时间超过 linger.ms 值时发送出去。 |
receive.buffer.bytes | socket接收消息的缓冲区, 默认32Kb, producer与broker处于不同的机房,适当调高该值 |
send.buffer.bytes | 发送消息的socket缓冲区, 默认128KB. |