BufferSync函数写入每个页面之后,都会调用CheckpointWriteDelay来控制BufferSync的写入频率,我们知道PG提供参数checkpoint_completion_target控制chk的刷盘速度,具体是如何实现的?
checkpoint_completion_target (floating point) Specifies the target of checkpoint completion, as a fraction of total time between checkpoints. The default is 0.5. This parameter can only be set in the postgresql.conf file or on the server command line.
调度功能最终算法实现在IsCheckpointOnSchedule中,调度主动延迟由CheckpointWriteDelay函数实现,下面重点分析这两个函数
调用路径:
CreateCheckPoint:启动检查点
|
CheckPointGuts:检查点流程入口
|
CheckPointBuffers:刷缓存页入口
|
BufferSync:刷缓存页具体函数
|
CheckpointWriteDelay:主动延迟函数 <-------- 本篇涉及
|
IsCheckpointOnSchedule:主动延迟函数的判断算法 <-------- 本篇涉及
如何控制频率?从CheckpointWriteDelay看起
/*
* CheckpointWriteDelay -- control rate of checkpoint
*
* This function is called after each page write performed by BufferSync().
* It is responsible for throttling BufferSync()'s write rate to hit
* checkpoint_completion_target.
*
* The checkpoint request flags should be passed in; currently the only one
* examined is CHECKPOINT_IMMEDIATE, which disables delays between writes.
*
* 'progress' is an estimate of how much of the work has been done, as a
* fraction between 0.0 meaning none, and 1.0 meaning all done.
*/
void
CheckpointWriteDelay(int flags, double progress)
{
static int absorb_counter = WRITES_PER_ABSORB;
/* Do nothing if checkpoint is being executed by non-checkpointer process */
if (!AmCheckpointerProcess())
return;
/*
* Perform the usual duties and take a nap, unless we're behind schedule,
* in which case we just try to catch up as quickly as possible.
*/
if (!(flags & CHECKPOINT_IMMEDIATE) &&
!shutdown_requested &&
!ImmediateCheckpointRequested() &&
IsCheckpointOnSchedule(progress))
IsCheckpointOnSchedule控制是否进入执行延迟,算法在内部实现。
{
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
/* update shmem copies of config variables */
UpdateSharedMemoryConfig();
}
AbsorbFsyncRequests();
absorb_counter = WRITES_PER_ABSORB;
CheckArchiveTimeout();
/*
* Report interim activity statistics to the stats collector.
*/
pgstat_send_bgwriter();
/*
* This sleep used to be connected to bgwriter_delay, typically 200ms.
* That resulted in more frequent wakeups if not much work to do.
* Checkpointer and bgwriter are no longer related so take the Big
* Sleep.
*/
pg_usleep(100000L);
每次进入延迟100 000 us = 100 ms
}
else if (--absorb_counter <= 0)
{
/*
* Absorb pending fsync requests after each WRITES_PER_ABSORB write
* operations even when we don't sleep, to prevent overflow of the
* fsync request queue.
*/
AbsorbFsyncRequests();
absorb_counter = WRITES_PER_ABSORB;
}
}
CheckPointCompletionTarget是PG参数checkpoint_completion_target对应的变量
{
{"checkpoint_completion_target", PGC_SIGHUP, WAL_CHECKPOINTS,
gettext_noop("Time spent flushing dirty buffers during checkpoint, as fraction of checkpoint interval."),
NULL
},
&CheckPointCompletionTarget,
0.5, 0.0, 1.0,
NULL, NULL, NULL
},
函数IsCheckpointOnSchedule,返回true表示计算的进度超过预期进度,可以sleep。
/*
* IsCheckpointOnSchedule -- are we on schedule to finish this checkpoint
* (or restartpoint) in time?
*
* Compares the current progress against the time/segments elapsed since last
* checkpoint, and returns true if the progress we've made this far is greater
* than the elapsed time/segments.
*/
static bool
IsCheckpointOnSchedule(double progress)
{
XLogRecPtr recptr;
struct timeval now;
double elapsed_xlogs,
elapsed_time;
Assert(ckpt_active);
/* Scale progress according to checkpoint_completion_target. */
progress *= CheckPointCompletionTarget;
/*
* Check against the cached value first. Only do the more expensive
* calculations once we reach the target previously calculated. Since
* neither time or WAL insert pointer moves backwards, a freshly
* calculated value can only be greater than or equal to the cached value.
*/
if (progress < ckpt_cached_elapsed)
return false;
ckpt_cached_elapsed记录了上次计算出来的结果(按上次的时间点计算出来的应该打到的目标),如果当前的进展还是小于上次的位点,那么不能sleep。
/*
* Check progress against WAL segments written and CheckPointSegments.
*
* We compare the current WAL insert location against the location
* computed before calling CreateCheckPoint. The code in XLogInsert that
* actually triggers a checkpoint when CheckPointSegments is exceeded
* compares against RedoRecptr, so this is not completely accurate.
* However, it's good enough for our purposes, we're only calculating an
* estimate anyway.
*
* During recovery, we compare last replayed WAL record's location with
* the location computed before calling CreateRestartPoint. That maintains
* the same pacing as we have during checkpoints in normal operation, but
* we might exceed max_wal_size by a fair amount. That's because there can
* be a large gap between a checkpoint's redo-pointer and the checkpoint
* record itself, and we only start the restartpoint after we've seen the
* record itself, and we only start the restartpoint after we've seen the
* checkpoint record. (The gap is typically up to CheckPointSegments *
* checkpoint_completion_target where checkpoint_completion_target is the
* value that was in effect when the WAL was generated).
*/
if (RecoveryInProgress())
recptr = GetXLogReplayRecPtr(NULL);
else
recptr = GetInsertRecPtr();
elapsed_xlogs = (((double) (recptr - ckpt_start_recptr)) / XLogSegSize) / CheckPointSegments;
if (progress < elapsed_xlogs)
{
ckpt_cached_elapsed = elapsed_xlogs;
return false;
}
/*
* Check progress against time elapsed and checkpoint_timeout.
*/
gettimeofday(&now, NULL);
elapsed_time = ((double) ((pg_time_t) now.tv_sec - ckpt_start_time) +
now.tv_usec / 1000000.0) / CheckPointTimeout;
看一下核心的计算方法:
if (progress < elapsed_time)
{
ckpt_cached_elapsed = elapsed_time;
return false;
}
/* It looks like we're on schedule. */
return true;
}
如果从时间维度看,当前的处理进度 小于 预期的处理进度,返回false,表示不能sleep。
progress = (double) num_processed / num_to_scan