在分布式场景下,有很多种情况都需要实现最终一致性。在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等)。通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性。 本文主要探讨另外一种实现分布式最终一致性的解决方案——采用分布式锁。基于分布式锁的解决方案,比如zookeeper,redis都是相较于持久化(如利用InnoDB行锁,或事务,或version乐观锁)方案提供了高可用性,并且支持丰富化的使用场景。 本文通过Java版本的redis分布式锁开源框架——Redisson来解析一下实现分布式锁的思路。
如果是不跨限界上下文的情况,跟本地领域服务相关的数据一致性,尽量还是用事务来保证。但也有些无法用事务或者乐观锁来处理的情况,这些情况大多是对于一个共享型的数据源,有并发写操作的场景,但又不是对于单一领域的操作。 举个例子,还是用租书来比喻,A和B两个人都来租书,在查看图书的时候,发现自己想要看的书《大设计》库存仅剩一本。书店系统中,书作为一种商品,是在商品系统中,以Item表示出租商品的领域模型,同时每一笔交易都会产生一个订单,Order是在订单系统(交易限界上下文)中的领域模型。这里假设先不考虑跨系统通信的问题,也暂时不考虑支付环节,但是我们需要保证A,B两个人不会都对于《大设计》产生订单就可以,也就是其中一个人是可以成功下单,另外一个人只要提示库存已没即可。此时,书的库存就是一种共享的分布式资源,下订单,减库存就是一个需要保证一致性的写操作。但又因为两个操作不能在同一个本地事务,或者说,不共享持久化的数据源的情况,这时候就可以考虑用分布式锁来实现。本例子中,就需要对于共享资源——书的库存进行加锁,至于锁的key可以结合领域模型的唯一标识,如itemId,以及操作类型(如操作类型是RENT的)设计一个待加锁的资源标识。当然,这里还有一个并发性能的问题,如果是个库存很多的秒杀类型的业务,那么就不能单纯在itemId 加类型加锁,还需要设计排队队列以及合理的调度算法,防止超卖等等,那些就是题外话了。本文只是将这个场景作为一个切入点,具体怎么设计锁,什么场景用还要结合业务。
借用《Implementing Domain-driven Design》里面的对于领域服务的定义。领域的某个操作过程或转换过程不是实体或值对象的职责时,应该将操作放在一个单独的接口中,即领域服务,并且要和通用语言保持一致。这里的非实体或值对象操作会有很多种情况,比如某个操作需要对多个领域对象操作,输出一个值对象。在分层的架构中,有点类似于Manager。但是如果过渡抽象manager就会出现贫血,所以还需要确保领域服务是无状态的,并且做好和贫血模型的权衡。可能大多数情况,领域服务的参数都是比实际的领域模型小的,只有些关键属性的值对象。如果服务只操作领域的实体或值对象,则可以考虑下放到domain model中操作。 前面提到了Manager,但是很多应用中都会把Manager抽象成接口的形式,但大多数情况其实完全没有必要,可以通过服务Factory的方式解耦,或者用Spring的@Service注解来注入真正的服务实现类。对于一些简单的领域操作,还可以抽象一个迷你层,这个迷你层也可以称作是领域服务,只不过是无状态,无事务,安全的一个抽象层。 领域事件其实也可以归纳为领域服务,不过领域服务的事件是幂等的。因为领域服务是无事务的,所以事件也是无副作用的,这样在处理聚合依赖的时候,需要保证他们的最终一致性。
将领域中发生的活动建模成一系列的离散事件,每个事件都用领域对象来表示。简而言之,领域事件就是领域中发生的事件。还拿租书为例,一本书被借走了,那么需要产生一个借书订单,并且对于租书者来说,需要能查看自己租书的列表和书籍详情,同时这本书也需要被标记为不能再借出的状态(因为已经被借走了)。这里面bookRent就可以作为一个领域事件来发出。
对于上述的事件模型,我们可以创建具有聚合特性的领域事件。这里我们可以把这个事件本身建模成一个聚合(BookRentEvent 对象),并且有自己的持久化方式。唯一标识可以由一组属性决定,在客户方(Client)调用领域服务的时候创建这个领域事件{new bookRentEvent())},并添加到资源库中,然后再通过消息的方式进行发布。发布成功后再回调更新时间状态。但这里需要注意,消息发布最好和事件资源库在相同的上下文,或共享数据源,这样就可以保证事件的成功提交,在不同上下文系统,就需要做全局事务来保证。而唯一标识在这里的作用就是为了防止消息重发或者重复处理。所以订阅方需要检查重复消息,并且忽略。如果是本地上下文的事件,最好提供equals和hashcode 实现。 结合刚才的例子,在书籍管理上下文中,书被借走了,那么书籍唯一表示和书的状态(Rent被借出)就可以标识一个事件。这个事件中需要有借书人的信息(如id,nick等),那么在持久化这个事件后,可以post一个Eventbus的本地消息,由用户书籍领域服务监听,更新用户书籍列表等一系列操作。然后再Callback到事件源,更新事件状态,处理成功。如果需要处理事件都在本地上下文,处理起来并不麻烦。
领域事件的发布可以用Observer模式。在本地上下文,也要尽量减少对基础设施或者消息中间件暴露领域模型,所以,需要将本地模型(领域模型)封装成事件的聚合。比如我们不能直接发布一个BookRent聚合的事件,而是一个BookRentEvent,这个Event对象,还会持有一些事件特有的属性,比如可能根据需要,会有occurTime(发生时间),isConsumed(是否已经被处理)。事件发布时,所有订阅方都会同步收到通知。领域事件的主要组件就是publisher和subscriber了。 发送者 发送者本身并不表达一种领域概念,而是作为一种服务的形态。无论用什么技术方式实现,用什么框架,处理事件发送的思路也都可能不尽相同。比如,在web应用中,可以在启动应用的时候处理订阅者向发送者的事件注册(避免注册和处理发送的线程同步问题)。比如可以将关注的事件registe到本地的一个ThreadLocal的publisher List中。应用启动完成后,开始处理领域事件的时候,就可以发送一个事件的聚合。这个事件的聚合是一个事件对象,而不是领域模型中的实体,因为我们要暴露需要暴露的事件给其他上下文,而不是暴露完整的领域对象。如果使用EventBus,我们可以在post的时候,封装一个事件作为参数。 订阅者 事件的订阅者可以作为应用服务的一个独立的组件。因为应用服务是在领域逻辑的外层,如果是纯粹的事件驱动,那么订阅者作为一种应用服务,也可以定位成具有单一职责的,负责事件存储的应用服务组件。
在处理分布式事件中,最重要也是最难处理的就是一致性。消息的延迟,处理的不幂等就会影响领域模型状态的准确性和事件的处理。但是我们在系统间交互的过程中,可以用一些技术方式来达到最终一致性。这其中可能就需要进行事件模型的持久化。处理方式可以 1. 领域模型和消息设施共享持久存储的数据源。这种需要事件作为一种本地事件模型存储在和本地领域模型的同一个数据库中。这样保证了本地事务的一致,性能较好,但是不能和其他上下文共享持久化存储。 2. 全局XA事务(两阶段提交)来控制。模型和消息的持久化可以分开,但是全局事务性能差,成本高。 3. 在领域模型的持久化存储中,单独一块存储区域(单独一张事件表)来存储领域事件。也就是做本地的EventStore。但是需要有一个发布事件的消息机制,消息事件是完全私有的。消息的发送可以交给消息中间件来处理。如果可以的话,还可以将时间存储作为Rest资源。事件就可以以一种存档日志的形式对外发布事件(消息队列,通过消息设施或者中间件发送RabitMQ,MetaQ等)。这样还保证了时间的可追溯性。
我们使用事件来解耦,是为了考虑尽量避免RPC,简化系统依赖,减少外部服务不可用对系统模型带来的状态影响。所以领域事件强调的是高度自治,但是也需要斟酌,通过事件处理的情况必须是容许延时的,并且消息的接收方需要是一个幂等接收器(可以自幂等,或者对于重复消息的拒绝处理),因为消息是可能重复发送的。}
分布式的思路和线程同步锁ReentrantLock的思路是一样的。我们也要考虑如以下几个问题:
在Redisson介绍前,回顾下Redis的命令,以及不通过任何开源框架,可以基于redis怎么设计一个分布式锁。基于不同应用系统实现的语言,也可以通过其他一些如Jedis,或者Spring的RedisOperations 等,来执行Reids命令Redis command list https://github.com/antirez/redis/blob/93e7a130fc9594e41ccfc996b5eca7626ae5356a/src/redis.c#L119 。 分布式锁主要需要以下redis命令,这里列举一下。在实现部分可以继续参照命令的操作含义。
假设我们现在要给itemId 1234 和下单操作 OP_ORDER 加锁,key是OP_ORDER_1234,结合上面的redis命令,似乎加锁的时候只要一个SETNX OP_ORDER_1234 currentTimestamp ,如果返回1代表加锁成功,返回0 表示锁被占用着。然后再用DEL OP_ORDER_1234解锁,返回1表示解锁成功,0表示已经被解锁过。然而却还存在着很多问题:SETNX会存在锁竞争,如果在执行过程中客户端宕机,也会引起死锁问题,即锁资源无法释放。并且当一个资源解锁的时候,释放锁之后,其他之前等待的锁没有办法再次自动重试申请锁(除非重新申请锁)。解决死锁的问题其实可以可以向Mysql的死锁检测学习,设置一个失效时间,通过key的时间戳来判断是否需要强制解锁。但是强制解锁也存在问题,一个就是时间差问题,不同的机器的本地时间可能也存在时间差,在很小事务粒度的高并发场景下还是会存在问题,比如删除锁的时候,在判断时间戳已经超过时效,有可能删除了其他已经获取锁的客户端的锁。另外,如果设置了一个超时时间,但是确实执行时间超过了超时时间,那么锁会被自动释放,原来持锁的客户端再次解锁的时候会出现问题,而且最为严重的还是一致性没有得到保障。 所以设计的时候需要考虑以下几点:
由于时间戳的设计有很多问题,以及上述几个问题,所以再换一种思路。先回顾几个关于锁的概念和经典java API。通过一些java.util.concurrent的API来处理一些本地队列的同步以及等待信号量的处理。
并发API以及一些框架的使用主要是控制锁的进入和调度,加锁的流程以及锁的逻辑也是非常重要。因为redis支持hash结构,除了key作为锁的标识,还可以利用value的结构
下面参数的含义先说明下 :
以上的方法,当返回空是,说明获取到锁,如果返回一个long数值(pttl 命令的返回值),说明锁已被占用,通过返回剩余时间,外部可以做一些等待时间的判断和调整。
也还是先说明一下参数信息: - KEYS[1] :需要加锁的key,这里需要是字符串类型。 - KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}” - ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。 - ARGV[2] :锁的超时时间,防止死锁 - ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
这就是解锁过程,当然建议提供强制解锁的接口,直接删除key,以防一些紧急故障出现的时候,关键业务节点受到影响。这里还有一个关键点,就是publish命令,通过在锁的唯一通道发布解锁消息,可以减少其他分布式节点的等待或者空转,整体上能提高加锁效率。至于redis的消息订阅可以有多种方式,基于Jedis的订阅API或者Spring的MessageListener都可以实现订阅,这里就可以结合刚才说的Semaphore,在第一次申请锁失败后acquire,接收到分布式消息后release就可以控制申请锁流程的再次进入。下面结合Redisson源码,相信会有更清晰的认识。
Redisson使用起来很方便,但是需要redis环境支持eval命令,否则一切都是悲剧,比如me.结果还是要用RedisCommands去写一套。例子就如下,获得一个RLock锁对象,然后tryLock 和unlock。trylock方法提供了锁重入的实现,并且客户端一旦持有锁,就会在能正常运行期间一直持有锁,直到主动unlock或者节点故障,主动失效(超过默认的过期时间)释放锁。
Redisson还提供了设置最长等待时间以及设置释放锁时间的含参tryLock接口 boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 。Redisson的lock 扩展了java.util.concurrent.locks.Lock的实现,也基本按照了Lock接口的实现方案。lock()方法会一直阻塞申请锁资源,直到有可用的锁释放。下面一部分会详细解析一部分关键实现的代码。
Redisson 的异步任务(Future,Promise,FutureListener API),任务计时器(Timeout,TimerTask),以及通过AbstractChannel连接redis以及写入执行批处理命令等很多都是基于netty框架的。po主因为不能使用eval,所以用Spring提供的redisApi ,RedisOperations来处理redis指令,异步调度等用了Spring的AsyncResult,MessageListener以及一些concurrent api。这里还是先看一下Redisson的实现。
这里以带参数的trylock解析一下,无参的trylock是一种默认参数的实现。先源码走读一下。
上述方法,调用加锁的逻辑就是在tryAcquire(long leaseTime, TimeUnit unit)中
tryAcquire(long leaseTime, TimeUnit unit)只是针对leaseTime的不同参数进行不同的转发处理,再提一下,trylock的无参方法就是直接调用了get(tryLockInnerAsync(Thread.currentThread().getId())); 所以下面再看核心的tryLockInnerAsync 基本命令已经在之前解析过,相信这里看起来应该比较轻松,返回的是一个future对象,是为了异步处理IO,提高系统吞吐量。
再说明一下,tryLock(long waitTime, long leaseTime, TimeUnit unit)有leaseTime参数的申请锁方法是会按照leaseTime时间来自动释放锁的。但是没有leaseTime参数的,比如tryLock()或者tryLock(long waitTime, TimeUnit unit)以及lock()是会一直持有锁的。再来看一下没有leaseTime参数的tryLockInnerAsync(Thread.currentThread().getId())
这里比有leaseTime参数的trylock就多了异步scheduleExpirationRenewal调度。可以继续看一下,这里的expirationRenewalMap就是之前降到的一个ConcurrentMap结构。下面的这个调度方式很精妙。除非被unlock的cancleTask方法触发,否则会一直循环重置过期时间。
这个任务,其实还有一个问题,个人觉得在expirationRenewalMap.containsKey判断时也加上isLocked判断会比较好,以防止unlock时出现redis节点异常的时候,任务没有办法自动停止,或者设置一个最大执行次数的限制也可以,否则极端情况下也会耗尽本地节点的CPU资源。
解锁的逻辑相对简单,如下,redis 命令相信看起来也会比较轻松了。
这里的 cancelExpirationRenewal对应着取消 scheduleExpirationRenewal的重置expire时间任务。
再看一下Redisson是如何处理unlock的redis消息的。这里的消息内容就是unlockMessage = 0L和unlock方法中publish的内容是对应的。
Redisson还支持Redis的多种集群配置,一主一备,一主多备,单机等等。也是通过netty的EventExecutorGroup,Promise,Future等API实现调度的。
在思考是否采用分布式锁以及采用哪种实现方案的时候,还是要基于业务,技术方案一定是基于业务基础,服务于业务,并且衡量过投入产出比的。所以如果有成熟的解决方案,在业务可承受规模肯定是不要重复造轮子,当然还要经过严谨的测试。在po主用Spring的redis api实现时,也遇到了一些问题。 比如hIncrBy 的字符集问题,在使用命令的时候,当然可以直接set a 1然后incr a 1,这个问题可以参考ERR value is not an integer or out of range 问题,但在使用RedisConnection的时候,需要通过转码,byte[] value =SafeEncoder.encode(String.valueOf(“1”)) 再 connection.hSet(key, field, value)这样才可以,或者自己通过String转成正确的编码也可以。 还有刚才说的调度pexpire任务,在unlock异常的时候,任务池中的任务无法自动结束。另外就是Spring的MessageListener的onMessage(Message message, byte[] pattern)回调方法message.getBody()是byte数组,消息内容转化的时候要处理一下。