"可靠消息最终一致性"是为了解决Producer端的消息发送与本地事务执行的原子性问题,是一种柔性事务,属于异步确保型,软状态,最终一致。
问题典型场景是:本地往DB中插入一条记录,同时往MQ中发送一条消息,必须保证二者同时成功或者同时失败。由于DB和MQ是不同的系统,可能插入DB成功,但是发消息到MQ中失败;也可能插入DB失败,但是发送消息到MQ成功。如何保证二者的一致性,就成为了我们要解决的问题。
本文深入讲解如何实现可靠消息一致性的各种实现方案,让你一次爽个够。包括:
要解决Producer端的消息发送与本地事务执行的原子性问题,一个典型的思路是,我们先将消息暂存到一个地方,在本地事务执行完成之前,这个消息对消费者是不可见的。只有当本地事务确认执行成功后,消费者才可以消费到这条消息。
下面用伪代码演示这个过程:
步骤说明:
显然,如果我们可以保证上述每个步骤都可以正确的执行,那么本地事务的执行与发送消息的行为将可以保持一致。然后事实总是残酷的,这一套流程充满了挑战。
大多数MQ,消息发送后,就可以直接被消费者消费了,然而我们并不想这样,需要等到本地事务也执行成功。
一种很直观的思路是,我们先将这个消息,找一个地方暂存起来。例如在数据库中建立一个表,将消息存入到这个表中,称之为”本地事务表”。在这个表中,可以有一个state字段表示消息的状态,在预发送阶段,我们将其标记为UNKONWN。
我们可以根据本地事务执行的结果,修改本地事务表中状态字段的值。如果本地事务执行成功,我们可以将本地事务表中的状态字段改为LOCAL_COMMIT;如果执行失败,我们可以将其改为LOCAL_ROLLBACK。
另外,我们通过一个异步的线程,不断的从这个表中,查询状态为LOCAL_COMMIT的消息,将其发送到MQ中。异步线程发送消息到MQ中,也可能成功,或者失败:
发送MQ成功:
此时,从发送端来说,整个事务已经结束,将其标记GLOBAL_COMMIT,接下来就是消费端进行消费。
发送MQ失败:
这个时候需要进行重试,直到成功。如果你想限制一个最大重试次数,可以在这个表中添加一个retries字段,每重试一次,就+1,当超过次数阈值后,就不再发送。你也可以指定一个消息的超时时间,当超过时间阈值后,也不再发送。对于发送失败的消息,将其状态标记为MESSAGE_ERROR。还可以事务表中添加一个cause字段,表示是什么原因导致的发送失败。
前面提到预发送消息阶段,会将本地事务表的状态字段设置为UNKONWN。在本地事务执行之后,将其改为LOCAL_COMMIT或者LOCAL_ROLLBACK。
然而,有可能本地事务执行之后,更改本地消息表中消息状态的行为失败了。这种情况下,消息就一直处于UNKONWN状态,而异步线程只会发送状态为LOCAL_COMMIT的消息到MQ中,这个消息会一直被忽略,也就是产生了消息状态丢失。解决方案
方案一:扩大事务边界
将预发送消息、执行本地事务、修改本地事务表消息状态三个操作,合并到一个事务中。
在第一步预发送消息之前就开启事务,在第三步执行结束之后提交或者回滚事务,因为所有操作位于同一个事务中,从而保证,本地事务表中的消息记录,与业务操作产生的记录,总是同时成功或者失败,且状态一致。
方案二:合并事务状态
显然,你还可以更进一步,消息不需要有预发送的状态,直接和正常的数据库操作合并到一个事务中写入到数据库,状态直接就是LOCAL_COMMIT,之后异步线程发送的逻辑不变。
方案三:对PREPARED状态消息也进行检查
方案一、二的特点在于,只在业务方法执行的时候,只进行一次判断事务是否可以提交,之后异步线程发送消息的时候,只检查LOCAL_SUCCESS状态的消息发送到MQ中,这可以满足大部分场景了。
然而,有时可能会有更复杂的场景。例如,有一个业务逻辑很复杂,业务的发起方A,除了操作本地数据库,可能需要进行RPC调用查询业务B,以获得一些MQ消息中必须要包含的一些信息。然而,B可能当前还不能提供这些信息,需要等待一段时间才能提供。A希望将这条消息保存下来,等到B可以提供足够多信息的时候再发送。这个时候,方案一、二就不满足了,我们需要继续进行改进。
具体策略是:在原始方案的基础上,让异步线程除了发送LOCAL_SUCCESS状态的消息之外,还对PREPARED状态的消息进行检查。当然你需要设置个过滤条件,如一个PREPARED状态的消息的创建时间,必须与当前时间比较的差值大于某个时间阈值时,才去尝试去查询这个消息的正确状态应该是什么。
设置阈值,主要是为了避免与新事务一开始插入消息的PREPARED消息状态混淆,这些新的PREPARED消息可能立即就会修改为LOCAL_SUCCESS。只有那些长时间处于PREPARED状态的消息,才有可能是因为本地事务执行成功,但是更新消息状态失败而导致的。
我们可以总结出,方案三的最大特点是:当前条件满足的情况下,立即判断可以发送消息;如果当前条件不满足,还可以异步的确定是否满足消息发送的条件。显然提供了极大的灵活性。而方案一二,只支持前者。
Apache RocketMQ 4.3版本中引入了事务消息。与我们前面使用本地事务表要解决的问题相同。都是为了为了解决Producer端发送消息与本地事务执行的原子性问题。
与本地事务表的思路一致,都是将消息先找一个地方暂存起来,只不过暂存的地方不一样,RocketMQ将消息暂存到了内部的主题中。
为了支持事务消息,RocketMQ引入了Half Topic 以及Operation Topic 两个内部队列来存储事务消息推进状态。其中:
关于这两个topic,我们在后文会详细介绍其作用。
下面是RocketMQ中一个事务消息的发送流程,不够清楚,可以放大看一下每个步骤:
图片来源:阿里巴巴中间件官微
接下来,对上述部分流程合并进行说明。
通过TransactionMQProducer发送事务消息,这个producer会在一条普通的Message中加上一些元数据,标识这是一条预先发送的事务消息。broker端在发现这是一条事务消息的时候,会将其存储到Half Topic中。另外必须要制定producer group,以便在发送者失败甚至宕机的情况下,回查其他同一个producer group中的实例查询事务状态。
在发送prepare消息成功后,需要执行本地事务。这是需要实现RocketMQ提供的一个TransactionListener接口的方法来完成(非Java读者不必在意,语言是次要的,关键是思路):
其中:
这两个方法都返回了一个表示本地事务消息的执行状态LocalTransactionState,事务生产者会将其上报给broker。状态总共有三种,见下文分析:
public enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW,}
不管客户端返回的是哪一种状态,生产者拿到这个状态,都会将这个状态报告给broker。broker在处理时,发现这是一个报告事务状态的消息,首先会判断状态值,进行相应的处理。
broker会把收到事务消息的状态后,会记录在内部主题Operation Topic中,消息体中则是prepare message对应在Half Topic中的offset。如下图所示:
图片来源:阿里巴巴中间件官微
此外,broker还会有一个内部服务,消费Operation Topic中的消息,具体来说:
细心的读者发现了,图中这两个队列的长度刻意画的不相等。其实是为了说明,在一些异常情况,可能上报事务消息状态失败,因此OperationTopic中没有记录,二者的差值可能就是UNKOWN未确认中间状态的消息,需要进行特殊处理。
如果消息是UNKNOW中间状态,那么说明目前还不能确定事务的状态,broker需要主动询问客户端producer。以下场景,可能会出现UNKNOW中间状态:
由于UNKNOW中间状态的消息,并不会提交到Operation Topic中,因此Half Topic与Operation Topic这两个内部主题中,服务端通过比对两个主题的差值来找到尚未提交的超时事务,进行回查。
回查意味着,业务方必须提供一个方法让rocketmq来回调。前面我们看到TransactionListener接口有2个方法,另外一个方法checkLocalTransaction就是用于回查。
我们需实现这个方法,rocketmq会把我们之前发送的消息当做参数传入。我们可以根据消息中的内容,反查之前的业务记录信息,确定状态。
最后小提一点,broker主动询问客户端producer事务状态,是依赖于broker与producer端的双向通信能力来完成的,也就是broker会主动给客户端producer发请求。双向通信能力是基于rocketmq-remoting模块基础上完成的。
从上述事务消息设计中可以看到,RocketMQ事务消息较好的解决了事务的最终一致性问题,事务发起方仅需要关注本地事务执行以及实现回查接口给出事务状态判定等实现,而且在上游事务峰值高时,可以通过消息队列,避免对下游服务产生过大压力。RocketMQ官网上提供了事务消息的完整使用案例,读者可以自行参考。
当然,事情不可能是那么的美好,以下是RocketMQ事务消息使用限制:
在之前的方案中,不论是本地事务表,还是RocketMQ中的事务消息,对业务都有一定的侵入性。
事实上,我们可以通过另外一种方案来实现可靠消息的发送。在这个方案中:
事实上,笔者认为这个方案更加优雅。目前开源的binlog订阅组件有很多,各种语言的实现都有:java、go、python等,首推的还是阿里巴巴开源的canal,服务端使用java编写,支持多语言客户端。
从Kafka 0.11开始,KafkaProducer支持另外两种模式:幂等生产者( idempotent producer)和事务生产者(transactional producer)。
这里我们看到了Kafka中的事务消息实际上与RocketMQ中的事务消息是截然不同的概念,类似于数据库事务的原子性。
如果你希望在Kafka中使用类似于RocketMQ的事务消息,那么只能自己做了,可以在Kafka之前加一个代理,由这个代理暂存事务消息,条件满足后,再发送到目标Topic中供业务方消费。