前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq-client-go的transactionProducer

聊聊rocketmq-client-go的transactionProducer

原创
作者头像
code4it
修改2020-07-07 10:20:18
6230
修改2020-07-07 10:20:18
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq-client-go的transactionProducer

transactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

代码语言:javascript
复制
type transactionProducer struct {
    producer *defaultProducer
    listener primitive.TransactionListener
}
  • transactionProducer定义了producer及listener属性

NewTransactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

代码语言:javascript
复制
func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {
    producer, err := NewDefaultProducer(opts...)
    if err != nil {
        return nil, errors.Wrap(err, "NewDefaultProducer failed.")
    }
    return &transactionProducer{
        producer: producer,
        listener: listener,
    }, nil
}
  • NewTransactionProducer方法实例化transactionProducer

Start

rocketmq-client-go-v2.0.0/producer/producer.go

代码语言:javascript
复制
func (tp *transactionProducer) Start() error {
    go primitive.WithRecover(func() {
        tp.checkTransactionState()
    })
    return tp.producer.Start()
}
  • Start方法先异步执行tp.checkTransactionState(),然后执行tp.producer.Start()

checkTransactionState

rocketmq-client-go-v2.0.0/producer/producer.go

代码语言:javascript
复制
func (tp *transactionProducer) checkTransactionState() {
    for ch := range tp.producer.callbackCh {
        switch callback := ch.(type) {
        case *internal.CheckTransactionStateCallback:
            localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
            uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
            if uniqueKey == "" {
                uniqueKey = callback.Msg.MsgId
            }
            header := &internal.EndTransactionRequestHeader{
                CommitLogOffset:      callback.Header.CommitLogOffset,
                ProducerGroup:        tp.producer.group,
                TranStateTableOffset: callback.Header.TranStateTableOffset,
                FromTransactionCheck: true,
                MsgID:                uniqueKey,
                TransactionId:        callback.Header.TransactionId,
                CommitOrRollback:     tp.transactionState(localTransactionState),
            }
​
            req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
            req.Remark = tp.errRemark(nil)
​
            err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
                tp.producer.options.SendMsgTimeout)
            if err != nil {
                rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
                    "callback":               callback.Addr.String(),
                    "request":                req.String(),
                    rlog.LogKeyUnderlayError: err,
                })
            }
        default:
            rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
        }
    }
}
  • checkTransactionState方法遍历tp.producer.callbackCh,根据type来不同处理,目前支持CheckTransactionStateCallback,它会构造EndTransactionRequestHeader执行tp.producer.client.InvokeOneWay

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

代码语言:javascript
复制
func (tp *transactionProducer) Shutdown() error {
    return tp.producer.Shutdown()
}
  • Shutdown方法执行tp.producer.Shutdown()

SendMessageInTransaction

rocketmq-client-go-v2.0.0/producer/producer.go

代码语言:javascript
复制
func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) {
    msg.WithProperty(primitive.PropertyTransactionPrepared, "true")
    msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName)
​
    rsp, err := tp.producer.SendSync(ctx, msg)
    if err != nil {
        return nil, err
    }
    localTransactionState := primitive.UnknowState
    switch rsp.Status {
    case primitive.SendOK:
        if len(rsp.TransactionID) > 0 {
            msg.WithProperty("__transactionId__", rsp.TransactionID)
        }
        transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
        if len(transactionId) > 0 {
            msg.TransactionId = transactionId
        }
        localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
        if localTransactionState != primitive.CommitMessageState {
            rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{
                "localState": localTransactionState,
                "message":    msg,
            })
        }
​
    case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable:
        localTransactionState = primitive.RollbackMessageState
    default:
    }
​
    tp.endTransaction(*rsp, err, localTransactionState)
​
    transactionSendResult := &primitive.TransactionSendResult{
        SendResult: rsp,
        State:      localTransactionState,
    }
​
    return transactionSendResult, nil
}
  • SendMessageInTransaction方法先执行tp.producer.SendSync(ctx, msg),然后根据rsp.Status来做不同处理;对于primitive.SendOK执行tp.listener.ExecuteLocalTransaction来更新localTransactionState;对于primitive.SendFlushDiskTimeout、primitive.SendFlushSlaveTimeout、primitive.SendSlaveNotAvailable则更新localTransactionState为primitive.RollbackMessageState;最后执行tp.endTransaction

小结

transactionProducer定义了producer及listener属性;它提供了NewTransactionProducer、Start、Shutdown、SendMessageInTransaction方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • transactionProducer
  • NewTransactionProducer
  • Start
    • checkTransactionState
    • Shutdown
    • SendMessageInTransaction
    • 小结
    • doc
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档