相关 《Postgresql源码(76)执行器专用元组格式TupleTableSlot》 《Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果》 《Postgresql源码(83)执行器的结果接收系统——DestReceiver》
执行器的工作包括:work、get result,之前work的内容已经介绍过了,这里分析下执行器如何拿到执行结果。
PG的结果接收器提供了四个接口:
struct _DestReceiver
{
/* Called for each tuple to be output: */
bool (*receiveSlot) (TupleTableSlot *slot,
DestReceiver *self);
/* Per-executor-run initialization and shutdown: */
void (*rStartup) (DestReceiver *self,
int operation,
TupleDesc typeinfo);
void (*rShutdown) (DestReceiver *self);
/* Destroy the receiver object itself (if dynamically allocated) */
void (*rDestroy) (DestReceiver *self);
/* CommandDest code for this receiver */
CommandDest mydest;
/* Private fields might appear beyond this point... */
};
这一组函数接口由printtup_create_DR配置:
DestReceiver *
printtup_create_DR(CommandDest dest)
{
DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup));
self->pub.receiveSlot = printtup; /* might get changed later */
self->pub.rStartup = printtup_startup;
self->pub.rShutdown = printtup_shutdown;
self->pub.rDestroy = printtup_destroy;
self->pub.mydest = dest;
/*
* Send T message automatically if DestRemote, but not if
* DestRemoteExecute
*/
self->sendDescrip = (dest == DestRemote);
self->attrinfo = NULL;
self->nattrs = 0;
self->myinfo = NULL;
self->buf.data = NULL;
self->tmpcontext = NULL;
return (DestReceiver *) self;
}
注意:这里的数据结构DR_printtup把DestReceiver有包装了 一层:
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
Portal portal; /* the Portal we are printing from */
bool sendDescrip; /* send RowDescription at startup? */
TupleDesc attrinfo; /* The attr info we are set up for */
int nattrs;
PrinttupAttrInfo *myinfo; /* Cached info about each attr */
StringInfoData buf; /* output buffer (*not* in tmpcontext) */
MemoryContext tmpcontext; /* Memory context for per-row workspace */
} DR_printtup;
来看下这几个函数的工作位置和流程,例如:
select s::int, left(random()::text,4) l from generate_series(1,2) s;
-- 输出:两行、两列
s | l
---+------
1 | 0.55
2 | 0.28
位置
#0 printtup_startup (self=0x106dd50, operation=1, typeinfo=0x100d190) at printtup.c:113
#1 0x0000000000733ddf in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:350
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s\n;") at postgres.c:1213
#6 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
static void
printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
DR_printtup *myState = (DR_printtup *) self;
Portal portal = myState->portal;
/*
* Create I/O buffer to be used for all messages. This cannot be inside
* tmpcontext, since we want to re-use it across rows.
*/
initStringInfo(&myState->buf);
申请"printtup"上下文,存放所有输出数据,
myState->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"printtup",
ALLOCSET_DEFAULT_SIZES);
启动时就需要发送行描述符:
if (myState->sendDescrip)
SendRowDescriptionMessage(&myState->buf,
typeinfo,
FetchPortalTargetList(portal),
portal->formats);
}
SendRowDescriptionMessage发送行描述符,入参:
List *targetlist
TargetEntry:
{xpr = {type = T_TargetEntry},
expr = 0x1086570,
resno = 1,
resname = 0xf5af88 "s",
ressortgroupref = 0,
resorigtbl = 0,
resorigcol = 0,
resjunk = false}
TargetEntry:
{xpr = {type = T_TargetEntry},
expr = 0x1086868,
resno = 2,
resname = 0xf5b5a0 "l",
ressortgroupref = 0,
resorigtbl = 0,
resorigcol = 0,
resjunk = false}
typeinfo
{natts = 2, tdtypeid = 2249, tdtypmod = -1, tdrefcount = -1, constr = 0x0, attrs = 0x100d1a8}
流程:拼接输出串
void
SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo,
List *targetlist, int16 *formats)
{
int natts = typeinfo->natts;
int i;
ListCell *tlist_item = list_head(targetlist);
/* tuple descriptor message type */
pq_beginmessage_reuse(buf, 'T');
/* # of attrs in tuples */
pq_sendint16(buf, natts);
/*
* Preallocate memory for the entire message to be sent. That allows to
* use the significantly faster inline pqformat.h functions and to avoid
* reallocations.
*
* Have to overestimate the size of the column-names, to account for
* character set overhead.
*/
enlargeStringInfo(buf, (NAMEDATALEN * MAX_CONVERSION_GROWTH /* attname */
+ sizeof(Oid) /* resorigtbl */
+ sizeof(AttrNumber) /* resorigcol */
+ sizeof(Oid) /* atttypid */
+ sizeof(int16) /* attlen */
+ sizeof(int32) /* attypmod */
+ sizeof(int16) /* format */
) * natts);
for (i = 0; i < natts; ++i)
{
Form_pg_attribute att = TupleDescAttr(typeinfo, i);
Oid atttypid = att->atttypid;
int32 atttypmod = att->atttypmod;
Oid resorigtbl;
AttrNumber resorigcol;
int16 format;
/*
* If column is a domain, send the base type and typmod instead.
* Lookup before sending any ints, for efficiency.
*/
atttypid = getBaseTypeAndTypmod(atttypid, &atttypmod);
/* Do we have a non-resjunk tlist item? */
while (tlist_item &&
((TargetEntry *) lfirst(tlist_item))->resjunk)
tlist_item = lnext(targetlist, tlist_item);
if (tlist_item)
{
TargetEntry *tle = (TargetEntry *) lfirst(tlist_item);
resorigtbl = tle->resorigtbl;
resorigcol = tle->resorigcol;
tlist_item = lnext(targetlist, tlist_item);
}
else
{
/* No info available, so send zeroes */
resorigtbl = 0;
resorigcol = 0;
}
if (formats)
format = formats[i];
else
format = 0;
pq_writestring(buf, NameStr(att->attname));
pq_writeint32(buf, resorigtbl);
pq_writeint16(buf, resorigcol);
pq_writeint32(buf, atttypid);
pq_writeint16(buf, att->attlen);
pq_writeint32(buf, atttypmod);
pq_writeint16(buf, format);
}
pq_endmessage_reuse(buf);
}
pq_endmessage_reuse
socket_putmessage(char msgtype, const char *s, size_t len)
(gdb) p s
$18 = 0x10724e0 ""
(gdb) p len
$19 = 42
(gdb) x/32bx 0x10724e0
0x10724e0: 0x00 0x02 0x73(s) 0x00 0x00 0x00 0x00 0x00
0x10724e8: 0x00 0x00 0x00 0x00 0x00 0x17 0x00 0x04
0x10724f0: 0xff 0xff 0xff 0xff 0x00 0x00 0x6c(l) 0x00
0x10724f8: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x1072500: 0x00 0x19 0xff 0xff 0xff 0xff 0xff 0xff
0x1072508: 0x00 0x00
位置
#0 printtup (slot=0x100d2a8, self=0x106dd50) at printtup.c:303
#1 0x0000000000736102 in ExecutePlan (estate=0x100b630, planstate=0x100b868, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,direction=ForwardScanDirection, dest=0x106dd50, execute_once=true) at execMain.c:1582
#2 0x0000000000733e7d in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#5 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50,qc=0x7ffd6b492890) at pquery.c:765
#6 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#7 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#8 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#9 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#10 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#11 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#12 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
printtup输入为tts包装的元组,和上面初始化后的DestReceiver。
static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
printtup_prepare_info // 拼接DR_printtup中的信息,准备发送
MemoryContextSwitchTo // 切换到"printtup"
pq_beginmessage_reuse // 调用libpq开始发数据
pq_sendint16
...
...
pq_endmessage_reuse
位置
#0 printtup_shutdown (self=0x106dd50) at printtup.c:388
#1 0x0000000000733e98 in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#6 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程:清理工作
static void
printtup_shutdown(DestReceiver *self)
{
DR_printtup *myState = (DR_printtup *) self;
if (myState->myinfo)
pfree(myState->myinfo);
myState->myinfo = NULL;
myState->attrinfo = NULL;
if (myState->buf.data)
pfree(myState->buf.data);
myState->buf.data = NULL;
if (myState->tmpcontext)
MemoryContextDelete(myState->tmpcontext);
myState->tmpcontext = NULL;
}
位置
#0 printtup_destroy (self=0x106dd50) at printtup.c:412
#1 0x0000000000976650 in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1221
#2 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#3 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#4 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#5 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#6 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#7 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程:清理动态申请的:外层数据结构DR_printtup
static void
printtup_destroy(DestReceiver *self)
{
pfree(self);
}
这一组函数接口由CreateCopyDestReceiver配置:
DestReceiver *
CreateCopyDestReceiver(void)
{
DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
self->pub.receiveSlot = copy_dest_receive;
self->pub.rStartup = copy_dest_startup;
self->pub.rShutdown = copy_dest_shutdown;
self->pub.rDestroy = copy_dest_destroy;
self->pub.mydest = DestCopyOut;
self->cstate = NULL; /* will be set later */
self->processed = 0;
return (DestReceiver *) self;
}
注意copy也给DestReceiver包了一层:DR_copy
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyToState cstate; /* CopyToStateData for the command */
uint64 processed; /* # of tuples processed */
} DR_copy;
来看下这几个函数的工作位置和流程,例如:
copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';
-- 输出:两行两列
1 0.74
2 0.09
空
位置
#0 copy_dest_receive (slot=0x1048538, self=0x10861c0) at copyto.c:1259
#1 0x0000000000736102 in ExecutePlan (estate=0x10468c0, planstate=0x1046af8, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x10861c0, execute_once=true) at execMain.c:1582
#2 0x0000000000733e7d in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3 0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4 0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#5 0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#6 0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
#7 0x000000000097e69b in ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:527
#8 0x000000000097d297 in PortalRunUtility (portal=0xff5050, pstmt=0xf5bef8, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, qc=0x7ffd6b492890)at pquery.c:1155
#9 0x000000000097d4fb in PortalRunMulti (portal=0xff5050, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:1312
#10 0x000000000097ca27 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:788
#11 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';")at postgres.c:1213
#12 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#13 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#14 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#15 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#16 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#17 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
CopyToState cstate = myState->cstate;
/* Send the data */
CopyOneRowTo(cstate, slot);
/* Increment the number of processed tuples, and report the progress */
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++myState->processed);
return true;
}
位置
#0 copy_dest_shutdown (self=0x10861c0) at copyto.c:1279
#1 0x0000000000733e98 in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#4 0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#5 0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
无操作
不执行,因为DestReceiver外面包的数据结构DR_copy没有什么需要释放的。
这一组函数接口由CreateDestReceiver分发函数直接配置,注意前面两种都是走CreateDestReceiver入口进入自己的配置函数,但是SPI不同,直接在CreateDestReceiver里面配置:
DestReceiver *
CreateDestReceiver(CommandDest dest)
{
/*
* It's ok to cast the constness away as any modification of the none
* receiver would be a bug (which gets easier to catch this way).
*/
switch (dest)
{
case DestRemote:
case DestRemoteExecute:
return printtup_create_DR(dest);
case DestRemoteSimple:
return unconstify(DestReceiver *, &printsimpleDR);
case DestNone:
return unconstify(DestReceiver *, &donothingDR);
case DestDebug:
return unconstify(DestReceiver *, &debugtupDR);
// 这里配置 <-<-<-----------------------------
case DestSPI:
return unconstify(DestReceiver *, &spi_printtupDR);
case DestTuplestore:
return CreateTuplestoreDestReceiver();
case DestIntoRel:
return CreateIntoRelDestReceiver(NULL);
case DestCopyOut:
return CreateCopyDestReceiver();
case DestSQLFunction:
return CreateSQLFunctionDestReceiver();
case DestTransientRel:
return CreateTransientRelDestReceiver(InvalidOid);
case DestTupleQueue:
return CreateTupleQueueDestReceiver(NULL);
}
/* should never get here */
pg_unreachable();
}
spi_printtupDR带四个函数:
static const DestReceiver spi_printtupDR = {
spi_printtup, spi_dest_startup, donothingCleanup, donothingCleanup,
DestSPI
};
SPI的结果不是直接返回给客户端的!SPI有自己的三个全局变量来指向结果集,SPI的接口函数会从全局变量中取值,组织后返回给客户端。(使用全局变量当接口的设计很差!)
uint64 SPI_processed = 0; // 行数
SPITupleTable *SPI_tuptable = NULL; // 数据
int SPI_result = 0; // 执行结果
直接执行:
```c
cat << EOF > spitest.c
#include "postgres.h"
#include "executor/spi.h"
#include "utils/builtins.h"
#include "fmgr.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(sptest1);
Datum
sptest1(PG_FUNCTION_ARGS)
{
char *sql10 = "select s::int, left(random()::text,4) l from generate_series(1,10) s";
int ret;
int proc;
SPI_connect();
ret = SPI_exec(sql10, 0);
proc = SPI_processed;
if (ret > 0 && SPI_tuptable != NULL)
{
SPITupleTable *tuptable = SPI_tuptable;
TupleDesc tupdesc = tuptable->tupdesc;
char buf[8192];
uint64 j;
for (j = 0; j < tuptable->numvals; j++)
{
HeapTuple tuple = tuptable->vals[j];
int i;
for (i = 1, buf[0] = 0; i <= tupdesc->natts; i++)
snprintf(buf + strlen(buf),
sizeof(buf) - strlen(buf), " %4s%4s",
SPI_getvalue(tuple, tupdesc, i),
(i == tupdesc->natts) ? " " : " |");
elog(INFO, "%s", buf);
}
}
SPI_finish();
return (proc);
}
EOF
gcc -O0 -Wall -I /home/mingjiegao/dev/src/postgres/src/include -g -shared -fpic -o spitest.so spitest.c
psql执行:
```javascript
postgres=# select sptest1();
INFO: 1 | 0.10
INFO: 2 | 0.18
INFO: 3 | 0.01
INFO: 4 | 0.78
INFO: 5 | 0.60
INFO: 6 | 0.76
INFO: 7 | 0.18
INFO: 8 | 0.86
INFO: 9 | 0.19
INFO: 10 | 0.99
sptest1
10
(1 row)
### 1 rStartup = spi\_dest\_startup
位置
```javascript
sptest1
SPI_exec
SPI_execute
_SPI_execute_plan
_SPI_pquery
ExecutorRun
standard_ExecutorRun
spi_dest_startup
流程
```javascript
void
spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
SPITupleTable *tuptable;
MemoryContext oldcxt;
MemoryContext tuptabcxt;
if (_SPI_current == NULL)
elog(ERROR, "spi_dest_startup called while not connected to SPI");
if (_SPI_current->tuptable != NULL)
elog(ERROR, "improper call to spi_dest_startup");
- 从"ExecutorState"切换到"SPI Proc"
- 创建"SPI TupTable",切换到"SPI TupTable"
```javascript
oldcxt = _SPI_procmem(); /* switch to procedure memory context */
tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
"SPI TupTable",
ALLOCSET_DEFAULT_SIZES);
MemoryContextSwitchTo(tuptabcxt);
在"SPI TupTable"中申请`SPITupleTable`结构,由\_SPI\_current->tuptable记录:
SPITupleTable结构中有:
- `TupleDesc tupdesc;`
- `HeapTuple *vals;`
- `uint64 numvals;`
记录结果集数据。
```javascript
_SPI_current->tuptable = tuptable = (SPITupleTable *)
palloc0(sizeof(SPITupleTable));
tuptable->tuptabcxt = tuptabcxt;
tuptable->subid = GetCurrentSubTransactionId();
/*
* The tuptable is now valid enough to be freed by AtEOSubXact_SPI, so put
* it onto the SPI context's tuptables list. This will ensure it's not
* leaked even in the unlikely event the following few lines fail.
*/- \_SPI\_connection中保存了`slist_head tuptables;`所有活跃的tuptable链表。
- 申请128个HeapTupleData指针位置保存结果数据。
```javascript
slist_push_head(&_SPI_current->tuptables, &tuptable->next);
/* set up initial allocations */
tuptable->alloced = 128;
tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
tuptable->numvals = 0;
tuptable->tupdesc = CreateTupleDescCopy(typeinfo);
MemoryContextSwitchTo(oldcxt);
}
### 2 receiveSlot = spi\_printtup
位置
```javascript
sptest1
SPI_exec
SPI_execute
_SPI_execute_plan
_SPI_pquery
ExecutorRun
standard_ExecutorRun
ExecutePlan
spi_printtup
流程
```javascript
bool
spi_printtup(TupleTableSlot slot, DestReceiver self)
{
SPITupleTable *tuptable;
MemoryContext oldcxt;
if (_SPI_current == NULL)
elog(ERROR, "spi_printtup called while not connected to SPI");
tuptable还没赋值的状态:
`{tupdesc = 0x107ea90, vals = 0x107e678, numvals = 0, alloced = 128, tuptabcxt = 0x107e500, next = {next = 0x0}, subid = 1}`
```javascript
tuptable = _SPI_current->tuptable;
if (tuptable == NULL)
elog(ERROR, "improper call to spi_printtup");
- 切到"SPI TupTable"
- 分配的128个位置不够用了?不够在申请256个。
```javascript
oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt);
if (tuptable->numvals >= tuptable->alloced)
{
/* Double the size of the pointer array */
uint64 newalloced = tuptable->alloced * 2;
tuptable->vals = (HeapTuple *) repalloc_huge(tuptable->vals,
newalloced * sizeof(HeapTuple));
tuptable->alloced = newalloced;
}
- 调用tts接口函数ExecCopySlotHeapTuple做元组拷贝,这里实际使用的是tts\_virtual\_copy\_heap\_tuple,参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》。
- ExecCopySlotHeapTuple输入tts输出标准存储格式HeapTuple。
```javascript
tuptable->vals[tuptable->numvals] = ExecCopySlotHeapTuple(slot);
(tuptable->numvals)++;
MemoryContextSwitchTo(oldcxt);
return true;
}
无
无