前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用强大的DBPack处理分布式事务(PHP使用教程)

使用强大的DBPack处理分布式事务(PHP使用教程)

原创
作者头像
用户2627641
发布2022-07-04 17:18:10
4790
发布2022-07-04 17:18:10
举报
文章被收录于专栏:dbmeshdbmesh

主流的分布式事务的处理方案

近些年,随着微服务的广泛使用,业务对系统的分布式事务处理能力的要求越来越高。

早期的基于XA协议的二阶段提交方案,将分布式事务的处理放在数据库驱动层,实现了对业务的无侵入,但是对数据的锁定时间很长,性能较低。

现在主流的TCC事务方案和SAGA事务方案,都是基于业务补偿机制,虽然没有全局锁,性能很高,但是一定程度上入侵了业务逻辑,增加了业务开发人员的开发时间和系统维护成本。

新兴的AT事务解决方案,例如SeataSeata-golang,通过数据源代理层的资源管理器RM记录SQL回滚日志,跟随本地事务一起提交,大幅减少了数据的锁定时间,性能好且对业务几乎没有侵入。其缺点是支持的语言比较单一,例如Seata只支持Java语言类型的微服务,Seata-golang只支持Go语言类型的微服务。

为了突破AT事务对业务编程语言的限制,现在业界正在往DB Mesh的方向发展,通过将事务中间件部署在SideCar的方式,达到任何编程语言都能使用分布式事务中间件的效果。

DBPack是一个处理分布式事务的数据库代理,其能够拦截MySQL流量,生成对应的事务回滚镜像,通过与ETCD协调完成分布式事务,性能很高,且对业务没有入侵,能够自动补偿SQL操作,支持接入任何编程语言。DBPack还支持TCC事务模式,能够自动补偿HTTP请求。目前其demo已经有Java、Go、Python和PHP,TCC的sample也已经在路上了,demo示例可以关注dbpack-samples

最新版DBPack不仅支持预处理的sql语句,还支持text类型的sql。DBPack最新版还兼容了php8的pdo_mysql扩展。Mysql 客户端在给用户发送 sql 执行结果时,如果执行没有异常,发送的第一个包为 OKPacket,该包中有一个标志位可以标识 sql 请求是否在一个事务中。如下图所示

image-20220629161325409.png
image-20220629161325409.png

这个包的内容为:

代码语言:txt
复制
07 00 00 // 前 3 个字节表示 payload 的长度为 7 个字节

01 // sequence 响应的序号,前 4 个字节一起构成了 OKPacket 的 heade

00 // 标识 payload 为 OKPacket

00 // affected row

00 // last insert id

03 00 // 状态标志位

00 00 // warning 数量

dbpack 之前的版本将标志位设置为 0,java、golang、.net core、php 8.0 之前的 mysql driver 都能正确协调事务,php 8.0 的 pdo driver 会对标志位进行校验,所以 php 8.0 以上版本在使用 dbpack 协调分布式事务时,会抛出 transaction not active 异常。最新版本已经修复了这个问题。

下图是具体的DBPack事务流程图。

dbpack-workflow.png
dbpack-workflow.png

其事务流程简要描述如下:

  1. 客户端向聚合层服务的DBPack代理发起HTTP请求。注意请求的地址和端口指向DBPack,并不直接指向实际API。
  2. DBPack生成全局唯一的XID,存储到ETCD中。
  3. 如果开启全局事务成功(如果失败则直接结束事务),聚合层服务就可以通过HTTP header(X-Dbpack-Xid)拿到XID了。此时,聚合服务调用服务1的接口,并传递XID。
  4. 服务1拿到XID,通过DBPack代理,注册分支事务(生成BranchID等信息,并存储到ETCD)。
  5. 服务1的分支事务注册成功后,DBPack自动生成本地事务的回滚镜像,随着本地事务一起commit。
  6. 服务2进行与服务1相同的步骤4和5。
  7. 聚合层服务根据服务1和服务2的结果,决定是全局事务提交还是回滚。如果是提交,则返回HTTP 200给DBPack(除200以外的状态码都会被DBPack认为是失败)。DBPack更新ETCD中的全局事务状态为全局提交中或回滚中。
  8. 服务1和服务2的DBPack,通过ETCD的watch机制,得知本地的分支事务是该提交还是回滚(如果是提交,则删除回滚日志;如果是回滚,则执行通过回滚日志回滚到事务前镜像)。
  9. 所有的分支事务提交或回滚完成后,ETCD里的分支事务状态将更新为已提交或已回滚,聚合层服务的DBPack的协程会检测到全局事务已经完成,将从ETCD删除XID和BranchID等事务信息。

本文将以PHP语言为例,详细介绍如何使用PHP对接DBPack完成分布式事务。实际使用其他语言时,对接过程也是类似的。

使用PHP对接DBPack实现分布式事务

前置条件

  • 业务数据库为mysql数据库
  • 业务数据表为innodb类型
  • 业务数据表必须有主键

Step0: 安装ETCD

代码语言:shell
复制
ETCD\_VER=v3.5.3



# choose either URL

GOOGLE\_URL=https://storage.googleapis.com/etcd

GITHUB\_URL=https://github.com/etcd-io/etcd/releases/download

DOWNLOAD\_URL=${GOOGLE\_URL}



rm -f /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz

rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test



curl -L ${DOWNLOAD\_URL}/${ETCD\_VER}/etcd-${ETCD\_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz

tar xzvf /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1

rm -f /tmp/etcd-${ETCD\_VER}-linux-amd64.tar.gz



/tmp/etcd-download-test/etcd --version

/tmp/etcd-download-test/etcdctl version

/tmp/etcd-download-test/etcdutl version

Step1: 在业务数据库中创建undo_log表

undo_log表用于存储本地事务的回滚镜像。

代码语言:sql
复制
-- ----------------------------

-- Table structure for undo\_log

-- ----------------------------

DROP TABLE IF EXISTS `undo\_log`;

CREATE TABLE `undo\_log` (

  `id` bigint(20) NOT NULL AUTO\_INCREMENT,

  `branch\_id` bigint(20) NOT NULL,

  `xid` varchar(100) NOT NULL,

  `context` varchar(128) NOT NULL,

  `rollback\_info` longblob NOT NULL,

  `log\_status` int(11) NOT NULL,

  `log\_created` datetime NOT NULL,

  `log\_modified` datetime NOT NULL,

  `ext` varchar(100) DEFAULT NULL,

  PRIMARY KEY (`id`),

  KEY `idx\_unionkey` (`xid`,`branch\_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Step2: 编写配置文件,对接DBPack

代码语言:shell
复制
# 更新distributed\_transaction.etcd\_config.endpoints

# 更新listeners配置项,调整为实际聚合层服务的地址和端口

# 更新filters配置项,配置聚合层服务的API endpoint

vim /path/to/your/aggregation-service/config-aggregation.yaml



# 更新distributed\_transaction.etcd\_config.endpoints

# 更新listeners配置项,配置业务数据库信息,包括dbpack代理的端口

# 更新data\_source\_cluster.dsn

vim /path/to/your/business-service/config-service.yaml

Step3: 运行DBPack

代码语言:shell
复制
git clone git@github.com:cectc/dbpack.git



cd dbpack

# build on local env

make build-local

# build on production env

make build



./dist/dbpack start --config /path/to/your/config-aggregation.yaml



./dist/dbpack start --config /path/to/your/config-service.yaml

Step4: 配置vhost,监听php项目端口

以Nginx为例,配置如下

代码语言:text
复制
server {

    listen 3001; # 暴露的服务端口

    index index.php index.html;

    root /var/www/code/; # 业务代码根目录



    location / {

        try\_files $uri /index.php?$args;

    }



    location ~ \.php$ {

        fastcgi\_split\_path\_info ^(.+\.php)(/.+)$;

        fastcgi\_pass order-svc-app:9000; # php-fpm 端口

        fastcgi\_index index.php;

        include fastcgi\_params;

        fastcgi\_param SCRIPT\_FILENAME $document\_root$fastcgi\_script\_name;

        fastcgi\_param PATH\_INFO $fastcgi\_path\_info;

    }

}

Step5: 编写应用程序

aggregation service example
代码语言:php
复制
class AggregationSvc

{



    public function CreateSo(string $xid, bool $rollback): bool

    {

        $createSoSuccess = $this->createSoRequest($xid);

        if (!$createSoSuccess) {

            return false;

        }

        $allocateInventorySuccess = $this->allocateInventoryRequest($xid);

        if (!$allocateInventorySuccess) {

            return false;

        }

        if ($rollback) {

            return false;

        }

        return true;

    }



    // private function createSoRequest(string $xid) ...

    // private function allocateInventoryRequest(string $xid) ...

}



$reqPath = strtok($\_SERVER["REQUEST\_URI"], '?');

$reaHeaders = getallheaders();



$xid = $reaHeaders['X-Dbpack-Xid'] ?? '';



if (empty($xid)) {

    die('xid is not provided!');

}



$aggregationSvc = new AggregationSvc();



if ($\_SERVER['REQUEST\_METHOD'] === 'POST') {

    switch ($reqPath) {

        case '/v1/order/create':

            if ($aggregationSvc->CreateOrder($xid, false)) {

                responseOK();

            } else {

                responseError();

            }

        case '/v1/order/create2':

            if ($aggregationSvc->CreateSo($xid, true)) {

                responseOK();

            } else {

                responseError();

            }

            break;

        default:

            die('api not found');

    }

}



function responseOK() {

    http\_response\_code(200);

    echo json\_encode([

        'success' => true,

        'message' => 'success',

    ]);

}



function responseError() {

    http\_response\_code(400);

    echo json\_encode([

        'success' => false,

        'message' => 'fail',

    ]);

}
order service example
代码语言:php
复制
class OrderDB

{

    private PDO $\_connection;

    private static OrderDB $\_instance;

    private string $\_host = 'dbpack-order';

    private int $\_port = 13308;

    private string $\_username = 'dksl';

    private string $\_password = '123456';

    private string $\_database = 'order';



    const insertSoMaster = "INSERT /\*+ XID('%s') \*/ INTO order.so\_master (sysno, so\_id, buyer\_user\_sysno, seller\_company\_code, 

        receive\_division\_sysno, receive\_address, receive\_zip, receive\_contact, receive\_contact\_phone, stock\_sysno, 

        payment\_type, so\_amt, status, order\_date, appid, memo) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,now(),?,?)";



    const insertSoItem = "INSERT /\*+ XID('%s') \*/ INTO order.so\_item(sysno, so\_sysno, product\_sysno, product\_name, cost\_price, 

        original\_price, deal\_price, quantity) VALUES (?,?,?,?,?,?,?,?)";



    public static function getInstance(): OrderDB

    {

        if (empty(self::$\_instance)) {

            self::$\_instance = new self();

        }

        return self::$\_instance;

    }



    private function \_\_construct()

    {

        try {

            $this->\_connection = new PDO(

                "mysql:host=$this->\_host;port=$this->\_port;dbname=$this->\_database;charset=utf8",

                $this->\_username,

                $this->\_password,

                [

                    PDO::ATTR\_PERSISTENT => true,

                    PDO::ATTR\_EMULATE\_PREPARES => false, // to let DBPack handle prepread sql

                ]

            );

        } catch (PDOException $e) {

            die($e->getMessage());

        }

    }



    private function \_\_clone()

    {

    }



    public function getConnection(): PDO

    {

        return $this->\_connection;

    }



    public function createSo(string $xid, array $soMasters): bool

    {

        $this->getConnection()->setAttribute(PDO::ATTR\_ERRMODE, PDO::ERRMODE\_EXCEPTION);

        try {

            $this->getConnection()->beginTransaction();

            foreach ($soMasters as $master) {

                if (!$this->insertSo($xid, $master)) {

                    throw new PDOException("failed to insert soMaster");

                }

            }

            $this->getConnection()->commit();

        } catch (PDOException $e) {

            $this->getConnection()->rollBack();

            return false;

        }

        return true;

    }



    private function insertSo(string $xid, array $soMaster): bool

    {

        // insert into so\_master, so\_item ...

    }

}



$reqPath = strtok($\_SERVER["REQUEST\_URI"], '?');

$reqHeaders = getallheaders();



$xid = $reqHeaders['Xid'] ?? '';



if (empty($xid)) {

    die('xid is not provided!');

}



if ($\_SERVER['REQUEST\_METHOD'] === 'POST') {

    if ($reqPath === '/createSo') {

        $reqBody = file\_get\_contents('php://input');

        $soMasters = json\_decode($reqBody, true);



        $orderDB = OrderDB::getInstance();

        $result = $orderDB->createSo($xid, $soMasters);



        if ($result) {

            responseOK();

        } else {

            responseError();

        }

    }

}



function responseOK() {

    http\_response\_code(200);

    echo json\_encode([

        'success' => true,

        'message' => 'success',

    ]);

}



function responseError() {

    http\_response\_code(400);

    echo json\_encode([

        'success' => false,

        'message' => 'fail',

    ]);

}

Step6: 访问聚合层业务接口

代码语言:txt
复制
curl -X{HTTP Method} http://localhost:{DBPack监听的聚合层服务端口}/{聚合层服务的API endpoint}

注意的点

  • 无论是使用mysqli驱动、pdo_mysql驱动,还是通过mysql\_connect()连接数据库(<=php5.4),在start transaction;开始之后,后续的业务操作必须在同一个数据库连接上进行。
  • DBPack通过xid(全局事务唯一ID)在事务上下文中传播,业务数据库执行的业务SQL语句中,需要加入xid注释,这样DBPack才能根据xid处理对应的事务。例如insert /\*+ XID('%s') \*/ into xx ...;

参考链接

作者简介:

卜贺贺。就职于日本楽天 Rakuten CNTD,任 Application Engineer,熟悉 AT 事务、Seata-golang 和 DBPack。GitHub:https://github.com/bohehe

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 主流的分布式事务的处理方案
  • 使用PHP对接DBPack实现分布式事务
    • 前置条件
      • Step0: 安装ETCD
        • Step1: 在业务数据库中创建undo_log表
          • Step2: 编写配置文件,对接DBPack
            • Step3: 运行DBPack
              • Step4: 配置vhost,监听php项目端口
                • Step5: 编写应用程序
                  • aggregation service example
                  • order service example
                • Step6: 访问聚合层业务接口
                • 注意的点
                • 参考链接
                • 作者简介:
                相关产品与服务
                数据库
                云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档