前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq-client-go的strategy

聊聊rocketmq-client-go的strategy

原创
作者头像
code4it
修改2020-07-14 10:06:30
3880
修改2020-07-14 10:06:30
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq-client-go的strategy

AllocateStrategy

rocketmq-client-go-v2.0.0/consumer/strategy.go

代码语言:javascript
复制
type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
  • AllocateStrategy定义了一个func

AllocateByAveragely

rocketmq-client-go-v2.0.0/consumer/strategy.go

代码语言:javascript
复制
func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
    cidAll []string) []*primitive.MessageQueue {
    if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
        return nil
    }
​
    var (
        find  bool
        index int
    )
    for idx := range cidAll {
        if cidAll[idx] == currentCID {
            find = true
            index = idx
            break
        }
    }
    if !find {
        rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
            rlog.LogKeyConsumerGroup: consumerGroup,
            "consumerId":             currentCID,
            "cidAll":                 cidAll,
        })
        return nil
    }
​
    mqSize := len(mqAll)
    cidSize := len(cidAll)
    mod := mqSize % cidSize
​
    var averageSize int
    if mqSize <= cidSize {
        averageSize = 1
    } else {
        if mod > 0 && index < mod {
            averageSize = mqSize/cidSize + 1
        } else {
            averageSize = mqSize / cidSize
        }
    }
​
    var startIndex int
    if mod > 0 && index < mod {
        startIndex = index * averageSize
    } else {
        startIndex = index*averageSize + mod
    }
​
    num := utils.MinInt(averageSize, mqSize-startIndex)
    result := make([]*primitive.MessageQueue, 0)
    for i := 0; i < num; i++ {
        result = append(result, mqAll[(startIndex+i)%mqSize])
    }
    return result
}
  • AllocateByAveragely方法会计算averageSize,然后再根据averageSize计算startIndex,最后取mqAll[(startIndex+i)%mqSize]

AllocateByAveragelyCircle

rocketmq-client-go-v2.0.0/consumer/strategy.go

代码语言:javascript
复制
func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
    cidAll []string) []*primitive.MessageQueue {
    if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
        return nil
    }
​
    var (
        find  bool
        index int
    )
    for idx := range cidAll {
        if cidAll[idx] == currentCID {
            find = true
            index = idx
            break
        }
    }
    if !find {
        rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
            rlog.LogKeyConsumerGroup: consumerGroup,
            "consumerId":             currentCID,
            "cidAll":                 cidAll,
        })
        return nil
    }
​
    result := make([]*primitive.MessageQueue, 0)
    for i := index; i < len(mqAll); i++ {
        if i%len(cidAll) == index {
            result = append(result, mqAll[i])
        }
    }
    return result
}
  • AllocateByAveragelyCircle方法取i%len(cidAll) == index的下标

AllocateByConfig

rocketmq-client-go-v2.0.0/consumer/strategy.go

代码语言:javascript
复制
func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy {
    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
        return list
    }
}
  • AllocateByConfig直接返回配置的list

AllocateByMachineRoom

rocketmq-client-go-v2.0.0/consumer/strategy.go

代码语言:javascript
复制
func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy {
    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
        if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
            return nil
        }
​
        var (
            find  bool
            index int
        )
        for idx := range cidAll {
            if cidAll[idx] == currentCID {
                find = true
                index = idx
                break
            }
        }
        if !find {
            rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
                rlog.LogKeyConsumerGroup: consumerGroup,
                "consumerId":             currentCID,
                "cidAll":                 cidAll,
            })
            return nil
        }
​
        var premqAll []*primitive.MessageQueue
        for _, mq := range mqAll {
            temp := strings.Split(mq.BrokerName, "@")
            if len(temp) == 2 {
                for _, idc := range consumeridcs {
                    if idc == temp[0] {
                        premqAll = append(premqAll, mq)
                    }
                }
            }
        }
​
        mod := len(premqAll) / len(cidAll)
        rem := len(premqAll) % len(cidAll)
        startIndex := mod * index
        endIndex := startIndex + mod
​
        result := make([]*primitive.MessageQueue, 0)
        for i := startIndex; i < endIndex; i++ {
            result = append(result, mqAll[i])
        }
        if rem > index {
            result = append(result, premqAll[index+mod*len(cidAll)])
        }
        return result
    }
}
  • AllocateByMachineRoom方法对于startIndex与endIndex之间的取对应的mqAll[i],若rem大于index,则取premqAll[index+mod*len(cidAll)]

AllocateByConsistentHash

rocketmq-client-go-v2.0.0/consumer/strategy.go

代码语言:javascript
复制
func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy {
    return func(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
        if currentCID == "" || len(mqAll) == 0 || len(cidAll) == 0 {
            return nil
        }
​
        var (
            find bool
        )
        for idx := range cidAll {
            if cidAll[idx] == currentCID {
                find = true
                break
            }
        }
        if !find {
            rlog.Warning("[BUG] ConsumerId not in cidAll", map[string]interface{}{
                rlog.LogKeyConsumerGroup: consumerGroup,
                "consumerId":             currentCID,
                "cidAll":                 cidAll,
            })
            return nil
        }
​
        c := consistent.New()
        c.NumberOfReplicas = virtualNodeCnt
        for _, cid := range cidAll {
            c.Add(cid)
        }
​
        result := make([]*primitive.MessageQueue, 0)
        for _, mq := range mqAll {
            clientNode, err := c.Get(mq.String())
            if err != nil {
                rlog.Warning("[BUG] AllocateByConsistentHash err: %s", map[string]interface{}{
                    rlog.LogKeyUnderlayError: err,
                })
            }
            if currentCID == clientNode {
                result = append(result, mq)
            }
        }
        return result
    }
}
  • AllocateByConsistentHash方法会使用consistent.New()来创建Consistent,然后根据virtualNodeCnt设置其NumberOfReplicas属性,然后通过c.Get(mq.String())获取clientNode

小结

AllocateStrategy定义了一个func;strategy.go提供了AllocateByAveragely、AllocateByAveragelyCircle、AllocateByConfig、AllocateByMachineRoom、AllocateByConsistentHash等方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AllocateStrategy
  • AllocateByAveragely
  • AllocateByAveragelyCircle
  • AllocateByConfig
  • AllocateByMachineRoom
  • AllocateByConsistentHash
  • 小结
  • doc
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档