commit
或rollback
.如下是有限状态机中状态说明// 事务的执行的状态
typedef enum TransState
{
// 没有事务运行时候的状态
TRANS_DEFAULT, /* idle */
// 事务开始时候的状态
TRANS_START, /* transaction starting */
// 事务进入处理函数时候的状态
TRANS_INPROGRESS, /* inside a valid transaction */
// 事务提交处理函数的状态
TRANS_COMMIT, /* commit in progress */
// 事务进入回滚阶段,调用资源清理处理函数的状态
TRANS_ABORT, /* abort in progress */
// 进入两阶段提交时候的事务 状态
TRANS_PREPARE /* prepare in progress */
} TransState;
// 事务的语句块的状态的定义,这些语句块的状态变更依赖于事务的事务执行的状态
typedef enum TBlockState
{
// 事务块的默认状态,事务开始之前或者结束之后都是出于这个状态
TBLOCK_DEFAULT, /* idle */
// 开始进入事务块的状态
TBLOCK_STARTED, /* running single-query transaction */
// 事务块是以begin命令开始
TBLOCK_BEGIN, /* starting transaction block */
// 事务块命令执行完毕后的状态
TBLOCK_INPROGRESS, /* live transaction */
// 隐式事务命令执行完毕的状态
TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN */
// 并行事务命令执行完毕的状态
TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */
// 事务提交时候的状态
TBLOCK_END, /* COMMIT received */
// 事务执行发生SQL错误,停止后面事务命令执行,设置状态为abort
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */
// 事务出于abort状态后,后续有不断的SQL语句,这些语句不会执行成功,这时候设置abort end状态
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */
// 事务执行过程中,显式执行rollback,事务是由TBLOCK_INPROGRESS更改为TBLOCK_ABORT_PENDING
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */
// 执行pg的二阶段提交的事务prepare_transaction命令,进入事务块时候的状态
TBLOCK_PREPARE, /* live xact, PREPARE received */
/* 如下是子事务的状态 */
TBLOCK_SUBBEGIN, /* starting a subtransaction */
TBLOCK_SUBINPROGRESS, /* live subtransaction */
TBLOCK_SUBRELEASE, /* RELEASE received */
TBLOCK_SUBCOMMIT, /* COMMIT received while TBLOCK_SUBINPROGRESS */
TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK */
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */
TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received */
} TBlockState;
只读事务
不会去申请事务ID
,但是会在涉及更改操作的情况下才会申请事务ID
,只读事务
通过快照机制判断判断元组的可见性,也不需要为只读事务
产生事务日志。事务的ID的分配是在GetNewTransactionId
中进行,事务ID的全局计数器保存在struct VariableCacheData
中,每次申请成功都会自增。PG的事务ID是一个无符号32位的整数,当整个事务执行过程中,事务ID不断的消耗,当消耗到一定的程度事务ID就会回卷
。简单的可以理解为事务ID是一个环,使用PG的vacuum
命令进行回收事务ID,被回收的可以被二次使用。
struct VariableCacheData
中保存对个限制变量,在事务ID分配时候会去和这些变量比较,比较时候达到一定的条件就会触发vacuum
来回收事务ID.typedef struct VariableCacheData
{
// 下一个可用的事务ID
Oid nextOid; /* next OID to assign */
uint32 oidCount; /* OIDs available before must do XLOG work */
/*
* These fields are protected by XidGenLock.
*/
FullTransactionId nextXid; /* next XID to assign */
// 集群维度最小的冻结的事务id
TransactionId oldestXid;
// 当事务ID超过这个变量的时候,事务可能执行一次vaccum,这个变量的是一个告警的作用,告诉PG事务ID的回卷已经非常靠近了。
TransactionId xidVacLimit;
// 当xidWarnLimit - xidVacLimit =1000000时候会产生告警需要手动执行vacuum,此时无法执行事务ID的申请
TransactionId xidWarnLimit;
// 当xidStopLimit - xidWarnLimit =1000000,产生告警,需要手动执行vacuum,刺水也是分配事务ID的
TransactionId xidStopLimit;
// 事务ID回卷的上限
TransactionId xidWrapLimit; /* where the world ends */
Oid oldestXidDB; /* database with minimum datfrozenxid */
/*
* These fields are protected by CommitTsLock
*/
TransactionId oldestCommitTsXid;
TransactionId newestCommitTsXid;
/*
* These fields are protected by ProcArrayLock.
*/
FullTransactionId latestCompletedXid; /* newest full XID that has
* committed or aborted */
/*
* Number of top-level transactions with xids (i.e. which may have
* modified the database) that completed in some form since the start of
* the server. This currently is solely used to check whether
* GetSnapshotData() needs to recompute the contents of the snapshot, or
* not. There are likely other users of this. Always above 1.
*/
uint64 xactCompletionCount;
/*
* These fields are protected by XactTruncationLock
*/
TransactionId oldestClogXid; /* oldest it's safe to look up in clog */
} VariableCacheData;
GetNewTransactionId
中,参照如下// 函数去除了部分代码,保留了需要说明的代码
FullTransactionId GetNewTransactionId(bool isSubXact)
{
FullTransactionId full_xid;
TransactionId xid;
// 并行操作阶段是不允许分配事务ID
if (IsInParallelMode())
elog(ERROR, "cannot assign TransactionIds during a parallel operation");
// 事务的崩溃恢复中也不能分配事务ID
if (RecoveryInProgress())
elog(ERROR, "cannot assign TransactionIds during recovery");
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
full_xid = ShmemVariableCache->nextXid;
xid = XidFromFullTransactionId(full_xid);
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
{
// 从共享内存中获取数据
TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
Oid oldest_datoid = ShmemVariableCache->oldestXidDB;
LWLockRelease(XidGenLock);
// 当下一个事务ID被65536取余等于0,则开启vacuum
if (IsUnderPostmaster && (xid % 65536) == 0)
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
// 如果事务ID 等于xidStopLimit-xid,告警事务ID不足,需要手动执行vacuum
if (IsUnderPostmaster &&
TransactionIdFollowsOrEquals(xid, xidStopLimit))
{
char *oldest_datname = get_database_name(oldest_datoid);
/* complain even if that DB has disappeared */
if (oldest_datname)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
oldest_datname),
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
"You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
else
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
oldest_datoid),
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
"You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
}
// 如果事务ID 等于xidWarnLimit-xid,告警事务ID不足,需要手动执行vacuum
else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
{
char *oldest_datname = get_database_name(oldest_datoid);
/* complain even if that DB has disappeared */
if (oldest_datname)
ereport(WARNING,
(errmsg("database \"%s\" must be vacuumed within %u transactions",
oldest_datname,
xidWrapLimit - xid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
"You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
else
ereport(WARNING,
(errmsg("database with OID %u must be vacuumed within %u transactions",
oldest_datoid,
xidWrapLimit - xid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
"You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
}
/* Re-acquire lock and start over */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
full_xid = ShmemVariableCache->nextXid;
xid = XidFromFullTransactionId(full_xid);
}
//. 省略代码 //
LWLockRelease(XidGenLock);
return full_xid;
}