MongoDB模拟多文档事务操作

Mongodb不支持多文档原子性操作,因此依据两阶段提交协议(Two Phase Commits protocol)来模拟事务。

以两个银行账户之间的转账行为为例,来说明如何实现多文档间的事务操作。

为实现多文档间的事务操作,定义一个事务文档TransactionDocument,储存在事务集合TransactionCollection中

public class TransactionDocument2
    {
        public object _id { set; get; }
        //原账户
        public string Source { set; get; }
        //目标账户
        public string Destination { set; get; }
        //转账金额
        public decimal Value { set; get; }
        //执行状态(初始化initial, 执行操作pending, 完成操作applied, 事务结束done, 正在取消操作canceling, 完成取消canceled)
        public string State { set; get; }
        //最后修改日期
        public DateTime LastModified { set; get; }
    }

银行账户的结构为:

public class Account
    {
        /// <summary>
        /// 账号
        /// </summary>
        public string _id { set; get; }
        /// <summary>
        /// 账户余额
        /// </summary>
        public decimal Balance { set; get; }
        /// <summary>
        /// 待处理事务链表
        /// </summary>
        public List<object> PendingTransactions { set; get; }
}

转账的主流程为:

第0步,为参与事务的两个实体创建唯一的事务文档。

为A、B两个账户创建唯一的事务文档,事务文档的_id值为A、B账户_id值的组合。

第1步,在TransactionCollection集合中找到状态为"initial"的事务文档。

对于A、B两个账户间的转账操作,只能有一个事务文档。这样做是为了防止多个客户端同时对一个账户执行修改操作,只有一个这种事务文档,那么当AB间的转账行为开始时,事务文档的状态为“pending”,而事务开始要查找的是状态为“initial”的事务文档,因此不会获得这样的事务文档,也就不会执行任何操作,只有当AB转账操作完成后才有可能再次执行类似的操作。

第2步,第1步执行成功的前提下,将事务文档状态由“initial”更改为“pending”。

第3步,第2步执行成功的前提下,对两个账户应用事务,执行转账。

对两个账户应用事务的具体操作就是向A、B两个账户的待处理事务链表中添加事务文档_id。

第4步,第3步执行成功的前提下,将事务文档状态由“pending”更改为“applied”。

第5步,第4步执行成功的前提下,移除事务标识。

具体操作是:移除第3步中向A、B两个账户的待处理事务链表中添加的事务文档_id。

第6步,第5步执行成功的前提下,将事务文档状态由“applied”更改为“done”。

第7步,不论第6步是否执行成功,将事务文档状态“done”更改为“initial”。

看似在第6步将“applied”更改为“initial”也是可以的,但是如果在这之间在加入一个“done”状态会带来更大的好处,例如,可以定时扫描TransactionCollection集合,批量将状态为“done”的事务文档状态改为“initial”,而不是在第6步执行完成以后立即执行第7步。

辅助流程

针对上述主流程的每一步加以分析,找出需要辅助流程介入的位置。

对于第0步:

如果创建不成功不会产生任何影响。

对于第1步:

如果没有找到,不会产生任何影响。

对于第2步:

如果事务文档状态修改不成功,不会产生任何影响。

对于第3步:

如果执行转账失败,A账户的钱已被扣除V,但B没有收到V,回滚到之前的状态。

如果在指定的超时时间内没有完成则,执行从错误中恢复策略。

对于第4步:

如果修改事务文档状态失败,设置执行超时时间Th4,重复执行此步骤,如果超时时间已到达,但未完成,执行从错误中恢复策略。

对于第5步:

如果移除事务标识失败,设置执行超时时间Th5,重复执行此步骤,如果超时时间已到达,但未完成,执行从错误中恢复策略。

对于第6步:

如果移除事务标识失败,设置执行超时时间Th6,重复执行此步骤,如果超时时间已到达,但未完成,执行从错误中恢复策略。

回滚的步骤为:

第1步,将事务文档状态由“pending”更改为“canceling”。

第2步,账户余额还原为操作之前的状态,删除两个账户的待处理事务链表中的事务文档_id.

第3步,将事务文档状态由“canceling”更改为“cancelled”。

从错误中恢复策略

通过重复执行需要此策略的那一步操作即可达到目的。可以选择异步执行错误恢复机制。

超时检测

比较事务文档的LastModified 与当前时间的值,如果二者差值超过设定的阈值,即判定超时。

示例

考虑了部分情形,实际情况比实例所考虑的情形要复杂。此外MongoDB从3.4版本开始支持decimal类型,不过在字段上添加BsonRepresentation(BsonType.Decimal128)特性

事务文档和账户文档相应地修改为

public class TransactionDocumentP
{
        .......
        //转账金额
        [BsonRepresentation(BsonType.Decimal128)]
        public decimal Value { set; get; }
        ......
}

public class AccountP
{
        ......
        [BsonRepresentation(BsonType.Decimal128)]
        public decimal Balance { set; get; }
        ......
}

操作的集合

        //事务文档集合
        private string TransactionCollectionName = "TransactionCollection";
        //账户集合
        private string AccountsCollectionName = "UserAccounts";
        private MongoDBService mongoDBService = new MongoDBService("mongodb://localhost:27017/TestDB?maxPoolSize=100&minPoolSize=10",
               "TestDB");

主流程方法:

1 为参与事务的两个实体创建唯一的事务文档

private void PrepareTransfer(decimal value, string source, string destination)
        {
            //创建事务文档
            TransactionDocumentP tDoc = new TransactionDocumentP
            {
                _id = string.Format("{0}For{1}", source, destination),
                State = "initial",
                LastModified = DateTime.Now,
                Value = value,
                Source = source,
                Destination = destination
            };
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter1 = filterBuilder.Eq(doc => doc._id, tDoc._id);
            if (mongoDBService.ExistDocument(TransactionCollectionName, filter1))
            {
                return;
            }
            //将事务文档插入事务集合
            mongoDBService.Insert(TransactionCollectionName, tDoc);
        }

2 找到状态为"initial"的事务文档

private TransactionDocumentP RetrieveTransaction()
        {
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter = filterBuilder.Eq(doc => doc.State, "initial");

            return mongoDBService.Single(TransactionCollectionName, filter);
        }

3 执行转账与应用事务

private bool ApplyTransaction(TransactionDocumentP t, decimal value, string source, string destination)
        {
            FilterDefinitionBuilder<AccountP> filterBuilderS = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterS1 = filterBuilderS.Eq(doc => doc._id, source);
            var updateS = Builders<AccountP>.Update.Inc(m => m.Balance, -value).Push(m => m.PendingTransactions, t._id);
            UpdateResult updateResultS = mongoDBService.DocumentUpdate(AccountsCollectionName, filterS1, updateS);

            bool isSuss = updateResultS.ModifiedCount > 0 && updateResultS.ModifiedCount == updateResultS.MatchedCount;
            if(isSuss)
            {
                FilterDefinitionBuilder<AccountP> filterBuilderD = Builders<AccountP>.Filter;
                FilterDefinition<AccountP> filterD1 = filterBuilderD.Eq(doc => doc._id, destination);
                var updateD = Builders<AccountP>.Update.Inc(m => m.Balance, value).Push(m => m.PendingTransactions, t._id);
                UpdateResult updateResultD = mongoDBService.DocumentUpdate(AccountsCollectionName, filterD1, updateD);
                isSuss = updateResultD.ModifiedCount > 0 && updateResultD.ModifiedCount == updateResultD.MatchedCount;
            }

            return isSuss;
        }

4更新两个账户的待处理事务链表,移除事务标识,超时跳出

private bool UpdateAccount(TransactionDocumentP t, string source, string destination, TimeSpan maxTxnTime)
        {
            FilterDefinitionBuilder<AccountP> filterBuilderS = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterS = filterBuilderS.Eq(doc => doc._id, source);
            var updateS = Builders<AccountP>.Update.Pull(doc => doc.PendingTransactions, t._id);
            bool isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterS, updateS);
            while (true)
            {
                if (isSucc) break;
                bool timeOut = CheckTimeOut(t, maxTxnTime);
                if (timeOut) break;
                isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterS, updateS);
            }
            if (!isSucc)
            {
                return isSucc;
            }

            FilterDefinitionBuilder<AccountP> filterBuilderD = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterD = filterBuilderD.Eq(doc => doc._id, destination);
            var updateD = Builders<AccountP>.Update.Pull(doc => doc.PendingTransactions, t._id);
            isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterD, updateD);
            while (true)
            {
                if (isSucc) break;
                bool timeOut = CheckTimeOut(t, maxTxnTime);
                if (timeOut) break;
                isSucc = mongoDBService.UpdateOne(AccountsCollectionName, filterD, updateD);
            }
            return isSucc;
        }

5 更新事务文档状态

private bool UpdateTransactionState(TransactionDocumentP t, string oldState, string newState)
        {
            if (t == null)
            {
                return false;
            }
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter1 = filterBuilder.Eq(doc => doc._id, t._id);
            FilterDefinition<TransactionDocumentP> filter2 = filterBuilder.Eq(doc => doc.State, oldState);
            FilterDefinition<TransactionDocumentP> filter = filterBuilder.And(new FilterDefinition<TransactionDocumentP>[] { filter1, filter2 });

            var update = Builders<TransactionDocumentP>.Update.Set(m => m.State, newState).Set(m =>m.LastModified,DateTime.Now);
            UpdateResult updateResult = mongoDBService.DocumentUpdate(TransactionCollectionName, filter, update);

            return  updateResult.ModifiedCount > 0 && updateResult.ModifiedCount == updateResult.MatchedCount;
        }
检验超时版本:
private bool ReUpdateTransactionState(TransactionDocumentP t, string oldState, string newState,TimeSpan maxTxnTime)
        {
            bool isSucc = UpdateTransactionState(t, oldState, newState);
            while (true)
            {
                if (isSucc) break;
                bool timeOut = CheckTimeOut(t, maxTxnTime);
                if (timeOut) break;
                isSucc = UpdateTransactionState(t, oldState, newState);
            }
            return isSucc;
        }

辅助方法:

1 检测超时

超时只能应对一般的短时网络故障,对于长时间的故障这种办法行不通。

private bool CheckTimeOut(TransactionDocumentP t, TimeSpan maxTxnTime)
        {
            DateTime cutOff = DateTime.Now - maxTxnTime;
            FilterDefinitionBuilder<TransactionDocumentP> filterBuilder = Builders<TransactionDocumentP>.Filter;
            FilterDefinition<TransactionDocumentP> filter = filterBuilder.Lt(doc => doc.LastModified, cutOff);
            var tranDoc = mongoDBService.Single(TransactionCollectionName, filter);
            return tranDoc == null ? true : false;
        }

2 回滚操作

private void RollbackOperations(TransactionDocumentP t,string source, string destination)
        {
            //1 将事务文档状态由pending更新为canceling.
            ReUpdateTransactionState(t, "pending", "canceling", new TimeSpan(0,0,100));
            
            //2 账户余额回滚.
            FilterDefinitionBuilder<AccountP> filterBuilderS = Builders<AccountP>.Filter;
            FilterDefinition<AccountP> filterS1 = filterBuilderS.Eq(doc => doc._id, t.Source);//source
            FilterDefinition<AccountP> filterS2 = filterBuilderS.Where(doc => doc.PendingTransactions.Contains(t._id));
            FilterDefinition<AccountP> filterS = filterBuilderS.And(new FilterDefinition<AccountP>[] { filterS1, filterS2 });
            var updateS = Builders<AccountP>.Update.Inc(m => m.Balance, t.Value).Pull(m => m.PendingTransactions, t._id);
            bool isSuccess = mongoDBService.UpdateOne(AccountsCollectionName, filterS, updateS);

            if(isSuccess)
            {
                FilterDefinitionBuilder<AccountP> filterBuilderD = Builders<AccountP>.Filter;
                FilterDefinition<AccountP> filterD1 = filterBuilderD.Eq(doc => doc._id, t.Destination);//source
                FilterDefinition<AccountP> filterD2 = filterBuilderD.Where(doc => doc.PendingTransactions.Contains(t._id));
                FilterDefinition<AccountP> filterD = filterBuilderD.And(new FilterDefinition<AccountP>[] { filterD1, filterD2 });
                var updateD = Builders<AccountP>.Update.Inc(m => m.Balance, -t.Value).Pull(m => m.PendingTransactions, t._id);
                isSuccess = mongoDBService.UpdateOne(AccountsCollectionName, filterD, updateD);
            }

            if (isSuccess)
            {
                //3 将事务文档状态由canceling更新为cancelled.
                UpdateTransactionState(t, "canceling", "cancelled");
            }
        }

组织流程:

public void Process(decimal value, string source, string destination)
        {
            //超时时间
            TimeSpan tSpan = new TimeSpan(0,0,100);
            //0 为参与事务的两个实体创建唯一的事务文档
            PrepareTransfer(value,source,destination);

            //1 找到状态为"initial"的事务文档
            TransactionDocumentP t2 = RetrieveTransaction();

            //2 将事务文档状态由“initial”更改为“pending”,超时跳出
            bool initial_pending = ReUpdateTransactionState(t2, "initial", "pending", tSpan);
            if (!initial_pending)
            {
                return;
            }
            //3 执行转账
            bool isSuccessAp = ApplyTransaction(t2, value, source, destination);
            if (!isSuccessAp)
            {
                //回滚
                RollbackOperations(t2, source, destination);
                return;
            }

            //4 将事务文档状态由“pending”更改为“applied”
            bool pending_applied = ReUpdateTransactionState(t2, "pending", "applied", tSpan);
            if (!pending_applied)
            {
                return;
            }

            //5 更新两个账户的待处理事务链表,移除事务标识,超时跳出
            bool update = UpdateAccount(t2, source, destination, tSpan);
            if (!update)
            {
                return;
            }

            //6 将事务文档状态由“applied”更改为“done”
            bool applied_done = ReUpdateTransactionState(t2, "applied", "done", tSpan);
            if (!applied_done)
            {
                return;
            }

            //7 将事务文档状态由“done”更改为“initial”
            bool done_initial = ReUpdateTransactionState(t2, "done", "initial", tSpan);
            if (!done_initial)
            {
                return;
            }
        }

-----------------------------------------------------------------------------------------

时间仓促,水平有限,如有不当之处,欢迎指正。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JackeyGao的博客

JPG2ASCII开发上线记录

刚开始做运维的时候喜欢在登录服务器的时候自动打印一些ASCII图像, 于是大量搜寻这种图片以做到自己的欢迎页独一无二。 想想有点不误正业, 现在虽说找到合适的A...

743
来自专栏happyJared

程序员神器,IntelliJ IDEA 2018.1 正式发布

3月27日,jetbrains正式发布期待已久的IntelliJ IDEA 2018.1,再次让人眼前一亮:什么,还能这么玩?

381
来自专栏养码场

最新全栈Python视频教程,全套完整版,从Day1-Day92,从入门到精通,决不放弃!

听很多Python大神聊过,Python非常适合初学者入门。因为,相比较其他不少主流编程语言,Python有更好的可读性,上手相对容易。它自带的各种模块加上丰富...

973
来自专栏Linux内核

Linux OOM机制分析

oom_killer(out of memory killer)是Linux内核的一种内存管理机制,在系统可用内存较少的情况下,内核为保证系统还能够继续运行下去...

2597
来自专栏皮振伟的专栏

​[linux][process]进程crash类问题处理方法

前言: 进程crash一般比较讨厌,尤其是segmentation fault,所谓的“踩内存”,是最讨厌的。 分析: 1,status 进程的状态,一般使...

3048
来自专栏Danny的专栏

【项目实战】——Java实现伪静态——urlrewrite

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

764
来自专栏数据和云

疑难解答:ORA-01555的场景模拟和解决方案

黄玮(Fuyuncat) 资深Oracle DBA,个人网站www.HelloDBA.com,致力于数据库底层技术的研究,其作品获得广大同行的高度评价. 前期...

2665
来自专栏申龙斌的程序人生

零基础学编程039:生成群文章目录(2)

每个月的月底,“分享与成长群”要汇总所有成员的原创文章,这次我改用了水滴微信平台把数据采集到一个电子表格文件中。在《零基础学编程019:生成群文章目录》这一节里...

2738
来自专栏后端之路

mysql的query end status

背景 由于业务中有备份某个业务表的定时任务 会在每天指定时间点做一次备份【使用quartz】 各位也都知道各种timeout的相关问题 socketTimeou...

4126
来自专栏*坤的Blog

redis性能测试

3122

扫码关注云+社区