前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >sarama消费kafka协议版本问题

sarama消费kafka协议版本问题

原创
作者头像
用户1073218
修改2020-07-08 10:08:23
4.8K1
修改2020-07-08 10:08:23
举报
文章被收录于专栏:技术交流学习技术交流学习

kafka作为消息队列,go消费数据常用的库,有sarama包和其扩展sarama-cluster。 https://github.com/Shopify/sarama https://github.com/bsm/sarama-cluster

问题

最近遇到消费出现问题,报错: consumer/broker/1002 disconnecting due to error processing FetchRequest: kafka: error decoding packet: message of length 115804741 too large or too small

定位

找到sarama中源码:

代码语言:javascript
复制
func (r *responseHeader) decode(pd packetDecoder) (err error) {
    r.length, err = pd.getInt32()
    if err != nil {
        return err
    }
    if r.length <= 4 || r.length > MaxResponseSize {
        return PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", r.length)}
    }

    r.correlationID, err = pd.getInt32()
    return err
}

原来是读取的数据包大小大于设置的缓存区大小MaxResponseSize(100M) 但是我们在程序中配置了拉取的包大小,Fetch.Default=40M,Fetch.Max=60M

代码语言:javascript
复制
Fetch struct {
            // The minimum number of message bytes to fetch in a request - the broker
            // will wait until at least this many are available. The default is 1,
            // as 0 causes the consumer to spin when no messages are available.
            // Equivalent to the JVM's `fetch.min.bytes`.
            Min int32
            // The default number of message bytes to fetch from the broker in each
            // request (default 32768). This should be larger than the majority of
            // your messages, or else the consumer will spend a lot of time
            // negotiating sizes and not actually consuming. Similar to the JVM's
            // `fetch.message.max.bytes`.
            Default int32
            // The maximum number of message bytes to fetch from the broker in a
            // single request. Messages larger than this will return
            // ErrMessageTooLarge and will not be consumable, so you must be sure
            // this is at least as large as your largest message. Defaults to 0
            // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
            // global `sarama.MaxResponseSize` still applies.
            Max int32
        }

为什么没有生效呢?这里的Default和Max到底是什么含义?

后来把Fetch.Default改小到20M,就不报错了,即不会超过100M。也就是说这个字段还是生效了,但是不准确?

解决

继续看代码,找到拼接fetch请求包的地方:

代码语言:javascript
复制
func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
    request := &FetchRequest{
        MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
        MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
    }
    if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
        request.Version = 2
    }
    if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
        request.Version = 3
        request.MaxBytes = MaxResponseSize
    }
    if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
        request.Version = 4
        request.Isolation = ReadUncommitted // We don't support yet transactions.
    }

    for child := range bc.subscriptions {
        request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
    }

    return bc.broker.Fetch(request)
}
1. 首先注意到这里的child.fetchSize

就是设置每次拉取的大小。 这个值是在哪里赋值的呢?找到parseResponse函数,可以看到

代码语言:javascript
复制
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
    ........
    if nRecs == 0 {
        partialTrailingMessage, err := block.Records.isPartial()
        if err != nil {
            return nil, err
        }
        // We got no messages. If we got a trailing one then we need to ask for more data.
        // Otherwise we just poll again and wait for one to be produced...
        if partialTrailingMessage {
            if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
                // we can't ask for more data, we've hit the configured limit
                child.sendError(ErrMessageTooLarge)
                child.offset++ // skip this one so we can keep processing future messages
            } else {
                child.fetchSize *= 2
                if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
                    child.fetchSize = child.conf.Consumer.Fetch.Max
                }
            }
        }

        return nil, nil
    }

    // we got messages, reset our fetch size in case it was increased for a previous request
    child.fetchSize = child.conf.Consumer.Fetch.Default
    ........
}

找到了这两个字段的含义: Default:拉取的默认大小(应该要大于大部分消息) Max:当拉取不到时,会*2指数增大fetchSize ,但是最大不能超过这个MAX。(主要应对大的消息体)

2. 再次注意到这里还有一个版本号,当v3的时候,多一个request.MaxBytes = MaxResponseSize

查一下kafka协议 https://kafka.apache.org/protocol#The_Messages_Fetch

代码语言:javascript
复制
Fetch Request (Version: 1) => replica_id max_wait_time min_bytes [topics] 
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  topics => topic [partitions] 
    topic => STRING
    partitions => partition fetch_offset partition_max_bytes 
      partition => INT32
      fetch_offset => INT64
      partition_max_bytes => INT32

Fetch Request (Version: 3) => replica_id max_wait_time min_bytes max_bytes [topics]
  replica_id => INT32 
  max_wait_time => INT32
  min_bytes => INT32
  max_bytes => INT32
  topics => topic [partitions]
    topic => STRING
    partitions => partition fetch_offset max_bytes
      partition => INT32
      fetch_offset => INT64
      max_bytes => INT32

这里v3多了一个max_bytes,也就是上面的fetch_offset指每一个partition的。 如果你这个客户端消费了100个partition,即使设置fetchsize=1M,最坏情况也可能拉取到100M的数据。 所以在v3协议中,kafka新增了上一级的max_bytes参数。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes https://issues.apache.org/jira/browse/KAFKA-2063

所以这里也解释了为什么之前改成20M不会出问题,乘以消费的parttition数量后小于100M,但是还有有超过100M的风险。 保险的做法是改成v3的协议拉取。

修复

sarama调用改为v3协议。但是运行一段时间后,还是出现了大于100M,但是只是略微大几十个字节。

再仔细看上面的代码: request.MaxBytes = MaxResponseSize 请求的大小直接是MaxResponseSize,回来的body是包含包头的。所以本身协议没有问题,是这个库要优化的。应该在MaxResponseSize基础再少一些,减去包头。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
  • 定位
  • 解决
    • 1. 首先注意到这里的child.fetchSize
      • 2. 再次注意到这里还有一个版本号,当v3的时候,多一个request.MaxBytes = MaxResponseSize
      • 修复
      相关产品与服务
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档