相关: 《Postgresql源码(18)PGPROC相关结构》 《Postgresql源码(65)新快照体系Globalvis工作原理分析》 《Postgresql快照优化Globalvis新体系分析(性能大幅增强)》 《Postgresql源码(23)Clog使用的Slru页面淘汰机制》
(这篇是PG视角看GTM、后面在总结一篇GTM内部逻辑)
(前面是一些概念,后面是GDB走读)
- 运行中的事务:snapshot
- 非运行事务的状态:clog 或 元组标志位(shot path)xl的可见性判断需要GTM提供:
- 需要有全局统一分发的事务ID
- 全局统一的快照
连接GTM
获取全局事务ID
事务通知
获取快照、GID
事务处理函数的基础功能请参考:《Postgresql源码(60)事务系统总结》,下面是pg-xl在分布式场景下对事务处理函数的修改。
事务状态机函数:差异点在PGXC不支持可串行化的隔离级别。
PrepareTransaction
做一阶段提交。perpare阶段:
PreCommit_Remote
以 2PC 方式将commit传播到dn。FinishPreparedTransaction
结束2PC。CallGTMCallbacks
调用回调函数通知GTM,例如全局序列管理器。正常事务提交:
RecordTransactionCommit
写XLOG结束:
AtEOXact_GlobalTxn
请求GTM提交事务。AtEOXact_Remote
清理事务信息。// PG
typedef struct SnapshotData
{
SnapshotSatisfiesFunc satisfies; /* tuple test function */
TransactionId xmin; /* all XID < xmin are visible to me */
TransactionId xmax; /* all XID >= xmax are invisible to me */
TransactionId *xip;
uint32 xcnt; /* # of xact ids in xip[] */
#ifdef PGXC /* PGXC_COORD */
uint32 max_xcnt; /* Max # of xact in xip[] */
#endif
TransactionId *subxip;
int32 subxcnt; /* # of xact ids in subxip[] */
bool suboverflowed; /* has the subxip array overflowed? */
bool takenDuringRecovery; /* recovery-shaped snapshot? */
bool copied; /* false if it's a static snapshot */
CommandId curcid; /* in my xact, CID < curcid are visible */
/*
* An extra return value for HeapTupleSatisfiesDirty, not used in MVCC
* snapshots.
*/
uint32 speculativeToken;
/*
* Book-keeping information, used by the snapshot manager
*/
uint32 active_count; /* refcount on ActiveSnapshot stack */
uint32 regd_count; /* refcount on RegisteredSnapshots */
pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */
TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
} SnapshotData;
// PGXL
typedef struct GTM_SnapshotData
{
uint64 sn_snapid;
GlobalTransactionId sn_xmin;
GlobalTransactionId sn_xmax;
uint32 sn_xcnt;
GlobalTransactionId *sn_xip;
} GTM_SnapshotData;
数据准备
-- cn1执行
psql -p50854 -h127.0.0.1 -Upgxc postgres
drop table clstr_tst;
CREATE TABLE clstr_tst (a SERIAL, b INT, c TEXT, d TEXT) DISTRIBUTE BY HASH (b);
INSERT INTO clstr_tst (b, c) VALUES (1, 'once');
INSERT INTO clstr_tst (b, c) VALUES (2, 'diez');
INSERT INTO clstr_tst (b, c) VALUES (3, 'treinta y uno');
INSERT INTO clstr_tst (b, c) VALUES (4, 'veintidos');
INSERT INTO clstr_tst (b, c) VALUES (5, 'tres');
INSERT INTO clstr_tst (b, c) VALUES (6, 'veinte');
INSERT INTO clstr_tst (b, c) VALUES (7, 'veintitres');
-- 数据分布
-- dn
psql -p50856 -h127.0.0.1 -Upgxc postgres -c 'select * from clstr_tst'
1
2
5
6
psql -p50857 -h127.0.0.1 -Upgxc postgres -c 'select * from clstr_tst'
3
4
7
调试
-- cn1执行
psql -p50854 -h127.0.0.1 -Upgxc postgres
begin;
update clstr_tst set c = 'updated' where b = 5;
select txid_current();
20215
-- cn2执行
psql -p50855 -h127.0.0.1 -Upgxc postgres
update clstr_tst set c = 'updated' where b = 7;
select txid_current();
20221
-- cn1调试
global_snapshot_source=coordinator
GetSnapshotData
// 【1】用户可配快照获取方式,默认GTM,也可以生成本地快照,代价是全局一致性。
if (GlobalSnapshotSource == GLOBAL_SNAPSHOT_SOURCE_GTM)
if (GetPGXCSnapshotData(snapshot, latest))
return snapshot;
// 【2】本地快照生成
for (index = 0; index < numProcs; index++)
...
// 【3】读PGXACT->xmin
xid = pgxact->xmin; /* fetch just once */
if (TransactionIdIsNormal(xid) && NormalTransactionIdPrecedes(xid, globalxmin))
globalxmin = xid;
...
if (NormalTransactionIdPrecedes(xid, xmin))
xmin = xid;
// 【4】循环结束快照的构建已经完毕,顺便更新PGXACT->xmin
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
//【5】xmin参数修正后,当做全局做小可清理位点。
...
RecentGlobalDataXmin = RecentGlobalXmin;
// cat /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
// 64
其中【4】、【3】会带来大量cacheline失效,导致高并发场景下性能急速下降:《Postgresql快照优化Globalvis新体系分析(性能大幅增强)》。
global_snapshot_source=gtm
场景
s1: -----------begin(20251)---------------------------------------------
s2: ---------------------------------begin(20260)-----------------------
s3: --------------------------------------------------------debug---------
GetSnapshotData
// 【1】用户可配快照获取方式,默认GTM,也可以生成本地快照,代价是全局一致性。
if (GlobalSnapshotSource == GLOBAL_SNAPSHOT_SOURCE_GTM)
if (GetPGXCSnapshotData(snapshot, latest))
return snapshot;
GetPGXCSnapshotData(Snapshot snapshot, bool latest)
GetSnapshotDataFromGTM(snapshot)
// 读取ClusterMonitorCtl->reporting_recent_global_xmin
reporting_xmin = ClusterMonitorGetReportingGlobalXmin();
// GTM上层接口
// {sn_snapid = 14190, sn_xmin = 20251, sn_xmax = 20260, sn_xcnt = 1, sn_xip = 0x13db6e0}
gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionIdIfAny(), canbe_grouped);
// 拿快照顺便更新全局清理位点
// ClusterMonitorCtl->gtm_recent_global_xmin = 20251
RecentGlobalXmin = ClusterMonitorGetGlobalXmin(false);
RecentGlobalDataXmin = RecentGlobalXmin;
// 构造到全局快照
SetGlobalSnapshotData(gtm_snapshot->sn_xmin, gtm_snapshot->sn_xmax,gtm_snapshot->sn_xcnt, gtm_snapshot->sn_xip, SNAPSHOT_DIRECT);
// 用全局快照构造PG快照
GetSnapshotFromGlobalSnapshot(snapshot);
// 配置:snapshot->xmin
// 配置:snapshot->xmax
// 配置:snapshot->xcnt
// 计算:global_xmin(PG是遍历PGXACT的xmin和xid,PGXL直接用ClusterMonitorCtl->gtm_recent_global_xmin)
// 更新RecentGlobalXmin和RecentGlobalDataXmin
// 更新ClusterMonitor
ClusterMonitorSyncGlobalStateUsingSnapshot(gtm_snapshot);
关于globalxmin的计算:gtm_recent_global_xmin
cluster monitor process进程每隔5秒唤醒一次
while (!got_SIGTERM)
...
oldestXmin = GetOldestXminInternal(NULL, 0, true, lastGlobalXmin);
// 【重要】把自己看到的oldestXmin发给GTM,从GTM拿到全局最小newOldestXmin
ReportGlobalXmin(oldestXmin, &newOldestXmin, &latestCompletedXid)));
ClusterMonitorSetGlobalXmin(newOldestXmin)
...
// 扩展CLOG,保证后面任何依赖RecentGlobalXmin的操作可以在CLOG正确拿到slot,下面具体介绍
ExtendLogs(newOldestXmin);
...
ClusterMonitorCtl->gtm_recent_global_xmin = newOldestXmin;
Min(128, Max(4, NBuffers / 512))
,最大128个,最小4个。PG单机:
void
ExtendCLOG(TransactionId newestXact)
{
int pageno;
// newestXact % CLOG_XACTS_PER_PAGE,为0说明上一个用完了,需要扩展。
if (TransactionIdToPgIndex(newestXact) != 0 &&
// ==3发生回卷,必须要扩展,
!TransactionIdEquals(newestXact, FirstNormalTransactionId))
return;
// newestXact / CLOG_XACTS_PER_PAGE:计算slot位置。
pageno = TransactionIdToPage(newestXact);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
ZeroCLOGPage(pageno, true);
SimpleLruZeroPage
// 选出一个slot : 《Postgresql源码(23)Clog使用的Slru页面淘汰机制》
SlruSelectLRUPage
// 如果全部buffer都在使用,需要刷下去一个(依据是page_lru_count最小的那个)
SlruInternalWritePage
// 写一条CLOG_ZEROPAGE的XLOG
WriteZeroPageXlogRec
LWLockRelease(XactSLRULock);
}
PGXL:
void
ExtendCLOG(TransactionId newestXact)
{
int pageno;
// 由于事务ID可能在别的节点上申请,导致当前节点申请事务ID时,拿到是不连续的值。
// PG原生的机制是连续的事务ID申请,切每次都调用ExtendCLOG。
// 所以这里增加latestXid,记录上次一在当前节点使用的XID。
TransactionId latestXid;
// %
pageno = TransactionIdToPage(newestXact);
// 计算上一次在当前节点XID申请到哪了
latestXid = (ClogCtl->shared->latest_page_number * CLOG_XACTS_PER_PAGE)
+ CLOG_XACTS_PER_PAGE - 1;
// 如果上一次申请到了10000,现在需要的xid5000,clog页面够用直接返回。
if (TransactionIdPrecedesOrEquals(newestXact, latestXid))
return;
// 走到这里说明CLOG页面不够用了,但需要考虑竞争场景:
// 拿上锁在检查一遍,其他进程可能并发的进行扩展,够了就不用再扩展了。
LWLockAcquire(CLogControlLock, LW_EXCLUSIVE);
latestXid = (ClogCtl->shared->latest_page_number * CLOG_XACTS_PER_PAGE)
+ CLOG_XACTS_PER_PAGE - 1;
if (TransactionIdPrecedesOrEquals(newestXact, latestXid))
{
LWLockRelease(CLogControlLock);
return;
}
// 确实不够了,那就从上一次申请的slot位置latest_page_number,连续+1的向后申请。
for (;;)
{
/* Zero the page and make an XLOG entry about it */
int target_pageno = ClogCtl->shared->latest_page_number + 1;
if (target_pageno > TransactionIdToPage(MaxTransactionId))
target_pageno = 0;
ZeroCLOGPage(target_pageno, true);
if (target_pageno == pageno)
break;
}
LWLockRelease(CLogControlLock);
}