前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq-client-go的QueueSelector

聊聊rocketmq-client-go的QueueSelector

作者头像
code4it
发布2020-07-07 16:07:51
4900
发布2020-07-07 16:07:51
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq-client-go的QueueSelector

QueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

代码语言:javascript
复制
type QueueSelector interface {
    Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue
}
  • QueueSelector接口,定义了Select方法

manualQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

代码语言:javascript
复制
type manualQueueSelector struct{}

func NewManualQueueSelector() QueueSelector {
    return new(manualQueueSelector)
}

func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
    return message.Queue
}
  • manualQueueSelector的select方法直接返回message.Queue

NewRandomQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

代码语言:javascript
复制
type randomQueueSelector struct {
    rander *rand.Rand
}

func NewRandomQueueSelector() QueueSelector {
    s := new(randomQueueSelector)
    s.rander = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
    return s
}

func (r randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
    i := r.rander.Intn(len(queues))
    return queues[i]
}
  • NewRandomQueueSelector方法先创建randomQueueSelector,然后设置其rander;Select方法通过r.rander.Intn(len(queues))随机选择index,然后从queue取值

roundRobinQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

代码语言:javascript
复制
type roundRobinQueueSelector struct {
    sync.Locker
    indexer map[string]*int32
}

func NewRoundRobinQueueSelector() QueueSelector {
    s := &roundRobinQueueSelector{
        Locker:  new(sync.Mutex),
        indexer: map[string]*int32{},
    }
    return s
}

func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
    t := message.Topic
    if _, exist := r.indexer[t]; !exist {
        r.Lock()
        if _, exist := r.indexer[t]; !exist {
            var v = int32(0)
            r.indexer[t] = &v
        }
        r.Unlock()
    }
    index := r.indexer[t]

    i := atomic.AddInt32(index, 1)
    if i < 0 {
        i = -i
        atomic.StoreInt32(index, 0)
    }
    qIndex := int(i) % len(queues)
    return queues[qIndex]
}
  • roundRobinQueueSelector的qIndex为int(i) % len(queues)

hashQueueSelector

rocketmq-client-go-v2.0.0/producer/selector.go

代码语言:javascript
复制
type hashQueueSelector struct {
    random QueueSelector
}

func NewHashQueueSelector() QueueSelector {
    return &hashQueueSelector{
        random: NewRandomQueueSelector(),
    }
}

// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.
func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
    key := message.GetShardingKey()
    if len(key) == 0 {
        return h.random.Select(message, queues)
    }

    hasher := fnv.New32a()
    _, err := hasher.Write([]byte(key))
    if err != nil {
        return nil
    }
    queueId := int(hasher.Sum32()) % len(queues)
    if queueId < 0 {
        queueId = -queueId
    }
    return queues[queueId]
}
  • hashQueueSelector通过int(hasher.Sum32()) % len(queues)来计算queue的index

小结

rocketmq-client-go的selector.go定义了manualQueueSelector、roundRobinQueueSelector、hashQueueSelector

doc

  • selector.go
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • QueueSelector
  • manualQueueSelector
  • NewRandomQueueSelector
  • roundRobinQueueSelector
  • hashQueueSelector
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档