前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Postgresql源码(75)notify与listen执行流程分析

Postgresql源码(75)notify与listen执行流程分析

作者头像
mingjie
发布2022-09-23 15:36:47
1K0
发布2022-09-23 15:36:47
举报
文章被收录于专栏:Postgresql源码分析

相关 《Postgresql源码(60)事务系统总结》 《Postgresql源码(75)notify与listen执行流程分析》

顺着看事务提交时发现PG有异步消息队列的功能,这里试着分析总结。

0 总结速查

两句话总结:

  • notify将msg追加到slru消息队列,发信号通知。
  • listen注册监听人backend到监听队列,每个监听者消费,并自己记录消费位置。

Listen监听:

  • CommitTransaction->PreCommit_Notifybackend事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。
  • 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。
  • 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend+db,只能使用一个位置。
  • 监听队列是SLRU结构,所以指向监听队列的指针为{page, offset}

notify通知:

  • DDL记录通知信息(不通知)。
  • CommitTransaction --> PreCommit_Notify事务提交时将记录的notify追加到消息队列。
  • CommitTransaction --> AtCommit_Notify事务提交时kill sigusr1通知其他进程。

消息队列:

  • 使用通用SLRU结构,也会标记为脏、也会正常淘汰落盘。参考之前写过的SLRU页面分析(CLOG、SUBTRANS等)。
  • 总控结构AsyncQueueControl->head端新增,AsyncQueueControl->tail端消费。
  • 注意:消息队列虽然使用SLRU结构,但不持久化,只是在内存页面不够用的时候,用LRU换出到磁盘。

1 背景

Listen:

  1. 监听语句如果在事务内,listen执行后不能拿到通知信息,必须等待事务提交;注意事务提交后,会拿到所有listen语句后的通知。
  2. 监听必须在notify之前,如果notify时没有监听,消息收不到。
  3. 监听如果在psql执行,只在任何语句执行完时收到通知,如没有语句执行不会收到通知。
  4. 监听如果使用API,例如libpq的PQnotifies函数,可以立即收到通知进行处理。

Notify:

  1. 通知语句的事务必须提交才会生效。
  2. 通知是异步的,记录在队列中,每次监听会收到队列中累加的所有消息,PG保证收到的顺序和发送顺序一致。

2 使用案例

2.1 PSQL

代码语言:javascript
复制
-- session 1
postgres=# listen ch1;
LISTEN

-- session 2
postgres=# listen ch1;
LISTEN

-- session 3
postgres=# notify ch1;
NOTIFY
postgres=# notify ch1;
NOTIFY

-- session 1
postgres=# select 1;
 ?column? 
----------
        1
(1 row)

Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.

-- session 2
postgres=# select 1;
 ?column? 
----------
        1
(1 row)

Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.

2.2 LIBPQ使用案例

https://www.postgresql.org/docs/14/libpq-example.html#LIBPQ-EXAMPLE-2

3 内核代码分析

3.1 listen监听

先放总结
  • backend在事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。
  • 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。
  • 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend+db,只能使用一个位置。
  • 监听队列是SLRU结构,所以指向监听队列的指针为{page, offset}
Async_Listen进入位置
代码语言:javascript
复制
exec_simple_query
  PortalRun
    PortalRunMulti
      PortalRunUtility
        ProcessUtility
          standard_ProcessUtility
            Async_Listen
              queue_listen

listen属于DDL,也是跟着事务提交才会生效,所以函数调用嵌在事务系统中。

listen调用Async_Listen登记Listen信息,只把action(三种类型:listen、unlisten、unlisten all)记录在pendingActions中。

在语句结尾的事务状态机流转函数中,如果是事务提交状态,会走入CommitTransaction进行事务提交的具体工作。

在事务提交时,调用PreCommit_Notify函数:

代码语言:javascript
复制
void
PreCommit_Notify(void)
{
    ...
	if (pendingActions != NULL)
	{
		foreach(p, pendingActions->actions)
		{
			ListenAction *actrec = (ListenAction *) lfirst(p);

			switch (actrec->action)
			{
				case LISTEN_LISTEN:
					Exec_ListenPreCommit();
					break;
				case LISTEN_UNLISTEN:
					/* there is no Exec_UnlistenPreCommit() */
					break;
				case LISTEN_UNLISTEN_ALL:
					/* there is no Exec_UnlistenAllPreCommit() */
					break;
			}
		}
	}
异步队列的数据结构
代码语言:javascript
复制
typedef struct AsyncQueueControl
{
	QueuePosition head;			/* head points to the next free location */
	QueuePosition tail;			/* tail must be <= the queue position of every
								 * listening backend */
	int			stopPage;		/* oldest unrecycled page; must be <=
								 * tail.page */
	BackendId	firstListener;	/* id of first listener, or InvalidBackendId */
	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
	/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;

typedef struct QueueBackendStatus
{
	int32		pid;			/* either a PID or InvalidPid */
	Oid			dboid;			/* backend's database OID, or InvalidOid */
	BackendId	nextListener;	/* id of next listener, or InvalidBackendId */
	QueuePosition pos;			/* backend has read queue up to here */
} QueueBackendStatus;

typedef struct QueuePosition
{
	int			page;			/* SLRU page number */
	int			offset;			/* byte offset within page */
} QueuePosition;

static AsyncQueueControl *asyncQueueControl;

#define QUEUE_HEAD					(asyncQueueControl->head)
#define QUEUE_TAIL					(asyncQueueControl->tail)
#define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER		(asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
完成监听过程

注意拿到的消费起点位置是:max(控制结构记录的TAIL,其他所有进程消费到的最新位置)

代码语言:javascript
复制
Exec_ListenPreCommit
  if (amRegisteredListener)
    return;

	head = QUEUE_HEAD;
	max = QUEUE_TAIL;
	prevListener = InvalidBackendId;
	for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
	{
		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
		    // 拿到消费位置,从全局信息取QUEUE_TAIL,或从每个backend消费到的最大位置取。
			max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
		/* Also find last listening backend before this one */
		if (i < MyBackendId)
			prevListener = i;
	}
	QUEUE_BACKEND_POS(MyBackendId) = max;
	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
	QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
	// 后插入监听队列
	if (prevListener > 0)
	{
		QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
		QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
	}
	else
	{
		QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
		QUEUE_FIRST_LISTENER = MyBackendId;
	}
	LWLockRelease(NotifyQueueLock);

3.2 notify通知

第一步:DDL记录通知信息(不通知)
代码语言:javascript
复制
Async_Notify
  // 拼接 Notification n = {channel_len = 3, payload_len = 0, data = 0x2a0ab84 "ch1"}
  // 挂在 pendingNotifies->events后 = list_make1(n)
第二步:PreCommit_Notify事务提交时append to 消息队列

CommitTransaction --> PreCommit_Notify --> asyncQueueAddEntries

代码语言:javascript
复制
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
{
	AsyncQueueEntry qe;
	QueuePosition queue_head;
	int			pageno;
	int			offset;
	int			slotno;

	/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
	LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);

SLRU标准接口拿消息队列页面SimpleLruZeroPage

代码语言:javascript
复制
	queue_head = QUEUE_HEAD;
	pageno = QUEUE_POS_PAGE(queue_head);
	if (QUEUE_POS_IS_ZERO(queue_head))
		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
	else
		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
								   InvalidTransactionId);

使用slru标准结构,会刷脏落盘。

代码语言:javascript
复制
	NotifyCtl->shared->page_dirty[slotno] = true;

	while (nextNotify != NULL)
	{
		Notification *n = (Notification *) lfirst(nextNotify);

		/* Construct a valid queue entry in local variable qe */
		asyncQueueNotificationToEntry(n, &qe);

		offset = QUEUE_POS_OFFSET(queue_head);

当前页面能装得下,可以把nextNotify指向下一条了。

代码语言:javascript
复制
		if (offset + qe.length <= QUEUE_PAGESIZE)
		{
			/* OK, so advance nextNotify past this item */
			nextNotify = lnext(pendingNotifies->events, nextNotify);
		}

当前页面装不下,length把剩下的装满,dboid=InvalidOid用于标记无效。

代码语言:javascript
复制
		else
		{
			qe.length = QUEUE_PAGESIZE - offset;
			qe.dboid = InvalidOid;
			qe.data[0] = '\0';	/* empty channel */
			qe.data[1] = '\0';	/* empty payload */
		}

拷贝qe到消息队列中:

代码语言:javascript
复制
typedef struct AsyncQueueEntry
{
	int			length;			/* total allocated length of entry */
	Oid			dboid;			/* sender's database OID */
	TransactionId xid;			/* sender's XID */
	int32		srcPid;			/* sender's PID */
	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;

开始拷贝

代码语言:javascript
复制
		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
			   &qe,
			   qe.length);

推进head指针

代码语言:javascript
复制
		/* Advance queue_head appropriately, and detect if page is full */
		if (asyncQueueAdvance(&(queue_head), qe.length))
		{
			...
		}
	}

控制结构记录head指针asyncQueueControl->head = queue_head

代码语言:javascript
复制
	/* Success, so update the global QUEUE_HEAD */
	QUEUE_HEAD = queue_head;

	LWLockRelease(NotifySLRULock);

	return nextNotify;
}
第三步:AtCommit_Notify事务提交时通知其他进程
代码语言:javascript
复制
AtCommit_Notify
  SignalBackends
    // 查询asyncQueueControl->backend监听数组,找到监听者
    // 例如两个监听者: 
    // count = 2
    // p pids[0] = 15446
    // p pids[1] = 23101

    // SendProcSignal(15446, PROCSIG_NOTIFY_INTERRUPT)
    // SendProcSignal(23101, PROCSIG_NOTIFY_INTERRUPT)  
第四步:(监听进程)被信号SIGUSR1中断,进入procsignal_sigusr1_handler
代码语言:javascript
复制
procsignal_sigusr1_handler
  if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
    HandleNotifyInterrupt();

HandleNotifyInterrupt函数配置标志位后就退出。notifyInterruptPending = true;

等流程走到信号处理函数在做处理。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0 总结速查
  • 1 背景
  • 2 使用案例
    • 2.1 PSQL
      • 2.2 LIBPQ使用案例
      • 3 内核代码分析
        • 3.1 listen监听
          • 3.2 notify通知
          相关产品与服务
          消息队列 CMQ 版
          消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档