前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:dtm分布式事务(6)

golang源码分析:dtm分布式事务(6)

作者头像
golangLeetcode
发布2023-03-01 16:17:25
3080
发布2023-03-01 16:17:25
举报

在分析完saga模式golang源码分析:dtm分布式事务(5),其它模式就是类似的。我们依次看下tcc,msg,workflow和xa

1,tcc

tcc的核心代码位于dtmsvr/trans_type_tcc.go,代码目前并没有完全实现

代码语言:javascript
复制
registorProcessorCreator("tcc", func(trans *TransGlobal) transProcessor { return &transTccProcessor{TransGlobal: trans} })

GenBranches返回的是空的。

代码语言:javascript
复制
func (t *transTccProcessor) GenBranches() []TransBranch {
  return []TransBranch{}
}

如果已经处理了,就不再处理保证幂等,如果是就绪状态但是超时了,也改成中断状态,然后从前往后依次处理每个分支事务,如果是提交请求,那就提交所有分支,如果是回滚请求,就依次回滚所有分支,如果所有分支都执行成功,将全局状态改为执行成功,否则将全局状态改为失败。

代码语言:javascript
复制
func (t *transTccProcessor) ProcessOnce(branches []TransBranch) error {
      if !t.needProcess() {
    return nil
  }
      if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
    t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
  }
      op := dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmimp.OpConfirm, dtmimp.OpCancel).(string)
    for current := len(branches) - 1; current >= 0; current-- {
      if branches[current].Op == op && branches[current].Status == dtmcli.StatusPrepared {
        err := t.execBranch(&branches[current], current)
    t.changeStatus(dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))

执行分支的逻辑位于dtmsvr/trans_status.go

先获取分支目前的状态,然后把当前位置的分支改成同样的状态,修改cron job执行的时间,如果重试超过最大次数发出告警。

代码语言:javascript
复制
func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
      status, err := t.getBranchResult(branch)
      t.changeBranchStatus(branch, status, branchPos)
      t.touchCronTime(cronKeep, 0)
          if retryCount >= conf.AlertRetryLimit && conf.AlertWebHook != "" {
      _, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
        "gid":         t.Gid,
        "status":      t.Status,
        "branch":      branch.URL,
        "error":       err.Error(),
        "retry_count": retryCount,
      }).Post(conf.AlertWebHook)
代码语言:javascript
复制
func (t *TransGlobal) touchCronTime(ctype cronType, delay uint64) {
      GetStore().TouchCronTime(&t.TransGlobalStore, nextCronInterval, nextCronTime)

2,msg

消息表事务的源码位于 dtmsvr/trans_type_msg.go

代码语言:javascript
复制
registorProcessorCreator("msg", func(trans *TransGlobal) transProcessor { return &transMsgProcessor{TransGlobal: trans} })

它将所有step中的消息中的执行url解析到分支事务中。

代码语言:javascript
复制
func (t *transMsgProcessor) GenBranches() []TransBranch {
    for i, step := range t.Steps {
      mayTopic := strings.TrimPrefix(step[dtmimp.OpAction], dtmimp.MsgTopicPrefix)
      for j, url := range urls {

依次执行每一个分支,执行完毕后修改全局状态为成功状态,如果是异步执行的场景,通过channel来接收执行结果。

代码语言:javascript
复制
func (t *transMsgProcessor) ProcessOnce(branches []TransBranch) error {
    t.mayQueryPrepared()
    for i := range branches {
      b := &branches[i]
          if t.Concurrent {
      started++
      go func(pos int) {
        resultsChan <- t.execBranch(b, pos)
      }(i)
      err = t.execBranch(b, i)
      for i := 0; i < started && err == nil; i++ {
    err = <-resultsChan
  }
    t.changeStatus(dtmcli.StatusSucceed)

执行分支的逻辑和tcc的代码是复用的dtmsvr/trans_status.go

代码语言:javascript
复制
func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
      status, err := t.getBranchResult(branch)
      t.changeBranchStatus(branch, status, branchPos)
            _, err2 := dtmcli.GetRestyClient().R().SetBody(gin.H{
        "gid":         t.Gid,
        "status":      t.Status,
        "branch":      branch.URL,
        "error":       err.Error(),
        "retry_count": retryCount,
      }).Post(conf.AlertWebHook)

3,workflow

工作流的实现位于dtmsvr/trans_type_workflow.go

代码语言:javascript
复制
registorProcessorCreator("workflow", func(trans *TransGlobal) transProcessor { return &transWorkflowProcessor{TransGlobal: trans} })

这个实现也并不完全

代码语言:javascript
复制
func (t *transWorkflowProcessor) GenBranches() []TransBranch {
  return []TransBranch{}
}
代码语言:javascript
复制
func (t *transWorkflowProcessor) ProcessOnce(branches []TransBranch) error 
    cmc := cWorkflowCustom{}
    dtmimp.MustUnmarshalString(t.CustomData, &cmc)
    return t.getURLResult(t.QueryPrepared, "00", cmc.Name, data)

4,xa

xa的实现位于dtmsvr/trans_type_xa.go

代码语言:javascript
复制
registorProcessorCreator("xa", func(trans *TransGlobal) transProcessor { return &transXaProcessor{TransGlobal: trans} })
代码语言:javascript
复制
func (t *transXaProcessor) GenBranches() []TransBranch {
  return []TransBranch{}
}

依次执行每一个分支,根据最后分支的状态是成功还是失败来决定是提交还是回滚。

代码语言:javascript
复制
func (t *transXaProcessor) ProcessOnce(branches []TransBranch) error {
    if t.Status == dtmcli.StatusPrepared && t.isTimeout() {
    t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))
    for i, branch := range branches {
      if branch.Op == currentType && branch.Status != dtmcli.StatusSucceed {
      err := t.execBranch(&branch, i)
    t.changeStatus(dtmimp.If(t.Status == dtmcli.StatusSubmitted, dtmcli.StatusSucceed, dtmcli.StatusFailed).(string))

执行分支逻辑同上dtmsvr/trans_status.go

代码语言:javascript
复制
func (t *TransGlobal) execBranch(branch *TransBranch, branchPos int) error {
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档