前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式事务_02_2PC框架raincat源码解析-启动过程

分布式事务_02_2PC框架raincat源码解析-启动过程

作者头像
shirayner
发布2022-03-10 16:42:04
2860
发布2022-03-10 16:42:04
举报
文章被收录于专栏:Java成神之路Java成神之路

一、前言

上一节已经将raincat demo工程运行起来了,这一节来分析下raincat启动过程的源码

主要包括:

事务协调者启动过程 事务参与者启动过程

二、协调者启动过程

主要就是在启动类中通过如下代码来启动 netty 服务端

代码语言:javascript
复制
nettyService.start()

三、参与者启动过程概览

参与者在启动过程中,主要做了如下5件事:

(1)保存SpringContext上下文 (2)通过加载spi,来使用用户自定义配置(序列化方式、日志存储方式) (3)启动Netty客户端,与txManager进行连接,并且维持心跳。 (4)启动事务补偿日志,建表,定时补偿。 (5)启动事务日志事件生产者。将事务补偿日志放入disruptor的环形队列中,由disruptor去异步执行。

时序图如下:

在这里插入图片描述
在这里插入图片描述

InitServiceImpl

代码语言:javascript
复制
    @Override
    public void initialization(final TxConfig txConfig) {
        try {
            loadSpi(txConfig);
            nettyClientService.start(txConfig);
            txCompensationService.start(txConfig);
            txTransactionEventPublisher.start(txConfig.getBufferSize());
        } catch (Exception e) {
            throw new TransactionRuntimeException("tx transaction ex:{}:" + e.getMessage());
        }
        LogUtil.info(LOGGER, () -> "tx transaction init success!");

    }

四、保存Spring上下文

源码见 SpringBeanUtils 类

  • 设置Spring 上下文
  • 提供spring bean 的注册与获取方法。
代码语言:javascript
复制
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.raincat.core.helper;

import org.springframework.context.ConfigurableApplicationContext;

/**
 * SpringBeanUtils.
 * @author xiaoyu
 */
public final class SpringBeanUtils {

    private static final SpringBeanUtils INSTANCE = new SpringBeanUtils();

    private ConfigurableApplicationContext cfgContext;

    private SpringBeanUtils() {
        if (INSTANCE != null) {
            throw new Error("error");
        }
    }

    /**
     * get SpringBeanUtils.
     * @return SpringBeanUtils
     */
    public static SpringBeanUtils getInstance() {
        return INSTANCE;
    }

    /**
     * acquire spring bean.
     *
     * @param type type
     * @param <T>  class
     * @return bean
     */
    public <T> T getBean(final Class<T> type) {
        return cfgContext.getBean(type);
    }

    /**
     * register bean in spring ioc.
     * @param beanName bean name
     * @param obj bean
     */
    public void registerBean(final String beanName, final Object obj) {
        cfgContext.getBeanFactory().registerSingleton(beanName, obj);
    }

    /**
     * set application context.
     * @param cfgContext application context
     */
    public void setCfgContext(final ConfigurableApplicationContext cfgContext) {
        this.cfgContext = cfgContext;
    }
}

五、加载spi

1.主要操作

  1. 获取序列化方式
  2. 获取 TransactionRecoverRepository(事务恢复的存储方式,示例中其实现是 JdbcTransactionRecoverRepository),并设置其序列化方式。
  3. 将TransactionRecoverRepository注入Spring容器,以便在事务补偿器中使用

InitServiceImpl

代码语言:javascript
复制
/**
     * load spi.
     *
     * @param txConfig {@linkplain TxConfig}
     */
    private void loadSpi(final TxConfig txConfig) {
        //spi  serialize
        final SerializeProtocolEnum serializeProtocolEnum
                = SerializeProtocolEnum.acquireSerializeProtocol(txConfig.getSerializer());
        final ServiceLoader<ObjectSerializer> objectSerializers
                = ServiceBootstrap.loadAll(ObjectSerializer.class);
        final ObjectSerializer serializer =
                StreamSupport.stream(objectSerializers.spliterator(), false)
                        .filter(s -> Objects.equals(s.getScheme(), serializeProtocolEnum.getSerializeProtocol()))
                        .findFirst().orElse(new KryoSerializer());

        //spi  RecoverRepository support
        final CompensationCacheTypeEnum compensationCacheTypeEnum
                = CompensationCacheTypeEnum.acquireCompensationCacheType(txConfig.getCompensationCacheType());

        final ServiceLoader<TransactionRecoverRepository> recoverRepositories
                = ServiceBootstrap.loadAll(TransactionRecoverRepository.class);
        final TransactionRecoverRepository repository =
                StreamSupport.stream(recoverRepositories.spliterator(), false)
                        .filter(r -> Objects.equals(r.getScheme(), compensationCacheTypeEnum.getCompensationCacheType()))
                        .findFirst().orElse(new JdbcTransactionRecoverRepository());
        //将compensationCache实现注入到spring容器
        repository.setSerializer(serializer);
        SpringBeanUtils.getInstance().registerBean(TransactionRecoverRepository.class.getName(), repository);
    }

2. 作用

SPI的全名为Service Provider Interface,该机制其实就是为接口寻找服务实现类

3. 如何使用

当服务的提供者,提供了服务接口的一种实现之后,在jar包的META-INF/services/目录里同时创建一个以服务接口命名的文件。 该文件里就是实现该服务接口的具体实现类。 而当外部程序装配这个模块的时候,就能通过该jar包META-INF/services/里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。

4. 优点

基于这样一个约定就能很好的找到服务接口的实现类,而不需要再代码里制定。

5. 示例

服务接口命名文件
服务接口命名文件
事务补偿存储
事务补偿存储

六、启动netty客户端

1 主要操作

Netty客户端启动过程中,主要做了以下几件事:

  • 定期更新TxMagager的信息

启动一个单线程的调度线程池,定期向TxMagager发送post请求,获取Eureka服务注册表中TxMagager的信息(appName,instanceId,homepageUrl)

  • 设置 Bootstrap

安装ChannelInitializer,并将一个ChannelHandler加入管道中

  • 连接Netty服务端

(1)获取前面更新的TxManager的信息(appName,instanceId,homepageUrl) (2)向TxManager发送Post请求,获取Netty服务器连接信息(host,port) (3)连接到Netty服务端 (4)通过ChannelFutureListener监听连接成功或失败的事件,若连接失败则定期重试。

2 事件监听

客户端除了连接服务端成功或失败事件监听,还有监听了以下事件。

前面向Netty管道中填充了一个ChannelHandler,这样就能通过ChannelHandler监控Netty生命周期中的消息入站事件:

channelRead exceptionCaught channelInactive channelActive userEventTriggered

3.Netty客户端启动时序图

在这里插入图片描述
在这里插入图片描述

七、事务补偿日志启动

1.主要操作

事务补偿日志启动过程中,主要做了以下几件事:

  • 事务补偿日志数据库准备

创建用来存储日志的数据表

  • 定时进行事务补偿

开启线程池,进行定时事务补偿 (1)获取到所有的事务补偿日志,并进行遍历 (2)根据每个日志的事务组ID,向协调者获取到对应的事务组信息 (3)如果整个事务组状态是提交的,而事务参与者自己不是提交的,则进行补偿。----不确定 (4)事务补偿: 反射执行事务参与者的事务,然后向事务协调者发送事务完成消息,最后事务参与者提交事务。

2.时序图

在这里插入图片描述
在这里插入图片描述

八、事件生产者启动

这里主要使用 disruptor 作为一个高性能环形缓存队列。 补偿日志是异步的,先把日志扔到环形队列,然后由disruptor 的事件消费者进行事务日志补偿的增删改和补偿操作

1.disruptor中的角色

角色

描述

raincat中对应角色

Event

事件

TxTransactionEvent

EventFactory

事件工厂

TxTransactionEventFactory

EventHandler

事件消费者

TxTransactionEventHandler

EventProducer

事件生产者

TxTransactionEventPublisher

EventTranslatorOneArg

3.0版本的Translator,可用来填充RingBuffer的事件槽

TxTransactionEventTranslator

2.主要操作

事件生产者启动过程是一个标准的 disruptor 启动过程,主要是设置事件工厂、事件消费者、设置线程池,然后启动disruptor

代码语言:javascript
复制
   /**
     * disruptor start.
     *
     * @param bufferSize this is disruptor buffer size.
     */
    public void start(final int bufferSize) {
        disruptor = new Disruptor<>(new TxTransactionEventFactory(), bufferSize, r -> {
            AtomicInteger index = new AtomicInteger(1);
            return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
        }, ProducerType.MULTI, new YieldingWaitStrategy());
        disruptor.handleEventsWith(txTransactionEventHandler);
        disruptor.setDefaultExceptionHandler(new ExceptionHandler<TxTransactionEvent>() {
            @Override
            public void handleEventException(Throwable ex, long sequence, TxTransactionEvent event) {
                LogUtil.error(LOGGER, () -> "Disruptor handleEventException:"
                        + event.getType() + event.getTransactionRecover().toString());
            }

            @Override
            public void handleOnStartException(Throwable ex) {
                LogUtil.error(LOGGER, () -> "Disruptor start exception");
            }

            @Override
            public void handleOnShutdownException(Throwable ex) {
                LogUtil.error(LOGGER, () -> "Disruptor close Exception ");
            }
        });
        executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                TxTransactionThreadFactory.create("raincat-log-disruptor", false),
                new ThreadPoolExecutor.AbortPolicy());
        disruptor.start();
    }

3.事件消费者

这里的事件消费者主要是监听事件,进行事务日志补偿的增删改和补偿操作。

代码语言:javascript
复制
/*
 *
 *  * Licensed to the Apache Software Foundation (ASF) under one or more
 *  * contributor license agreements.  See the NOTICE file distributed with
 *  * this work for additional information regarding copyright ownership.
 *  * The ASF licenses this file to You under the Apache License, Version 2.0
 *  * (the "License"); you may not use this file except in compliance with
 *  * the License.  You may obtain a copy of the License at
 *  *
 *  *     http://www.apache.org/licenses/LICENSE-2.0
 *  *
 *  * Unless required by applicable law or agreed to in writing, software
 *  * distributed under the License is distributed on an "AS IS" BASIS,
 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  * See the License for the specific language governing permissions and
 *  * limitations under the License.
 *
 */

package com.raincat.core.disruptor.handler;

import com.lmax.disruptor.EventHandler;
import com.raincat.common.enums.CompensationActionEnum;
import com.raincat.core.compensation.TxCompensationService;
import com.raincat.core.disruptor.event.TxTransactionEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Disroptor handler.
 *
 * @author xiaoyu(Myth)
 */
@Component
public class TxTransactionEventHandler implements EventHandler<TxTransactionEvent> {

    @Autowired
    private TxCompensationService txCompensationService;

    @Override
    public void onEvent(final TxTransactionEvent txTransactionEvent, final long sequence, final boolean endOfBatch) {
        if (txTransactionEvent.getType() == CompensationActionEnum.SAVE.getCode()) {
            txCompensationService.save(txTransactionEvent.getTransactionRecover());
        } else if (txTransactionEvent.getType() == CompensationActionEnum.DELETE.getCode()) {
            txCompensationService.remove(txTransactionEvent.getTransactionRecover().getId());
        } else if (txTransactionEvent.getType() == CompensationActionEnum.UPDATE.getCode()) {
            txCompensationService.update(txTransactionEvent.getTransactionRecover());
        } else if (txTransactionEvent.getType() == CompensationActionEnum.COMPENSATE.getCode()) {
            txCompensationService.compensation(txTransactionEvent.getTransactionRecover());
        }
        txTransactionEvent.clear();
    }
}

4.时序图

在这里插入图片描述
在这里插入图片描述

九、参考资料

  1. Raincat-github地址
  2. Raincat-源码解析视频
  3. Java之SPI机制
  4. Disruptor_学习_00_资源帖
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-09-27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、协调者启动过程
  • 三、参与者启动过程概览
  • 四、保存Spring上下文
  • 五、加载spi
    • 1.主要操作
      • 2. 作用
        • 3. 如何使用
          • 4. 优点
            • 5. 示例
            • 六、启动netty客户端
              • 1 主要操作
                • 2 事件监听
                  • 3.Netty客户端启动时序图
                  • 七、事务补偿日志启动
                    • 1.主要操作
                      • 2.时序图
                      • 八、事件生产者启动
                        • 1.disruptor中的角色
                          • 2.主要操作
                            • 3.事件消费者
                              • 4.时序图
                              • 九、参考资料
                              相关产品与服务
                              文件存储
                              文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档