前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PostgreSQL进程功能源码分析

PostgreSQL进程功能源码分析

作者头像
用户4700054
发布2022-08-17 13:23:21
4850
发布2022-08-17 13:23:21
举报
文章被收录于专栏:存储内核技术交流

Postgres服务端后台进程功能概览

  • 在PG14版本中定义了如上图中的进程基本的描述信息
代码语言:javascript
复制
// 这里定义了PG基本的进程类型const char *GetBackendTypeDesc(BackendType backendType){
	const char *backendDesc = "unknown process type";

	switch (backendType)
	{
		case B_INVALID:
			backendDesc = "not initialized";
			break;
		case B_AUTOVAC_LAUNCHER:
			backendDesc = "autovacuum launcher";
			break;
		case B_AUTOVAC_WORKER:
			backendDesc = "autovacuum worker";
			break;
		case B_BACKEND:
			backendDesc = "client backend";
			break;
		case B_BG_WORKER:
			backendDesc = "background worker";
			break;
		case B_BG_WRITER:
			backendDesc = "background writer";
			break;
		case B_CHECKPOINTER:
			backendDesc = "checkpointer";
			break;
		case B_STARTUP:
			backendDesc = "startup";
			break;
		case B_WAL_RECEIVER:
			backendDesc = "walreceiver";
			break;
		case B_WAL_SENDER:
			backendDesc = "walsender";
			break;
		case B_WAL_WRITER:
			backendDesc = "walwriter";
			break;
		case B_ARCHIVER:
			backendDesc = "archiver";
			break;
		case B_STATS_COLLECTOR:
			backendDesc = "stats collector";
			break;
		case B_LOGGER:
			backendDesc = "logger";
			break;
	}

	return backendDesc;}

后台进程启动流程

  • postgres数据库启动后会在ServerLoop中不断的监听来自客户端的第一次IO请求,然后在创建客户端进程,接着判断整个postgres中的辅助的后台进程是否存在,如果不存在会自动拉起这个进程。这种模式和oracle类似
代码语言:javascript
复制
// PG启动后的非常核心的监听循环static int ServerLoop(void){

	nSockets = initMasks(&readmask);
	// for循环监听是否有来自客户端的请求
	for (;;)
	{
		fd_set		rmask;
		int			selres;
		time_t		now;
		// 这里采用selelct模型,因为有客户端就有创建进程,select的模型是for循环遍历,可以优化为epoll模型
		selres = select(nSockets, &rmask, NULL, NULL, &timeout);
	
		if (selres > 0)
		{
			int			i;

			for (i = 0; i < MAXLISTEN; i++)
			{
				// 如果有新客户端进来则创建新的后台进程,专门为客户端服务
				if (FD_ISSET(ListenSocket[i], &rmask))
				{
					Port	   *port;
					// 创建为客户端服务的进程
					port = ConnCreate(ListenSocket[i]);
					if (port)
					{
						// 启动后台进程
						BackendStartup(port);
						StreamClose(port->sock);
						ConnFree(port);
					}
				}
			}
		}

		// 如果日志进程不存在,且开启了日志进程则进行创建
		if (SysLoggerPID == 0 && Logging_collector)
			SysLoggerPID = SysLogger_Start();

		// 检查checkpoint进程和后台磁盘写进程,如果不存在则创建
		if (pmState == PM_RUN || pmState == PM_RECOVERY ||
			pmState == PM_HOT_STANDBY)
		{
			if (CheckpointerPID == 0)
				CheckpointerPID = StartCheckpointer();
			if (BgWriterPID == 0)
				BgWriterPID = StartBackgroundWriter();
		}

		// 如果wal写进程不存在,则创建wal写进程
		if (WalWriterPID == 0 && pmState == PM_RUN)
			WalWriterPID = StartWalWriter();

		// 判断vacuum的守护进程,如果不存在则启动
		if (!IsBinaryUpgrade && AutoVacPID == 0 &&
			(AutoVacuumingActive() || start_autovac_launcher) &&
			pmState == PM_RUN)
		{
			AutoVacPID = StartAutoVacLauncher();
			if (AutoVacPID != 0)
				start_autovac_launcher = false; /* signal processed */
		}

		// 判断统计信息收集进程不存在,则启动统计信息收集进程
		if (PgStatPID == 0 &&
			(pmState == PM_RUN || pmState == PM_HOT_STANDBY))
			PgStatPID = pgstat_start();

		// 如果配置归档进程,如果不存在则启动归档进程
		if (PgArchPID == 0 && PgArchStartupAllowed())
			PgArchPID = StartArchiver();

		

		// 如果是备库,则启动wal receiver进程
		if (WalReceiverRequested)
			MaybeStartWalReceiver();

		// 启动其他的辅助进程,如果存在crash的情况
		if (StartWorkerNeeded || HaveCrashedWorker)
			maybe_start_bgworkers();

		// 计算时间的间隔
		now = time(NULL);
		if (now - last_touch_time >= 58 * SECS_PER_MINUTE)
		{
			TouchSocketFiles();
			TouchSocketLockFiles();
			last_touch_time = now;
		}
	}}

autovacuum launcher主要做了什么?

  • autovacuum launcher进程可以理解为vacuum进程的守护进程,根据参数配置和负载动态的创建vacuum进程,这个核心逻辑在AutoVacLauncherMain函数中
代码语言:javascript
复制
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]){
	
	if (!AutoVacuumingActive())
	{
		// 如果么有stop的请求,就直接创建vacuum工作进程
		if (!ShutdownRequestPending)
			do_start_worker();
		proc_exit(0);			/* done */
	}}

background worker主要做了什么?

  • 后台刷脏进程核心工作是从shard buffer pool中把脏的page刷新到磁盘,目的是尽可能的利用好shard buffer pool的内存缓冲区。后台刷脏的核心工作定义在BackgroundWriterMain
代码语言:javascript
复制
void BackgroundWriterMain(void){
	// 核心的loop,不断的同步脏page到磁盘
	for (;;)
	{
		bool		can_hibernate;
		int			rc;
		ResetLatch(MyLatch);
		// 刷脏的动作
		can_hibernate = BgBufferSync(&wb_context);

	
		// 发送统计数据给统计信息的进程
		pgstat_send_bgwriter();

		// 如果没有脏page,就休眠一会
		rc = WaitLatch(MyLatch,
					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
					   BgWriterDelay /* ms */ , WAIT_EVENT_BGWRITER_MAIN);
		prev_hibernate = can_hibernate;
	}}

autovacuum worker主要做了什么?

  • vacuum进程的主要目的是回收当前数据库中的已经废弃或者我用的数据,目前有两种模式,一种是回收无用page的空间并不归还给操作系统,这样设计为后续再插入新的page就不用分配空间;第二种是扫描表中无用的page,重新写到新的page中,然后把无用的page的空间归还给操作系统。
代码语言:javascript
复制
// 启动vacuum进程回收无用的page空间static Oid do_start_worker(void){


	// 获取当前的统计信息
	autovac_refresh_stats();

	// 获取数据库列表
	dblist = get_database_list();

	// 查找可以被vacuum的数据库
	foreach(cell, dblist)
	{
		avw_dbase  *tmp = lfirst(cell);
		dlist_iter	iter;
		skipit = false;
		dlist_reverse_foreach(iter, &DatabaseList)
		{
			avl_dbase  *dbp = dlist_container(avl_dbase, adl_node, iter.cur);

			if (dbp->adl_datid == tmp->adw_datid)
			{
			
				if (!TimestampDifferenceExceeds(dbp->adl_next_worker,
												current_time, 0) &&
					!TimestampDifferenceExceeds(current_time,
												dbp->adl_next_worker,
												autovacuum_naptime * 1000))
					skipit = true;

				break;
			}
		}
		if (skipit)
			continue;
		if (avdb == NULL ||
			tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
			avdb = tmp;
	}


	// 查找到数据库然后进行处理
	if (avdb != NULL)
	{
		WorkerInfo	worker;
		dlist_node *wptr;

		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);

	
		wptr = dlist_pop_head_node(&AutoVacuumShmem->av_freeWorkers);

		worker = dlist_container(WorkerInfoData, wi_links, wptr);
		worker->wi_dboid = avdb->adw_datid;
		worker->wi_proc = NULL;
		worker->wi_launchtime = GetCurrentTimestamp();

		AutoVacuumShmem->av_startingWorker = worker;

		LWLockRelease(AutovacuumLock);

		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);

		retval = avdb->adw_datid;
	}

	return retval;}

checkpointer主要做了什么?

  • 假设数据库不断的写wal日志,很多脏page都没有刷新到磁盘,一旦数据库crash,需要从头开始进行数据库恢复,这样数据库恢复的时间非常长;如果有了checkpointer进程定期涉及到wal日志文件中,缓存池中的脏page,定期刷新到磁盘,并在wal中做好记录说明脏page刷到哪个位置,即使数据库崩溃,可以从上一次的checkpointer点进行恢复,这样能大大减少数据库恢复的时间。
代码语言:javascript
复制
// checkpoint进程的工作入口void CheckpointerMain(void){

	/*
	 * Loop forever
	 */
	for (;;)
	{
		bool		do_checkpoint = false;
		int			flags = 0;
		pg_time_t	now;
		int			elapsed_secs;
		int			cur_timeout;

		/* Clear any already-pending wakeups */
		ResetLatch(MyLatch);

		/*
		 * Process any requests or signals received recently.
		 */
		AbsorbSyncRequests();
		HandleCheckpointerInterrupts();

		/*
		 * Detect a pending checkpoint request by checking whether the flags
		 * word in shared memory is nonzero.  We shouldn't need to acquire the
		 * ckpt_lck for this.
		 */
		if (((volatile CheckpointerShmemStruct *) CheckpointerShmem)->ckpt_flags)
		{
			do_checkpoint = true;
			BgWriterStats.m_requested_checkpoints++;
		}

		/*
		 * Force a checkpoint if too much time has elapsed since the last one.
		 * Note that we count a timed checkpoint in stats only when this
		 * occurs without an external request, but we set the CAUSE_TIME flag
		 * bit even if there is also an external request.
		 */
		now = (pg_time_t) time(NULL);
		elapsed_secs = now - last_checkpoint_time;
		if (elapsed_secs >= CheckPointTimeout)
		{
			if (!do_checkpoint)
				BgWriterStats.m_timed_checkpoints++;
			do_checkpoint = true;
			flags |= CHECKPOINT_CAUSE_TIME;
		}

		/*
		 * Do a checkpoint if requested.
		 */
		if (do_checkpoint)
		{
			bool		ckpt_performed = false;
			bool		do_restartpoint;

			// 是否需要进行recovery
			do_restartpoint = RecoveryInProgress();

			// 执行检查点行为
			if (!do_restartpoint)
			{
				CreateCheckPoint(flags);
				ckpt_performed = true;
			}
			else
				ckpt_performed = CreateRestartPoint(flags);

			
			if (ckpt_performed)
			{
				last_checkpoint_time = now;
			}
			else
			{
				last_checkpoint_time = now - CheckPointTimeout + 15;
			}

			ckpt_active = false;
		}

		// 必要的情况下切换xlog
		CheckArchiveTimeout();

		// 发送统计信息给后台刷脏进程
		pgstat_send_bgwriter();


		// 发送wal的统计信息给统计信息进程
		pgstat_send_wal(true);

		// 完成后短暂休眠
		(void) WaitLatch(MyLatch,
						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
						 cur_timeout * 1000L /* convert to ms */ ,
						 WAIT_EVENT_CHECKPOINTER_MAIN);
	}}

walwriter主要做了什么?

  • postgre数据写到内存之前是先把数据写到wal日志,wal日志是write-ahead-log。目的是为了防止内存中的已提交或者未提交的page掉电而引起数据丢失。wal写进程是不断的把wal buffer中的日志数据不断的刷盘到wal日志文件中。
代码语言:javascript
复制
// wal日志写进程核心逻辑void WalWriterMain(void){
	// Wal进程的LOOP循环
	for (;;)
	{
		long		cur_timeout;
		// 后台刷新Wal buffer中的数据到Wal日志
		if (XLogBackgroundFlush())
			left_till_hibernate = LOOPS_UNTIL_HIBERNATE;
		else if (left_till_hibernate > 0)
			left_till_hibernate--;

		// 发送Wal的日志统计信息给统计信息进程
		pgstat_send_wal(false);
		// 休眠短暂时间
		(void) WaitLatch(MyLatch,
						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
						 cur_timeout,
						 WAIT_EVENT_WAL_WRITER_MAIN);
	}}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 存储内核技术交流 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Postgres服务端后台进程功能概览
  • 后台进程启动流程
  • autovacuum launcher主要做了什么?
  • background worker主要做了什么?
  • autovacuum worker主要做了什么?
  • checkpointer主要做了什么?
  • walwriter主要做了什么?
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档