在单个进程中,数据库一般提供了数据一致性的保证。但如果数据分布到了不同的服务中,数据为单个服务所私有。问题会更为复杂。
比如用户需要提现,简化流程如下: 1. 提现服务收到用户请求提现1元 2. 提现服务插入一条本地数据,记录提现请求 3. 提现服务向money服务发起数据请求,对该用户扣款一元
代码如下:
# 预先插入数据
withdraw_log = WithDrawLog()
withdraw_log.Uid = uid
withdraw_log.Money = str(money)
withdraw_log.FlowNo = flow_no
withdraw_log.Aliname = username
withdraw_log.Aliaccount = useraccount
withdraw_log.Type = withdraw_type
wid = await withdraw_log.save()
ret = await request_api('money_svc', uid, str(money))
if ret['err'] != 0:
return Exception('账户没有足够的余额'), 1
然而因为网络延时,request_api
并不能总是成功。此时,提现服务记录了提现,而用户没有扣款。数据将产生不一致。
目前市面上有很多分布式事务框架,可以维护分布式事务的acid。但是我们这个小应用,引入这样的框架就太重量了。 要保持数据一致,首先要有以下几个保证: 1. api必须有幂等性(说白了,同样数据调用一次api和调用N次没什么影响) 2. 服务内部的实现保证acid。否则可靠性就无从谈起。 3. 对异常设计补偿是一种较为简单的方式
money_svc不具有幂等性,对它进行改造最简单的方式是,加一个唯一时间戳(flow_no)。每次调用都进行检查,如果库中有这个flow_no,则拒绝这次请求。
# request_api('money_svc',flow_no, uid, str(money))
# money_svc实现
ret = await db.select('money_flow', flow_no=args.flow_no)
if ret:
return Exception("这笔交易已存在"), 1
...
predo
。done
predo
的订单,对它们之前的流水号进行重试。由于money_svc有幂等性,重试并不会造成副作用。withdraw服务
flow_no = get_rnd_flow_no()
withdraw_log = WithDrawLog()
withdraw_log.Uid = uid
withdraw_log.Money = str(money)
withdraw_log.FlowNo = flow_no
withdraw_log.Aliname = username
withdraw_log.Aliaccount = useraccount
withdraw_log.Type = withdraw_type
withdraw_log.FlowNo = flow_no
withdraw_log.Status = 'predo'
wid = await withdraw_log.save()
ret = await request_api('money_svc',flow_no, uid, str(money))
if ret['err'] != 0:
return Exception('账户没有足够的余额'), 1
withdraw_log.Status = 'done'
await withdraw_log.update()
补偿服务
while 1:
withdraw_log = await db.select('withdraw', Status='predo')
flow_no = withdraw_log.FlowNo
# 大胆放心的重试
ret = await request_api('money_svc',flow_no, uid, str(money))
withdraw_log.Status = 'done'
await withdraw_log.update()
如果业务过于复杂,则需要考虑对其进行抽象。大概流程: 1. 业务调用 服务1, 服务2, 服务3,并注册事件监控 2. 事件监控发现 服务1,服务3 成功,服务2失败 3. 对业务调用补偿服务,补偿的方式可以是撤销服务1,服务3,也可以是对服务2进行重试。
其实很简单,目前异常大大下降。就酱。