前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq-client-go的PullConsumer

聊聊rocketmq-client-go的PullConsumer

原创
作者头像
code4it
修改2020-07-10 09:48:37
7720
修改2020-07-10 09:48:37
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下rocketmq-client-go的PullConsumer

PullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
type PullConsumer interface {
    // Start
    Start()
​
    // Shutdown refuse all new pull operation, finish all submitted.
    Shutdown()
​
    // Pull pull message of topic,  selector indicate which queue to pull.
    Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)
​
    // PullFrom pull messages of queue from the offset to offset + numbers
    PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
​
    // updateOffset update offset of queue in mem
    UpdateOffset(queue *primitive.MessageQueue, offset int64) error
​
    // PersistOffset persist all offset in mem.
    PersistOffset(ctx context.Context) error
​
    // CurrentOffset return the current offset of queue in mem.
    CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}
  • PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset方法

defaultPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
type defaultPullConsumer struct {
    *defaultConsumer
​
    option    consumerOptions
    client    internal.RMQClient
    GroupName string
    Model     MessageModel
    UnitMode  bool
​
    interceptor primitive.Interceptor
}
  • defaultPullConsumer定义了consumerOptions、client、GroupName、Model、UnitMode属性

NewPullConsumer

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
    defaultOpts := defaultPullConsumerOptions()
    for _, apply := range options {
        apply(&defaultOpts)
    }
​
    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
    if err != nil {
        return nil, errors.Wrap(err, "new Namesrv failed.")
    }
​
    dc := &defaultConsumer{
        client:        internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
        consumerGroup: defaultOpts.GroupName,
        cType:         _PullConsume,
        state:         int32(internal.StateCreateJust),
        prCh:          make(chan PullRequest, 4),
        model:         defaultOpts.ConsumerModel,
        option:        defaultOpts,
​
        namesrv: srvs,
    }
​
    c := &defaultPullConsumer{
        defaultConsumer: dc,
    }
    return c, nil
}
  • NewPullConsumer方法实例化defaultConsumer

Start

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
func (c *defaultPullConsumer) Start() error {
    atomic.StoreInt32(&c.state, int32(internal.StateRunning))
​
    var err error
    c.once.Do(func() {
        err = c.start()
        if err != nil {
            return
        }
    })
​
    return err
}
  • Start方法执行defaultPullConsumer的start方法

Pull

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) {
    mq := c.getNextQueueOf(topic)
    if mq == nil {
        return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
    }
​
    data := buildSubscriptionData(mq.Topic, selector)
    result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)
​
    if err != nil {
        return nil, err
    }
​
    c.processPullResult(mq, result, data)
    return result, nil
}
  • Pull方法先通过c.getNextQueueOf(topic)获取mq,然后通过buildSubscriptionData(mq.Topic, selector)构造data,之后执行c.pull,最后执行c.processPullResult(mq, result, data)

getNextQueueOf

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {
    queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
    if err != nil && len(queues) > 0 {
        rlog.Error("get next mq error", map[string]interface{}{
            rlog.LogKeyTopic:         topic,
            rlog.LogKeyUnderlayError: err.Error(),
        })
        return nil
    }
    var index int64
    v, exist := queueCounterTable.Load(topic)
    if !exist {
        index = -1
        queueCounterTable.Store(topic, 0)
    } else {
        index = v.(int64)
    }
​
    return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
}
  • getNextQueueOf方法先通过c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)获取queues,然后执行queueCounterTable.Load(topic)获取index

PullFrom

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
// PullFrom pull messages of queue from the offset to offset + numbers
func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
    if err := c.checkPull(ctx, queue, offset, numbers); err != nil {
        return nil, err
    }
​
    selector := MessageSelector{}
    data := buildSubscriptionData(queue.Topic, selector)
​
    return c.pull(ctx, queue, data, offset, numbers)
}
  • PullFrom方法先执行c.checkPull,之后通过buildSubscriptionData构造subscriptionData,最后通过c.pull(ctx, queue, data, offset, numbers)拉取数据

UpdateOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
// updateOffset update offset of queue in mem
func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error {
    return c.updateOffset(queue, offset)
}
  • UpdateOffset方法通过defaultPullConsumer来提交

PersistOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
// PersistOffset persist all offset in mem.
func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {
    return c.persistConsumerOffset()
}
  • PersistOffset方法通过defaultPullConsumer来持久化

CurrentOffset

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
// CurrentOffset return the current offset of queue in mem.
func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error) {
    v := c.queryOffset(queue)
    return v, nil
}
  • CurrentOffset方法执行c.queryOffset(queue)

Shutdown

rocketmq-client-go-v2.0.0/consumer/pull_consumer.go

代码语言:javascript
复制
// Shutdown close defaultConsumer, refuse new request.
func (c *defaultPullConsumer) Shutdown() error {
    return c.defaultConsumer.shutdown()
}
  • Shutdown方法则执行c.defaultConsumer.shutdown()

小结

PullConsumer定义了Start、Shutdown、Pull、UpdateOffset、PersistOffset、CurrentOffset方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • PullConsumer
  • defaultPullConsumer
  • NewPullConsumer
  • Start
  • Pull
    • getNextQueueOf
    • PullFrom
    • UpdateOffset
    • PersistOffset
    • CurrentOffset
    • Shutdown
    • 小结
    • doc
    相关产品与服务
    消息队列 CMQ 版
    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档