前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:dtm分布式事务(2)

golang源码分析:dtm分布式事务(2)

作者头像
golangLeetcode
发布2023-03-01 16:16:03
3970
发布2023-03-01 16:16:03
举报
文章被收录于专栏:golang算法架构leetcode技术php

在分析了qs的大致源码后golang源码分析:dtm分布式事务(1),我们分析下dtm-example的源码结构,每个例子都是类似的。

先看下main.go里面的main函数

代码语言:javascript
复制
func main() {
    hintExit("")
      
    busi.BusiConf = dtmimp.DBConf
    busi.ResetXaData()
    app, gsvr := busi.Startup()
    examples.AddRoutes(app)
    if cmd == "qs"
       go busi.RunHTTP(app)
       busi.QsMain()
    examples.IsExists(cmd)

如果没有知名具体实例参数,列出参数列表

代码语言:javascript
复制
func hintExit(msg string) {
   _, cmd := range examples.Commands

然后是将数据库里未提交的事务回滚掉,learn/dtm/dtm-examples/busi/utils.go

代码语言:javascript
复制
func ResetXaData() {
  db.Must().Raw("xa recover").Scan(&xas)
  for _, xa := range xas {
    db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))
  }

其中数据库连接参数定义在

busi/base_types.go

代码语言:javascript
复制
var StoreHost = "localhost"
    var BusiConf = dtmcli.DBConf{
  Driver: "mysql",
  Host:   StoreHost,
  Port:   3306,
  User:   "root",
}

获取db连接dtmutil/db.go

代码语言:javascript
复制
func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB {
      dsn := dtmimp.GetDsn(conf)
      db, ok := dbs.Load(dsn)
      db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{
      SkipDefaultTransaction: true,
    })
      dbs.Store(dsn, db)

然后启动业务服务,对外提供服务busi/startup.go

代码语言:javascript
复制
func Startup() (*gin.Engine, *grpc.Server) {
      svr := GrpcStartup()
      app := BaseAppStartup()

其中grpc服务

代码语言:javascript
复制
func GrpcStartup() *grpc.Server {
      conn, err := grpc.Dial(dtmutil.DefaultGrpcServer
      DtmClient = dtmgpb.NewDtmClient(conn)
      conn1, err := grpc.Dial(BusiGrpc,
      BusiCli = NewBusiClient(conn1)
      s := grpc.NewServer(
      RegisterBusiServer(s, &busiServer{})

服务实现位于busi/busi_grpc.pb.go

代码语言:javascript
复制
type busiServer struct {
  UnimplementedBusiServer
}

对外提供了所有业务接口

代码语言:javascript
复制
      func (s *busiServer) QueryPrepared
      func (s *busiServer) TransIn(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransOut(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInRevert(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutRevert(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInConfirm(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutConfirm(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInTcc(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutTcc(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInXa(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransOutXa(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransInTccNested(ctx context.Context, in *ReqGrpc) 
      func (s *busiServer) TransOutHeaderYes(ctx context.Context, in *ReqGrpc)
      func (s *busiServer) TransOutHeaderNo(ctx context.Context, in *ReqGrpc)

然后是注册http服务,busi/base_http.go

代码语言:javascript
复制
    func BaseAppStartup() *gin.Engine {
      app := dtmutil.GetGinApp()
      app.Use(func(c *gin.Context) {
      v := MainSwitch.NextResult.Fetch()
      BaseAddRoute(app)
      addJrpcRoute(app)
      for k, v := range setupFuncs {
        v(app)

其中,注册http路由和json rpc路由的实现如下

代码语言:javascript
复制
func BaseAddRoute(app *gin.Engine) {
      app.POST(BusiAPI+"/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{}

busi/base_jrpc.go

代码语言:javascript
复制
func addJrpcRoute(app *gin.Engine) {
  app.POST("/api/json-rpc", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
代码语言:javascript
复制
var setupFuncs = map[string]setupFunc{}

注册了barrier,busi/barrier.go

代码语言:javascript
复制
  func init() {
      setupFuncs["BarrierSetup"] = func(app *gin.Engine) {
        app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
          barrier := MustBarrierFromGin(c)
            ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
              return BarrierFrom(dtmimp.EscapeGet(qs, "trans_type"), dtmimp.EscapeGet(qs, "gid"), dtmimp.EscapeGet(qs, "branch_id"), dtmimp.EscapeGet(qs, "op"))
          return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
            return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
代码语言:javascript
复制
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {
              tx, err := db.Begin()

接着是注册路由examples/startup.go

代码语言:javascript
复制
 var routes = []PostRoute{}
 func AddRoutes(app *gin.Engine) {
      for _, r := range routes {
       app.POST(r.Route, dtmutil.WrapHandler(r.Handler))

对于例子"qs"

代码语言:javascript
复制
    go busi.RunHTTP(app)
    time.Sleep(200 * time.Millisecond)
    busi.QsMain()

代码位于busi/base_http.go,仅仅启动了http服务进行监听

代码语言:javascript
复制
    func RunHTTP(app *gin.Engine) {
      err := app.Run(fmt.Sprintf(":%d", BusiPort))

busi/quick_start.go 启动了 QsStartSvr()供dtm回调用

代码语言:javascript
复制
func QsMain() {
  QsStartSvr()
  QsFireRequest()
  select {}
}
代码语言:javascript
复制
func QsStartSvr() {
  app := gin.New()
  qsAddRoute(app)
  log.Printf("quick start examples listening at %d", qsBusiPort)
  go func() {
    _ = app.Run(fmt.Sprintf(":%d", qsBusiPort))
代码语言:javascript
复制
func qsAddRoute(app *gin.Engine) {
  app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {

然后就是触发我们的业务请求,将我们的参数和回调地址注册给dtm,dtmutil/consts.go

代码语言:javascript
复制
func QsFireRequest() string {
        req := &gin.H{"amount": 30} 
        saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).
        Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
        Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
        err := saga.Submit()
代码语言:javascript
复制
    DefaultHTTPServer = "http://localhost:36789/api/dtmsvr"
    DefaultJrpcServer = "http://localhost:36789/api/json-rpc"
    DefaultGrpcServer = "localhost:36790"

分析完qs例子后,其他例子是类似的,对于grpc请求,会初始化grpc workflow

代码语言:javascript
复制
workflow.InitGrpc(dtmutil.DefaultGrpcServer, busi.BusiGrpc, gsvr)
busi.BusiCli = busi.NewBusiClient(conn1)

http请求,对应的是http workflow

代码语言:javascript
复制
workflow.InitHTTP(dtmutil.DefaultHTTPServer, busi.Busi+"/workflow/resume")

然后分别启动grpc回调监听,http回调监听,最后通过Call触发请求

代码语言:javascript
复制
      go busi.RunGrpc(gsvr)
      go busi.RunHTTP(app)
      examples.Call(cmd)

对于每一个例子是如何注册进去的呢?examples/startup.go

代码语言:javascript
复制
func AddCommand(name string, fn func() string) {
      Commands = append(Commands, commandInfo{Arg: name, Action: fn})

其中的Action,就是注册的时候注册的执行方法,在Call方法里面会被调用:

代码语言:javascript
复制
func Call(name string) {
      c.Action()

例子的注入时机在init函数中Commands,以xa为例:examples/http_xa.go

代码语言:javascript
复制
func init() {
      AddCommand("http_xa", func() string {
        err := dtmcli.XaGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
        resp, err := xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi+"/TransOutXa")
        return xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi+"/TransInXa")
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档