前面对于分布式事务也讲了好几篇了(可靠消息最终一致性 分布式事务 - TCC 分布式事务 - 2PC、3PC),但是还没有实战过。那么本篇我们就来演示下如何在 .NET 环境下实现一个基于可靠消息的分布式事务。基于可靠消息的分布式事务流程上还是比较清晰明了的,但是要用代码去一个个实现还是比较费事的。通过分析可以发现这个事务的关键点就是要在真正的业务逻辑的前面、后面插入对应的流程。很明显这种流程是可以通过 AOP 技术来简化操作的。于是就有了 AgileDT 。AgileDT 使用 Natasha 在启动的时候动态生成代理类,来为你完成跟消息部分的操作,使用者只需关心核心业务逻辑就可以了。 https://github.com/kklldog/AgileDT 开源不易,大家多多 ✨✨✨
前面一篇文章(可靠消息最终一致性 )我们详细介绍了基于可靠消息的分布式事务。为了更好的理解 AgileDT 的代码,我们还是有必要简单的来回顾下。
该方案总体流程上可分为以下步骤:
废话不多说了,下面让我们演示下如何使用 AgileDT 来快速实现一个基于可靠消息的分布式事务。 以下我们还是以经典的订单下单完成给会员赠送积分的场景来演示。
目前支持 mysql 数据库,但是数据访问组件使用的是 freesql 所以后续要实现支持别的数据库也很简单。目前框架使用的可靠消息服务为 rabbitmq 。
在服务新建一个数据库并且新建一张表
// crate event_message table on mysql
create table if not exists event_message
(
event_id varchar(36) not null
primary key,
biz_msg varchar(4000) null,
status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
create_time datetime(3) null,
event_name varchar(255) null
);
使用docker-compose运行服务端
version: "3" # optional since v1.27.0
services:
agile_dt:
image: "kklldog/agile_dt"
ports:
- "5000:5000"
environment:
- db:provider=mysql
- db:conn= Database=agile_dt;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306
- mq:userName=admin
- mq:password=123456
- mq:host=192.168.0.115
- mq:port=5672
在主动方跟被动方都需要安装AgileDT的客户端库
Install-Package AgileDT.Client
// crate event_message table on mysql
create table if not exists event_message
(
event_id varchar(36) not null
primary key,
biz_msg varchar(4000) null,
status enum('Prepare', 'Done', 'WaitSend', 'Sent', 'Finish', 'Cancel') not null,
create_time datetime(3) null,
event_name varchar(255) null
);
在appsettings.json文件添加以下节点:
"agiledt": {
"server": "http://localhost:5000",
"db": {
"provider": "mysql",
"conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
//"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
},
"mq": {
"host": "192.168.0.125",
//"host": "192.168.0.115",
"userName": "admin",
"password": "123456",
"port": 5672
}
}
public void ConfigureServices(IServiceCollection services)
{
services.AddAgileDT();
...
}
注意:业务方法最终一定要使用事务来同步修改消息表的status字段为done状态,这个操作框架没办法帮你实现 注意:业务方法如果失败请抛出Exception,如果不抛异常框架一律认为执行成功
public interface IAddOrderService:IEventService
{
bool AddOrder(Order order);
}
[DtEventName("orderservice.order_added")]
public class AddOrderService : IAddOrderService
{
private readonly ILogger<AddOrderService> _logger;
public AddOrderService(ILogger<AddOrderService> logger)
{
_logger = logger;
}
public string EventId {
get;
set;
}
[DtEventBizMethod]
public virtual bool AddOrder(Order order)
{
var ret = false;
//3. 写 Order 跟 修改 event 的状态必选写在同一个事务内
FreeSQL.Instance.Ado.Transaction(() =>
{
order.EventId = EventId;//在订单表新增一个eventid字段,使order跟event_message表关联起来
var ret0 = FreeSQL.Instance.Insert(order).ExecuteAffrows();
var ret1 = FreeSQL.Instance.Update<OrderService.Data.entities.EventMessage>()
.Set(x => x.Status, MessageStatus.Done)
.Where(x => x.EventId == EventId)
.ExecuteAffrows();
ret = ret0 > 0 && ret1 > 0;
});
return ret;
}
/// <summary>
/// 构造后续业务处理需要的消息内容
/// </summary>
/// <returns></returns>
public string GetBizMsg()
{
//这里可以构造传递到MQ的业务消息的内容,比如传递订单编号啊 ,以便后续的被动方处理业务时候使用
var order = FreeSQL.Instance.Select<Order>().Where(x => x.EventId == EventId).First();
return order?.Id;
}
}
在实现好 IAddOrderService 接口后,你可以像平常一样使用 IAddOrderService 来注入实现类。比如在 Controller 的构造函数注入进去。因为 AgileDT 在启动的时候会自动帮你注册。
注意:IAddOrderService 跟实现类的生命周期是 Scoped 。
在appsettings.json文件添加以下节点:
"agiledt": {
"server": "http://localhost:5000",
"db": {
"provider": "mysql",
"conn": "Database=agile_order;Data Source=192.168.0.125;User Id=dev;Password=dev@123f;port=13306"
//"conn": "Database=agile_order;Data Source=192.168.0.115;User Id=root;Password=mdsd;port=3306"
},
"mq": {
"host": "192.168.0.125",
//"host": "192.168.0.115",
"userName": "admin",
"password": "123456",
"port": 5672
}
}
public void ConfigureServices(IServiceCollection services)
{
services.AddAgileDT();
...
}
public interface IOrderAddedMessageHandler: IEventMessageHandler
{
}
[DtEventName("orderservice.order_added")]
public class OrderAddedMessageHandler: IOrderAddedMessageHandler
{
static object _lock = new object();
public bool Receive(EventMessage message)
{
var bizMsg = message.BizMsg;
var eventId = message.EventId;
string orderId = bizMsg;
lock (_lock)
{
var entity = FreeSQL.Instance.Select<PointHistory>().Where(x => x.EventId == eventId).First();
if (entity == null)
{
var ret = FreeSQL.Instance.Insert(new PointHistory
{
Id = Guid.NewGuid().ToString(),
EventId = message.EventId,
OrderId = orderId,
Points = 20,
CreateTime = DateTime.Now
}).ExecuteAffrows();
return ret > 0;
}
else
{
return true;
}
}
}
}
通过以上演示,我们快速的实现了一个订单下单会员赠送积分的服务。可以看到使用 AgileDT 可以很快速的实现一个分布式事务。特别是在实现过一个分布式事务后,后面实现起来就特别简单,只要实现几个接口就可以了。AgileDT 才刚刚起步,希望大家多多支持,多多✨✨✨ ,多多 PR 。
https://github.com/kklldog/AgileDT