首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka发送消息时提示请求数据过大是怎么回事?

Kafka发送消息时提示请求数据过大是怎么回事?

作者头像
张乘辉
发布2019-09-25 14:09:07
发布2019-09-25 14:09:07
3.8K00
代码可运行
举报
文章被收录于专栏:后端进阶后端进阶
运行总次数:0
代码可运行
今天有个小伙伴跟我反馈,在 Kafka 客户端他明明设置了 batch.size 参数,以提高 producer 的吞吐量,但他发现报了如下错误:

然后我去服务器查看了下 producer 的配置,发现没有配置 max.request.size,默认值为 1048576,而他发送的消息大小为 1575543,因此报了这个异常。

然后接下来他跟我讲他已经在客户端配置了 batch.size 的值为 512000,按照这个值的作用,应该是大于这个值才会进行批量发送消息到 broker:

于是我又得去撸源码,搞清楚 Kafka 发送消息实现细节:

org.apache.kafka.clients.producer.KafkaProducer#doSend:

代码语言:javascript
代码运行次数:0
运行
复制
// ...

// 估算消息的字节大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                                                                   compressionType, serializedKey, serializedValue, headers);

// 确保消息大小不超过发送请求最大值 max.request.size,或者发送缓冲池发小 buffer.memory
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())
  transactionManager.maybeAddPartitionToTransaction(tp);

// 向 RecordAccumulator 追加消息
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                                                                 serializedValue, headers, interceptCallback, remainingWaitMs);

// 如果 batch 已经满了,那么就会唤醒sender线程发送批量消息
if (result.batchIsFull || result.newBatchCreated) {
  log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  this.sender.wakeup();
}
return result.future;
// ...

org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize:

代码语言:javascript
代码运行次数:0
运行
复制
private void ensureValidRecordSize(int size) {
  // 如果发送消息的大小比maxRequestSize大,就会抛异常
  if (size > this.maxRequestSize)
    throw new RecordTooLargeException("The message is " + size +
                                      " bytes when serialized which is larger than the maximum request size you have configured with the " +
                                      ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                                      " configuration.");
  if (size > this.totalMemorySize)
    throw new RecordTooLargeException("The message is " + size +
                                      " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                                      ProducerConfig.BUFFER_MEMORY_CONFIG +
                                      " configuration.");
}

从以上源码得出结论,Kafka 会首先判断本次消息大小是否大于 maxRequestSize,如果本次消息大小 maxRequestSize,则直接抛出异常,不会继续执行追加消息到 batch。

batch.size 是 Kafka producer 非常重要的参数,它的值对 Producer 的吞吐量有着非常大的影响,因为我们知道,收集到一批消息再发送到 broker,比每条消息都请求一次 broker,性能会有显著的提高,但 batch.size 设置得非常大又会给机器内存带来极大的压力,因此需要在项目中合理地增减 batch.size 值,才能提高 producer 的吞吐量。

这里来个扩展性的问题:

可能有人会问,如果 producer 发送的消息量非常少,少到不足以填满 batch,因此不足以触发 Sender 线程执行发送消息,那这时怎么办,其实这里还有一个参数与 batch.size 配合使用,叫 linger.ms,这个参数的作用是当达到了 linger.ms 时长后,不管 batch 有没有填满,都会立即发送消息。linger.ms 参数默认值为 0,即默认消息无需批量发送,这时就需要看项目需求来权衡了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端进阶 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档