前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq-client-go的strategy

聊聊rocketmq-client-go的strategy

作者头像
code4it
发布2020-07-16 10:27:11
3810
发布2020-07-16 10:27:11
举报

本文主要研究一下rocketmq-client-go的strategy

AllocateStrategy

rocketmq-client-go-v2.0.0/consumer/strategy.go

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
  • AllocateStrategy定义了一个func

AllocateByAveragely

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
    cidAll []string) []*primitive.MessageQueue {
    if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
        return nil
    }

    var (
        find  bool
        index int
    )
    for idx := range cidAll {
        if cidAll[idx] == currentCID {
            find = true
            index = idx
            break
        }
    }
    if !find {
        rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
            rlog.LogKeyConsumerGroup: consumerGroup,
            "consumerId":             currentCID,
            "cidAll":                 cidAll,
        })
        return nil
    }

    mqSize := len(mqAll)
    cidSize := len(cidAll)
    mod := mqSize % cidSize

    var averageSize int
    if mqSize <= cidSize {
        averageSize = 1
    } else {
        if mod > 0 && index < mod {
            averageSize = mqSize/cidSize + 1
        } else {
            averageSize = mqSize / cidSize
        }
    }

    var startIndex int
    if mod > 0 && index < mod {
        startIndex = index * averageSize
    } else {
        startIndex = index*averageSize + mod
    }

    num := utils.MinInt(averageSize, mqSize-startIndex)
    result := make([]*primitive.MessageQueue, 0)
    for i := 0; i < num; i++ {
        result = append(result, mqAll[(startIndex+i)%mqSize])
    }
    return result
}
  • AllocateByAveragely方法会计算averageSize,然后再根据averageSize计算startIndex,最后取mqAll[(startIndex+i)%mqSize]

AllocateByAveragelyCircle

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
    cidAll []string) []*primitive.MessageQueue {
    if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
        return nil
    }

    var (
        find  bool
        index int
    )
    for idx := range cidAll {
        if cidAll[idx] == currentCID {
            find = true
            index = idx
            break
        }
    }
    if !find {
        rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
            rlog.LogKeyConsumerGroup: consumerGroup,
            "consumerId":             currentCID,
            "cidAll":                 cidAll,
        })
        return nil
    }

    result := make([]*primitive.MessageQueue, 0)
    for i := index; i < len(mqAll); i++ {
        if i%len(cidAll) == index {
            result = append(result, mqAll[i])
        }
    }
    return result
}
  • AllocateByAveragelyCircle方法取i%len(cidAll) == index的下标

AllocateByConfig

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy {
    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
        return list
    }
}
  • AllocateByConfig直接返回配置的list

AllocateByMachineRoom

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {
    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
        if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
            return nil
        }

        var (
            find  bool
            index int
        )
        for idx := range cidAll {
            if cidAll[idx] == currentCID {
                find = true
                index = idx
                break
            }
        }
        if !find {
            rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
                rlog.LogKeyConsumerGroup: consumerGroup,
                "consumerId":             currentCID,
                "cidAll":                 cidAll,
            })
            return nil
        }

        var premqAll []*primitive.MessageQueue
        for _, mq := range mqAll {
            temp := strings.Split(mq.BrokerName, "@")
            if len(temp) == 2 {
                for _, idc := range consumeridcs {
                    if idc == temp[0] {
                        premqAll = append(premqAll, mq)
                    }
                }
            }
        }

        mod := len(premqAll) / len(cidAll)
        rem := len(premqAll) % len(cidAll)
        startIndex := mod * index
        endIndex := startIndex + mod

        result := make([]*primitive.MessageQueue, 0)
        for i := startIndex; i < endIndex; i++ {
            result = append(result, mqAll[i])
        }
        if rem > index {
            result = append(result, premqAll[index+mod*len(cidAll)])
        }
        return result
    }
}
  • AllocateByMachineRoom方法对于startIndex与endIndex之间的取对应的mqAll[i],若rem大于index,则取premqAll[index+mod*len(cidAll)]

AllocateByConsistentHash

rocketmq-client-go-v2.0.0/consumer/strategy.go

func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {
    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
        if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
            return nil
        }

        var (
            find bool
        )
        for idx := range cidAll {
            if cidAll[idx] == currentCID {
                find = true
                break
            }
        }
        if !find {
            rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
                rlog.LogKeyConsumerGroup: consumerGroup,
                "consumerId":             currentCID,
                "cidAll":                 cidAll,
            })
            return nil
        }

        c := consistent.New()
        c.NumberOfReplicas = virtualNodeCnt
        for _, cid := range cidAll {
            c.Add(cid)
        }

        result := make([]*primitive.MessageQueue, 0)
        for _, mq := range mqAll {
            clientNode, err := c.Get(mq.String())
            if err != nil {
                rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{
                    rlog.LogKeyUnderlayError: err,
                })
            }
            if currentCID == clientNode {
                result = append(result, mq)
            }
        }
        return result
    }
}
  • AllocateByConsistentHash方法会使用consistent.New()来创建Consistent,然后根据virtualNodeCnt设置其NumberOfReplicas属性,然后通过c.Get(mq.String())获取clientNode

小结

AllocateStrategy定义了一个func;strategy.go提供了AllocateByAveragely、AllocateByAveragelyCircle、AllocateByConfig、AllocateByMachineRoom、AllocateByConsistentHash等方法

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AllocateStrategy
  • AllocateByAveragely
  • AllocateByAveragelyCircle
  • AllocateByConfig
  • AllocateByMachineRoom
  • AllocateByConsistentHash
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档