基于可靠消息方案的分布式事务(四):接入Lottor服务

上一篇文章中,通过Lottor Sample介绍了快速体验分布式事务Lottor。本文将会介绍如何将微服务中的生产方和消费方服务接入Lottor。

场景描述

  • 生产方:User服务
  • 消费方:Auth服务
  • 事务管理方:Lottor Server

Lottor-Samples中的场景为:客户端调用User服务创建一个用户,用户服务的user表中增加了一条用户记录。除此之外,还会调用Auth服务创建该用户对应的角色和权限信息。

我们通过上面的请求流程图入手,介绍接入Lottor服务。当您启动好docker-compose中的组件时,会创建好两个服务对应的user和auth数据库。其中User和Auth服务所需要的初始化数据已经准备好,放在各自的classpath下,服务在启动时会自动初始化数据库,所需要的预置数据(如角色、权限信息)也放在sql文件中。

Lottor客户端API

Lottor Client中提供了一个ExternalNettyService接口,用以发送三类消息到Lottor Server:

  • 预提交消息
  • 确认提交消息
  • 消费完成消息
 1public interface ExternalNettyService {
 2
 3    /**
 4     * pre-commit msgs
 5     *
 6     * @param preCommitMsgs
 7     */
 8    public Boolean preSend(List<TransactionMsg> preCommitMsgs);
 9
10    /**
11     * confirm msgs
12     *
13     * @param success
14     */
15    public void postSend(Boolean success, Object message);
16
17    /**
18     * msgs after consuming
19     *
20     * @param msg
21     * @param success
22     */
23    public void consumedSend(TransactionMsg msg, Boolean success);
24}

预发送#preSend的入参为预提交的消息列表,一个生产者可能有对应的多个消费者;确认提交#postSend的入参为生产方本地事务执行的状态,如果失败,第二个参数记录异常信息;#consumedSend为消费方消费成功的发送的异步消息,第一个入参为其接收到的事务消息,第二个为消费的状态。

事务消息TransactionMsg

 1public class TransactionMsg implements Serializable {
 2    /**
 3     * 用于消息的追溯
 4     */
 5    private String groupId;
 6
 7    /**
 8     * 事务消息id
 9     */
10    @NonNull
11    private String subTaskId;
12
13    /**
14     * 源服务,即调用发起方
15     */
16    private String source;
17
18    /**
19     * 目标方服务
20     */
21    private String target;
22
23    /**
24     * 执行的方法,适配成枚举
25     */
26    private String method;
27
28    /**
29     * 参数,即要传递的内容,可以为null
30     */
31    private Object args;
32
33    /**
34     * 创建时间
35     */
36    private Long createTime = Timestamp.valueOf(DateUtils.getCurrentDateTime()).getTime();
37
38    /**
39     * 操作结果信息
40     */
41    private String message;
42
43    /**
44     * 更新时间
45     */
46    private Long updateTime;
47
48    /**
49     * 是否消费,默认为否
50     *
51     * {@linkplain com.blueskykong.lottor.common.enums.ConsumedStatus}
52     */
53    private int consumed = ConsumedStatus.UNCONSUMED.getStatus();
54
55     ...
56}

在构建事务消息时,事务消息id、源服务、目标服务、目标方法和目标方法的传参args都是必不可少的。消费方消费完之后,将会设置consumed的状态,出现异常将会设置异常message信息。

生产方-User服务

创建用户时,需要创建对应的角色。生产方接入分为三步:

  • 发送预提交消息
  • 执行本地事务
  • 发送确认提交的消息

引入依赖

首先,需要引入Lottor客户端的依赖:

1    <dependency>
2        <groupId>com.blueskykong</groupId>
3        <artifactId>lottor-starter</artifactId>
4        <version>2.0.0-SNAPSHOT</version>
5    </dependency>

发起调用

UserService中定义了创建用户的方法,我们需要在执行本地事务之前,构造事务消息并预发送到Lottor Server(对应流程图中的步骤1)。如果遇到预发送失败,则直接停止本地事务的执行。如果本地事务执行成功(对应步骤3),则发送confirm消息,否则发送回滚消息到Lottor Server(对应步骤4)。

 1@Service
 2public class UserServiceImpl implements UserService {
 3
 4    private static final Logger LOGGER = LoggerFactory.getLogger(UserServiceImpl.class);
 5
 6     //注入ExternalNettyService
 7    @Autowired
 8    private ExternalNettyService nettyService;
 9
10    @Autowired
11    private UserMapper userMapper;
12
13    @Override
14    @Transactional
15    public Boolean createUser(UserEntity userEntity, StateEnum flag) {
16        UserRoleDTO userRoleDTO = new UserRoleDTO(RoleEnum.ADMIN, userEntity.getId());
17         //构造消费方的TransactionMsg
18        TransactionMsg transactionMsg = new TransactionMsg.Builder()
19                .setSource(ServiceNameEnum.TEST_USER.getServiceName())
20                .setTarget(ServiceNameEnum.TEST_AUTH.getServiceName())
21                .setMethod(MethodNameEnum.AUTH_ROLE.getMethod())
22                .setSubTaskId(IdWorkerUtils.getInstance().createUUID())
23                .setArgs(userRoleDTO)
24                .build();
25
26        if (flag == StateEnum.CONSUME_FAIL) {
27            userRoleDTO.setUserId(null);
28            transactionMsg.setArgs(userRoleDTO);
29        }
30
31        //发送预处理消息
32        if (!nettyService.preSend(Collections.singletonList(transactionMsg))) {
33            return false;//预发送失败,本地事务停止执行
34        }
35
36        //local transaction本地事务
37        try {
38            LOGGER.debug("执行本地事务!");
39            if (flag != StateEnum.PRODUCE_FAIL) {
40                userMapper.saveUser(userEntity);
41            } else {
42                userMapper.saveUserFailure(userEntity);
43            }
44        } catch (Exception e) {
45              //本地事务异常,发送回滚消息
46            nettyService.postSend(false, e.getMessage());
47            LOGGER.error("执行本地事务失败,cause is 【{}】", e.getLocalizedMessage());
48            return false;
49        }
50        //发送确认消息
51        nettyService.postSend(true, null);
52        return true;
53    }
54
55}

代码如上所示,实现不是很复杂。本地事务执行前,必然已经成功发送了预提交消息,当本地事务执行成功,Lottor Client将会记录本地事务执行的状态,避免异步发送的确认消息的丢失,便于后续的Lottor Server回查。

配置文件

 1lottor:
 2  enabled: true
 3  core:
 4    cache: true  
 5    cache-type: redis
 6    tx-redis-config:
 7      host-name: localhost
 8      port: 6379
 9    serializer: kryo
10    netty-serializer: kryo
11    tx-manager-id: lottor
12
13spring:
14  datasource:
15    url: jdbc:mysql://localhost:3306/user?autoReconnect=true&useSSL=false
16    continue-on-error: false
17    initialize: true
18    max-active: 50
19    max-idle: 10
20    max-wait: 10000
21    min-evictable-idle-time-millis: 60000
22    min-idle: 8
23    name: dbcp1
24    test-on-borrow: false
25    test-on-return: false
26    test-while-idle: false
27    time-between-eviction-runs-millis: 5000
28    username: root
29    password: _123456_
30    schema[0]: classpath:/user.sql

如上为User服务的部分配置文件,lottor.enabled: true开启Lottor 客户端服务。cache 开启本地缓存记录。cache-type指定了本地事务记录的缓存方式,可以为redis或者MongoDB。serializer为序列化和反序列化方式。tx-manager-id为对应的Lottor Server的服务名。

Lottor Server

多个微服务的接入,对Lottor Server其实没什么侵入性。这里需要注意的是,TransactionMsg中设置的sourcetarget字段来源于lottor-common中的com.blueskykong.lottor.common.enums.ServiceNameEnum

 1public enum ServiceNameEnum {
 2    TEST_USER("user", "tx-user"),
 3    TEST_AUTH("auth", "tx-auth");
 4    //服务名
 5    String serviceName;
 6    //消息中间件的topic
 7    String topic;
 8
 9    ...
10}

消息中间件的topic是在服务名的基础上,加上tx-前缀。消费方在设置订阅的topic时,需要按照这样的规则命名。Lottor Server完成的步骤为上面流程图中的2(成功收到预提交消息)和5(发送事务消息到指定的消费方),除此之外,还会定时轮询异常状态的事务组和事务消息。

消费方-Auth服务

引入依赖

 1    <dependency>
 2        <groupId>com.blueskykong</groupId>
 3        <artifactId>lottor-starter</artifactId>
 4        <version>2.0.0-SNAPSHOT</version>
 5    </dependency>
 6
 7    <dependency>
 8        <groupId>org.springframework.cloud</groupId>
 9        <artifactId>spring-cloud-stream</artifactId>
10    </dependency>
11    <dependency>
12        <groupId>org.springframework.cloud</groupId>
13        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
14    </dependency>

引入了Lottor客户端starter,spring-cloud-stream用于消费方接收来自Lottor Server的事务消息。

topic监听

 1@Component
 2@EnableBinding({TestSink.class})
 3public class ListenerStream extends InitStreamHandler {
 4    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerStream.class);
 5
 6    @Autowired
 7    private RoleUserService roleUserService;
 8
 9    @Autowired
10    public ListenerStream(ExternalNettyService nettyService, ObjectSerializer objectSerializer) {
11        super(nettyService, objectSerializer);
12    }
13
14    @StreamListener(TestSink.INPUT)
15    public void processSMS(Message message) {
16        //解析接收到的TransactionMsg
17        process(init(message));
18    }
19
20    @Transactional
21    public void process(TransactionMsg message) {
22        try {
23            if (Objects.nonNull(message)) {
24                LOGGER.info("===============consume notification message: =======================" + message.toString());
25                if (StringUtils.isNotBlank(message.getMethod())) {
26                    MethodNameEnum method = MethodNameEnum.fromString(message.getMethod());
27                    LOGGER.info(message.getMethod());
28                    //根据目标方法进行处理,因为一个服务可以对应多个生产方,有多个目标方法
29                    switch (method) {
30                        case AUTH_ROLE:
31                            UserRoleDTO userRoleDTO = (UserRoleDTO) message.getArgs();
32                            RoleEntity roleEntity = roleUserService.getRole(userRoleDTO.getRoleEnum().getName());
33                            String roleId = "";
34                            if (Objects.nonNull(roleEntity)) {
35                                roleId = roleEntity.getId();
36                            }
37                            roleUserService.saveRoleUser(new UserRole(UUID.randomUUID().toString(), userRoleDTO.getUserId(), roleId));
38                            LOGGER.info("matched case {}", MethodNameEnum.AUTH_ROLE);
39
40                            break;
41                        default:
42                            LOGGER.warn("no matched consumer case!");
43                            message.setMessage("no matched consumer case!");
44                            nettyService.consumedSend(message, false);
45                            return;
46                    }
47                }
48            }
49        } catch (Exception e) {
50              //处理异常,发送消费失败的消息
51            LOGGER.error(e.getLocalizedMessage());
52            message.setMessage(e.getLocalizedMessage());
53            nettyService.consumedSend(message, false);
54            return;
55        }
56        //成功消费
57        nettyService.consumedSend(message, true);
58        return;
59    }
60}

消费方监听指定的topic(如上实现中,为test-input中指定的topic,spring-cloud-stream更加简便调用的接口),解析接收到的TransactionMsg。根据目标方法进行处理,因为一个服务可以对应多个生产方,有多个目标方法。执行本地事务时,Auth会根据TransactionMsg中提供的args作为入参执行指定的方法(对应步骤7),最后向Lottor Server发送消费的结果(对应步骤8)。

配置文件

 1---
 2spring:
 3  cloud:
 4    stream:
 5      bindings:
 6        test-input:
 7          group: testGroup
 8          content-type: application/x-java-object;type=com.blueskykong.lottor.common.entity.TransactionMsgAdapter
 9          destination: tx-auth
10          binder: rabbit1
11      binders:
12        rabbit1:
13          type: rabbit
14          environment:
15            spring:
16              rabbitmq:
17                host: localhost
18                port: 5672
19                username: guest
20                password: guest
21                virtual-host: /
22
23---
24lottor:
25  enabled: true
26  core:
27    cache: true
28    cache-type: redis
29    tx-redis-config:
30      host-name: localhost
31      port: 6379
32    serializer: kryo
33    netty-serializer: kryo
34    tx-manager-id: lottor

配置和User服务的差别在于增加了spring-cloud-stream的配置,配置了rabbitmq的相关信息,监听的topic为tx-auth。

小结

本文主要通过User和Auth的示例服务讲解了如何接入Lottor客户端。生产方构造涉及的事务消息,首先预发送事务消息到Lottor Server,成功预提交之后便执行本地事务;本地事务执行完则异步发送确认消息(可能成功,也可能失败)。Lottor Server根据接收到的确认消息决定是否将对应的事务组消息发送到对应的消费方。Lottor Server还会定时轮询异常状态的事务组和事务消息,以防因为异步的确认消息发送失败。消费方收到事务消息之后,将会根据目标方法执行对应的处理操作,最后将消费结果异步回写到Lottor Server。

Lottor项目地址:https://github.com/keets2012/Lottor

原文发布于微信公众号 - aoho求索(aohoBlog)

原文发表时间:2018-08-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏容器云生态

Openstack平台搭建之第二天

Openstack平台搭建之第二天 If you have any question ,please contact me by weichuangxxb@si...

47410
来自专栏FreeBuf

Kali下常用安全工具中文参数说明(160个)

*本文原创作者:屌丝绅士,属Freebuf原创奖励计划,转载请注明来自FreeBuf 由于篇幅有限,只列举部分,ps:第一次发有什么不对的 还望各位大大指正 n...

9589
来自专栏安恒网络空间安全讲武堂

Python编写渗透工具学习笔记二 | 0x02利用FTP与web批量抓肉鸡

0x02利用FTP与web批量抓肉鸡 脚本要实现的目标和思路: 先尝试匿名登录ftp,当匿名登录失败时再尝试用用户/密码爆破登录,登录成功后,脚本会搜索ftp中...

1.6K7
来自专栏IT技术精选文摘

JVM致命错误日志(hs_err_pid.log)分析

当jvm出现致命错误时,会生成一个错误文件 hs_err_pid<pid>.log,其中包括了导致jvm crash的重要信息,可以通过分析该文件定位到导致cr...

5915
来自专栏idba

ZanDB基于Celery定时任务的二次开发

ZanDB早期的任务需求中,大部分都是针对servant(跑在主机上的agent)做任务调度。也就是说,一期的任务系统,满足的是在特定时刻调用特定主机执行特定的...

1422
来自专栏JavaEE

Java实现把图片上传到图片服务器(nginx+vsftp)前言:需求:功能实现:总结:

2.1K3
来自专栏小白鼠

ZookeeperZNode基本命令四字命令SessionWatcherACLZookeeper集群Paxos算法ZAB协议Curator分布式锁

在Zookeeper中,ZNode可以分为持久节点和临时节点两类。所谓持久节点是指一旦这个ZNode被创建了,除非主动进行ZNode的移除操作,否则这个ZNod...

1053
来自专栏JackeyGao的博客

一个超级小的 Django 项目.

当用最简单的代码实现 Django 项目为最基本的要素的时候, 项目可以和微框架一样小.

1792
来自专栏技术博客

ExtJs十二(ExtJs Mvc图片管理之二)

接着图片管理一http://www.cnblogs.com/aehyok/archive/2013/04/27/3048278.html。

1331
来自专栏小狼的世界

Curl操作Elasticsearch的常用方法

Elasticsearch对于文档操作,提供了以下几种API,本文就说明如何使用curl方式来调用这些API。

1512

扫码关注云+社区

领取腾讯云代金券