接上一篇:Postgresql源码(39)备库startup启动和redo流程分析
提到备机startup进程等在这个堆栈里面:
#0 0x00007f66aef20913 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000084d7f2 in WaitEventSetWaitBlock (set=0x1ca0dc0, cur_timeout=5000, occurred_events=0x7ffc088b3130, nevents=1) at latch.c:1048
#2 0x000000000084d6cd in WaitEventSetWait (set=0x1ca0dc0, timeout=5000, occurred_events=0x7ffc088b3130, nevents=1, wait_event_info=83886088) at latch.c:1000
#3 0x000000000084ce76 in WaitLatchOrSocket (latch=0x2aaaaac0d254, wakeEvents=25, sock=-1, timeout=5000, wait_event_info=83886088) at latch.c:385
#4 0x000000000084cd57 in WaitLatch (latch=0x2aaaaac0d254, wakeEvents=25, timeout=5000, wait_event_info=83886088) at latch.c:339
#5 0x0000000000551751 in WaitForWALToBecomeAvailable (RecPtr=206024220672, randAccess=0 '\000', fetching_ckpt=0 '\000', tliRecPtr=206029051848) at xlog.c:12238
#6 0x0000000000550d79 in XLogPageRead (xlogreader=0x1cc9df0, targetPagePtr=206024212480, reqLen=8192, targetRecPtr=206029051848, readBuf=0x1ccae08 "\227\320\005", readTLI=0x1cca69c) at xlog.c:11707
#7 0x0000000000556c1f in ReadPageInternal (state=0x1cc9df0, pageptr=206029045760, reqLen=6112) at xlogreader.c:557
#8 0x0000000000556492 in XLogReadRecord (state=0x1cc9df0, RecPtr=206029051848, errormsg=0x7ffc088b3428) at xlogreader.c:276
#9 0x0000000000542c23 in ReadRecord (xlogreader=0x1cc9df0, RecPtr=0, emode=15, fetching_ckpt=0 '\000') at xlog.c:4232
#10 0x00000000005494eb in StartupXLOG () at xlog.c:7350
#11 0x00000000007e1b8b in StartupProcessMain () at startup.c:230
#12 0x000000000055d935 in AuxiliaryProcessMain (argc=2, argv=0x7ffc088b3fd0) at bootstrap.c:426
#13 0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#14 0x00000000007db927 in PostmasterMain (argc=1, argv=0x1c9fd80) at postmaster.c:1377
#15 0x0000000000719962 in main (argc=1, argv=0x1c9fd80) at main.c:228
备机使用latch机制等待新日志到来唤醒、处理。
本篇重点分析Wait这一系列Latch相关的函数。(latch.c)
本篇需要一点背景支持:《Postgresql的latch实现中self-pipe trick解决什么问题》
整体总结:
1、latch的实现(如果支持epoll的话)就是epoll_wait的封装 + 利用self-pipe,实现等锁唤醒的机制。
2、备机startup会等待recoveryWakeupLatch、POSTMASTER_FD_WATCH两个事件,两个事件其实都是管道的read端,然后用epoll_wait等待。
LF
初始化过程总结:
中会涉及三把latch锁:MyLatch、MyProc->procLatch、XLogCtl->recoveryWakeupLatch
LF
等锁相关总结:
1、WaitForWALToBecomeAvailable会循环调用WaitLatch等锁,具体等三件事情:recoveryWakeupLatch、postmaster_alive_fdsPOSTMASTER_FD_WATCH、超时(请见2.1WaitLatchOrSocket)
2、唤醒后把锁信息全部清理掉,并把epoll_create创建的fd关掉close(set->epoll_fd)。注意不会close监听的那两个fd。
3、在进入新一轮循环WaitLatch。
4、recoveryWakeupLatch在epoll_wait的时候,等的是Pipe的读端,应用了上面提到的self-pipe trick。
LF
等锁WaitLatchOrSocket流程总结:
0、(补充)WL_LATCH_SET时在AddWaitEventToSet中要监听的fd是selfpipe_readfd,也就是上面创建的管道读端,应用self-pipe trick
1、WaitLatchOrSocket完成了epoll的配置和等待
2、WaitLatchOrSocket中增加对&XLogCtl->recoveryWakeupLatch的等待,记录为一个wakeEvents
3、WaitLatchOrSocket中增加对postmaster_alive_fdsPOSTMASTER_FD_WATCH的等待,记录为一个wakeEvents
4、wakeEvents汇总到WaitEventSet中
5、调用epoll_wait等上面两把锁 或 超时唤醒
6、清理WaitEventSet
/* typedef in latch.h */
struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
/*
* Array, of nevents_space length, storing the definition of events this
* set is waiting for.
*/
WaitEvent *events;
/*
* If WL_LATCH_SET is specified in any wait event, latch is a pointer to
* said latch, and latch_pos the offset in the ->events array. This is
* useful because we check the state of the latch before performing doing
* syscalls related to waiting.
*/
Latch *latch; // 数组记录该set下所有的latch = event
int latch_pos;
int epoll_fd;
/* epoll_wait returns events in a user provided arrays, allocate once */
struct epoll_event *epoll_ret_events;
};
// 每个epoll事件对应一个,也对应一个latch
typedef struct WaitEvent
{
int pos; /* position in the event data structure */
uint32 events; /* triggered events */
pgsocket fd; /* socket fd associated with event */
void *user_data; /* pointer provided in AddWaitEventToSet */
} WaitEvent;
typedef struct Latch
{
sig_atomic_t is_set;
bool is_shared;
int owner_pid;
} Latch;
位置:
(gdb) bt
#0 InitializeLatchSupport () at latch.c:152
#1 0x00000000009ee63b in InitPostmasterChild () at miscinit.c:198
#2 0x00000000007e0a38 in StartChildProcess (type=StartupProcess) at postmaster.c:5453
#3 0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d40) at postmaster.c:1377
#4 0x0000000000719962 in main (argc=1, argv=0xf80d40) at main.c:228
在InitPostmasterChild中依次执行三步初始化:
InitializeLatchSupport();
MyLatch = &LocalLatchData;
InitLatch(MyLatch);
源码走读
第一步:初始化PIPE,信号到来时用管道唤醒io wait函数(为什么建非阻塞管道参考《Postgresql的latch实现中self-pipe trick解决什么问题》)
InitializeLatchSupport
pipe(pipefd)
fcntl(pipefd[0], F_SETFL, O_NONBLOCK)
fcntl(pipefd[1], F_SETFL, O_NONBLOCK)
fcntl(pipefd[0], F_SETFD, FD_CLOEXEC)
fcntl(pipefd[1], F_SETFD, FD_CLOEXEC)
selfpipe_readfd = pipefd[0]
selfpipe_writefd = pipefd[1]
selfpipe_owner_pid = MyProcPid
第二步:MyLatch = &LocalLatchData,内存指向私有全局变量
第三步:InitLatch
void
InitLatch(volatile Latch *latch)
{
latch->is_set = false;
latch->owner_pid = MyProcPid;
latch->is_shared = false;
}
(第三步的另一种方式)
InitSharedLatch
latch->is_set = false;
latch->owner_pid = 0;
latch->is_shared = true;
OwnLatch
latch->owner_pid = MyProcPid;
DisownLatch
latch->owner_pid = 0;
#0 OwnLatch (latch=0x2aaab4df6ea4) at latch.c:291
#1 0x0000000000863e47 in InitAuxiliaryProcess () at proc.c:574
#2 0x000000000055d8a7 in AuxiliaryProcessMain (argc=2, argv=0x7fffffffe0b0) at bootstrap.c:372
#3 0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#4 0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d50) at postmaster.c:1377
#5 0x0000000000719962 in main (argc=1, argv=0xf80d50) at main.c:228
【MyProc->procLatch】在辅助进程初始化中做两步latch初始化:
OwnLatch(&MyProc->procLatch);
latch->owner_pid = MyProcPid;
SwitchToSharedLatch();
MyLatch = &MyProc->procLatch;
/* Sets a latch and wakes up anyone waiting on it */
/* This is cheap if the latch is already set, otherwise not so much */
SetLatch(MyLatch);
pg_memory_barrier();
if (latch->is_set) // 已经SET了直接返回
return;
latch->is_set = true; // 没SET给SET进去
如果是自己进程owner:给Pipe发1字节
如果是其他进程owner:给其他进程发sigusr1
如果是0进程owner:返回
将共享latch配置上pid
#0 OwnLatch (latch=0x2aaaaac0d254) at latch.c:291
#1 0x000000000054797a in StartupXLOG () at xlog.c:6425
#2 0x00000000007e1b8b in StartupProcessMain () at startup.c:230
#3 0x000000000055d935 in AuxiliaryProcessMain (argc=2, argv=0x7fffffffe0b0) at bootstrap.c:426
#4 0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#5 0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d50) at postmaster.c:1377
#6 0x0000000000719962 in main (argc=1, argv=0xf80d50) at main.c:228
总结:
1、WaitForWALToBecomeAvailable会循环调用WaitLatch等锁,具体等三件事情:recoveryWakeupLatch、postmaster_alive_fdsPOSTMASTER_FD_WATCH、超时(请见2.1WaitLatchOrSocket)
2、唤醒后把锁信息全部清理掉,并把epoll_create创建的fd关掉close(set->epoll_fd)。注意不会close监听的那两个fd。
3、在进入新一轮循环WaitLatch。
4、recoveryWakeupLatch在epoll_wait的时候,等的是Pipe的读端,应用了上面提到的self-pipe trick。
分析:
第一次进入等锁堆栈:
#0 WaitLatch (latch=0x2aaaaac0d254, wakeEvents=25, timeout=5000, wait_event_info=83886088) at latch.c:339
#1 0x0000000000551751 in WaitForWALToBecomeAvailable (RecPtr=206024220672, randAccess=0 '\000', fetching_ckpt=0 '\000',
tliRecPtr=206029093456) at xlog.c:12238
#2 0x0000000000550d79 in XLogPageRead (xlogreader=0xfaade0, targetPagePtr=206024212480, reqLen=8192, targetRecPtr=206029093456,
readBuf=0xfabdf8 "\227\320\005", readTLI=0xfab68c) at xlog.c:11707
#3 0x0000000000556c1f in ReadPageInternal (state=0xfaade0, pageptr=206029086720, reqLen=6760) at xlogreader.c:557
#4 0x0000000000556492 in XLogReadRecord (state=0xfaade0, RecPtr=206029093456, errormsg=0x7fffffffd508) at xlogreader.c:276
#5 0x0000000000542c23 in ReadRecord (xlogreader=0xfaade0, RecPtr=0, emode=15, fetching_ckpt=0 '\000') at xlog.c:4232
#6 0x00000000005494eb in StartupXLOG () at xlog.c:7350
#7 0x00000000007e1b8b in StartupProcessMain () at startup.c:230
#8 0x000000000055d935 in AuxiliaryProcessMain (argc=2, argv=0x7fffffffe0b0) at bootstrap.c:426
#9 0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#10 0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d50) at postmaster.c:1377
#11 0x0000000000719962 in main (argc=1, argv=0xf80d50) at main.c:228
WaitForWALToBecomeAvailable函数进入等待事件中:
这里等待的是recoveryWakeupLatch:
WaitLatch(
&XLogCtl->recoveryWakeupLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
5000L,
WAIT_EVENT_RECOVERY_WAL_ALL
)
继续调用:
WaitLatchOrSocket(
latch = &XLogCtl->recoveryWakeupLatch,
wakeEvents = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
PGINVALID_SOCKET,
timeout = 5000L,
wait_event_info = WAIT_EVENT_RECOVERY_WAL_ALL
)
**注意传入的锁:&XLogCtl->recoveryWakeupLatch
先回忆下epoll怎么用? 1、epoll_create1(size)创建epollfd,给的size只是参考值,注意create会占用一个fd 2、epoll_ctl(epollfd上面创建的fd,行为ADD,监听FD,epoll_event监听什么事件) 3、唤醒的nfds = epoll_wait(传入epollfd上面创建的fd,返回唤醒的events,传入监听最大fd数量,传入timeout)
总结:
0、(补充)WL_LATCH_SET时在AddWaitEventToSet中要监听的fd是selfpipe_readfd,也就是上面创建的管道读端,应用self-pipe trick
1、WaitLatchOrSocket完成了epoll的配置和等待
2、WaitLatchOrSocket中增加对&XLogCtl->recoveryWakeupLatch的等待,记录为一个wakeEvents
3、WaitLatchOrSocket中增加对postmaster_alive_fdsPOSTMASTER_FD_WATCH的等待,记录为一个wakeEvents
4、wakeEvents汇总到WaitEventSet中
5、调用epoll_wait等上面两把锁 或 超时唤醒
6、清理WaitEventSet
wakeEvents = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH
int
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
long timeout, uint32 wait_event_info)
{
int ret = 0;
int rc;
WaitEvent event;
WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
/******************************************************************
CreateWaitEventSet展开:构造WaitEventSet
WaitEventSet *set
// 内存结构:
set->
sz += MAXALIGN(sizeof(WaitEventSet)) // 整体分配一个WaitEventSet
set->events->
sz += MAXALIGN(sizeof(WaitEvent) * nevents) // 每个事件有一个WaitEvent
set->epoll_ret_events->
sz += MAXALIGN(sizeof(struct epoll_event) * nevents) // 要监听的3个事件
set->latch = NULL
set->nevents_space = nevents
set->epoll_fd = epoll_create1(EPOLL_CLOEXEC) // 200w个
******************************************************************/
if (wakeEvents & WL_TIMEOUT)
Assert(timeout >= 0);
else
timeout = -1;
if (wakeEvents & WL_LATCH_SET)
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
(Latch *) latch, NULL);
/******************************************************************
WL_LATCH_SET会进入这个分支:
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)
1、现在的latch={is_set = 0, is_shared = 1 '\001', owner_pid = 30877}, 30877是startup的pid
2、开始拼WaitEvent *event;
event = &set->events[set->nevents]
...
event->fd = selfpipe_readfd ***********注意这里监控的是管道的读端
...
//set: {nevents = 1, nevents_space = 3, events = 0xf81dd8, latch = 0x2aaaaac0d254, latch_pos = 0, epoll_fd = 7, epoll_ret_events = 0xf81e20}
//event: {pos = 0, events = 1, fd = 13, user_data = 0x0}
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD)
epoll_event epoll_ev:
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev)
******************************************************************/
if (wakeEvents & WL_POSTMASTER_DEATH && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
NULL, NULL);
/******************************************************************
WL_POSTMASTER_DEATH进入这个分支
和上流程相同,不同的是event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]
******************************************************************/
if (wakeEvents & WL_SOCKET_MASK)
{
int ev;
ev = wakeEvents & WL_SOCKET_MASK;
AddWaitEventToSet(set, ev, sock, NULL, NULL);
}
rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
/******************************************************************
开始等待:
类似epoll的函数构造,传入上面构造好的set,可能记录多个event。 传出event唤醒的事件。
进入
rc = WaitEventSetWaitBlock(set, cur_timeout,occurred_events, nevents);
epoll_wait(set->epoll_fd, set->epoll_ret_events, nevents, cur_timeout)
等5秒唤醒 rc == 0 return -1;
******************************************************************/
if (rc == 0)
ret |= WL_TIMEOUT;
else
{
ret |= event.events & (WL_LATCH_SET |
WL_POSTMASTER_DEATH |
WL_SOCKET_MASK);
}
FreeWaitEventSet(set);
/******************************************************************
释放刚刚epoll_create1创建的epoll_fd
close(set->epoll_fd)
释放整体
pfree(set)
******************************************************************/
return ret;
}
Startup唤醒
/* SIGUSR2: set flag to finish recovery */
StartupProcTriggerHandler
/* SIGHUP: set flag to re-read config file at next convenient time */
StartupProcSigHupHandler
/* SIGTERM: set flag to abort redo and exit */
StartupProcShutdownHandler
WakeupRecovery
wal receiver唤醒
// Wait for startup process to set receiveStart and receiveStartTLI.
WalRcvWaitForStartPosition
// Mark us as STOPPED in shared memory at exit.
WalRcvDie
// Flush the log to disk.
XLogWalRcvFlush
WakeupRecovery
SetLatch(&XLogCtl->recoveryWakeupLatch)