相关 《Postgresql源码(60)事务系统总结》 《Postgresql源码(75)notify与listen执行流程分析》
顺着看事务提交时发现PG有异步消息队列的功能,这里试着分析总结。
两句话总结:
Listen监听:
CommitTransaction->PreCommit_Notifybackend
在事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。{page, offset}
。notify通知:
CommitTransaction --> PreCommit_Notify
事务提交时将记录的notify追加到消息队列。CommitTransaction --> AtCommit_Notify
事务提交时kill sigusr1通知其他进程。消息队列:
Listen:
Notify:
-- session 1
postgres=# listen ch1;
LISTEN
-- session 2
postgres=# listen ch1;
LISTEN
-- session 3
postgres=# notify ch1;
NOTIFY
postgres=# notify ch1;
NOTIFY
-- session 1
postgres=# select 1;
?column?
----------
1
(1 row)
Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.
-- session 2
postgres=# select 1;
?column?
----------
1
(1 row)
Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.
https://www.postgresql.org/docs/14/libpq-example.html#LIBPQ-EXAMPLE-2
{page, offset}
。exec_simple_query
PortalRun
PortalRunMulti
PortalRunUtility
ProcessUtility
standard_ProcessUtility
Async_Listen
queue_listen
listen属于DDL,也是跟着事务提交才会生效,所以函数调用嵌在事务系统中。
listen调用Async_Listen登记Listen信息,只把action(三种类型:listen、unlisten、unlisten all)记录在pendingActions中。
在语句结尾的事务状态机流转函数中,如果是事务提交状态,会走入CommitTransaction进行事务提交的具体工作。
在事务提交时,调用PreCommit_Notify函数:
void
PreCommit_Notify(void)
{
...
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_ListenPreCommit();
break;
case LISTEN_UNLISTEN:
/* there is no Exec_UnlistenPreCommit() */
break;
case LISTEN_UNLISTEN_ALL:
/* there is no Exec_UnlistenAllPreCommit() */
break;
}
}
}
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* tail must be <= the queue position of every
* listening backend */
int stopPage; /* oldest unrecycled page; must be <=
* tail.page */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;
typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
Oid dboid; /* backend's database OID, or InvalidOid */
BackendId nextListener; /* id of next listener, or InvalidBackendId */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
typedef struct QueuePosition
{
int page; /* SLRU page number */
int offset; /* byte offset within page */
} QueuePosition;
static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
注意拿到的消费起点位置是:max(控制结构记录的TAIL,其他所有进程消费到的最新位置)
Exec_ListenPreCommit
if (amRegisteredListener)
return;
head = QUEUE_HEAD;
max = QUEUE_TAIL;
prevListener = InvalidBackendId;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
// 拿到消费位置,从全局信息取QUEUE_TAIL,或从每个backend消费到的最大位置取。
max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
/* Also find last listening backend before this one */
if (i < MyBackendId)
prevListener = i;
}
QUEUE_BACKEND_POS(MyBackendId) = max;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
// 后插入监听队列
if (prevListener > 0)
{
QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
}
else
{
QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
QUEUE_FIRST_LISTENER = MyBackendId;
}
LWLockRelease(NotifyQueueLock);
Async_Notify
// 拼接 Notification n = {channel_len = 3, payload_len = 0, data = 0x2a0ab84 "ch1"}
// 挂在 pendingNotifies->events后 = list_make1(n)
CommitTransaction --> PreCommit_Notify --> asyncQueueAddEntries
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
{
AsyncQueueEntry qe;
QueuePosition queue_head;
int pageno;
int offset;
int slotno;
/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
SLRU标准接口拿消息队列页面SimpleLruZeroPage
queue_head = QUEUE_HEAD;
pageno = QUEUE_POS_PAGE(queue_head);
if (QUEUE_POS_IS_ZERO(queue_head))
slotno = SimpleLruZeroPage(NotifyCtl, pageno);
else
slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
InvalidTransactionId);
使用slru标准结构,会刷脏落盘。
NotifyCtl->shared->page_dirty[slotno] = true;
while (nextNotify != NULL)
{
Notification *n = (Notification *) lfirst(nextNotify);
/* Construct a valid queue entry in local variable qe */
asyncQueueNotificationToEntry(n, &qe);
offset = QUEUE_POS_OFFSET(queue_head);
当前页面能装得下,可以把nextNotify指向下一条了。
if (offset + qe.length <= QUEUE_PAGESIZE)
{
/* OK, so advance nextNotify past this item */
nextNotify = lnext(pendingNotifies->events, nextNotify);
}
当前页面装不下,length把剩下的装满,dboid=InvalidOid用于标记无效。
else
{
qe.length = QUEUE_PAGESIZE - offset;
qe.dboid = InvalidOid;
qe.data[0] = '\0'; /* empty channel */
qe.data[1] = '\0'; /* empty payload */
}
拷贝qe到消息队列中:
typedef struct AsyncQueueEntry
{
int length; /* total allocated length of entry */
Oid dboid; /* sender's database OID */
TransactionId xid; /* sender's XID */
int32 srcPid; /* sender's PID */
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
开始拷贝
memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
&qe,
qe.length);
推进head指针
/* Advance queue_head appropriately, and detect if page is full */
if (asyncQueueAdvance(&(queue_head), qe.length))
{
...
}
}
控制结构记录head指针asyncQueueControl->head = queue_head
/* Success, so update the global QUEUE_HEAD */
QUEUE_HEAD = queue_head;
LWLockRelease(NotifySLRULock);
return nextNotify;
}
AtCommit_Notify
SignalBackends
// 查询asyncQueueControl->backend监听数组,找到监听者
// 例如两个监听者:
// count = 2
// p pids[0] = 15446
// p pids[1] = 23101
// SendProcSignal(15446, PROCSIG_NOTIFY_INTERRUPT)
// SendProcSignal(23101, PROCSIG_NOTIFY_INTERRUPT)
procsignal_sigusr1_handler
if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
HandleNotifyInterrupt();
HandleNotifyInterrupt函数配置标志位后就退出。notifyInterruptPending = true;
等流程走到信号处理函数在做处理。