前文:https://cloud.tencent.com/developer/article/2000681
XLOG注册好数据后,开始执行组装(XLogRecordAssemble)和写入(XLogInsertRecord)
XLogInsertRecord
XLogRecordAssemble
XLogInsertRecord
XLogInsertRecord函数接受已经组装好的XLOG(rdt链),计算插入位置执行insert。
例如当前SQL为一条insert,当前rdt链的状态:
(gdb) p *rdata
$18 = {next = 0x2058108, data = 0x2058308 ";", len = 46}
(gdb) p *rdata->next
$19 = {next = 0x2058120, data = 0x7ffccf66fed0 "\001", len = 5}
(gdb) p *rdata->next->next
$20 = {next = 0x20580f0, data = 0x20b74a7 "", len = 5}
(gdb) p *rdata->next->next->next
$21 = {next = 0x0, data = 0x7ffccf66fee0 "L", len = 3}
rdt的第一个元素头部保存了header结构:XLogRecord
(gdb) p *(XLogRecord *) rdata->data
$22 = {xl_tot_len = 59, xl_xid = 1836094, xl_prev = 0, xl_info = 0 '\000', xl_rmid = 10 '\n', xl_crc = 446857048}
xl_tot_len = 59 = 46 + 5 + 5 + 3
// do not switch xlog
XLogInsertRecord
...
WALInsertLockAcquire
...
// size=59, StartPos=0x7ffccf66fe10, EndPos=0x7ffccf66fe08, PrevPtr=0x2058310
ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, &rechdr->xl_prev);
...
static void
ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
XLogRecPtr *PrevPtr)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
size = MAXALIGN(size);
...
SpinLockAcquire(&Insert->insertpos_lck);
startbytepos = Insert->CurrBytePos;
endbytepos = startbytepos + size;
prevbytepos = Insert->PrevBytePos;
Insert->CurrBytePos = endbytepos;
Insert->PrevBytePos = startbytepos;
SpinLockRelease(&Insert->insertpos_lck);
*StartPos = XLogBytePosToRecPtr(startbytepos);
*EndPos = XLogBytePosToEndRecPtr(endbytepos);
*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
...
}
有关Insert->CurrBytePos的几点:
得到逻辑位置的起点和终点后,需要计算按段、页管理的XLOG物理位置:
https://www.interdb.jp/pg/pgsql09.html
逻辑位置 转换为 物理位置:(不包含XLogPHD的位置)转换为(包含XlogPHD的位置)
static XLogRecPtr
XLogBytePosToEndRecPtr(uint64 bytepos)
{
uint64 fullsegs;
uint64 fullpages;
uint64 bytesleft;
uint32 seg_offset;
XLogRecPtr result;
/* 找到所在的物理段号,UsableBytesInSegment:一个段内有效空间的大小 */
fullsegs = bytepos / UsableBytesInSegment;
/* 找到在段中的偏移量,非物理位置,需要后面调整 */
bytesleft = bytepos % UsableBytesInSegment;
/* 如果对16728048取余后小于8152,定位到段的第一个页面上,需要配一个LONG HEADER */
if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
{
/* fits on first page of segment */
/* 实际使用位置需要从逻辑位置向后偏移SizeOfXLogLongPHD */
seg_offset = bytesleft + SizeOfXLogLongPHD;
}
else
/* 定位到段非第一个页面上,需要配一个HEADER */
{
/* account for the first page on segment with long header */
// 第一个页面的HEADER长度特殊,所以先偏移到第二个页面起点开始计算
// seg_offset是端内物理偏移
seg_offset = XLOG_BLCKSZ;
// 偏移量计算需要使用bytesleft,已经偏移到第二个页面起点开始算,所以这里减去第一个页面的有效荷载
bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
// 算段内的页面号 和 页面内偏移
fullpages = bytesleft / UsableBytesInPage;
bytesleft = bytesleft % UsableBytesInPage;
// 段内的物理偏移
seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
}
// #define XLogSegNoOffsetToRecPtr(segno, offset, dest) (dest) = (segno) * XLOG_SEG_SIZE + (offset)
// 段号 * 段大小 + 段内偏移 = 物理位置
XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
return result;
}
UsableBytesInSegment的含义:一个段内有效空间的大小
#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
// UsableBytesInSegment = (XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD)
// UsableBytesInSegment = (16 * 1024 * 1024 / 8192) * (8192 - SizeOfXLogShortPHD) - (XlogHeaderDiff)
// UsableBytesInSegment = 页面数量 * 每个页面除了HEADER之外的空间 - 第一个页面的HEADER多出来的一部分
// UsableBytesInSegment = 一个SEG内能用来保存XLOG的空间总大小
// UsableBytesInSegment = 16728048
// 如果没有HEADER,UsableBytesInSegment = 16 * 1024 * 1024 = 16777216
UsableBytesInSegment = 下图中物理地址所有红色部分(除了每个8k带的PAGE HEADER之外,能保存XLOG的空间)
开始写入XLog
CopyXLogRecordToWAL (
write_len=59, isLogSwitch=0 '\000', rdata=0xf16ab0 <hdr_rdt>, StartPos=32430394688,EndPos=32430394752)
注意rdt链的状态没有任何变化,所以xlog组装后就挂在rdt链上直接等待写入,后面没有在加工的步骤了:
当前rdt链的状态:
(gdb) p *rdata
$18 = {next = 0x2058108, data = 0x2058308 ";", len = 46}
(gdb) p *rdata->next
$19 = {next = 0x2058120, data = 0x7ffccf66fed0 "\001", len = 5}
(gdb) p *rdata->next->next
$20 = {next = 0x20580f0, data = 0x20b74a7 "", len = 5}
(gdb) p *rdata->next->next->next
$21 = {next = 0x0, data = 0x7ffccf66fee0 "L", len = 3}
遍历rdt链写入
static void
CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
XLogRecPtr StartPos, XLogRecPtr EndPos)
{
char *currpos;
int freespace;
int written;
XLogRecPtr CurrPos;
XLogPageHeader pagehdr;
CurrPos = StartPos;
/* 找到BUFFER位点写入数据 */
currpos = GetXLogBuffer(CurrPos);
freespace = INSERT_FREESPACE(CurrPos);
written = 0;
while (rdata != NULL)
{
char *rdata_data = rdata->data;
int rdata_len = rdata->len;
while (rdata_len > freespace)
{
Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
memcpy(currpos, rdata_data, freespace);
rdata_data += freespace;
rdata_len -= freespace;
written += freespace;
CurrPos += freespace;
currpos = GetXLogBuffer(CurrPos);
pagehdr = (XLogPageHeader) currpos;
pagehdr->xlp_rem_len = write_len - written;
pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
/* skip over the page header */
if (CurrPos % XLogSegSize == 0)
{
CurrPos += SizeOfXLogLongPHD;
currpos += SizeOfXLogLongPHD;
}
else
{
CurrPos += SizeOfXLogShortPHD;
currpos += SizeOfXLogShortPHD;
}
freespace = INSERT_FREESPACE(CurrPos);
}
Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
memcpy(currpos, rdata_data, rdata_len);
currpos += rdata_len;
CurrPos += rdata_len;
freespace -= rdata_len;
written += rdata_len;
rdata = rdata->next;
}
Assert(written == write_len);
...
}