前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:dtm分布式事务(5)

golang源码分析:dtm分布式事务(5)

作者头像
golangLeetcode
发布2023-03-01 16:17:07
4460
发布2023-03-01 16:17:07
举报

在介绍完服务端的整体框架后,可以开始saga模式的介绍。saga模式是将一个大事务拆分成几个小的分支事务,然后依次执行每一个事务,如果出现异常,逆序回滚每一个分支事务。核心代码位于dtmsvr/trans_type_saga.go

代码语言:javascript
复制
func init() {
  registorProcessorCreator("saga", func(trans *TransGlobal) transProcessor {
     return &transSagaProcessor{TransGlobal: trans}

在init的时候将saga模式注册到全局的processor工厂里面

代码语言:javascript
复制
var processorFac = map[string]processorCreator{}

processor核心有两个接口

代码语言:javascript
复制
type transProcessor interface {
  GenBranches() []TransBranch
  ProcessOnce(branches []TransBranch) error
}

首先看下阐述分支的接口:

代码语言:javascript
复制
func (t *transSagaProcessor) GenBranches() []TransBranch {
      for i, step := range t.Steps {
        for _, op := range []string{dtmimp.OpCompensate, dtmimp.OpAction} {

针对分支事务的所有步骤,依次创建补偿分支和执行分支。 然后看下事务处理的接口 ,这里实现了saga模式的核心逻辑。

代码语言:javascript
复制
func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {

1, 如果已经提交并且超时了,修改状态为中断

代码语言:javascript
复制
 if t.Status == dtmcli.StatusSubmitted && t.isTimeout() {
       t.changeStatus(dtmcli.StatusAborting
   csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}}

2,然后定义一个对象存储,事务执行过程中的数据。用数组branchResults存储 每一个分支的执行结果,需要注意的是2i+1是action,2i是compensation,这个奇偶位置关系很重要,后面逻辑都是围绕这个顺序展开的

代码语言:javascript
复制
branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op}

3,定义了过滤应该执行的分支的函数shouldRun

代码语言:javascript
复制
if branchResults[pre*2+1].status != dtmcli.StatusSucceed {
        return false

4,判断分支是否应该回滚shouldRollback,首先定义了一个子函数rollbacked判断分支是否已经回滚了:

代码语言:javascript
复制
branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared

后面一步必须回滚完成,当前这一步才能回滚

代码语言:javascript
复制
if !csc.Concurrent && current < n-2 && !rollbacked(current+2) {
      return false
    }

从后往前找,直到找到第一个没有回滚的分支

代码语言:javascript
复制
for _, next := range csc.cOrders[current/2] {
      if !rollbacked(2 * next) {
        return false
      }
    }
    return true

5,定义函数asyncExecBranch异步执行分支,开始执行分支 ,并将执行结果写入通道:

代码语言:javascript
复制
defer resultChan <- branchResult{index: i, status: branches[i].Status, op: branches[i].Op, err: branches[i].Error}
 err = t.execBranch(&branches[i], i)

6,pickToRunActions找出即将执行的动作列表,注意这里是奇数位置,也就是正向执行的动作列表

代码语言:javascript
复制
for current := 1; current < n; current += 2 {
      br := &branchResults[current]
      if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) {
        toRun = append(toRun, current)

7,选择需要补偿的动作列表,和上面的函数类似,不过是偶数位置,并且是逆序:

代码语言:javascript
复制
pickToRunCompensates := func() []int {
  for current := n - 2; current >= 0; current -= 2 {
      br := &branchResults[current]
      if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) {
        toRun = append(toRun, current)

8,执行需要执行的分支列表runBranches

代码语言:javascript
复制
for _, b := range toRun {
          go asyncExecBranch(b)

9,等待分支的执行结果waitDoneOnce,根据结果修改分支的状态

代码语言:javascript
复制
select {
    case r := <-resultChan:
          go asyncExecBranch(r.index)
          t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("RetryCount is greater than RetryLimit, RetryLimit: %v", t.RetryLimit)))

10,prepareToCompensate为执行补偿操作做准备,获取正向执行的结果,统计执行成功和失败的分支

代码语言:javascript
复制
prepareToCompensate := func() {
    toRun := pickToRunActions()
    for i := 1; i < len(branchResults); i += 2 {
      if branchResults[i].started && branchResults[i].status == dtmcli.StatusPrepared {
      branchResults[i].status = dtmcli.StatusSucceed
    for i, b := range branchResults {
      if b.op == dtmimp.OpCompensate && b.status != dtmcli.StatusSucceed &&
        branchResults[i+1].status != dtmcli.StatusPrepared {

11,执行所有的分支,如果遇到失败或者超时终止

代码语言:javascript
复制
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 {
        toRun := pickToRunActions()
        runBranches(toRun)
        waitDoneOnce()

12,如果没有失败和超时的,修改事务为成功状态

代码语言:javascript
复制
t.changeStatus(dtmcli.StatusSucceed)

13,如果有失败或者超时的,修改状态为中断

代码语言:javascript
复制
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(msg))
代码语言:javascript
复制
t.changeStatus(dtmcli.StatusAborting, withRollbackReason(fmt.Sprintf("Timeout after %d seconds", t.TimeoutToFail)))

14,如果是终止状态,开始执行回滚操作,直到超时

代码语言:javascript
复制
if t.Status == dtmcli.StatusAborting {
    prepareToCompensate()
      for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting {
        toRun := pickToRunCompensates()
        runBranches(toRun)
        waitDoneOnce()

15,如果所有的补偿分支都执行成功,修改全局状态为失败

代码语言:javascript
复制
if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed {
    t.changeStatus(dtmcli.StatusFailed)

以上就是全部流程,中间我们用到的存储状态的结构体定义如下

代码语言:javascript
复制
type cSagaCustom struct {
  Orders     map[int][]int `json:"orders"`
  Concurrent bool          `json:"concurrent"`
  cOrders    map[int][]int
}

对于saga的客户端,代码定义位于client/dtmcli/trans_saga.go

代码语言:javascript
复制
type Saga struct {
  dtmimp.TransBase
  orders map[int][]int
}

它封装了各个操作,比如添加分支

代码语言:javascript
复制
func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga {

按照指定顺序添加分支

代码语言:javascript
复制
func (s *Saga) AddBranchOrder(branch int, preBranches []int) *Saga {

指定当前分支

代码语言:javascript
复制
func (s *Saga) SetConcurrent() *Saga {

提交分支事务

代码语言:javascript
复制
  func (s *Saga) Submit() error {
      return dtmimp.TransCallDtm(&s.TransBase, "submit")

自定义参数

代码语言:javascript
复制
func (s *Saga) BuildCustomOptions() {

这些操作都是通过http,grpc,grpc-json传递给服务端的client/dtmcli/dtmimp/trans_base.go

代码语言:javascript
复制
func TransCallDtm(tb *TransBase, operation string) error {
      _, err := TransCallDtmExt(tb, tb, operation)

http和jrpc-json定义如下

代码语言:javascript
复制
func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
   if tb.Protocol == Jrpc {
    return transCallDtmJrpc(tb, body, operation)
  rc := GetRestyClient2(time.Duration(tb.RequestTimeout) * time.Second)
  resp, err := rc.R().
    SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))
代码语言:javascript
复制
func transCallDtmJrpc(tb *TransBase, body interface{}, operation string) (*resty.Response, error) {
      rc := GetRestyClient2(time.Duration(tb.RequestTimeout) * time.Second)
      resp, err := rc.R().
    SetBody(map[string]interface{}{
      "jsonrpc": "2.0",
      "id":      "no-use",
      "method":  operation,
      "params":  body,
    }).
    SetResult(&result).
    Post(tb.Dtm)

dtmsvr/trans_status.go状态修改,在服务端最终都会落盘的

代码语言:javascript
复制
func (t *TransGlobal) changeStatus(status string, opts ...changeStatusOption) {
      t.UpdateTime = &now
  GetStore().ChangeGlobalStatus(&t.TransGlobalStore, status, updates, status == dtmcli.StatusSucceed || status == dtmcli.StatusFailed)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-01-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档