前面说过,seata在做二阶段提交前会生成前镜像、执行sql、生成后镜像。那么首先需要做的是,有数据源进行连接,然后需要对表的元数据信息进行抽取。这样才可以进行前镜像以及后镜像的操作。
可以看到io.seata.rm.datasource.DataSourceProxy中的构造函数会执行初始化方法
public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
if (targetDataSource instanceof SeataDataSourceProxy) {
targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
}
this.targetDataSource = targetDataSource;
//执行初始化
init(targetDataSource, resourceGroupId);
}
执行初始化方法会提取相关信息:
//执行初始化
private void init(DataSource dataSource, String resourceGroupId) {
this.resourceGroupId = resourceGroupId;
//获取相关数据源信息
try (Connection connection = dataSource.getConnection()) {
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
}
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
//注册数据源
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
tableMetaExcutor.scheduleAtFixedRate(() -> {
//获取数据远连接
try (Connection connection = dataSource.getConnection()) {
//执行刷新表元数据缓存
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
//Set the default branch type to 'AT' in the RootContext.
//设置默认分支类型AT到root上下文中
RootContext.setDefaultBranchType(this.getBranchType());
}
可以看到 mysql 获取schema
// mysql 获取schema
@Override
protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
// 获取其中的一条,执行sql查询,然后设置元数据信息到schema中
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
//将结果集元数据设置到schema中
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
}
}
设置的结果集元数据中可以看到:schemaName、catalogName、tableName、TableMeta、ColumnMeta、IndexMeta。
同时将表信息放入到缓存中:
Cache<String, TableMeta> TABLE_META_CACHE = Cache<String, TableMeta> TABLE_META_CACHE = Caffeine.newBuilder().maximumSize(CACHE_SIZE)
.expireAfterWrite(EXPIRE_TIME, TimeUnit.MILLISECONDS).softValues().build();
TABLE_META_CACHE.put(entry.getKey(), tableMeta);
可以看到sql识别器会根据对应sql类型执行sql操作:
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
可以看到在io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse中会执行几个重要的操作
生成前镜像、执行sql、生成后镜像、准备undo log日志数据
/**
* Execute auto commit false t.
*
* @param args the args
* @return the t
* @throws Exception the exception
*/
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
LOGGER.info("----执行自动提交 false------");
LOGGER.info("----生成前镜像------");
TableRecords beforeImage = beforeImage();
LOGGER.info("----执行sql操作------");
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
LOGGER.info("----生成后镜像------");
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
之后执行二阶段处理提交
提交sql,如果没有发生异常,则删除undo log日志。否则,执行回滚操作,执行undo log日志,也即通过镜像sql执行复原数据操作。