summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorRobert Haas2017-03-23 17:05:48 +0000
committerRobert Haas2017-03-23 17:14:36 +0000
commit691b8d59281b5177f16fe80858df921f77a8e955 (patch)
tree8708ae434bf120e73f93e3ef43779d761438ba79 /src/backend
parent218f51584d5a9fcdf702bcc7f54b5b65e255c187 (diff)
Allow for parallel execution whenever ExecutorRun() is done only once.
Previously, it was unsafe to execute a plan in parallel if ExecutorRun() might be called with a non-zero row count. However, it's quite easy to fix things up so that we can support that case, provided that it is known that we will never call ExecutorRun() a second time for the same QueryDesc. Add infrastructure to signal this, and cross-checks to make sure that a caller who claims this is true doesn't later reneg. While that pattern never happens with queries received directly from a client -- there's no way to know whether multiple Execute messages will be sent unless the first one requests all the rows -- it's pretty common for queries originating from procedural languages, which often limit the result to a single tuple or to a user-specified number of tuples. This commit doesn't actually enable parallelism in any additional cases, because currently none of the places that would be able to benefit from this infrastructure pass CURSOR_OPT_PARALLEL_OK in the first place, but it makes it much more palatable to pass CURSOR_OPT_PARALLEL_OK in places where we currently don't, because it eliminates some cases where we'd end up having to run the parallel plan serially. Patch by me, based on some ideas from Rafia Sabih and corrected by Rafia Sabih based on feedback from Dilip Kumar and myself. Discussion: http://postgr.es/m/CA+TgmobXEhvHbJtWDuPZM9bVSLiTj-kShxQJ2uM5GPDze9fRYA@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/commands/copy.c2
-rw-r--r--src/backend/commands/createas.c2
-rw-r--r--src/backend/commands/explain.c2
-rw-r--r--src/backend/commands/extension.c2
-rw-r--r--src/backend/commands/matview.c2
-rw-r--r--src/backend/commands/portalcmds.c2
-rw-r--r--src/backend/commands/prepare.c2
-rw-r--r--src/backend/executor/execMain.c38
-rw-r--r--src/backend/executor/execParallel.c2
-rw-r--r--src/backend/executor/functions.c2
-rw-r--r--src/backend/executor/spi.c2
-rw-r--r--src/backend/tcop/postgres.c2
-rw-r--r--src/backend/tcop/pquery.c20
13 files changed, 54 insertions, 26 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index b0fd09f458a..ab59be84552 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2074,7 +2074,7 @@ CopyTo(CopyState cstate)
else
{
/* run the plan --- the dest receiver will send tuples */
- ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 646a88409f2..3daffc894a1 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -347,7 +347,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
ExecutorStart(queryDesc, GetIntoRelEFlags(into));
/* run the plan to completion */
- ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
/* save the rowcount if we're given a completionTag to fill */
if (completionTag)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index c9b55ead3dc..b4c7466666b 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -530,7 +530,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
dir = ForwardScanDirection;
/* run the plan */
- ExecutorRun(queryDesc, dir, 0L);
+ ExecutorRun(queryDesc, dir, 0L, true);
/* run cleanup too */
ExecutorFinish(queryDesc);
diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c
index 86a84ee2346..5a84bedf467 100644
--- a/src/backend/commands/extension.c
+++ b/src/backend/commands/extension.c
@@ -742,7 +742,7 @@ execute_sql_string(const char *sql, const char *filename)
dest, NULL, 0);
ExecutorStart(qdesc, 0);
- ExecutorRun(qdesc, ForwardScanDirection, 0);
+ ExecutorRun(qdesc, ForwardScanDirection, 0, true);
ExecutorFinish(qdesc);
ExecutorEnd(qdesc);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 8df3d1d81dd..9d41ad8fad2 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -424,7 +424,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
/* run the plan */
- ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
processed = queryDesc->estate->es_processed;
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 29d0430dd87..f57cf87e8c3 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -395,7 +395,7 @@ PersistHoldablePortal(Portal portal)
true);
/* Fetch the result set into the tuplestore */
- ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
(*queryDesc->dest->rDestroy) (queryDesc->dest);
queryDesc->dest = NULL;
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 1cf0d2b971a..992ba1c9a2e 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -301,7 +301,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
*/
PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
- (void) PortalRun(portal, count, false, dest, dest, completionTag);
+ (void) PortalRun(portal, count, false, true, dest, dest, completionTag);
PortalDrop(portal, false);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 023ea0081a0..c28cf9c8eab 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -85,7 +85,8 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
bool sendTuples,
uint64 numberTuples,
ScanDirection direction,
- DestReceiver *dest);
+ DestReceiver *dest,
+ bool execute_once);
static bool ExecCheckRTEPerms(RangeTblEntry *rte);
static bool ExecCheckRTEPermsModified(Oid relOid, Oid userid,
Bitmapset *modifiedCols,
@@ -288,17 +289,18 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
*/
void
ExecutorRun(QueryDesc *queryDesc,
- ScanDirection direction, uint64 count)
+ ScanDirection direction, uint64 count,
+ bool execute_once)
{
if (ExecutorRun_hook)
- (*ExecutorRun_hook) (queryDesc, direction, count);
+ (*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
else
- standard_ExecutorRun(queryDesc, direction, count);
+ standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
void
standard_ExecutorRun(QueryDesc *queryDesc,
- ScanDirection direction, uint64 count)
+ ScanDirection direction, uint64 count, bool execute_once)
{
EState *estate;
CmdType operation;
@@ -345,6 +347,11 @@ standard_ExecutorRun(QueryDesc *queryDesc,
* run plan
*/
if (!ScanDirectionIsNoMovement(direction))
+ {
+ if (execute_once && queryDesc->already_executed)
+ elog(ERROR, "can't re-execute query flagged for single execution");
+ queryDesc->already_executed = true;
+
ExecutePlan(estate,
queryDesc->planstate,
queryDesc->plannedstmt->parallelModeNeeded,
@@ -352,7 +359,9 @@ standard_ExecutorRun(QueryDesc *queryDesc,
sendTuples,
count,
direction,
- dest);
+ dest,
+ execute_once);
+ }
/*
* shutdown tuple receiver, if we started it
@@ -1595,7 +1604,8 @@ ExecutePlan(EState *estate,
bool sendTuples,
uint64 numberTuples,
ScanDirection direction,
- DestReceiver *dest)
+ DestReceiver *dest,
+ bool execute_once)
{
TupleTableSlot *slot;
uint64 current_tuple_count;
@@ -1611,12 +1621,12 @@ ExecutePlan(EState *estate,
estate->es_direction = direction;
/*
- * If a tuple count was supplied, we must force the plan to run without
- * parallelism, because we might exit early. Also disable parallelism
- * when writing into a relation, because no database changes are allowed
- * in parallel mode.
+ * If the plan might potentially be executed multiple times, we must force
+ * it to run without parallelism, because we might exit early. Also
+ * disable parallelism when writing into a relation, because no database
+ * changes are allowed in parallel mode.
*/
- if (numberTuples || dest->mydest == DestIntoRel)
+ if (!execute_once || dest->mydest == DestIntoRel)
use_parallel_mode = false;
if (use_parallel_mode)
@@ -1687,7 +1697,11 @@ ExecutePlan(EState *estate,
*/
current_tuple_count++;
if (numberTuples && numberTuples == current_tuple_count)
+ {
+ /* Allow nodes to release or shut down resources. */
+ (void) ExecShutdownNode(planstate);
break;
+ }
}
if (use_parallel_mode)
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 86db73be431..b91b663c46f 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -853,7 +853,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecParallelInitializeWorker(queryDesc->planstate, toc);
/* Run the plan */
- ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
/* Shut down the executor */
ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 2d49a656502..12214f8a150 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -855,7 +855,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
/* Run regular commands to completion unless lazyEval */
uint64 count = (es->lazyEval) ? 1 : 0;
- ExecutorRun(es->qd, ForwardScanDirection, count);
+ ExecutorRun(es->qd, ForwardScanDirection, count, !fcache->returnsSet || !es->lazyEval);
/*
* If we requested run to completion OR there was no tuple returned,
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 55f97b14e6e..72c7b4d0689 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -2305,7 +2305,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
ExecutorStart(queryDesc, eflags);
- ExecutorRun(queryDesc, ForwardScanDirection, tcount);
+ ExecutorRun(queryDesc, ForwardScanDirection, tcount, true);
_SPI_current->processed = queryDesc->estate->es_processed;
_SPI_current->lastoid = queryDesc->estate->es_lastoid;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ba41f907126..6258a14c390 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1101,6 +1101,7 @@ exec_simple_query(const char *query_string)
(void) PortalRun(portal,
FETCH_ALL,
isTopLevel,
+ true,
receiver,
receiver,
completionTag);
@@ -1985,6 +1986,7 @@ exec_execute_message(const char *portal_name, long max_rows)
completed = PortalRun(portal,
max_rows,
true, /* always top level */
+ !execute_is_fetch && max_rows == FETCH_ALL,
receiver,
receiver,
completionTag);
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 371d7350b7a..f538b7787c3 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -90,6 +90,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
qd->planstate = NULL;
qd->totaltime = NULL;
+ /* not yet executed */
+ qd->already_executed = false;
+
return qd;
}
@@ -152,7 +155,7 @@ ProcessQuery(PlannedStmt *plan,
/*
* Run the plan to completion.
*/
- ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
/*
* Build command completion status string, if caller wants one.
@@ -679,7 +682,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
* suspended due to exhaustion of the count parameter.
*/
bool
-PortalRun(Portal portal, long count, bool isTopLevel,
+PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
DestReceiver *dest, DestReceiver *altdest,
char *completionTag)
{
@@ -712,6 +715,10 @@ PortalRun(Portal portal, long count, bool isTopLevel,
*/
MarkPortalActive(portal);
+ /* Set run_once flag. Shouldn't be clear if previously set. */
+ Assert(!portal->run_once || run_once);
+ portal->run_once = run_once;
+
/*
* Set up global portal context pointers.
*
@@ -918,7 +925,8 @@ PortalRunSelect(Portal portal,
else
{
PushActiveSnapshot(queryDesc->snapshot);
- ExecutorRun(queryDesc, direction, (uint64) count);
+ ExecutorRun(queryDesc, direction, (uint64) count,
+ portal->run_once);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
@@ -957,7 +965,8 @@ PortalRunSelect(Portal portal,
else
{
PushActiveSnapshot(queryDesc->snapshot);
- ExecutorRun(queryDesc, direction, (uint64) count);
+ ExecutorRun(queryDesc, direction, (uint64) count,
+ portal->run_once);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
@@ -1394,6 +1403,9 @@ PortalRunFetch(Portal portal,
*/
MarkPortalActive(portal);
+ /* If supporting FETCH, portal can't be run-once. */
+ Assert(!portal->run_once);
+
/*
* Set up global portal context pointers.
*/