kafka作为消息队列,go消费数据常用的库,有sarama包和其扩展sarama-cluster。 https://github.com/Shopify/sarama https://github.com/bsm/sarama-cluster
最近遇到消费出现问题,报错:
consumer/broker/1002 disconnecting due to error processing FetchRequest: kafka: error decoding packet: message of length 115804741 too large or too small
找到sarama中源码:
func (r *responseHeader) decode(pd packetDecoder) (err error) {
r.length, err = pd.getInt32()
if err != nil {
return err
}
if r.length <= 4 || r.length > MaxResponseSize {
return PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", r.length)}
}
r.correlationID, err = pd.getInt32()
return err
}
原来是读取的数据包大小大于设置的缓存区大小MaxResponseSize(100M) 但是我们在程序中配置了拉取的包大小,Fetch.Default=40M,Fetch.Max=60M
Fetch struct {
// The minimum number of message bytes to fetch in a request - the broker
// will wait until at least this many are available. The default is 1,
// as 0 causes the consumer to spin when no messages are available.
// Equivalent to the JVM's `fetch.min.bytes`.
Min int32
// The default number of message bytes to fetch from the broker in each
// request (default 32768). This should be larger than the majority of
// your messages, or else the consumer will spend a lot of time
// negotiating sizes and not actually consuming. Similar to the JVM's
// `fetch.message.max.bytes`.
Default int32
// The maximum number of message bytes to fetch from the broker in a
// single request. Messages larger than this will return
// ErrMessageTooLarge and will not be consumable, so you must be sure
// this is at least as large as your largest message. Defaults to 0
// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
// global `sarama.MaxResponseSize` still applies.
Max int32
}
为什么没有生效呢?这里的Default和Max到底是什么含义?
后来把Fetch.Default改小到20M,就不报错了,即不会超过100M。也就是说这个字段还是生效了,但是不准确?
继续看代码,找到拼接fetch请求包的地方:
func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request := &FetchRequest{
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 3
request.MaxBytes = MaxResponseSize
}
if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
request.Isolation = ReadUncommitted // We don't support yet transactions.
}
for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
return bc.broker.Fetch(request)
}
就是设置每次拉取的大小。 这个值是在哪里赋值的呢?找到parseResponse函数,可以看到
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
........
if nRecs == 0 {
partialTrailingMessage, err := block.Records.isPartial()
if err != nil {
return nil, err
}
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if partialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}
return nil, nil
}
// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
........
}
找到了这两个字段的含义: Default:拉取的默认大小(应该要大于大部分消息) Max:当拉取不到时,会*2指数增大fetchSize ,但是最大不能超过这个MAX。(主要应对大的消息体)
查一下kafka协议 https://kafka.apache.org/protocol#The_Messages_Fetch
Fetch Request (Version: 1) => replica_id max_wait_time min_bytes [topics]
replica_id => INT32
max_wait_time => INT32
min_bytes => INT32
topics => topic [partitions]
topic => STRING
partitions => partition fetch_offset partition_max_bytes
partition => INT32
fetch_offset => INT64
partition_max_bytes => INT32
Fetch Request (Version: 3) => replica_id max_wait_time min_bytes max_bytes [topics]
replica_id => INT32
max_wait_time => INT32
min_bytes => INT32
max_bytes => INT32
topics => topic [partitions]
topic => STRING
partitions => partition fetch_offset max_bytes
partition => INT32
fetch_offset => INT64
max_bytes => INT32
这里v3多了一个max_bytes,也就是上面的fetch_offset指每一个partition的。 如果你这个客户端消费了100个partition,即使设置fetchsize=1M,最坏情况也可能拉取到100M的数据。 所以在v3协议中,kafka新增了上一级的max_bytes参数。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes https://issues.apache.org/jira/browse/KAFKA-2063
所以这里也解释了为什么之前改成20M不会出问题,乘以消费的parttition数量后小于100M,但是还有有超过100M的风险。 保险的做法是改成v3的协议拉取。
sarama调用改为v3协议。但是运行一段时间后,还是出现了大于100M,但是只是略微大几十个字节。
再仔细看上面的代码: request.MaxBytes = MaxResponseSize 请求的大小直接是MaxResponseSize,回来的body是包含包头的。所以本身协议没有问题,是这个库要优化的。应该在MaxResponseSize基础再少一些,减去包头。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。