前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Postgresql源码(101)深入分析clog组提交(clog group updates)

Postgresql源码(101)深入分析clog组提交(clog group updates)

作者头像
mingjie
发布2023-03-01 13:33:54
7730
发布2023-03-01 13:33:54
举报
文章被收录于专栏:Postgresql源码分析

1 背景知识

1.1 CLOG使用的全局变量XactLastRecEnd

代码语言:javascript
复制
XLogRecPtr	ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;

其中ProcLastRecPtr、XactLastRecEnd更新位置XLogInsertRecord:

代码语言:javascript
复制
XLogInsertRecord()
  ...
  ...
  /*
	 * Update our global variables
	 */
	ProcLastRecPtr = StartPos;
	XactLastRecEnd = EndPos;
  • XactLastRecEnd表示,当前事务所有xlog的最后一个位置。
    • 在顶层事务提交、回滚时更新。
    • 可以从XactLastRecEnd知道当前事务有没有生成xlog。
  • ProcLastRecPtr表示,当前事务插入的最后一个xlog的 起始位置。

1.2 CLOG中的group_lsn

Postgresql中的XLOG和CLOG是分开保存的,所以存在先写后写的问题。

  • 场景一:XLOG落盘,CLOG没落盘
    • 没问题,XLOG中有commit log,在redo中会重建clog。
  • 场景二:XLOG没落盘,CLOG落盘
    • 有问题:事务在clog中查询到已提交,但XLOG丢失redo无法恢复该事物,所以该事物应该是未提交状态,与CLOG矛盾——发生数据不一致。

在同步提交场景下:可以保证XLOG一定先与CLOG落盘。 在异步提交场景下:从下图中可以看到,没有XlogFlush的过程,所以可能发生上述场景二的问题。

所以在异步移交场景下,Postgresql做了另外一层保护,使用group_lsn来保证xlog先与clog落盘,

代码语言:javascript
复制
/* We store the latest async LSN for each group of transactions */
#define CLOG_XACTS_PER_LSN_GROUP	32	/* keep this a power of 2 */
#define CLOG_LSNS_PER_PAGE	(CLOG_XACTS_PER_PAGE / CLOG_XACTS_PER_LSN_GROUP)

每个逻辑组保存32个事务状态,每个页面有CLOG_LSNS_PER_PAGE个组。

在这里插入图片描述
在这里插入图片描述

1.3 全局唯一的PROC HDR结构

其中CLOG组提交会使用到的是clogGroupFirst,记录int类型,指向procs中,第一个等待组提交的那个proc(组提交leader)。

代码语言:javascript
复制
typedef struct PROC_HDR
{
	/* Array of PGPROC structures (not including dummies for prepared txns) */
	PGPROC	   *allProcs;

	/* Array mirroring PGPROC.xid for each PGPROC currently in the procarray */
	TransactionId *xids;

	/*
	 * Array mirroring PGPROC.subxidStatus for each PGPROC currently in the
	 * procarray.
	 */
	XidCacheStatus *subxidStates;
    ...
    ...
	/* First pgproc waiting for group XID clear */
	pg_atomic_uint32 procArrayGroupFirst;
	/* First pgproc waiting for group transaction status update */
	pg_atomic_uint32 clogGroupFirst;    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
	...
	...
} PROC_HDR;

1.4 每个进程的PROC结构

  • clogGroupMember:布尔,当前proc是不是在使用组提交?
  • clogGroupNext:原子int,指向下一个组成员proc(allprocs数组中的位置)
  • clogGroupMemberXid:当前要提交的xid
  • clogGroupMemberXidStatus:当前要提交xid的状态
  • clogGroupMemberPage:当前要提交xid属于哪个page
  • clogGroupMemberLsn:当前要提交的xid的commit日志的lsn号(异步提交会有、同步提交没有)
代码语言:javascript
复制
struct PGPROC
{
	...
	...

	TransactionId xid;			/* id of top-level transaction currently being
								 * executed by this proc, if running and XID
								 * is assigned; else InvalidTransactionId.
								 * mirrored in ProcGlobal->xids[pgxactoff] */

	TransactionId xmin;			/* minimal running XID as it was when we were
								 * starting our xact, excluding LAZY VACUUM:
								 * vacuum must not remove tuples deleted by
								 * xid >= xmin ! */

	LocalTransactionId lxid;	/* local id of top-level transaction currently
								 * being executed by this proc, if running;
								 * else InvalidLocalTransactionId */
	int			pid;			/* Backend's process ID; 0 if prepared xact */

	int			pgxactoff;		/* offset into various ProcGlobal->arrays with
								 * data mirrored from this PGPROC */

	int			pgprocno;		/* Number of this PGPROC in
								 * ProcGlobal->allProcs array. This is set
								 * once by InitProcGlobal().
								 * ProcGlobal->allProcs[n].pgprocno == n */

	...
	...
	/* Support for group transaction status update. */
	bool                clogGroupMember;          /* true, if member of clog group */
	pg_atomic_uint32    clogGroupNext;            /* next clog group member */
	TransactionId       clogGroupMemberXid;       /* transaction id of clog group member */
	XidStatus           clogGroupMemberXidStatus; /* transaction status of clog
	                                               * group member */
	int                 clogGroupMemberPage;      /* clog page corresponding to
                                                   * transaction id of clog group member */
	XLogRecPtr          clogGroupMemberLsn;       /* WAL location of commit record for clog
	                                               * group member */
    ...
    ...
};

1.5 提交时CLOG写入流程

在这里插入图片描述
在这里插入图片描述

2 事务提交RecordTransactionCommit

RecordTransactionCommit

  1. 判断当前提交XID是否有效if (!markXidCommitted)
  2. 全局XactLastRecEnd判断当前事务是否有写入,wrote_xlog = (XactLastRecEnd != 0)
  3. 如果XID有效,写入一条commit的xlog,XactLogCommitRecord,注意全局wrote_xlog是在上面记录的,不受这里影响。
  4. 记录提交时间TransactionTreeSetCommitTsData
  5. 如果
    1. 事务写了xlog
    2. xid有效
    3. synchronous_commit不是off
  6. 如果上述三个条件满足:同步提交
    1. XLogFlush(XactLastRecEnd)
    2. TransactionIdCommitTree(xid, nchildren, children)
  7. 如果上述三个条件不满足:异步提交
    1. XLogSetAsyncXactLSN(XactLastRecEnd)
    2. TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd)

同步提交和异步提交的区别就是是否传入XactLastRecEnd,XactLastRecEnd记录了当前事务(包含最后一条commit)产生的所有xlog的最后一条的最后一个位点。注意这个是进程私有变量。

3 CLOG写入TransactionIdSetPageStatus

老版本PG的CLOG写入流程比较简单,可以总结为下面三步:

  1. 拿大锁:LWLockAcquire(CLogControlLock, LW_EXCLUSIVE)
  2. 写入:TransactionIdSetStatusBit; ClogCtl->shared->page_dirty[slotno] = true;
  3. 放锁:LWLockRelease(CLogControlLock)

新版本PG对CLogControlLock有了较大优化,主要是XactSLRULock锁优化(新版本XactSLRULock == 老版本CLogControlLock )(baaf272ac908ea27c09076e34f62c45fa7d1e448)

优化思路主要参考了procArray在高并发场景下使用单个进程来清理xid,避免ProcArrayLock的争抢。 (0e141c0fbb211bdd23783afa731e3eef95c9ad7a)

在这里插入图片描述
在这里插入图片描述

下面分析TransactionIdSetPageStatus的新流程

代码语言:javascript
复制
static void
TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
						   TransactionId *subxids, XidStatus status,
						   XLogRecPtr lsn, int pageno,
						   bool all_xact_same_page)
{
	/* Can't use group update when PGPROC overflows. */
	StaticAssertDecl(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS,
					 "group clog threshold less than PGPROC cached subxids");

	/*
	 * When there is contention on XactSLRULock, we try to group multiple
	 * updates; a single leader process will perform transaction status
	 * updates for multiple backends so that the number of times XactSLRULock
	 * needs to be acquired is reduced.
	 *
	 * For this optimization to be safe, the XID and subxids in MyProc must be
	 * the same as the ones for which we're setting the status.  Check that
	 * this is the case.
	 *
	 * For this optimization to be efficient, we shouldn't have too many
	 * sub-XIDs and all of the XIDs for which we're adjusting clog should be
	 * on the same page.  Check those conditions, too.
	 */
	if (all_xact_same_page && xid == MyProc->xid &&
		nsubxids <= THRESHOLD_SUBTRANS_CLOG_OPT &&
		nsubxids == MyProc->subxidStatus.count &&
		(nsubxids == 0 ||
		 memcmp(subxids, MyProc->subxids.xids,
				nsubxids * sizeof(TransactionId)) == 0))

需要满足

  1. CLOG必须都在一个页面上。(有时提交xid需要一块提交多个subxid,所以可能分布在多个页面上)
  2. 为了性能考虑,子事务数要少于5个。
代码语言:javascript
复制
	{
		/*
		 * If we can immediately acquire XactSLRULock, we update the status of
		 * our own XID and release the lock.  If not, try use group XID
		 * update.  If that doesn't work out, fall back to waiting for the
		 * lock to perform an update for this transaction only.
		 */
		if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE))
		{
			/* Got the lock without waiting!  Do the update. */
			TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
											   lsn, pageno);
			LWLockRelease(XactSLRULock);
			return;
		}
		else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno))
		{
			/* Group update mechanism has done the work. */
			return;
		}

		/* Fall through only if update isn't done yet. */
	}

	/* Group update not applicable, or couldn't accept this page number. */
	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
	TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
									   lsn, pageno);
	LWLockRelease(XactSLRULock);
}

满足组提交条件后,所有拿不到锁的PROC都会进入TransactionGroupUpdateXidStatus。

由TransactionGroupUpdateXidStatus选一个最先进来的当leader,负责后续提交。

4 组提交TransactionGroupUpdateXidStatus

4.1 整体

代码语言:javascript
复制
static bool
TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
								XLogRecPtr lsn, int pageno)
{
	volatile PROC_HDR *procglobal = ProcGlobal;
	PGPROC	   *proc = MyProc;
	uint32		nextidx;
	uint32		wakeidx;

	/* We should definitely have an XID whose status needs to be updated. */
	Assert(TransactionIdIsValid(xid));

	/*
	 * Add ourselves to the list of processes needing a group XID status
	 * update.
	 */
	proc->clogGroupMember = true;
	proc->clogGroupMemberXid = xid;
	proc->clogGroupMemberXidStatus = status;
	proc->clogGroupMemberPage = pageno;
	proc->clogGroupMemberLsn = lsn;

	nextidx = pg_atomic_read_u32(&procglobal->clogGroupFirst);

	while (true)
	{
		// 不在同一个页面不交给leader去做。
		// 可能判断完了又不在页面了,但逻辑没问题,性能可能会差。
		if (nextidx != INVALID_PGPROCNO &&
			ProcGlobal->allProcs[nextidx].clogGroupMemberPage != proc->clogGroupMemberPage)
		{
			/*
			 * Ensure that this proc is not a member of any clog group that
			 * needs an XID status update.
			 */
			proc->clogGroupMember = false;
			pg_atomic_write_u32(&proc->clogGroupNext, INVALID_PGPROCNO);
			return false;
		}

		pg_atomic_write_u32(&proc->clogGroupNext, nextidx);

		if (pg_atomic_compare_exchange_u32(&procglobal->clogGroupFirst,
										   &nextidx,
										   (uint32) proc->pgprocno))
			break;
	}

	/*
	 * If the list was not empty, the leader will update the status of our
	 * XID. It is impossible to have followers without a leader because the
	 * first process that has added itself to the list will always have
	 * nextidx as INVALID_PGPROCNO.
	 */
	if (nextidx != INVALID_PGPROCNO)
	{
		int			extraWaits = 0;

		/* Sleep until the leader updates our XID status. */
		pgstat_report_wait_start(WAIT_EVENT_XACT_GROUP_UPDATE);
		for (;;)
		{
			/* acts as a read barrier */
			PGSemaphoreLock(proc->sem);
			if (!proc->clogGroupMember)
				break;
			extraWaits++;
		}
		pgstat_report_wait_end();

		Assert(pg_atomic_read_u32(&proc->clogGroupNext) == INVALID_PGPROCNO);

		/* Fix semaphore count for any absorbed wakeups */
		while (extraWaits-- > 0)
			PGSemaphoreUnlock(proc->sem);
		return true;
	}

	/* We are the leader.  Acquire the lock on behalf of everyone. */
	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);

	/*
	 * Now that we've got the lock, clear the list of processes waiting for
	 * group XID status update, saving a pointer to the head of the list.
	 * Trying to pop elements one at a time could lead to an ABA problem.
	 */
	nextidx = pg_atomic_exchange_u32(&procglobal->clogGroupFirst,
									 INVALID_PGPROCNO);

	/* Remember head of list so we can perform wakeups after dropping lock. */
	wakeidx = nextidx;

	/* Walk the list and update the status of all XIDs. */
	while (nextidx != INVALID_PGPROCNO)
	{
		PGPROC	   *nextproc = &ProcGlobal->allProcs[nextidx];

		/*
		 * Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs
		 * should not use group XID status update mechanism.
		 */
		Assert(nextproc->subxidStatus.count <= THRESHOLD_SUBTRANS_CLOG_OPT);

		TransactionIdSetPageStatusInternal(nextproc->clogGroupMemberXid,
										   nextproc->subxidStatus.count,
										   nextproc->subxids.xids,
										   nextproc->clogGroupMemberXidStatus,
										   nextproc->clogGroupMemberLsn,
										   nextproc->clogGroupMemberPage);

		/* Move to next proc in list. */
		nextidx = pg_atomic_read_u32(&nextproc->clogGroupNext);
	}

	/* We're done with the lock now. */
	LWLockRelease(XactSLRULock);

	/*
	 * Now that we've released the lock, go back and wake everybody up.  We
	 * don't do this under the lock so as to keep lock hold times to a
	 * minimum.
	 */
	while (wakeidx != INVALID_PGPROCNO)
	{
		PGPROC	   *wakeproc = &ProcGlobal->allProcs[wakeidx];

		wakeidx = pg_atomic_read_u32(&wakeproc->clogGroupNext);
		pg_atomic_write_u32(&wakeproc->clogGroupNext, INVALID_PGPROCNO);

		/* ensure all previous writes are visible before follower continues. */
		pg_write_barrier();

		wakeproc->clogGroupMember = false;

		if (wakeproc != MyProc)
			PGSemaphoreUnlock(wakeproc->sem);
	}

	return true;
}

4.2 选主逻辑与链表构造

使用pg_atomic_compare_exchange_u32函数构造链表:

代码语言:javascript
复制
    uint32		nextidx;

    nextidx = pg_atomic_read_u32(&procglobal->clogGroupFirst);
	while (true)
	{
		pg_atomic_write_u32(&proc->clogGroupNext, nextidx);
		if (pg_atomic_compare_exchange_u32(&procglobal->clogGroupFirst,
										   &nextidx,
										   (uint32) proc->pgprocno))
			break;
	}

pg_atomic_compare_exchange_u32逻辑:

  • nextidx一定会更新为procglobal->clogGroupFirst
  • procglobal->clogGroupFirst会视情况更新为proc->pgprocno(1、2参相等则更新)
  • 如果发生更新了,pg_atomic_compare_exchange_u32为真退出循环。

实例:

进程

全局指针:procglobal->clogGroupFirst

nextidx

链表指针:proc->clogGroupNext

第一个进入的进程1997(☆)

proc->pgprocno = 1997

invalid

invalid

第二个进入的进程2001(△)

proc->pgprocno = 2001

1997

1997

第三个进入的进程1998(▢)

proc->pgprocno = 1998

2001

2001

在这里插入图片描述
在这里插入图片描述

总结:

  • nextidx都会通过全局clogGroupFirst指向链表头,记录clogGroupFirst的旧值。用于更新clogGroupNext,形成链表。
  • clogGroupFirst会更新为当前进程的procno。

4.3 非leader

nextidx有值说明他不在链表第一位,非leader。

  • 等leader唤醒:proc->sem。
  • 其他什么都不需要做。
代码语言:javascript
复制
	if (nextidx != INVALID_PGPROCNO)
	{
		int			extraWaits = 0;

		/* Sleep until the leader updates our XID status. */
		pgstat_report_wait_start(WAIT_EVENT_XACT_GROUP_UPDATE);
		for (;;)
		{
			/* acts as a read barrier */
			PGSemaphoreLock(proc->sem);
			if (!proc->clogGroupMember)
				break;
			extraWaits++;
		}
		pgstat_report_wait_end();

		Assert(pg_atomic_read_u32(&proc->clogGroupNext) == INVALID_PGPROCNO);

		/* Fix semaphore count for any absorbed wakeups */
		while (extraWaits-- > 0)
			PGSemaphoreUnlock(proc->sem);
		return true;
	}

4.4 leader

  1. 拿XactSLRULock干活(Leader只能有一个,拿锁是为了避免和其他场景竞争,比如非组提交的场景 或是ExtendCLOG)。
  2. 拿着锁后,下一个进来的进程有两种结果:
    • 第一种:clogGroupFirst还没清:继续挂链表
    • 第二种:clogGroupFirst清理了:成为新的leader
代码语言:javascript
复制
	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);

这里进来的新进程会继续挂链表

代码语言:javascript
复制
	nextidx = pg_atomic_exchange_u32(&procglobal->clogGroupFirst,
									 INVALID_PGPROCNO);

这里进来的进程会成为新的leader,注意新的链表不会覆盖旧的,因为链表是由每个进程的proc->clogGroupNext构成的,新链表和旧链表的Proc不可能重复。

代码语言:javascript
复制
	/* Remember head of list so we can perform wakeups after dropping lock. */
	wakeidx = nextidx;

开始干活,从链表最后一个位置做。

代码语言:javascript
复制
	while (nextidx != INVALID_PGPROCNO)
	{
		PGPROC	   *nextproc = &ProcGlobal->allProcs[nextidx];

		/*
		 * Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs
		 * should not use group XID status update mechanism.
		 */
		Assert(nextproc->subxidStatus.count <= THRESHOLD_SUBTRANS_CLOG_OPT);

		TransactionIdSetPageStatusInternal(nextproc->clogGroupMemberXid,
										   nextproc->subxidStatus.count,
										   nextproc->subxids.xids,
										   nextproc->clogGroupMemberXidStatus,
										   nextproc->clogGroupMemberLsn,
										   nextproc->clogGroupMemberPage);

		/* Move to next proc in list. */
		nextidx = pg_atomic_read_u32(&nextproc->clogGroupNext);
	}

	/* We're done with the lock now. */
	LWLockRelease(XactSLRULock);

都做完了,唤醒等待的进程。

代码语言:javascript
复制
	while (wakeidx != INVALID_PGPROCNO)
	{
		PGPROC	   *wakeproc = &ProcGlobal->allProcs[wakeidx];

		wakeidx = pg_atomic_read_u32(&wakeproc->clogGroupNext);
		pg_atomic_write_u32(&wakeproc->clogGroupNext, INVALID_PGPROCNO);

		/* ensure all previous writes are visible before follower continues. */
		pg_write_barrier();

		wakeproc->clogGroupMember = false;

		if (wakeproc != MyProc)
			PGSemaphoreUnlock(wakeproc->sem);
	}

	return true;
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-02-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 背景知识
    • 1.1 CLOG使用的全局变量XactLastRecEnd
      • 1.2 CLOG中的group_lsn
        • 1.3 全局唯一的PROC HDR结构
          • 1.4 每个进程的PROC结构
            • 1.5 提交时CLOG写入流程
            • 2 事务提交RecordTransactionCommit
            • 3 CLOG写入TransactionIdSetPageStatus
            • 4 组提交TransactionGroupUpdateXidStatus
              • 4.1 整体
                • 4.2 选主逻辑与链表构造
                  • 4.3 非leader
                    • 4.4 leader
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档