diff options
author | Andres Freund | 2018-05-23 20:41:53 +0000 |
---|---|---|
committer | Andres Freund | 2018-05-23 21:01:59 +0000 |
commit | 88483119950fe5460c2355b508a21a7d404433bc (patch) | |
tree | e5dcd0f91801a07da30e046d2002dba7c88962e5 | |
parent | b929614f5e867c70721b3db31d3dec6cb35e1eb5 (diff) |
WIP: Protoype of new, non-recursive, executor.executor-rewrite
Author: Andres Freund
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
26 files changed, 2556 insertions, 188 deletions
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index cc09895fa5..cc59ed42df 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -29,6 +29,7 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \ nodeCtescan.o nodeNamedtuplestorescan.o nodeWorktablescan.o \ nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \ nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o \ - nodeTableFuncscan.o + nodeTableFuncscan.o \ + execProgram.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 3d12f9c76f..c24dfd1c7c 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -46,6 +46,7 @@ #include "commands/matview.h" #include "commands/trigger.h" #include "executor/execdebug.h" +#include "executor/execProgram.h" #include "foreign/fdwapi.h" #include "jit/jit.h" #include "mb/pg_wchar.h" @@ -66,6 +67,9 @@ #include "utils/tqual.h" +/* GUCs */ +bool use_linearized_plan; + /* Hooks for plugins to get control in ExecutorStart/Run/Finish/End */ ExecutorStart_hook_type ExecutorStart_hook = NULL; ExecutorRun_hook_type ExecutorRun_hook = NULL; @@ -80,8 +84,7 @@ static void InitPlan(QueryDesc *queryDesc, int eflags); static void CheckValidRowMarkRel(Relation rel, RowMarkType markType); static void ExecPostprocessPlan(EState *estate); static void ExecEndPlan(PlanState *planstate, EState *estate); -static void ExecutePlan(EState *estate, PlanState *planstate, - bool use_parallel_mode, +static void ExecutePlan(QueryDesc *desc, CmdType operation, bool sendTuples, uint64 numberTuples, @@ -360,9 +363,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, elog(ERROR, "can't re-execute query flagged for single execution"); queryDesc->already_executed = true; - ExecutePlan(estate, - queryDesc->planstate, - queryDesc->plannedstmt->parallelModeNeeded, + ExecutePlan(queryDesc, operation, sendTuples, count, @@ -1089,6 +1090,20 @@ InitPlan(QueryDesc *queryDesc, int eflags) queryDesc->tupDesc = tupType; queryDesc->planstate = planstate; + queryDesc->prog = NULL; + + if (use_linearized_plan) + { + queryDesc->prog = ExecBuildProgram(planstate, estate, eflags); + if (queryDesc->prog) + { + StringInfo s = ExecDescribeProgram(queryDesc->prog); + + ereport(LOG, (errmsg("pp:\n%s", s->data), + errhidestmt(true), + errhidecontext(true))); + } + } } /* @@ -1678,9 +1693,7 @@ ExecEndPlan(PlanState *planstate, EState *estate) * ---------------------------------------------------------------- */ static void -ExecutePlan(EState *estate, - PlanState *planstate, - bool use_parallel_mode, +ExecutePlan(QueryDesc *queryDesc, CmdType operation, bool sendTuples, uint64 numberTuples, @@ -1688,6 +1701,9 @@ ExecutePlan(EState *estate, DestReceiver *dest, bool execute_once) { + bool use_parallel_mode = queryDesc->plannedstmt->parallelModeNeeded; + EState *estate = queryDesc->estate; + PlanState *planstate = queryDesc->planstate; TupleTableSlot *slot; uint64 current_tuple_count; @@ -1723,7 +1739,10 @@ ExecutePlan(EState *estate, /* * Execute the plan and obtain a tuple */ - slot = ExecProcNode(planstate); + if (queryDesc->prog) + slot = ExecExecProgram(queryDesc->prog, estate); + else + slot = ExecProcNode(planstate); /* * if the tuple is null, then we assume there is nothing more to @@ -1733,6 +1752,9 @@ ExecutePlan(EState *estate, { /* Allow nodes to release or shut down resources. */ (void) ExecShutdownNode(planstate); + if (queryDesc->prog) + ExecShutdownProgram(queryDesc->prog); + break; } @@ -1780,6 +1802,9 @@ ExecutePlan(EState *estate, { /* Allow nodes to release or shut down resources. */ (void) ExecShutdownNode(planstate); + if (queryDesc->prog) + ExecShutdownProgram(queryDesc->prog); + break; } } diff --git a/src/backend/executor/execProgram.c b/src/backend/executor/execProgram.c new file mode 100644 index 0000000000..0c55eb14d0 --- /dev/null +++ b/src/backend/executor/execProgram.c @@ -0,0 +1,1033 @@ +/*------------------------------------------------------------------------- + * + * execProgram.c + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/execProgram.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execProgram.h" + +#include "access/relscan.h" +#include "executor/executor.h" +#include "executor/nodeAgg.h" +#include "executor/nodeAppend.h" +#include "executor/nodeBitmapAnd.h" +#include "executor/nodeBitmapHeapscan.h" +#include "executor/nodeBitmapIndexscan.h" +#include "executor/nodeBitmapOr.h" +#include "executor/nodeCtescan.h" +#include "executor/nodeCustom.h" +#include "executor/nodeForeignscan.h" +#include "executor/nodeFunctionscan.h" +#include "executor/nodeGather.h" +#include "executor/nodeGatherMerge.h" +#include "executor/nodeGroup.h" +#include "executor/nodeHash.h" +#include "executor/nodeHashjoin.h" +#include "executor/nodeIndexonlyscan.h" +#include "executor/nodeIndexscan.h" +#include "executor/nodeLimit.h" +#include "executor/nodeLockRows.h" +#include "executor/nodeMaterial.h" +#include "executor/nodeMergeAppend.h" +#include "executor/nodeMergejoin.h" +#include "executor/nodeModifyTable.h" +#include "executor/nodeNamedtuplestorescan.h" +#include "executor/nodeNestloop.h" +#include "executor/nodeProjectSet.h" +#include "executor/nodeRecursiveunion.h" +#include "executor/nodeResult.h" +#include "executor/nodeSamplescan.h" +#include "executor/nodeSeqscan.h" +#include "executor/nodeSetOp.h" +#include "executor/nodeSort.h" +#include "executor/nodeSubplan.h" +#include "executor/nodeSubqueryscan.h" +#include "executor/nodeTableFuncscan.h" +#include "executor/nodeTidscan.h" +#include "executor/nodeUnique.h" +#include "executor/nodeValuesscan.h" +#include "executor/nodeWindowAgg.h" +#include "executor/nodeWorktablescan.h" +#include "nodes/nodeFuncs.h" +#include "miscadmin.h" + + +/* + * Add another expression evaluation step to ExprState->steps. + * + * Note that this potentially re-allocates es->steps, therefore no pointer + * into that array may be used while the expression is still being built. + */ +ExecStep * +ExecProgramAddStep(ExecProgramBuild *b) +{ + ExecProgram *p = b->program; + + if (b->steps_alloc == 0) + { + b->steps_alloc = 16; + b->program->steps = palloc(sizeof(ExecStep) * b->steps_alloc); + } + else if (b->steps_alloc == p->steps_len) + { + b->steps_alloc *= 2; + p->steps = repalloc(p->steps, + sizeof(ExecStep) * b->steps_alloc); + } + + return &p->steps[p->steps_len++]; +} + +int +ExecProgramAddSlot(ExecProgramBuild *b) +{ + ExecProgram *p = b->program; + + if (b->slots_alloc == 0) + { + b->slots_alloc = 16; + b->program->slots = palloc(sizeof(TupleTableSlot *) * b->slots_alloc); + } + else if (b->slots_alloc == p->slots_len) + { + b->slots_alloc *= 2; + p->slots = repalloc(p->slots, + sizeof(TupleTableSlot *) * b->slots_alloc); + } + + return p->slots_len++; +} + +void +ExexProgramAssignJump(ExecProgramBuild *b, ExecStep *s, int *a, int t) +{ + *a = t; + + /* if it's a not yet known */ + if (t < 0) + { + ExecRelocatableJump *j; + + if (b->reloc_jumps_alloc == 0) + { + b->reloc_jumps_alloc = 16; + b->reloc_jumps = palloc(sizeof(ExecRelocatableJump) * b->reloc_jumps_alloc); + } + else if (b->reloc_jumps_alloc == b->reloc_jumps_len) + { + b->reloc_jumps_alloc *= 2; + b->reloc_jumps = repalloc(b->reloc_jumps, + sizeof(ExecRelocatableJump) * b->reloc_jumps_alloc); + } + + j = &b->reloc_jumps[b->reloc_jumps_len++]; + + j->varno = t; + j->step = s - b->program->steps; + j->offset = (char *) a - (char *) s; + } +} + +void +ExecProgramDefineJump(ExecProgramBuild *b, int var, int target) +{ + int i; + + Assert(var < 0); + Assert(target >= 0); + + for (i = 0; i < b->reloc_jumps_len; i++) + { + ExecRelocatableJump *j = &b->reloc_jumps[i]; + ExecStep *step = &b->program->steps[j->step]; + int *val = (int*)(((char *) step) + j->offset); + + if (*val != var) + continue; + *val = target; + } +} + +ExecProgram * +ExecBuildProgram(PlanState *node, EState *estate, int eflags) +{ + ExecProgramBuild build = {}; + ExecStep *step; + int resvar = --build.varno; + EmitForPlanNodeData nodeData = {}; + + build.program = (ExecProgram *) palloc0(sizeof(ExecProgram)); + build.estate = estate; + + ExecProgramBuildForNode(&build, node, eflags, resvar, &nodeData); + if (build.failed) + return NULL; + + step = ExecProgramAddStep(&build); + step->opcode = XO_RETURN; + step->d.ret.slotno = nodeData.resslot; + step->d.ret.next = nodeData.jumpret; + + step = ExecProgramAddStep(&build); + step->opcode = XO_DONE; + + ExecProgramDefineJump(&build, resvar, build.program->steps_len - 1); + + return build.program; +} + +void +ExecProgramBuildForNode(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + switch (nodeTag(node)) + { + case T_SeqScanState: + ExecProgramBuildForSeqScan(b, node, eflags, jumpfail, d); + break; + + case T_IndexScanState: + ExecProgramBuildForIndexScan(b, node, eflags, jumpfail, d); + break; + + case T_IndexOnlyScanState: + ExecProgramBuildForIndexOnlyScan(b, node, eflags, jumpfail, d); + break; + + case T_AggState: + ExecProgramBuildForAgg(b, node, eflags, jumpfail, d); + break; + + case T_NestLoopState: + ExecProgramBuildForNestloop(b, node, eflags, jumpfail, d); + break; + + case T_HashJoinState: + ExecProgramBuildForHashJoin(b, node, eflags, jumpfail, d); + break; + + case T_HashState: + pg_unreachable(); + break; + + case T_SortState: + ExecProgramBuildForSort(b, node, eflags, jumpfail, d); + break; + + case T_LimitState: + ExecProgramBuildForLimit(b, node, eflags, jumpfail, d); + break; + + default: + b->failed = true; + break; + } +} + + + +extern TupleTableSlot * +ExecExecProgram(ExecProgram *p, EState *estate) +{ + ExecStep *step = &p->steps[p->cur_step]; + TupleTableSlot **slots = p->slots; + +eval: + switch(step->opcode) + { + case XO_SEQSCAN_FIRST: + { + SeqScanState *state = step->d.seqscan.state; + HeapScanDesc scandesc = state->ss.ss_currentScanDesc; + + if (scandesc == NULL) + { + /* + * We reach here if the scan is not parallel, or if we're executing a + * scan that was intended to be parallel serially. + */ + scandesc = heap_beginscan(state->ss.ss_currentRelation, + estate->es_snapshot, + 0, NULL); + state->ss.ss_currentScanDesc = scandesc; + } + + step++; + } + /* FALLTHROUGH */ + + case XO_SEQSCAN: + { + TupleTableSlot *slot = slots[step->d.seqscan.slot]; + SeqScanState *state = step->d.seqscan.state; + HeapScanDesc scandesc = state->ss.ss_currentScanDesc; + HeapTuple tuple; + + /* + * get the next tuple from the table + */ + tuple = heap_getnext(scandesc, ForwardScanDirection); + if (unlikely(!tuple)) + { + ExecClearTuple(slot); + step = &p->steps[step->d.seqscan.jumpempty]; + } + else + { + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + scandesc->rs_cbuf, /* buffer associated with this + * tuple */ + false); /* don't pfree this pointer */ + step++; + } + + goto eval; + } + + case XO_INDEX_SCAN_FIRST: + { + IndexScanState *state = step->d.indexscan.state; + IndexScanDesc scandesc = state->iss_ScanDesc; + + if (scandesc == NULL) + { + /* + * We reach here if the index scan is not parallel, or if we're + * executing a index scan that was intended to be parallel serially. + */ + scandesc = index_beginscan(state->ss.ss_currentRelation, + state->iss_RelationDesc, + estate->es_snapshot, + state->iss_NumScanKeys, + state->iss_NumOrderByKeys); + + state->iss_ScanDesc = scandesc; + + /* + * If no run-time keys to calculate or they are ready, go ahead and + * pass the scankeys to the index AM. + */ + if (state->iss_NumRuntimeKeys == 0 || state->iss_RuntimeKeysReady) + index_rescan(scandesc, + state->iss_ScanKeys, state->iss_NumScanKeys, + state->iss_OrderByKeys, state->iss_NumOrderByKeys); + + } + + step++; + } + /* FALLTHROUGH */ + + case XO_INDEX_SCAN: + { + TupleTableSlot *slot = slots[step->d.indexscan.slot]; + IndexScanState *state = step->d.indexscan.state; + IndexScanDesc scandesc = state->iss_ScanDesc; + HeapTuple tuple; + + /* + * get the next tuple from the table + */ + tuple = index_getnext(scandesc, ForwardScanDirection); + if (unlikely(!tuple)) + { + ExecClearTuple(slot); + step = &p->steps[step->d.indexscan.jumpempty]; + } + else + { + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + scandesc->xs_cbuf, /* buffer associated with this + * tuple */ + false); /* don't pfree this pointer */ + + /* FIXME: recheck */ + + step++; + } + + goto eval; + } + + + case XO_INDEXONLY_SCAN_FIRST: + { + IndexOnlyScanState *state = step->d.ioscan.state; + IndexScanDesc scandesc = state->ioss_ScanDesc; + + if (scandesc == NULL) + { + /* + * We reach here if the index scan is not parallel, or if we're + * executing a index scan that was intended to be parallel serially. + */ + scandesc = index_beginscan(state->ss.ss_currentRelation, + state->ioss_RelationDesc, + estate->es_snapshot, + state->ioss_NumScanKeys, + state->ioss_NumOrderByKeys); + + state->ioss_ScanDesc = scandesc; + + /* + * If no run-time keys to calculate or they are ready, go ahead and + * pass the scankeys to the index AM. + */ + if (state->ioss_NumRuntimeKeys == 0 || state->ioss_RuntimeKeysReady) + index_rescan(scandesc, + state->ioss_ScanKeys, state->ioss_NumScanKeys, + state->ioss_OrderByKeys, state->ioss_NumOrderByKeys); + + } + + step++; + } + /* FALLTHROUGH */ + + case XO_INDEXONLY_SCAN: + { + TupleTableSlot *slot = slots[step->d.ioscan.slot]; + IndexOnlyScanState *state = step->d.ioscan.state; + + /* + * get the next tuple from the table + * + * XXX: qual invocations should be moved to here. + */ + if (!indexonly_getnext(state, slot, ForwardScanDirection)) + { + ExecClearTuple(slot); + step = &p->steps[step->d.indexscan.jumpempty]; + } + else + { + step++; + } + + goto eval; + } + + case XO_QUAL_SCAN: + { + TupleTableSlot *scanslot = slots[step->d.qual.scanslot]; + ExprContext *econtext = step->d.qual.econtext; + + econtext->ecxt_scantuple = scanslot; + + if (ExecQualAndReset(step->d.qual.qual, econtext)) + { + step++; + } + else + { + step = &p->steps[step->d.qual.jumpfail]; + } + + goto eval; + } + + case XO_QUAL_JOIN: + { + TupleTableSlot *outerslot = slots[step->d.qual.outerslot]; + TupleTableSlot *innerslot = slots[step->d.qual.innerslot]; + ExprContext *econtext = step->d.qual.econtext; + + econtext->ecxt_outertuple = outerslot; + econtext->ecxt_innertuple = innerslot; + + if (ExecQualAndReset(step->d.qual.qual, econtext)) + { + step++; + } + else + { + step = &p->steps[step->d.qual.jumpfail]; + } + + goto eval; + } + + case XO_PROJECT_SCAN: + { + TupleTableSlot *scanslot = slots[step->d.project.scanslot]; + ProjectionInfo *project = step->d.project.project; + + project->pi_exprContext->ecxt_scantuple = scanslot; + + ExecProject(project); + + step++; + goto eval; + } + + case XO_PROJECT_JOIN: + { + TupleTableSlot *innerslot = slots[step->d.project.innerslot]; + TupleTableSlot *outerslot = slots[step->d.project.outerslot]; + ProjectionInfo *project = step->d.project.project; + + project->pi_exprContext->ecxt_outertuple = outerslot; + project->pi_exprContext->ecxt_innertuple = innerslot; + + ExecProject(project); + + step++; + goto eval; + } + + case XO_SORTAGG_TUPLE: + { + AggState *state = step->d.sortagg.state; + TupleTableSlot *inputslot = slots[step->d.sortagg.inputslot]; + AggStateBoundaryState action; + + action = agg_fill_direct_onetup(state, inputslot); + + switch (action) + { + case AGGBOUNDARY_NEXT: + step = &p->steps[step->d.sortagg.jumpnext]; + break; + case AGGBOUNDARY_REACHED: + step = &p->steps[step->d.sortagg.jumpgroup]; + break; + case AGGBOUNDARY_FINISHED: + step = &p->steps[step->d.sortagg.jumpempty]; + break; + } + goto eval; + } + + case XO_HASHAGG_TUPLE: + { + TupleTableSlot *slot = slots[step->d.hashagg.inputslot]; + AggState *state = step->d.hashagg.state; + + agg_fill_hash_table_onetup(state, slot); + + step = &p->steps[step->d.hashagg.jumpnext]; + goto eval; + } + + case XO_DRAIN_HASHAGG: + { + TupleTableSlot *slot = slots[step->d.drain_hashagg.outputslot]; + AggState *state = step->d.drain_hashagg.state; + TupleTableSlot *slot2; + + /* + * XXX: Good chunks of this should be moved here, so that when + * JITing the function calls to expressions happen here. As + * they are in turn JITed, that'd allow them to be inlined + * (without full inlining, which'd not recognize the + * opportunity anway). + */ + slot2 = agg_retrieve_hash_table(state); + Assert(slot2 == NULL || slot == slot2); + if (slot2 == NULL) + { + step = &p->steps[step->d.drain_hashagg.jumpempty]; + } + else + { + step++; + } + goto eval; + } + + case XO_INIT_SORT: + { + SortState *state = step->d.sort.state; + PlanState *outerNode; + TupleDesc tupDesc; + Sort *plannode = (Sort *) state->ss.ps.plan; + + outerNode = outerPlanState(state); + tupDesc = ExecGetResultType(outerNode); + + state->tuplesortstate = + tuplesort_begin_heap(tupDesc, + plannode->numCols, + plannode->sortColIdx, + plannode->sortOperators, + plannode->collations, + plannode->nullsFirst, + work_mem, + NULL, state->randomAccess); + if (state->bounded) + tuplesort_set_bound(state->tuplesortstate, state->bound); + + step++; + goto eval; + } + + case XO_SORT_TUPLE: + { + TupleTableSlot *slot = slots[step->d.sort.inputslot]; + SortState *state = step->d.sort.state; + + tuplesort_puttupleslot(state->tuplesortstate, slot); + + step = &p->steps[step->d.sort.jumpnext]; + goto eval; + } + + case XO_SORT: + { + SortState *state = step->d.sort.state; + Tuplesortstate *tuplesortstate = state->tuplesortstate; + + if (!state->sort_Done) + { + tuplesort_performsort(tuplesortstate); + state->sort_Done = true; + } + + /* could fall through? */ + step++; + goto eval; + } + + case XO_DRAIN_SORT: + { + TupleTableSlot *slot = slots[step->d.sort.outputslot]; + SortState *state = step->d.sort.state; + Tuplesortstate *tuplesortstate = state->tuplesortstate; + + Assert(state->sort_Done); + + if (tuplesort_gettupleslot(tuplesortstate, + ForwardScanDirection, + false, slot, NULL)) + step++; + else + step = &p->steps[step->d.sort.jumpempty]; + + goto eval; + } + + case XO_INIT_HASH: + { + HashJoinState *hjstate = step->d.hash.hjstate; + HashState *hstate = step->d.hash.hstate; + + if (hjstate->hj_HashTable) + { + step = &p->steps[step->d.hash.jumpbuilt]; + } + else + { + HashJoinTable hashtable; + + hashtable = ExecHashTableCreate(hstate, + hjstate->hj_HashOperators, + false); + hjstate->hj_HashTable = hashtable; + + step++; + } + + goto eval; + } + + case XO_HASH_TUPLE: + { + HashJoinState *hjstate = step->d.hash.hjstate; + HashState *hstate = step->d.hash.hstate; + TupleTableSlot *slot = p->slots[step->d.hash.inputslot]; + + ExecHashDoInsert(hjstate, hstate, slot); + + step = &p->steps[step->d.hash.jumpnext]; + + goto eval; + } + + case XO_PROBE_HASH: + { + HashJoinState *hjstate = step->d.hj.state; + ExprContext *econtext = hjstate->js.ps.ps_ExprContext; + HashJoinTable hashtable = hjstate->hj_HashTable; + uint32 hashvalue; + int batchno; + + if (!hjstate->hj_OuterNotEmpty) + { + TupleTableSlot *slot = p->slots[step->d.hash.inputslot]; + + econtext->ecxt_outertuple = slot; + + if (!ExecHashGetHashValue(hashtable, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + HJ_FILL_OUTER(hjstate), + &hashvalue)) + { + + hjstate->hj_OuterNotEmpty = false; + step = &p->steps[step->d.hj.jumpmiss]; + goto eval; + } + + /* remember outer relation is not empty for possible rescan */ + hjstate->hj_OuterNotEmpty = true; + + hjstate->hj_CurHashValue = hashvalue; + ExecHashGetBucketAndBatch(hashtable, hashvalue, + &hjstate->hj_CurBucketNo, &batchno); + hjstate->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable, + hashvalue); + hjstate->hj_CurTuple = NULL; + + } + + if (ExecScanHashBucket(hjstate, econtext)) + { + step++; + } + else + { + hjstate->hj_OuterNotEmpty = false; + + /* out of matches; check for possible outer-join fill */ + hjstate->hj_JoinState = HJ_FILL_OUTER_TUPLE; + + step = &p->steps[step->d.hj.jumpmiss]; + } + + goto eval; + } + + case XO_RESCAN: + { + int i; + + for (i = 0; i < step->d.rescan.num_nodes; i++) + { + ExecReScan(step->d.rescan.nodes[i]); + } + + step++; + goto eval; + } + + case XO_PARAM: + { + int nparam = step->d.param.nparams; + NestLoopParam *params = step->d.param.params; + ExprContext *econtext = step->d.param.econtext; + TupleTableSlot *slot = slots[step->d.param.slot]; + PlanState *plan = step->d.param.ps; + int i; + + for (i = 0; i < nparam; i++) + { + NestLoopParam *nlp = ¶ms[i]; + int paramno = nlp->paramno; + ParamExecData *prm = &econtext->ecxt_param_exec_vals[paramno]; + + + prm = &(econtext->ecxt_param_exec_vals[paramno]); + prm->value = slot_getattr(slot, + nlp->paramval->varattno, + &(prm->isnull)); + plan->chgParam = bms_add_member(plan->chgParam, paramno); + } + + step++; + goto eval; + } + + case XO_COMPUTE_LIMIT: + ExecComputeLimit(step->d.limit.state); + step++; + goto eval; + + case XO_CHECK_LIMIT: + if (ExecCheckLimit(step->d.limit.state)) + step = &p->steps[step->d.limit.jumpout]; + else if (unlikely(!step->d.limit.initialized)) + { + step->d.limit.initialized = true;; + step++; + } + else + { + step = &p->steps[step->d.limit.jumpnext]; + } + goto eval; + + case XO_UPDATE_LIMIT: + if (ExecUpdateLimit(step->d.limit.state)) + step = &p->steps[step->d.limit.jumpnext]; + else + step++; + + goto eval; + + case XO_RETURN: + { + TupleTableSlot *slot = slots[step->d.ret.slotno]; + + /* setup for next call */ + p->cur_step = step->d.ret.next; + + return slot; + } + + case XO_DONE: + { + return NULL; + } + + default: + pg_unreachable(); + break; + } + + return NULL; +} + +extern void +ExecShutdownProgram(ExecProgram *p) +{ +} + +extern StringInfo +ExecDescribeProgram(ExecProgram *p) +{ + StringInfo s = makeStringInfo(); + int i = 0; + + for (i = 0; i < p->steps_len; i++) + { + ExecStep *step = &p->steps[i]; + + appendStringInfo(s, "%d: ", i); + + switch(step->opcode) + { + case XO_SEQSCAN_FIRST: + { + appendStringInfo(s, "seqscan_first\n"); + break; + } + + case XO_SEQSCAN: + { + appendStringInfo(s, "seqscan [j empty %d] > s%d\n", + step->d.seqscan.jumpempty, + step->d.seqscan.slot); + break; + } + + case XO_INDEX_SCAN_FIRST: + { + appendStringInfo(s, "indexscan_first\n"); + break; + } + + case XO_INDEX_SCAN: + { + appendStringInfo(s, "indexscan [j empty %d] > s%d\n", + step->d.indexscan.jumpempty, + step->d.indexscan.slot); + break; + } + + case XO_INDEXONLY_SCAN_FIRST: + { + appendStringInfo(s, "indexonlyscan_first\n"); + break; + } + + case XO_INDEXONLY_SCAN: + { + appendStringInfo(s, "indexonlyscan [j empty %d] > s%d\n", + step->d.ioscan.jumpempty, + step->d.ioscan.slot); + break; + } + + case XO_QUAL_SCAN: + { + appendStringInfo(s, "qual [j fail %d] < scan s%d\n", + step->d.qual.jumpfail, + step->d.qual.scanslot); + break; + } + + case XO_QUAL_JOIN: + { + appendStringInfo(s, "qual [j fail %d] < outer s%d inner s%d\n", + step->d.qual.jumpfail, + step->d.qual.outerslot, + step->d.qual.innerslot); + break; + } + + case XO_PROJECT_SCAN: + { + appendStringInfo(s, "project < scan s%d > s%d\n", + step->d.project.scanslot, + step->d.project.result); + break; + } + + case XO_PROJECT_JOIN: + { + appendStringInfo(s, "project < outer s%d inner s%d > s%d\n", + step->d.project.outerslot, + step->d.project.innerslot, + step->d.project.result); + break; + } + + case XO_HASHAGG_TUPLE: + { + appendStringInfo(s, "hashagg_tuple [j %d] < s%d\n", + step->d.hashagg.jumpnext, + step->d.hashagg.inputslot); + break; + } + + case XO_HASH_TUPLE: + { + appendStringInfo(s, "hash_tuple [j %d] < s%d\n", + step->d.hash.jumpnext, + step->d.hash.inputslot); + break; + } + + case XO_SORTAGG_TUPLE: + { + appendStringInfo(s, "sortagg_tuple [j next %d, j group %d, j empty %d] < s%d > s%d\n", + step->d.sortagg.jumpnext, + step->d.sortagg.jumpgroup, + step->d.sortagg.jumpempty, + step->d.sortagg.inputslot, + step->d.sortagg.outputslot); + break; + } + + case XO_DRAIN_HASHAGG: + { + appendStringInfo(s, "drain_hashagg [j empty %d] > s%d\n", + step->d.drain_hashagg.jumpempty, + step->d.drain_hashagg.outputslot); + break; + } + + case XO_INIT_HASH: + { + appendStringInfo(s, "init_hash [j built %d]\n", + step->d.hash.jumpbuilt); + break; + } + + case XO_PROBE_HASH: + { + appendStringInfo(s, "probe_hash [j miss %d] < s%d > s%d\n", + step->d.hj.jumpmiss, + step->d.hj.inputslot, + step->d.hj.probeslot); + break; + } + + case XO_INIT_SORT: + { + appendStringInfo(s, "init_sort\n"); + break; + } + + case XO_SORT_TUPLE: + { + appendStringInfo(s, "sort_tuple [j %d] < s%d\n", + step->d.sort.jumpnext, + step->d.sort.inputslot); + break; + } + + case XO_SORT: + { + appendStringInfo(s, "sort\n"); + break; + } + + case XO_DRAIN_SORT: + { + appendStringInfo(s, "drain_sort [j empty %d] > s%d\n", + step->d.sort.jumpempty, + step->d.sort.outputslot); + break; + } + + case XO_RESCAN: + { + appendStringInfo(s, "rescan #%d\n", + step->d.rescan.num_nodes); + break; + } + + case XO_PARAM: + { + appendStringInfo(s, "param #%d < s%d\n", + step->d.param.nparams, + step->d.param.slot); + break; + } + + case XO_COMPUTE_LIMIT: + { + appendStringInfo(s, "compute_limit\n"); + break; + } + + case XO_CHECK_LIMIT: + { + appendStringInfo(s, "check_limit [j empty %d, j reenter %d]\n", + step->d.limit.jumpout, + step->d.limit.jumpnext); + break; + } + + case XO_UPDATE_LIMIT: + { + appendStringInfo(s, "update_limit [j next %d]\n", + step->d.limit.jumpnext); + break; + } + + case XO_RETURN: + { + appendStringInfo(s, "return < s%d [next %d]\n", + step->d.ret.slotno, + step->d.ret.next); + break; + } + + case XO_DONE: + { + appendStringInfo(s, "done\n"); + break; + } + + default: + appendStringInfo(s, "unknown\n"); + break; + } + } + + return s; +} diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 0fe0c22c1e..55bbf1aa60 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -277,7 +277,7 @@ static TupleHashEntryData *lookup_hash_entry(AggState *aggstate); static void lookup_hash_entries(AggState *aggstate); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); static void agg_fill_hash_table(AggState *aggstate); -static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); +//static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, @@ -1315,7 +1315,7 @@ static void find_hash_columns(AggState *aggstate) { Bitmapset *base_colnos; - List *outerTlist = outerPlanState(aggstate)->plan->targetlist; + List *outerTlist = outerPlan(aggstate->ss.ps.plan)->targetlist; int numHashes = aggstate->num_hashes; EState *estate = aggstate->ss.ps.state; int j; @@ -1908,7 +1908,6 @@ static void agg_fill_hash_table(AggState *aggstate) { TupleTableSlot *outerslot; - ExprContext *tmpcontext = aggstate->tmpcontext; /* * Process each outer-plan tuple, and then fetch the next one, until we @@ -1920,20 +1919,7 @@ agg_fill_hash_table(AggState *aggstate) if (TupIsNull(outerslot)) break; - /* set up for lookup_hash_entries and advance_aggregates */ - tmpcontext->ecxt_outertuple = outerslot; - - /* Find or build hashtable entries */ - lookup_hash_entries(aggstate); - - /* Advance the aggregates (or combine functions) */ - advance_aggregates(aggstate); - - /* - * Reset per-input-tuple context after each tuple, but note that the - * hash lookups do this too - */ - ResetExprContext(aggstate->tmpcontext); + agg_fill_hash_table_onetup(aggstate, outerslot); } aggstate->table_filled = true; @@ -1944,9 +1930,34 @@ agg_fill_hash_table(AggState *aggstate) } /* + * ExecAgg for hashed case: read input and build hash table + */ +void +agg_fill_hash_table_onetup(AggState *aggstate, TupleTableSlot *slot) +{ + ExprContext *tmpcontext = aggstate->tmpcontext; + + /* set up for lookup_hash_entries and advance_aggregates */ + tmpcontext->ecxt_outertuple = slot; + + /* Find or build hashtable entries */ + lookup_hash_entries(aggstate); + + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); + + /* + * Reset per-input-tuple context after each tuple, but note that the + * hash lookups do this too + */ + ResetExprContext(aggstate->tmpcontext); +} + + +/* * ExecAgg for hashed case: retrieving groups from hash table */ -static TupleTableSlot * +TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate) { ExprContext *econtext; @@ -3666,3 +3677,243 @@ aggregate_dummy(PG_FUNCTION_ARGS) fcinfo->flinfo->fn_oid); return (Datum) 0; /* keep compiler quiet */ } + +static bool +agg_project_current_group(AggState *aggstate) +{ + int currentSet; + AggStatePerAgg peragg = aggstate->peragg; + AggStatePerGroup *pergroups = aggstate->pergroups; + TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot; + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + + /* + * Clear the per-output-tuple context for each group, as well as + * aggcontext (which contains any pass-by-ref transvalues of the old + * group). Some aggregate functions store working state in child + * contexts; those now get reset automatically without us needing to + * do anything special. + * + * We use ReScanExprContext not just ResetExprContext because we want + * any registered shutdown callbacks to be called. That allows + * aggregate functions to ensure they've cleaned up any non-memory + * resources. + */ + ReScanExprContext(econtext); + + econtext->ecxt_outertuple = firstSlot; + + Assert(aggstate->projected_set >= 0); + + currentSet = aggstate->projected_set; + + prepare_projection_slot(aggstate, firstSlot, currentSet); + + select_current_set(aggstate, currentSet, false); + + finalize_aggregates(aggstate, + peragg, + pergroups[currentSet]); + + return !TupIsNull(project_aggregates(aggstate)); +} + +AggStateBoundaryState +agg_fill_direct_onetup(AggState *aggstate, TupleTableSlot *slot) +{ + Agg *node = (Agg *) aggstate->ss.ps.plan; + AggStatePerGroup *pergroups = aggstate->pergroups; + bool hasGroupingSets = aggstate->phase->numsets > 0; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int numReset = numGroupingSets; + ExprContext *tmpcontext = aggstate->tmpcontext; + TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot; + AggStateBoundaryState state; + + /* set up for next advance_aggregates call */ + tmpcontext->ecxt_innertuple = firstSlot; + tmpcontext->ecxt_outertuple = slot; + + if (unlikely(TupIsNull(slot)) || unlikely(aggstate->agg_done)) + { + /* no more outer-plan tuples available */ + if (hasGroupingSets) + { + aggstate->input_done = true; + } + else + { + aggstate->agg_done = true; + } + + if (!aggstate->at_boundary) + { + aggstate->at_boundary = true; + /* + * First project current group (using previous representative + * tuple), then use the input tuple to start a new group. + */ + if (agg_project_current_group(aggstate)) + return AGGBOUNDARY_REACHED; + } + + return AGGBOUNDARY_FINISHED; + } + + if (unlikely(aggstate->at_boundary)) + { + initialize_aggregates(aggstate, pergroups, numReset); + + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + ExecStoreTuple(ExecCopySlotTuple(slot), + firstSlot, + InvalidBuffer, + true); + + aggstate->at_boundary = false; + + state = AGGBOUNDARY_NEXT; + } + else + { + state = AGGBOUNDARY_NEXT; + + /* + * If we are grouping, check whether we've crossed a group + * boundary. + */ + if (node->aggstrategy != AGG_PLAIN) + { + if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1], + tmpcontext)) + { + /* + * First project current group (using previous representative + * tuple), then use the input tuple to start a new group. + */ + if (agg_project_current_group(aggstate)) + { + ExecMaterializeSlot(aggstate->ss.ps.ps_ProjInfo->pi_state.resultslot); + state = AGGBOUNDARY_REACHED; + } + + initialize_aggregates(aggstate, pergroups, numReset); + + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + ExecStoreTuple(ExecCopySlotTuple(slot), + firstSlot, + InvalidBuffer, + true); + } + } + } + + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); + + return state; +} + +void +ExecProgramBuildForAgg(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + AggState *state = castNode(AggState, node); + Agg *agg = (Agg *) state->ss.ps.plan; + EmitForPlanNodeData subPlanData = {}; + + if (agg->aggstrategy == AGG_MIXED) + { + b->failed = true; + return; + } + + d->resetnodes = lappend(d->resetnodes, state); + + if (agg->aggstrategy == AGG_HASHED) + { + /* add it to hash table */ + ExecStep *step; + int drain_off; + int drainslot; + + subPlanData.jumpret = b->program->steps_len - 1; + drain_off = --b->varno; + + /* get one input tuple */ + ExecProgramBuildForNode(b, outerPlanState(state), eflags, + drain_off, &subPlanData); + + step = ExecProgramAddStep(b); + step->opcode = XO_HASHAGG_TUPLE; + step->d.hashagg.state = state; + ExexProgramAssignJump(b, step, &step->d.hashagg.jumpnext, subPlanData.jumpret); + + step->d.hashagg.inputslot = subPlanData.resslot; + + /* once input has been emptied, drain the hash table */ + ExecProgramDefineJump(b, drain_off, b->program->steps_len); + + drainslot = ExecProgramAddSlot(b); + + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_DRAIN_HASHAGG; + step->d.drain_hashagg.state = state; + ExexProgramAssignJump(b, step, &step->d.drain_hashagg.jumpempty, jumpfail); + step->d.drain_hashagg.outputslot = drainslot; + b->program->slots[drainslot] = state->ss.ps.ps_ProjInfo->pi_state.resultslot; + } + + d->resslot = drainslot; + d->jumpret = b->program->steps_len - 1; + } + else + { + int agg_off = --b->varno; + int result_off = --b->varno; + int resultslot; + + /* get one input tuple */ + ExecProgramBuildForNode(b, outerPlanState(state), eflags, + agg_off, &subPlanData); + + ExecProgramDefineJump(b, agg_off, b->program->steps_len); + + resultslot = ExecProgramAddSlot(b); + + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_SORTAGG_TUPLE; + step->d.sortagg.state = state; + ExexProgramAssignJump(b, step, &step->d.sortagg.jumpempty, jumpfail); + ExexProgramAssignJump(b, step, &step->d.sortagg.jumpgroup, result_off); + ExexProgramAssignJump(b, step, &step->d.sortagg.jumpnext, subPlanData.jumpret); + + step->d.sortagg.inputslot = subPlanData.resslot; + step->d.sortagg.outputslot = resultslot; + } + + /* once input has been emptied, drain the hash table */ + ExecProgramDefineJump(b, result_off, b->program->steps_len); + + d->resslot = resultslot; + d->jumpret = subPlanData.jumpret; + b->program->slots[resultslot] = state->ss.ps.ps_ProjInfo->pi_state.resultslot; + + state->at_boundary = true; + state->projected_set = 0; + } +} diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 4f069d17fd..d858f8e8fc 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -3307,3 +3307,37 @@ ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) return true; } + +void +ExecHashDoInsert(HashJoinState *hjstate, HashState *hstate, TupleTableSlot *slot) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + ExprContext *econtext = hstate->ps.ps_ExprContext; + List *hashkeys = hstate->hashkeys; + uint32 hashvalue; + + /* We have to compute the hash value */ + econtext->ecxt_innertuple = slot; + + if (ExecHashGetHashValue(hashtable, econtext, hashkeys, + false, hashtable->keepNulls, + &hashvalue)) + { + int bucketNumber; + + bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); + if (bucketNumber != INVALID_SKEW_BUCKET_NO) + { + /* It's a skew tuple, so put it into that hash table */ + ExecHashSkewTableInsert(hashtable, slot, hashvalue, + bucketNumber); + hashtable->skewTuples += 1; + } + else + { + /* Not subject to skew optimization, so insert normally */ + ExecHashTableInsert(hashtable, slot, hashvalue); + } + hashtable->totalTuples += 1; + } +} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index dd94cffbd1..2bfc3cedd5 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -118,21 +118,6 @@ #include "utils/sharedtuplestore.h" -/* - * States of the ExecHashJoin state machine - */ -#define HJ_BUILD_HASHTABLE 1 -#define HJ_NEED_NEW_OUTER 2 -#define HJ_SCAN_BUCKET 3 -#define HJ_FILL_OUTER_TUPLE 4 -#define HJ_FILL_INNER_TUPLES 5 -#define HJ_NEED_NEW_BATCH 6 - -/* Returns true if doing null-fill on outer relation */ -#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL) -/* Returns true if doing null-fill on inner relation */ -#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL) - static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); @@ -1530,3 +1515,135 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } + + + +extern void +ExecProgramBuildForHashJoin(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + HashJoinState *hjstate = castNode(HashJoinState, node); + HashState *hashstate = castNode(HashState, innerPlanState(hjstate)); + HashJoin *hj = (HashJoin *) hjstate->js.ps.plan; + EmitForPlanNodeData outerPlanData = {}; + EmitForPlanNodeData innerPlanData = {}; + int built_off = --b->varno; + int probeslot; + + if (hj->join.jointype != JOIN_INNER) + { + b->failed = true; + return; + } + + d->resetnodes = lappend(d->resetnodes, hjstate); + + /* + * Create hashtable / skip over steps building it if already built. + */ + { + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_INIT_HASH; + step->d.hash.hstate = hashstate; + step->d.hash.hjstate = hjstate; + ExexProgramAssignJump(b, step, &step->d.hash.jumpbuilt, built_off); + } + + /* retrieve content of hashtable */ + + /* get one input tuple */ + ExecProgramBuildForNode(b, outerPlanState(hashstate), eflags, built_off, + &innerPlanData); + { + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_HASH_TUPLE; + step->d.hash.hstate = hashstate; + step->d.hash.hjstate = hjstate; + step->d.hash.inputslot = innerPlanData.resslot; + step->d.hash.jumpnext = innerPlanData.jumpret; + } + + /* done building content of hashjoin */ + /* FIXME: not correct when reentering, because we might be in process of probing */ + ExecProgramDefineJump(b, built_off, b->program->steps_len); + + ExecProgramBuildForNode(b, outerPlanState(hjstate), eflags, jumpfail, + &outerPlanData); + + d->jumpret = b->program->steps_len; + + { + ExecStep *step; + + step = ExecProgramAddStep(b); + probeslot = ExecProgramAddSlot(b); + + step->opcode = XO_PROBE_HASH; + step->d.hj.state = hjstate; + step->d.hj.jumpmiss = outerPlanData.jumpret; + step->d.hj.inputslot = outerPlanData.resslot; + step->d.hj.probeslot = probeslot; + + b->program->slots[probeslot] = hjstate->hj_HashTupleSlot; + } + + + /* currently executed inside ExecScanHashBucket */ + if (hjstate->hashclauses && false) + { + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_QUAL_JOIN; + step->d.qual.jumpfail = d->jumpret; + step->d.qual.outerslot = outerPlanData.resslot; + step->d.qual.innerslot = probeslot; + step->d.qual.qual = hjstate->hashclauses; + step->d.qual.econtext = hjstate->js.ps.ps_ExprContext; + } + + if (hjstate->js.joinqual) + { + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_QUAL_JOIN; + step->d.qual.jumpfail = d->jumpret; + step->d.qual.outerslot = outerPlanData.resslot; + step->d.qual.innerslot = probeslot; + step->d.qual.qual = hjstate->js.joinqual; + step->d.qual.econtext = hjstate->js.ps.ps_ExprContext; + } + + if (hjstate->js.ps.qual) + { + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_QUAL_JOIN; + step->d.qual.jumpfail = d->jumpret; + step->d.qual.outerslot = outerPlanData.resslot; + step->d.qual.innerslot = probeslot; + step->d.qual.qual = hjstate->js.ps.qual; + step->d.qual.econtext = hjstate->js.ps.ps_ExprContext; + } + + { + ProjectionInfo *project = hjstate->js.ps.ps_ProjInfo; + int projslot = ExecProgramAddSlot(b); + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_PROJECT_JOIN; + step->d.project.outerslot = outerPlanData.resslot; + step->d.project.innerslot = probeslot; + step->d.project.project = project; + step->d.project.result = projslot; + + b->program->slots[projslot] = project->pi_state.resultslot; + d->resslot = projslot; + } +} diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index 3a02a99621..a12aa60b6f 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -57,11 +57,9 @@ static TupleTableSlot * IndexOnlyNext(IndexOnlyScanState *node) { EState *estate; - ExprContext *econtext; ScanDirection direction; IndexScanDesc scandesc; TupleTableSlot *slot; - ItemPointer tid; /* * extract necessary information from index scan node @@ -77,7 +75,6 @@ IndexOnlyNext(IndexOnlyScanState *node) direction = ForwardScanDirection; } scandesc = node->ioss_ScanDesc; - econtext = node->ss.ps.ps_ExprContext; slot = node->ss.ss_ScanTupleSlot; if (scandesc == NULL) @@ -112,149 +109,10 @@ IndexOnlyNext(IndexOnlyScanState *node) node->ioss_NumOrderByKeys); } - /* - * OK, now that we have what we need, fetch the next tuple. - */ - while ((tid = index_getnext_tid(scandesc, direction)) != NULL) - { - HeapTuple tuple = NULL; - - CHECK_FOR_INTERRUPTS(); - - /* - * We can skip the heap fetch if the TID references a heap page on - * which all tuples are known visible to everybody. In any case, - * we'll use the index tuple not the heap tuple as the data source. - * - * Note on Memory Ordering Effects: visibilitymap_get_status does not - * lock the visibility map buffer, and therefore the result we read - * here could be slightly stale. However, it can't be stale enough to - * matter. - * - * We need to detect clearing a VM bit due to an insert right away, - * because the tuple is present in the index page but not visible. The - * reading of the TID by this scan (using a shared lock on the index - * buffer) is serialized with the insert of the TID into the index - * (using an exclusive lock on the index buffer). Because the VM bit - * is cleared before updating the index, and locking/unlocking of the - * index page acts as a full memory barrier, we are sure to see the - * cleared bit if we see a recently-inserted TID. - * - * Deletes do not update the index page (only VACUUM will clear out - * the TID), so the clearing of the VM bit by a delete is not - * serialized with this test below, and we may see a value that is - * significantly stale. However, we don't care about the delete right - * away, because the tuple is still visible until the deleting - * transaction commits or the statement ends (if it's our - * transaction). In either case, the lock on the VM buffer will have - * been released (acting as a write barrier) after clearing the bit. - * And for us to have a snapshot that includes the deleting - * transaction (making the tuple invisible), we must have acquired - * ProcArrayLock after that time, acting as a read barrier. - * - * It's worth going through this complexity to avoid needing to lock - * the VM buffer, which could cause significant contention. - */ - if (!VM_ALL_VISIBLE(scandesc->heapRelation, - ItemPointerGetBlockNumber(tid), - &node->ioss_VMBuffer)) - { - /* - * Rats, we have to visit the heap to check visibility. - */ - InstrCountTuples2(node, 1); - tuple = index_fetch_heap(scandesc); - if (tuple == NULL) - continue; /* no visible tuple, try next index entry */ - - /* - * Only MVCC snapshots are supported here, so there should be no - * need to keep following the HOT chain once a visible entry has - * been found. If we did want to allow that, we'd need to keep - * more state to remember not to call index_getnext_tid next time. - */ - if (scandesc->xs_continue_hot) - elog(ERROR, "non-MVCC snapshots are not supported in index-only scans"); - - /* - * Note: at this point we are holding a pin on the heap page, as - * recorded in scandesc->xs_cbuf. We could release that pin now, - * but it's not clear whether it's a win to do so. The next index - * entry might require a visit to the same heap page. - */ - } - - /* - * Fill the scan tuple slot with data from the index. This might be - * provided in either HeapTuple or IndexTuple format. Conceivably an - * index AM might fill both fields, in which case we prefer the heap - * format, since it's probably a bit cheaper to fill a slot from. - */ - if (scandesc->xs_hitup) - { - /* - * We don't take the trouble to verify that the provided tuple has - * exactly the slot's format, but it seems worth doing a quick - * check on the number of fields. - */ - Assert(slot->tts_tupleDescriptor->natts == - scandesc->xs_hitupdesc->natts); - ExecStoreTuple(scandesc->xs_hitup, slot, InvalidBuffer, false); - } - else if (scandesc->xs_itup) - StoreIndexTuple(slot, scandesc->xs_itup, scandesc->xs_itupdesc); - else - elog(ERROR, "no data returned for index-only scan"); - - /* - * If the index was lossy, we have to recheck the index quals. - * (Currently, this can never happen, but we should support the case - * for possible future use, eg with GiST indexes.) - */ - if (scandesc->xs_recheck) - { - econtext->ecxt_scantuple = slot; - if (!ExecQualAndReset(node->indexqual, econtext)) - { - /* Fails recheck, so drop it and loop back for another */ - InstrCountFiltered2(node, 1); - continue; - } - } - - /* - * We don't currently support rechecking ORDER BY distances. (In - * principle, if the index can support retrieval of the originally - * indexed value, it should be able to produce an exact distance - * calculation too. So it's not clear that adding code here for - * recheck/re-sort would be worth the trouble. But we should at least - * throw an error if someone tries it.) - */ - if (scandesc->numberOfOrderBys > 0 && scandesc->xs_recheckorderby) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("lossy distance functions are not supported in index-only scans"))); - - /* - * Predicate locks for index-only scans must be acquired at the page - * level when the heap is not accessed, since tuple-level predicate - * locks need the tuple's xmin value. If we had to visit the tuple - * anyway, then we already have the tuple-level lock and can skip the - * page lock. - */ - if (tuple == NULL) - PredicateLockPage(scandesc->heapRelation, - ItemPointerGetBlockNumber(tid), - estate->es_snapshot); - + if (indexonly_getnext(node, slot, direction)) return slot; - } - - /* - * if we get here it means the index scan failed so we are at the end of - * the scan.. - */ - return ExecClearTuple(slot); + else + return ExecClearTuple(slot); } /* @@ -738,3 +596,211 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, node->ioss_ScanKeys, node->ioss_NumScanKeys, node->ioss_OrderByKeys, node->ioss_NumOrderByKeys); } + + +void +ExecProgramBuildForIndexOnlyScan(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + IndexOnlyScanState *state = castNode(IndexOnlyScanState, node); + IndexOnlyScan *ioscan = (IndexOnlyScan *) state->ss.ps.plan; + ExecStep *step; + int continue_at; + int indexslot = ExecProgramAddSlot(b); + + d->resetnodes = lappend(d->resetnodes, state); + + step = ExecProgramAddStep(b); + step->opcode = XO_INDEXONLY_SCAN_FIRST; + step->d.ioscan.state = state; + b->program->slots[indexslot] = state->ss.ss_ScanTupleSlot; + + /* only execute initialization once */ + d->jumpret = continue_at = b->program->steps_len; + + step = ExecProgramAddStep(b); + step->opcode = XO_INDEXONLY_SCAN; + step->d.ioscan.state = state; + step->d.ioscan.slot = indexslot; + ExexProgramAssignJump(b, step, &step->d.indexscan.jumpempty, jumpfail); + + /* FIXME: recheck? */ + + if (ioscan->scan.plan.qual) + { + /* consider combining to INDEXSCAN_QUAL w/ tight loop */ + step = ExecProgramAddStep(b); + step->opcode = XO_QUAL_SCAN; + step->d.qual.jumpfail = continue_at; + step->d.qual.scanslot = indexslot; + step->d.qual.qual = state->ss.ps.qual; + step->d.qual.econtext = state->ss.ps.ps_ExprContext; + } + + if (state->ss.ps.ps_ProjInfo) + { + ProjectionInfo *project = state->ss.ps.ps_ProjInfo; + int projslot = ExecProgramAddSlot(b); + + step = ExecProgramAddStep(b); + step->opcode = XO_PROJECT_SCAN; + step->d.project.scanslot = indexslot; + step->d.project.project = project; + step->d.project.result = projslot; + + b->program->slots[projslot] = project->pi_state.resultslot; + d->resslot = projslot; + } + else + { + d->resslot = indexslot; + } +} + +bool +indexonly_getnext(IndexOnlyScanState *node, TupleTableSlot *slot, ScanDirection direction) +{ + EState *estate = node->ss.ps.state; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + IndexScanDesc scandesc = node->ioss_ScanDesc; + ItemPointer tid; + + /* + * OK, now that we have what we need, fetch the next tuple. + */ + while ((tid = index_getnext_tid(scandesc, direction)) != NULL) + { + HeapTuple tuple = NULL; + + CHECK_FOR_INTERRUPTS(); + + /* + * We can skip the heap fetch if the TID references a heap page on + * which all tuples are known visible to everybody. In any case, + * we'll use the index tuple not the heap tuple as the data source. + * + * Note on Memory Ordering Effects: visibilitymap_get_status does not + * lock the visibility map buffer, and therefore the result we read + * here could be slightly stale. However, it can't be stale enough to + * matter. + * + * We need to detect clearing a VM bit due to an insert right away, + * because the tuple is present in the index page but not visible. The + * reading of the TID by this scan (using a shared lock on the index + * buffer) is serialized with the insert of the TID into the index + * (using an exclusive lock on the index buffer). Because the VM bit + * is cleared before updating the index, and locking/unlocking of the + * index page acts as a full memory barrier, we are sure to see the + * cleared bit if we see a recently-inserted TID. + * + * Deletes do not update the index page (only VACUUM will clear out + * the TID), so the clearing of the VM bit by a delete is not + * serialized with this test below, and we may see a value that is + * significantly stale. However, we don't care about the delete right + * away, because the tuple is still visible until the deleting + * transaction commits or the statement ends (if it's our + * transaction). In either case, the lock on the VM buffer will have + * been released (acting as a write barrier) after clearing the bit. + * And for us to have a snapshot that includes the deleting + * transaction (making the tuple invisible), we must have acquired + * ProcArrayLock after that time, acting as a read barrier. + * + * It's worth going through this complexity to avoid needing to lock + * the VM buffer, which could cause significant contention. + */ + if (!VM_ALL_VISIBLE(scandesc->heapRelation, + ItemPointerGetBlockNumber(tid), + &node->ioss_VMBuffer)) + { + /* + * Rats, we have to visit the heap to check visibility. + */ + InstrCountTuples2(node, 1); + tuple = index_fetch_heap(scandesc); + if (tuple == NULL) + continue; /* no visible tuple, try next index entry */ + + /* + * Only MVCC snapshots are supported here, so there should be no + * need to keep following the HOT chain once a visible entry has + * been found. If we did want to allow that, we'd need to keep + * more state to remember not to call index_getnext_tid next time. + */ + if (scandesc->xs_continue_hot) + elog(ERROR, "non-MVCC snapshots are not supported in index-only scans"); + + /* + * Note: at this point we are holding a pin on the heap page, as + * recorded in scandesc->xs_cbuf. We could release that pin now, + * but it's not clear whether it's a win to do so. The next index + * entry might require a visit to the same heap page. + */ + } + + /* + * Fill the scan tuple slot with data from the index. This might be + * provided in either HeapTuple or IndexTuple format. Conceivably an + * index AM might fill both fields, in which case we prefer the heap + * format, since it's probably a bit cheaper to fill a slot from. + */ + if (scandesc->xs_hitup) + { + /* + * We don't take the trouble to verify that the provided tuple has + * exactly the slot's format, but it seems worth doing a quick + * check on the number of fields. + */ + Assert(slot->tts_tupleDescriptor->natts == + scandesc->xs_hitupdesc->natts); + ExecStoreTuple(scandesc->xs_hitup, slot, InvalidBuffer, false); + } + else if (scandesc->xs_itup) + StoreIndexTuple(slot, scandesc->xs_itup, scandesc->xs_itupdesc); + else + elog(ERROR, "no data returned for index-only scan"); + + /* + * If the index was lossy, we have to recheck the index quals. + * (Currently, this can never happen, but we should support the case + * for possible future use, eg with GiST indexes.) + */ + if (scandesc->xs_recheck) + { + econtext->ecxt_scantuple = slot; + if (!ExecQualAndReset(node->indexqual, econtext)) + { + /* Fails recheck, so drop it and loop back for another */ + InstrCountFiltered2(node, 1); + continue; + } + } + + /* + * We don't currently support rechecking ORDER BY distances. (In + * principle, if the index can support retrieval of the originally + * indexed value, it should be able to produce an exact distance + * calculation too. So it's not clear that adding code here for + * recheck/re-sort would be worth the trouble. But we should at least + * throw an error if someone tries it.) + */ + if (scandesc->numberOfOrderBys > 0 && scandesc->xs_recheckorderby) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("lossy distance functions are not supported in index-only scans"))); + + /* + * Predicate locks for index-only scans must be acquired at the page + * level when the heap is not accessed, since tuple-level predicate + * locks need the tuple's xmin value. If we had to visit the tuple + * anyway, then we already have the tuple-level lock and can skip the + * page lock. + */ + if (tuple == NULL) + PredicateLockPage(scandesc->heapRelation, + ItemPointerGetBlockNumber(tid), + estate->es_snapshot); + + return true; + } + + return false; +} diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index d6012192a1..67fbe9608a 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -1775,3 +1775,59 @@ ExecIndexScanInitializeWorker(IndexScanState *node, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); } + +void +ExecProgramBuildForIndexScan(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + IndexScanState *state = castNode(IndexScanState, node); + IndexScan *indexscan = (IndexScan *) state->ss.ps.plan; + ExecStep *step; + int continue_at; + int indexslot = ExecProgramAddSlot(b); + + d->resetnodes = lappend(d->resetnodes, state); + + step = ExecProgramAddStep(b); + step->opcode = XO_INDEX_SCAN_FIRST; + step->d.indexscan.state = state; + b->program->slots[indexslot] = state->ss.ss_ScanTupleSlot; + + /* only execute initialization once */ + d->jumpret = continue_at = b->program->steps_len; + + step = ExecProgramAddStep(b); + step->opcode = XO_INDEX_SCAN; + step->d.indexscan.state = state; + step->d.indexscan.slot = indexslot; + ExexProgramAssignJump(b, step, &step->d.indexscan.jumpempty, jumpfail); + + if (indexscan->scan.plan.qual) + { + /* consider combining to INDEXSCAN_QUAL w/ tight loop */ + step = ExecProgramAddStep(b); + step->opcode = XO_QUAL_SCAN; + step->d.qual.jumpfail = continue_at; + step->d.qual.scanslot = indexslot; + step->d.qual.qual = state->ss.ps.qual; + step->d.qual.econtext = state->ss.ps.ps_ExprContext; + } + + if (state->ss.ps.ps_ProjInfo) + { + ProjectionInfo *project = state->ss.ps.ps_ProjInfo; + int projslot = ExecProgramAddSlot(b); + + step = ExecProgramAddStep(b); + step->opcode = XO_PROJECT_SCAN; + step->d.project.scanslot = indexslot; + step->d.project.project = project; + step->d.project.result = projslot; + + b->program->slots[projslot] = project->pi_state.resultslot; + d->resslot = projslot; + } + else + { + d->resslot = indexslot; + } +} diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c index 56d98b4490..a8a1d82c19 100644 --- a/src/backend/executor/nodeLimit.c +++ b/src/backend/executor/nodeLimit.c @@ -414,3 +414,176 @@ ExecReScanLimit(LimitState *node) if (node->ps.lefttree->chgParam == NULL) ExecReScan(node->ps.lefttree); } + +void +ExecComputeLimit(LimitState *node) +{ + /* + * First call for this node, so compute limit/offset. (We can't do + * this any earlier, because parameters from upper nodes will not + * be set during ExecInitLimit.) This also sets position = 0 and + * changes the state to LIMIT_RESCAN. + */ + + if (node->lstate == LIMIT_INITIAL) + { + recompute_limits(node); + + /* + * Check for empty window; if so, treat like empty subplan. + */ + if (node->count <= 0 && !node->noCount) + { + node->lstate = LIMIT_EMPTY; + } + } +} + +/* + * Returns true if limit has been reached. + */ +bool +ExecCheckLimit(LimitState *node) +{ + /* + * The main logic is a simple state machine. + */ + switch (node->lstate) + { + case LIMIT_INITIAL: + pg_unreachable(); + return true; + + case LIMIT_EMPTY: + case LIMIT_SUBPLANEOF: + case LIMIT_WINDOWEND: + return true; + + case LIMIT_RESCAN: + case LIMIT_INWINDOW: + case LIMIT_WINDOWSTART: + return false; + + default: + elog(ERROR, "impossible LIMIT state: %d", + (int) node->lstate); + return false; + break; + } +} + +/* + * Return true if input tuple should be disregarded. + */ +bool +ExecUpdateLimit(LimitState *node) +{ + node->position++; + + /* + * The main logic is a simple state machine. + */ + switch (node->lstate) + { + case LIMIT_INITIAL: + case LIMIT_EMPTY: + case LIMIT_SUBPLANEOF: + case LIMIT_WINDOWEND: + pg_unreachable(); + return false; + + case LIMIT_RESCAN: + if (!node->noCount && + node->position - node->offset >= node->count) + { + node->lstate = LIMIT_WINDOWEND; + return false; + } + else if (node->position > node->offset) + { + node->lstate = LIMIT_INWINDOW; + return false; + } + return true; + + case LIMIT_INWINDOW: + /* + * Forwards scan, so check for stepping off end of window. If + * we are at the end of the window, return NULL without + * advancing the subplan or the position variable; but change + * the state machine state to record having done so. + */ + if (!node->noCount && + node->position - node->offset >= node->count) + { + node->lstate = LIMIT_WINDOWEND; + } + return false; + + case LIMIT_WINDOWSTART: + pg_unreachable(); + + node->lstate = LIMIT_INWINDOW; + + return false; + + default: + elog(ERROR, "impossible LIMIT state: %d", + (int) node->lstate); + return false; + break; + } +} + + +void +ExecProgramBuildForLimit(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + EmitForPlanNodeData outerPlanData = {}; + LimitState *limitstate = castNode(LimitState, node); + int jump_check; + int post_init_off = --b->varno; + + Assert(limitstate->lstate == LIMIT_INITIAL); + + d->resetnodes = lappend(d->resetnodes, limitstate); + + /* first compute how many tuples should be returned */ + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_COMPUTE_LIMIT; + step->d.limit.state = limitstate; + } + + jump_check = b->program->steps_len; + + /* check whether another tuple is relevant */ + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_CHECK_LIMIT; + step->d.limit.state = limitstate; + ExexProgramAssignJump(b, step, &step->d.limit.jumpout, jumpfail); + ExexProgramAssignJump(b, step, &step->d.limit.jumpnext, post_init_off); + step->d.limit.initialized = false; + } + + /* get one input tuple */ + ExecProgramBuildForNode(b, outerPlanState(limitstate), eflags, jumpfail, + &outerPlanData); + + ExecProgramDefineJump(b, post_init_off, outerPlanData.jumpret); + + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_UPDATE_LIMIT; + step->d.limit.state = limitstate; + step->d.limit.jumpout = -1; + ExexProgramAssignJump(b, step, &step->d.limit.jumpnext, outerPlanData.jumpret); + } + + d->jumpret = jump_check; + d->resslot = outerPlanData.resslot; +} diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c index 9ae9863226..cf4f0fc1dd 100644 --- a/src/backend/executor/nodeNestloop.c +++ b/src/backend/executor/nodeNestloop.c @@ -408,3 +408,127 @@ ExecReScanNestLoop(NestLoopState *node) node->nl_NeedNewOuter = true; node->nl_MatchedOuter = false; } + +void +ExecProgramBuildForNestloop(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + NestLoopState *state = castNode(NestLoopState, node); + NestLoop *nestloop = (NestLoop *) state->js.ps.plan; + EmitForPlanNodeData outerPlanData = {}; + EmitForPlanNodeData innerPlanData = {}; + int i; + ListCell *lc; + int rescan_step; + + if (nestloop->join.jointype != JOIN_INNER) + { + b->failed = true; + return; + } + + ExecProgramBuildForNode(b, outerPlanState(state), eflags, jumpfail, + &outerPlanData); + + d->resetnodes = outerPlanData.resetnodes; + + if (nestloop->nestParams != NIL) + { + ExecStep *param_step = NULL; + int nparams = list_length(nestloop->nestParams); + ListCell *lc; + int i; + + param_step = ExecProgramAddStep(b); + param_step->opcode = XO_PARAM; + param_step->d.param.slot = outerPlanData.resslot; + param_step->d.param.nparams = nparams; + param_step->d.param.econtext = state->js.ps.ps_ExprContext; + param_step->d.param.params = (NestLoopParam *) palloc(sizeof(NestLoopParam) * nparams); + param_step->d.param.ps = innerPlanState(state); + + i = 0; + foreach(lc, nestloop->nestParams) + { + param_step->d.param.params[i++] = *(NestLoopParam *) lfirst(lc); + } + } + + /* reset inner plan */ + { + ExecStep *step = ExecProgramAddStep(b); + + rescan_step = b->program->steps_len - 1; + + step->opcode = XO_RESCAN; + + d->jumpret = outerPlanData.jumpret; + } + + ExecProgramBuildForNode(b, innerPlanState(state), eflags, + outerPlanData.jumpret, + &innerPlanData); + + /* XXX: otherwise assertions below could fail */ + if (b->failed) + return; + + /* update data for reset (only determined in ExecProgramBuildForNode()) */ + Assert(innerPlanData.resetnodes != NIL); + { + ExecStep *step = &b->program->steps[rescan_step]; + + step->d.rescan.num_nodes = list_length(innerPlanData.resetnodes); + Assert(step->d.rescan.num_nodes > 0); + step->d.rescan.nodes = palloc(sizeof(PlanState*) * + step->d.rescan.num_nodes); + i = 0; + foreach(lc, innerPlanData.resetnodes) + { + step->d.rescan.nodes[i++] = (PlanState *) lfirst(lc); + } + } + + if (state->js.joinqual) + { + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_QUAL_JOIN; + step->d.qual.jumpfail = innerPlanData.jumpret; + step->d.qual.outerslot = outerPlanData.resslot; + step->d.qual.innerslot = innerPlanData.resslot; + step->d.qual.qual = state->js.joinqual; + step->d.qual.econtext = state->js.ps.ps_ExprContext; + } + + if (state->js.ps.qual) + { + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_QUAL_JOIN; + step->d.qual.jumpfail = innerPlanData.jumpret; + step->d.qual.outerslot = outerPlanData.resslot; + step->d.qual.innerslot = innerPlanData.resslot; + step->d.qual.qual = state->js.ps.qual; + step->d.qual.econtext = state->js.ps.ps_ExprContext; + } + + { + ProjectionInfo *project = state->js.ps.ps_ProjInfo; + int projslot = ExecProgramAddSlot(b); + ExecStep *step; + + step = ExecProgramAddStep(b); + step->opcode = XO_PROJECT_JOIN; + step->d.project.outerslot = outerPlanData.resslot; + step->d.project.innerslot = innerPlanData.resslot; + step->d.project.project = project; + step->d.project.result = projslot; + + b->program->slots[projslot] = project->pi_state.resultslot; + d->resslot = projslot; + } + + d->jumpret = innerPlanData.jumpret; +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 9db368922a..c70f943a2f 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -335,3 +335,65 @@ ExecSeqScanInitializeWorker(SeqScanState *node, node->ss.ss_currentScanDesc = heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); } + +void +ExecProgramBuildForSeqScan(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + ExecProgram *p = b->program; + SeqScanState *state = (SeqScanState *) node; + SeqScan *seqscan = (SeqScan *) state->ss.ps.plan; + int continue_at; + int seqslot = ExecProgramAddSlot(b); + + d->resetnodes = lappend(d->resetnodes, state); + + { + ExecStep *step = ExecProgramAddStep(b); + step->opcode = XO_SEQSCAN_FIRST; + step->d.seqscan.state = state; + p->slots[seqslot] = state->ss.ss_ScanTupleSlot; + } + + /* only execute initialization once */ + d->jumpret = continue_at = b->program->steps_len; + + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_SEQSCAN; + step->d.seqscan.state = state; + step->d.seqscan.slot = seqslot; + ExexProgramAssignJump(b, step, &step->d.seqscan.jumpempty, jumpfail); + } + + if (seqscan->plan.qual) + { + /* consider combining to SEQSCAN_QUAL w/ tight loop */ + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_QUAL_SCAN; + step->d.qual.jumpfail = continue_at; + step->d.qual.scanslot = seqslot; + step->d.qual.qual = state->ss.ps.qual; + step->d.qual.econtext = state->ss.ps.ps_ExprContext; + } + + if (state->ss.ps.ps_ProjInfo) + { + ExecStep *step = ExecProgramAddStep(b); + ProjectionInfo *project = state->ss.ps.ps_ProjInfo; + int projslot = ExecProgramAddSlot(b); + + step->opcode = XO_PROJECT_SCAN; + step->d.project.scanslot = seqslot; + step->d.project.project = project; + step->d.project.result = projslot; + + p->slots[projslot] = project->pi_state.resultslot; + d->resslot = projslot; + } + else + { + d->resslot = seqslot; + } +} diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index 0d2acb665a..efcb1bba61 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -427,3 +427,63 @@ ExecSortRetrieveInstrumentation(SortState *node) memcpy(si, node->shared_info, size); node->shared_info = si; } + +void +ExecProgramBuildForSort(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d) +{ + SortState *sortstate = castNode(SortState, node); + EmitForPlanNodeData outerPlanData = {}; + int drain_off; + int drainslot; + + drain_off = --b->varno; + d->resetnodes = lappend(d->resetnodes, sortstate); + + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_INIT_SORT; + step->d.sort.state = sortstate; + } + + /* get one input tuple */ + ExecProgramBuildForNode(b, outerPlanState(sortstate), eflags, drain_off, + &outerPlanData); + + /* add it to sort */ + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_SORT_TUPLE; + step->d.sort.state = sortstate; + step->d.sort.inputslot = outerPlanData.resslot; + ExexProgramAssignJump(b, step, &step->d.sort.jumpnext, outerPlanData.jumpret); + } + + /* once input has been emptied, drain the sort */ + ExecProgramDefineJump(b, drain_off, b->program->steps_len); + + drainslot = ExecProgramAddSlot(b); + + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_SORT; + step->d.sort.state = sortstate; + ExexProgramAssignJump(b, step, &step->d.sort.jumpempty, jumpfail); + step->d.sort.outputslot = drainslot; + } + + { + ExecStep *step = ExecProgramAddStep(b); + + step->opcode = XO_DRAIN_SORT; + step->d.sort.state = sortstate; + ExexProgramAssignJump(b, step, &step->d.sort.jumpempty, jumpfail); + step->d.sort.outputslot = drainslot; + } + b->program->slots[drainslot] = sortstate->ss.ps.ps_ResultTupleSlot; + + d->resslot = drainslot; + d->jumpret = b->program->steps_len - 1; +} diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index ee1444c427..6292d0e9ea 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1823,6 +1823,17 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"use_linearized_plan", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Master of magic."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &use_linearized_plan, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/include/executor/execProgram.h b/src/include/executor/execProgram.h new file mode 100644 index 0000000000..baada8ea40 --- /dev/null +++ b/src/include/executor/execProgram.h @@ -0,0 +1,284 @@ +/*------------------------------------------------------------------------- + * + * execProgram.h + * Build program to evaluate query + * + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/execProgram.h + * + *------------------------------------------------------------------------- + */ +#ifndef EXEC_PROGRAM_H +#define EXEC_PROGRAM_H + +#include "nodes/execnodes.h" + + +typedef struct ExecProgram ExecProgram; +typedef struct ExecStep ExecStep; + +typedef struct ExecRelocatableJump +{ + int varno; /* id to relocate*/ + int step; /* step */ + size_t offset; /* offset in that step */ +} ExecRelocatableJump; + +typedef struct ExecRelocation +{ + int varno; /* id to relocate */ + int target; /* resolved relocation */ +} ExecRelocation; + +typedef struct ExecProgramBuild +{ + ExecProgram *program; + + EState *estate; + + ExecRelocatableJump *reloc_jumps; + ExecRelocation *reloc; + + int steps_alloc; + + int slots_alloc; + + int reloc_jumps_alloc; + int reloc_jumps_len; + + int varno; + + bool failed; +} ExecProgramBuild; + +struct ExecProgram +{ + TupleTableSlot **slots; + ExecStep *steps; + + PlanState *top; + + int cur_step; + int steps_len; + int slots_len; +}; + + +typedef enum ExecEvalOp +{ + /* actual tuple scan sources */ + XO_SEQSCAN_FIRST, + XO_SEQSCAN, + + XO_INDEX_SCAN_FIRST, + XO_INDEX_SCAN, + + XO_INDEXONLY_SCAN_FIRST, + XO_INDEXONLY_SCAN, + + /* scan like tuple sources */ + XO_DRAIN_HASHAGG, + XO_DRAIN_SORTAGG, + XO_DRAIN_SORT, + + XO_INIT_HASH, + + /* */ + XO_INIT_SORT, + XO_SORT, + + XO_COMPUTE_LIMIT, + XO_CHECK_LIMIT, + XO_UPDATE_LIMIT, + + /* joins */ + XO_PROBE_HASH, + + /* tuple sinks */ + XO_HASH_TUPLE, + XO_HASHAGG_TUPLE, + XO_SORTAGG_TUPLE, + XO_SORT_TUPLE, + + /* helper steps referenced by multiple executor nodes */ + XO_QUAL_SCAN, + XO_QUAL_JOIN, + XO_PROJECT_SCAN, + XO_PROJECT_JOIN, + XO_PARAM, + XO_RESCAN, + XO_RETURN, + XO_DONE +} ExecEvalOp; + +struct ExecStep +{ + /* XXX: convert to jump threading */ + ExecEvalOp opcode; + + union + { + /* for XO_SEQSCAN */ + struct + { + SeqScanState *state; + int slot; + int jumpempty; + } seqscan; + + /* for XO_INDEX_SCAN[_FIRST] */ + struct + { + IndexScanState *state; + int slot; + int jumpempty; + } indexscan; + + /* for XO_INDEXONLY_SCAN[_FIRST] */ + struct + { + IndexOnlyScanState *state; + int slot; + int jumpempty; + } ioscan; + + /* for XO_INIT_HASH, XO_HASH_TUPLE */ + struct + { + int jumpbuilt; + int jumpnext; + int inputslot; + struct HashJoinState *hjstate; + struct HashState *hstate; + } hash; + + /* for XO_PROBE_HASH */ + struct + { + struct HashJoinState *state; + int inputslot; + int jumpmiss; + int probeslot; + } hj; + + /* for XO_QUAL_* */ + struct + { + ExprState *qual; + ExprContext *econtext; + int scanslot; + int innerslot; + int outerslot; + int jumpfail; + } qual; + + /* for XO_PROJECT_* */ + struct + { + ProjectionInfo *project; + int scanslot; + int innerslot; + int outerslot; + int result; + } project; + + /* for XO_RESCAN */ + struct + { + int num_nodes; + PlanState **nodes; + } rescan; + + /* for XO_PARAM */ + struct + { + int slot; + int nparams; + /* FIXME: shouldn't be nestloop specific */ + NestLoopParam *params; + ExprContext *econtext; + PlanState *ps; + } param; + + /* for XO_RETURN */ + struct + { + int next; + int slotno; + } ret; + + /* for XO_SORTAGG_TUPLE */ + struct + { + int jumpnext; + int jumpgroup; + int jumpempty; + int inputslot; + int outputslot; + struct AggState *state; + } sortagg; + + /* for XO_HASHAGG_TUPLE */ + struct + { + int jumpnext; + int inputslot; + struct AggState *state; + } hashagg; + + /* for XO_DRAIN_HASHAGG */ + struct + { + int jumpempty; + int outputslot; + struct AggState *state; + } drain_hashagg; + + + /* for XO_SORT_TUPLE and XO_DRAIN_SORT */ + struct + { + int jumpempty; + int jumpnext; + int inputslot; + int outputslot; + struct SortState *state; + } sort; + + /* for XO_LIMIT* */ + struct + { + int jumpnext; + int jumpout; + bool initialized; + struct LimitState *state; + } limit; + + } d; +}; + +extern ExecProgram *ExecBuildProgram(PlanState *node, EState *estate, int eflags); +extern TupleTableSlot *ExecExecProgram(ExecProgram *p, EState *estate); +extern StringInfo ExecDescribeProgram(ExecProgram *p); +extern void ExecShutdownProgram(ExecProgram *p); + + + +typedef struct EmitForPlanNodeData +{ + int jumpret; + int resslot; + List *resetnodes; +} EmitForPlanNodeData; + +extern void ExecProgramBuildForNode(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); +extern ExecStep *ExecProgramAddStep(ExecProgramBuild *b); +extern int ExecProgramAddSlot(ExecProgramBuild *b); +extern void ExexProgramAssignJump(ExecProgramBuild *b, ExecStep *s, int *a, int t); +extern void ExecProgramDefineJump(ExecProgramBuild *b, int var, int target); + + +#endif /* EXEC_PROGRAM_H */ diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index 10e9ded246..ffc16a5a2a 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -48,6 +48,8 @@ typedef struct QueryDesc EState *estate; /* executor's query-wide state */ PlanState *planstate; /* tree of per-plan-node state */ + struct ExecProgram *prog; + /* This field is set by ExecutorRun */ bool already_executed; /* true if previously executed */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index a7ea3c7d10..1bebefb2b0 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -64,6 +64,7 @@ #define EXEC_FLAG_WITHOUT_OIDS 0x0040 /* force no OIDs in returned tuples */ #define EXEC_FLAG_WITH_NO_DATA 0x0080 /* rel scannability doesn't matter */ +extern PGDLLIMPORT bool use_linearized_plan; /* Hook for plugins to get control in ExecutorStart() */ typedef void (*ExecutorStart_hook_type) (QueryDesc *queryDesc, int eflags); @@ -214,6 +215,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate); /* * functions in execProcnode.c */ +extern PlanState *ExecInitNodeTop(Plan *node, EState *estate, int eflags); extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function); extern Node *MultiExecProcNode(PlanState *node); diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index 8fb8c8fe80..49bd7b5b14 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -15,6 +15,7 @@ #define NODEAGG_H #include "nodes/execnodes.h" +#include "executor/execProgram.h" /* @@ -313,4 +314,21 @@ extern Size hash_agg_entry_size(int numAggs); extern Datum aggregate_dummy(PG_FUNCTION_ARGS); + +extern void agg_fill_hash_table_onetup(AggState *aggstate, TupleTableSlot *slot); +extern TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); + + +typedef enum AggStateBoundaryState +{ + AGGBOUNDARY_NEXT, + AGGBOUNDARY_REACHED, + AGGBOUNDARY_FINISHED +} AggStateBoundaryState; + +extern AggStateBoundaryState agg_fill_direct_onetup(AggState *aggstate, TupleTableSlot *slot); + + +extern void ExecProgramBuildForAgg(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + #endif /* NODEAGG_H */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 8d700c06c5..86e19cc57a 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -76,4 +76,6 @@ extern void ExecShutdownHash(HashState *node); extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable); +void ExecHashDoInsert(HashJoinState *hjstate, HashState *hstate, TupleTableSlot *slot); + #endif /* NODEHASH_H */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index 4086dd5382..5d1145bd05 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -15,9 +15,26 @@ #define NODEHASHJOIN_H #include "access/parallel.h" +#include "executor/execProgram.h" #include "nodes/execnodes.h" #include "storage/buffile.h" + +/* + * States of the ExecHashJoin state machine + */ +#define HJ_BUILD_HASHTABLE 1 +#define HJ_NEED_NEW_OUTER 2 +#define HJ_SCAN_BUCKET 3 +#define HJ_FILL_OUTER_TUPLE 4 +#define HJ_FILL_INNER_TUPLES 5 +#define HJ_NEED_NEW_BATCH 6 + +/* Returns true if doing null-fill on outer relation */ +#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL) +/* Returns true if doing null-fill on inner relation */ +#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL) + extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags); extern void ExecEndHashJoin(HashJoinState *node); extern void ExecReScanHashJoin(HashJoinState *node); @@ -31,4 +48,7 @@ extern void ExecHashJoinInitializeWorker(HashJoinState *state, extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr); +extern void ExecProgramBuildForHashJoin(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + + #endif /* NODEHASHJOIN_H */ diff --git a/src/include/executor/nodeIndexonlyscan.h b/src/include/executor/nodeIndexonlyscan.h index 8f6c5a8d09..7192000dfa 100644 --- a/src/include/executor/nodeIndexonlyscan.h +++ b/src/include/executor/nodeIndexonlyscan.h @@ -15,6 +15,7 @@ #define NODEINDEXONLYSCAN_H #include "nodes/execnodes.h" +#include "executor/execProgram.h" #include "access/parallel.h" extern IndexOnlyScanState *ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags); @@ -33,4 +34,9 @@ extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, ParallelWorkerContext *pwcxt); + +extern bool indexonly_getnext(IndexOnlyScanState *node, TupleTableSlot *slot, ScanDirection direction); + +extern void ExecProgramBuildForIndexOnlyScan(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + #endif /* NODEINDEXONLYSCAN_H */ diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h index 822a9c9fad..82c899f3b8 100644 --- a/src/include/executor/nodeIndexscan.h +++ b/src/include/executor/nodeIndexscan.h @@ -15,6 +15,7 @@ #define NODEINDEXSCAN_H #include "access/parallel.h" +#include "executor/execProgram.h" #include "nodes/execnodes.h" extern IndexScanState *ExecInitIndexScan(IndexScan *node, EState *estate, int eflags); @@ -43,4 +44,6 @@ extern bool ExecIndexEvalArrayKeys(ExprContext *econtext, IndexArrayKeyInfo *arrayKeys, int numArrayKeys); extern bool ExecIndexAdvanceArrayKeys(IndexArrayKeyInfo *arrayKeys, int numArrayKeys); +extern void ExecProgramBuildForIndexScan(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + #endif /* NODEINDEXSCAN_H */ diff --git a/src/include/executor/nodeLimit.h b/src/include/executor/nodeLimit.h index 160ae5b026..995f054621 100644 --- a/src/include/executor/nodeLimit.h +++ b/src/include/executor/nodeLimit.h @@ -14,10 +14,17 @@ #ifndef NODELIMIT_H #define NODELIMIT_H +#include "executor/execProgram.h" #include "nodes/execnodes.h" extern LimitState *ExecInitLimit(Limit *node, EState *estate, int eflags); extern void ExecEndLimit(LimitState *node); extern void ExecReScanLimit(LimitState *node); +extern void ExecComputeLimit(LimitState *node); +extern bool ExecCheckLimit(LimitState *node); +extern bool ExecUpdateLimit(LimitState *node); + +extern void ExecProgramBuildForLimit(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + #endif /* NODELIMIT_H */ diff --git a/src/include/executor/nodeNestloop.h b/src/include/executor/nodeNestloop.h index 06b90d150e..4d6a0f7f3a 100644 --- a/src/include/executor/nodeNestloop.h +++ b/src/include/executor/nodeNestloop.h @@ -15,9 +15,12 @@ #define NODENESTLOOP_H #include "nodes/execnodes.h" +#include "executor/execProgram.h" extern NestLoopState *ExecInitNestLoop(NestLoop *node, EState *estate, int eflags); extern void ExecEndNestLoop(NestLoopState *node); extern void ExecReScanNestLoop(NestLoopState *node); +extern void ExecProgramBuildForNestloop(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + #endif /* NODENESTLOOP_H */ diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index 020b40c6b7..0a7fcb01f0 100644 --- a/src/include/executor/nodeSeqscan.h +++ b/src/include/executor/nodeSeqscan.h @@ -15,6 +15,7 @@ #define NODESEQSCAN_H #include "access/parallel.h" +#include "executor/execProgram.h" #include "nodes/execnodes.h" extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags); @@ -28,4 +29,6 @@ extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt extern void ExecSeqScanInitializeWorker(SeqScanState *node, ParallelWorkerContext *pwcxt); +extern void ExecProgramBuildForSeqScan(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + #endif /* NODESEQSCAN_H */ diff --git a/src/include/executor/nodeSort.h b/src/include/executor/nodeSort.h index 22f69ee1ea..1a60d74607 100644 --- a/src/include/executor/nodeSort.h +++ b/src/include/executor/nodeSort.h @@ -15,6 +15,7 @@ #define NODESORT_H #include "access/parallel.h" +#include "executor/execProgram.h" #include "nodes/execnodes.h" extern SortState *ExecInitSort(Sort *node, EState *estate, int eflags); @@ -29,4 +30,6 @@ extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt); extern void ExecSortRetrieveInstrumentation(SortState *node); +extern void ExecProgramBuildForSort(ExecProgramBuild *b, PlanState *node, int eflags, int jumpfail, EmitForPlanNodeData *d); + #endif /* NODESORT_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index da7f52cab0..e13a5be2d6 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1938,6 +1938,8 @@ typedef struct AggState AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ ProjectionInfo *combinedproj; /* projection machinery */ + + bool at_boundary; } AggState; /* ---------------- |