前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Seata学习整理二

Seata学习整理二

作者头像
路行的亚洲
发布2023-02-28 13:30:11
2670
发布2023-02-28 13:30:11
举报
文章被收录于专栏:后端技术学习

前面说过,seata在做二阶段提交前会生成前镜像、执行sql、生成后镜像。那么首先需要做的是,有数据源进行连接,然后需要对表的元数据信息进行抽取。这样才可以进行前镜像以及后镜像的操作。

一、初始化数据源元数据信息

可以看到io.seata.rm.datasource.DataSourceProxy中的构造函数会执行初始化方法

代码语言:javascript
复制
  public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        this.targetDataSource = targetDataSource;
        //执行初始化
        init(targetDataSource, resourceGroupId);
    }

执行初始化方法会提取相关信息:

代码语言:javascript
复制
    //执行初始化
    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        //获取相关数据源信息
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        //注册数据源
        DefaultResourceManager.get().registerResource(this);
        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                //获取数据远连接
                try (Connection connection = dataSource.getConnection()) {
                    //执行刷新表元数据缓存
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }

        //Set the default branch type to 'AT' in the RootContext.
        //设置默认分支类型AT到root上下文中
        RootContext.setDefaultBranchType(this.getBranchType());
    }

可以看到 mysql 获取schema

代码语言:javascript
复制
 // mysql 获取schema
    @Override
    protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
        // 获取其中的一条,执行sql查询,然后设置元数据信息到schema中
        String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
        try (Statement stmt = connection.createStatement();
            ResultSet rs = stmt.executeQuery(sql)) {
            //将结果集元数据设置到schema中
            return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
        } catch (SQLException sqlEx) {
            throw sqlEx;
        } catch (Exception e) {
            throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
        }
    }

设置的结果集元数据中可以看到:schemaName、catalogName、tableName、TableMeta、ColumnMeta、IndexMeta。

同时将表信息放入到缓存中:

代码语言:javascript
复制
Cache<String, TableMeta> TABLE_META_CACHE = Cache<String, TableMeta> TABLE_META_CACHE = Caffeine.newBuilder().maximumSize(CACHE_SIZE)
    .expireAfterWrite(EXPIRE_TIME, TimeUnit.MILLISECONDS).softValues().build();
TABLE_META_CACHE.put(entry.getKey(), tableMeta);    

二、sql识别器

可以看到sql识别器会根据对应sql类型执行sql操作:

代码语言:javascript
复制
  switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }

三、一阶段sql执行前后操作

可以看到在io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse中会执行几个重要的操作

生成前镜像、执行sql、生成后镜像、准备undo log日志数据

代码语言:javascript
复制
  /**
     * Execute auto commit false t.
     *
     * @param args the args
     * @return the t
     * @throws Exception the exception
     */
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        LOGGER.info("----执行自动提交 false------");
        LOGGER.info("----生成前镜像------");
        TableRecords beforeImage = beforeImage();
        LOGGER.info("----执行sql操作------");
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        LOGGER.info("----生成后镜像------");
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

之后执行二阶段处理提交

四、二阶段提交

提交sql,如果没有发生异常,则删除undo log日志。否则,执行回滚操作,执行undo log日志,也即通过镜像sql执行复原数据操作。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-12-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、初始化数据源元数据信息
  • 二、sql识别器
  • 三、一阶段sql执行前后操作
  • 四、二阶段提交
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档