前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq-client-go的pushConsumer

聊聊rocketmq-client-go的pushConsumer

原创
作者头像
code4it
修改2020-07-13 09:55:10
1K0
修改2020-07-13 09:55:10
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq-client-go的pushConsumer

pushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

代码语言:javascript
复制
type pushConsumer struct {
    *defaultConsumer
    queueFlowControlTimes        int
    queueMaxSpanFlowControlTimes int
    consumeFunc                  utils.Set
    submitToConsume              func(*processQueue, *primitive.MessageQueue)
    subscribedTopic              map[string]string
    interceptor                  primitive.Interceptor
    queueLock                    *QueueLock
    done                         chan struct{}
    closeOnce                    sync.Once
}
  • pushConsumer定义了queueFlowControlTimes、queueMaxSpanFlowControlTimes、consumeFunc、submitToConsume、subscribedTopic、interceptor、queueLock、done、closeOnce属性

NewPushConsumer

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

代码语言:javascript
复制
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
    defaultOpts := defaultPushConsumerOptions()
    for _, apply := range opts {
        apply(&defaultOpts)
    }
    srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
    if err != nil {
        return nil, errors.Wrap(err, "new Namesrv failed.")
    }
    if !defaultOpts.Credentials.IsEmpty() {
        srvs.SetCredentials(defaultOpts.Credentials)
    }
    defaultOpts.Namesrv = srvs
​
    if defaultOpts.Namespace != "" {
        defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
    }
​
    dc := &defaultConsumer{
        client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
        consumerGroup:  defaultOpts.GroupName,
        cType:          _PushConsume,
        state:          int32(internal.StateCreateJust),
        prCh:           make(chan PullRequest, 4),
        model:          defaultOpts.ConsumerModel,
        consumeOrderly: defaultOpts.ConsumeOrderly,
        fromWhere:      defaultOpts.FromWhere,
        allocate:       defaultOpts.Strategy,
        option:         defaultOpts,
        namesrv:        srvs,
    }
​
    p := &pushConsumer{
        defaultConsumer: dc,
        subscribedTopic: make(map[string]string, 0),
        queueLock:       newQueueLock(),
        done:            make(chan struct{}, 1),
        consumeFunc:     utils.NewSet(),
    }
    dc.mqChanged = p.messageQueueChanged
    if p.consumeOrderly {
        p.submitToConsume = p.consumeMessageOrderly
    } else {
        p.submitToConsume = p.consumeMessageCurrently
    }
​
    p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
​
    return p, nil
}
  • NewPushConsumer方法实例化defaultConsumer及pushConsumer

Start

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

代码语言:javascript
复制
func (pc *pushConsumer) Start() error {
    var err error
    pc.once.Do(func() {
        rlog.Info("the consumer start beginning", map[string]interface{}{
            rlog.LogKeyConsumerGroup: pc.consumerGroup,
            "messageModel":           pc.model,
            "unitMode":               pc.unitMode,
        })
        atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
        pc.validate()
​
        err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
        if err != nil {
            rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
                rlog.LogKeyConsumerGroup: pc.consumerGroup,
            })
            err = ErrCreated
            return
        }
​
        err = pc.defaultConsumer.start()
        if err != nil {
            return
        }
​
        go func() {
            // todo start clean msg expired
            for {
                select {
                case pr := <-pc.prCh:
                    go func() {
                        pc.pullMessage(&pr)
                    }()
                case <-pc.done:
                    rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
                        rlog.LogKeyConsumerGroup: pc.consumerGroup,
                    })
                    return
                }
            }
        }()
​
        go primitive.WithRecover(func() {
            // initial lock.
            if !pc.consumeOrderly {
                return
            }
​
            time.Sleep(1000 * time.Millisecond)
            pc.lockAll()
​
            lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
            defer lockTicker.Stop()
            for {
                select {
                case <-lockTicker.C:
                    pc.lockAll()
                case <-pc.done:
                    rlog.Info("push consumer close tick.", map[string]interface{}{
                        rlog.LogKeyConsumerGroup: pc.consumerGroup,
                    })
                    return
                }
            }
        })
    })
​
    if err != nil {
        return err
    }
​
    pc.client.UpdateTopicRouteInfo()
    for k := range pc.subscribedTopic {
        _, exist := pc.topicSubscribeInfoTable.Load(k)
        if !exist {
            pc.client.Shutdown()
            return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
        }
    }
    pc.client.CheckClientInBroker()
    pc.client.SendHeartbeatToAllBrokerWithLock()
    pc.client.RebalanceImmediately()
​
    return err
}
  • Start方法执行pc.client.RegisterConsumer及pc.defaultConsumer.start(),然后异步执行pc.pullMessage(&pr);对于非consumeOrderly则通过time.NewTicker创建lockTicker,执行pc.lockAll();之后执行pc.client.UpdateTopicRouteInfo()、pc.client.CheckClientInBroker()、pc.client.SendHeartbeatToAllBrokerWithLock()及pc.client.RebalanceImmediately()

Shutdown

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

代码语言:javascript
复制
func (pc *pushConsumer) Shutdown() error {
    var err error
    pc.closeOnce.Do(func() {
        close(pc.done)
​
        pc.client.UnregisterConsumer(pc.consumerGroup)
        err = pc.defaultConsumer.shutdown()
    })
​
    return err
}
  • Shutdown方法则执行pc.client.UnregisterConsumer及pc.defaultConsumer.shutdown()

Subscribe

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

代码语言:javascript
复制
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
    f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
    if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
        return errors.New("subscribe topic only started before")
    }
    if pc.option.Namespace != "" {
        topic = pc.option.Namespace + "%" + topic
    }
    data := buildSubscriptionData(topic, selector)
    pc.subscriptionDataTable.Store(topic, data)
    pc.subscribedTopic[topic] = ""
​
    pc.consumeFunc.Add(&PushConsumerCallback{
        f:     f,
        topic: topic,
    })
    return nil
}
  • Subscribe方法先通过buildSubscriptionData构建data,之后执行pc.subscriptionDataTable.Store(topic, data)及pc.consumeFunc.Add

pullMessage

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

代码语言:javascript
复制
func (pc *pushConsumer) pullMessage(request *PullRequest) {
    rlog.Debug("start a new Pull Message task for PullRequest", map[string]interface{}{
        rlog.LogKeyPullRequest: request.String(),
    })
    var sleepTime time.Duration
    pq := request.pq
    go primitive.WithRecover(func() {
        for {
            select {
            case <-pc.done:
                rlog.Info("push consumer close pullMessage.", map[string]interface{}{
                    rlog.LogKeyConsumerGroup: pc.consumerGroup,
                })
                return
            default:
                pc.submitToConsume(request.pq, request.mq)
            }
        }
    })
​
    for {
    NEXT:
        select {
        case <-pc.done:
            rlog.Info("push consumer close message handle.", map[string]interface{}{
                rlog.LogKeyConsumerGroup: pc.consumerGroup,
            })
            return
        default:
        }
​
        if pq.IsDroppd() {
            rlog.Debug("the request was dropped, so stop task", map[string]interface{}{
                rlog.LogKeyPullRequest: request.String(),
            })
            return
        }
        if sleepTime > 0 {
            rlog.Debug(fmt.Sprintf("pull MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, sleepTime/time.Millisecond, request.mq), nil)
            time.Sleep(sleepTime)
        }
        // reset time
        sleepTime = pc.option.PullInterval
        pq.lastPullTime = time.Now()
        err := pc.makeSureStateOK()
        if err != nil {
            rlog.Warning("consumer state error", map[string]interface{}{
                rlog.LogKeyUnderlayError: err.Error(),
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }
​
        if pc.pause {
            rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",
                pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
            sleepTime = _PullDelayTimeWhenSuspend
            goto NEXT
        }
​
        cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
        if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
            if pc.queueFlowControlTimes%1000 == 0 {
                rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
                    "PullThresholdForQueue": pc.option.PullThresholdForQueue,
                    "minOffset":             pq.Min(),
                    "maxOffset":             pq.Max(),
                    "count":                 pq.msgCache,
                    "size(MiB)":             cachedMessageSizeInMiB,
                    "flowControlTimes":      pc.queueFlowControlTimes,
                    rlog.LogKeyPullRequest:  request.String(),
                })
            }
            pc.queueFlowControlTimes++
            sleepTime = _PullDelayTimeWhenFlowControl
            goto NEXT
        }
​
        if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
            if pc.queueFlowControlTimes%1000 == 0 {
                rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
                    "PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
                    "minOffset":                 pq.Min(),
                    "maxOffset":                 pq.Max(),
                    "count":                     pq.msgCache,
                    "size(MiB)":                 cachedMessageSizeInMiB,
                    "flowControlTimes":          pc.queueFlowControlTimes,
                    rlog.LogKeyPullRequest:      request.String(),
                })
            }
            pc.queueFlowControlTimes++
            sleepTime = _PullDelayTimeWhenFlowControl
            goto NEXT
        }
​
        if !pc.consumeOrderly {
            if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
                if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
                    rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{
                        "ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,
                        "minOffset":                  pq.Min(),
                        "maxOffset":                  pq.Max(),
                        "maxSpan":                    pq.getMaxSpan(),
                        "flowControlTimes":           pc.queueFlowControlTimes,
                        rlog.LogKeyPullRequest:       request.String(),
                    })
                }
                sleepTime = _PullDelayTimeWhenFlowControl
                goto NEXT
            }
        } else {
            if pq.IsLock() {
                if !request.lockedFirst {
                    offset := pc.computePullFromWhere(request.mq)
                    brokerBusy := offset < request.nextOffset
                    rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{
                        rlog.LogKeyPullRequest:      request.String(),
                        rlog.LogKeyValueChangedFrom: request.nextOffset,
                        rlog.LogKeyValueChangedTo:   offset,
                        "brokerBusy":                brokerBusy,
                    })
                    if brokerBusy {
                        rlog.Info("[NOTIFY_ME] the first time to pull message, but pull request offset larger than "+
                            "broker consume offset", map[string]interface{}{"offset": offset})
                    }
                    request.lockedFirst = true
                    request.nextOffset = offset
                }
            } else {
                rlog.Info("pull message later because not locked in broker", map[string]interface{}{
                    rlog.LogKeyPullRequest: request.String(),
                })
                sleepTime = _PullDelayTimeWhenError
                goto NEXT
            }
        }
​
        v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
        if !exist {
            rlog.Info("find the consumer's subscription failed", map[string]interface{}{
                rlog.LogKeyPullRequest: request.String(),
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }
        beginTime := time.Now()
        var (
            commitOffsetEnable bool
            commitOffsetValue  int64
            subExpression      string
        )
​
        if pc.model == Clustering {
            commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
            if commitOffsetValue > 0 {
                commitOffsetEnable = true
            }
        }
​
        sd := v.(*internal.SubscriptionData)
        classFilter := sd.ClassFilterMode
        if pc.option.PostSubscriptionWhenPull && classFilter {
            subExpression = sd.SubString
        }
​
        sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
​
        pullRequest := &internal.PullMessageRequestHeader{
            ConsumerGroup:        pc.consumerGroup,
            Topic:                request.mq.Topic,
            QueueId:              int32(request.mq.QueueId),
            QueueOffset:          request.nextOffset,
            MaxMsgNums:           pc.option.PullBatchSize,
            SysFlag:              sysFlag,
            CommitOffset:         commitOffsetValue,
            SubExpression:        _SubAll,
            ExpressionType:       string(TAG),
            SuspendTimeoutMillis: 20 * time.Second,
        }
        //
        //if data.ExpType == string(TAG) {
        //  pullRequest.SubVersion = 0
        //} else {
        //  pullRequest.SubVersion = data.SubVersion
        //}
​
        brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
        if brokerResult == nil {
            rlog.Warning("no broker found for mq", map[string]interface{}{
                rlog.LogKeyPullRequest: request.mq.String(),
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }
​
        if brokerResult.Slave {
            pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
        }
​
        result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
        if err != nil {
            rlog.Warning("pull message from broker error", map[string]interface{}{
                rlog.LogKeyBroker:        brokerResult.BrokerAddr,
                rlog.LogKeyUnderlayError: err.Error(),
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }
​
        if result.Status == primitive.PullBrokerTimeout {
            rlog.Warning("pull broker timeout", map[string]interface{}{
                rlog.LogKeyBroker: brokerResult.BrokerAddr,
            })
            sleepTime = _PullDelayTimeWhenError
            goto NEXT
        }
​
        switch result.Status {
        case primitive.PullFound:
            rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
            prevRequestOffset := request.nextOffset
            request.nextOffset = result.NextBeginOffset
​
            rt := time.Now().Sub(beginTime) / time.Millisecond
            increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
​
            pc.processPullResult(request.mq, result, sd)
​
            msgFounded := result.GetMessageExts()
            firstMsgOffset := int64(math.MaxInt64)
            if msgFounded != nil && len(msgFounded) != 0 {
                firstMsgOffset = msgFounded[0].QueueOffset
                increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
                pq.putMessage(msgFounded...)
            }
            if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
                rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
                    "nextBeginOffset":   result.NextBeginOffset,
                    "firstMsgOffset":    firstMsgOffset,
                    "prevRequestOffset": prevRequestOffset,
                })
            }
        case primitive.PullNoNewMsg:
            rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
                request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
        case primitive.PullNoMsgMatched:
            request.nextOffset = result.NextBeginOffset
            pc.correctTagsOffset(request)
        case primitive.PullOffsetIllegal:
            rlog.Warning("the pull request offset illegal", map[string]interface{}{
                rlog.LogKeyPullRequest: request.String(),
                "result":               result.String(),
            })
            request.nextOffset = result.NextBeginOffset
            pq.WithDropped(true)
            time.Sleep(10 * time.Second)
            pc.storage.update(request.mq, request.nextOffset, false)
            pc.storage.persist([]*primitive.MessageQueue{request.mq})
            pc.processQueueTable.Delete(request.mq)
            rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
        default:
            rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
            sleepTime = _PullDelayTimeWhenError
        }
    }
}
  • pullMessage方法会创建internal.PullMessageRequestHeader,之后通过pc.defaultConsumer.tryFindBroker获取brokerResult,之后执行pc.client.PullMessage获取result;对于result.Status为primitive.PullFound执行pc.processPullResult、pq.putMessage提交到processQueue;pc.submitToConsume(request.pq, request.mq)对于p.consumeOrderly执行的是p.consumeMessageOrderly,否则执行的是p.consumeMessageCurrently,他们都会执行pc.consumeInner

consumeInner

rocketmq-client-go-v2.0.0/consumer/push_consumer.go

代码语言:javascript
复制
func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {
    if len(subMsgs) == 0 {
        return ConsumeRetryLater, errors.New("msg list empty")
    }
​
    f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
​
    // fix lost retry message
    if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {
        f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
    }
​
    if !exist {
        return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
    }
​
    callback, ok := f.(*PushConsumerCallback)
    if !ok {
        return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
    }
    if pc.interceptor == nil {
        return callback.f(ctx, subMsgs...)
    } else {
        var container ConsumeResultHolder
        err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
            msgs := req.([]*primitive.MessageExt)
            r, e := callback.f(ctx, msgs...)
​
            realReply := reply.(*ConsumeResultHolder)
            realReply.ConsumeResult = r
​
            msgCtx, _ := primitive.GetConsumerCtx(ctx)
            msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
            if realReply.ConsumeResult == ConsumeSuccess {
                msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
            } else {
                msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
            }
            return e
        })
        return container.ConsumeResult, err
    }
}
  • consumeInner方法会触发f.(*PushConsumerCallback)

小结

pushConsumer是对pull模式的封装,拉到消息之后若consumeOrderly则执行consumeMessageOrderly,否则执行的是consumeMessageCurrently,他们内部调用了consumeInner,会触发PushConsumerCallback回调

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • pushConsumer
  • NewPushConsumer
  • Start
  • Shutdown
  • Subscribe
  • pullMessage
  • consumeInner
  • 小结
  • doc
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档