Enable parallel SELECT for "INSERT INTO ... SELECT ...".
authorAmit Kapila <akapila@postgresql.org>
Wed, 10 Mar 2021 02:08:58 +0000 (07:38 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 10 Mar 2021 02:08:58 +0000 (07:38 +0530)
Parallel SELECT can't be utilized for INSERT in the following cases:
- INSERT statement uses the ON CONFLICT DO UPDATE clause
- Target table has a parallel-unsafe: trigger, index expression or
  predicate, column default expression or check constraint
- Target table has a parallel-unsafe domain constraint on any column
- Target table is a partitioned table with a parallel-unsafe partition key
  expression or support function

The planner is updated to perform additional parallel-safety checks for
the cases listed above, for determining whether it is safe to run INSERT
in parallel-mode with an underlying parallel SELECT. The planner will
consider using parallel SELECT for "INSERT INTO ... SELECT ...", provided
nothing unsafe is found from the additional parallel-safety checks, or
from the existing parallel-safety checks for SELECT.

While checking parallel-safety, we need to check it for all the partitions
on the table which can be costly especially when we decide not to use a
parallel plan. So, in a separate patch, we will introduce a GUC and or a
reloption to enable/disable parallelism for Insert statements.

Prior to entering parallel-mode for the execution of INSERT with parallel
SELECT, a TransactionId is acquired and assigned to the current
transaction state. This is necessary to prevent the INSERT from attempting
to assign the TransactionId whilst in parallel-mode, which is not allowed.
This approach has a disadvantage in that if the underlying SELECT does not
return any rows, then the TransactionId is not used, however that
shouldn't happen in practice in many cases.

Author: Greg Nancarrow, Amit Langote, Amit Kapila
Reviewed-by: Amit Langote, Hou Zhijie, Takayuki Tsunakawa, Antonin Houska, Bharath Rupireddy, Dilip Kumar, Vignesh C, Zhihong Yu, Amit Kapila
Tested-by: Tang, Haiying
Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com
Discussion: https://postgr.es/m/CAJcOf-fAdj=nDKMsRhQzndm-O13NY4dL6xGcEvdX5Xvbbi0V7g@mail.gmail.com

17 files changed:
doc/src/sgml/parallel.sgml
src/backend/access/transam/xact.c
src/backend/executor/execMain.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/util/clauses.c
src/backend/utils/cache/plancache.c
src/include/access/xact.h
src/include/nodes/pathnodes.h
src/include/nodes/plannodes.h
src/include/optimizer/clauses.h
src/test/regress/expected/insert_parallel.out [new file with mode: 0644]
src/test/regress/parallel_schedule
src/test/regress/serial_schedule
src/test/regress/sql/insert_parallel.sql [new file with mode: 0644]

index c81abff48d3736e0fe62d206a14c5396edb1d478..cec1329e259237ce02a9cc36f2d8caff3fc52556 100644 (file)
@@ -146,7 +146,9 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
         a CTE, no parallel plans for that query will be generated.  As an
         exception, the commands <literal>CREATE TABLE ... AS</literal>, <literal>SELECT
         INTO</literal>, and <literal>CREATE MATERIALIZED VIEW</literal> which create a new
-        table and populate it can use a parallel plan.
+        table and populate it can use a parallel plan. Another exception is the command
+        <literal>INSERT INTO ... SELECT ...</literal> which can use a parallel plan for
+        the underlying <literal>SELECT</literal> part of the query.
       </para>
     </listitem>
 
index c83aa16f2ce74b293f67ad318dfb78ea47b4fc15..6395a9b2408edfd7e68da96c5318fa8649e69af1 100644 (file)
@@ -1014,6 +1014,32 @@ IsInParallelMode(void)
        return CurrentTransactionState->parallelModeLevel != 0;
 }
 
+/*
+ *     PrepareParallelModePlanExec
+ *
+ * Prepare for entering parallel mode plan execution, based on command-type.
+ */
+void
+PrepareParallelModePlanExec(CmdType commandType)
+{
+       if (IsModifySupportedInParallelMode(commandType))
+       {
+               Assert(!IsInParallelMode());
+
+               /*
+                * Prepare for entering parallel mode by assigning a TransactionId.
+                * Failure to do this now would result in heap_insert() subsequently
+                * attempting to assign a TransactionId whilst in parallel-mode, which
+                * is not allowed.
+                *
+                * This approach has a disadvantage in that if the underlying SELECT
+                * does not return any rows, then the TransactionId is not used,
+                * however that shouldn't happen in practice in many cases.
+                */
+               (void) GetCurrentTransactionId();
+       }
+}
+
 /*
  *     CommandCounterIncrement
  */
index c74ce36ffbab6a0807621c53ac3b23c53de95c06..0648dd82ba0cf82bc54694e8917403445b099392 100644 (file)
@@ -1512,7 +1512,10 @@ ExecutePlan(EState *estate,
 
        estate->es_use_parallel_mode = use_parallel_mode;
        if (use_parallel_mode)
+       {
+               PrepareParallelModePlanExec(estate->es_plannedstmt->commandType);
                EnterParallelMode();
+       }
 
        /*
         * Loop until we've processed the proper number of tuples from the plan.
index aaba1ec2c4a9947aa5547882d4af8c68bbde848a..da91cbd2b1e582af5a521d6d0ad435ad8de4c84b 100644 (file)
@@ -96,6 +96,7 @@ _copyPlannedStmt(const PlannedStmt *from)
        COPY_BITMAPSET_FIELD(rewindPlanIDs);
        COPY_NODE_FIELD(rowMarks);
        COPY_NODE_FIELD(relationOids);
+       COPY_NODE_FIELD(partitionOids);
        COPY_NODE_FIELD(invalItems);
        COPY_NODE_FIELD(paramExecTypes);
        COPY_NODE_FIELD(utilityStmt);
index 8fc432bfe17b8ed31f9a8007bf83c77fd4ba83c0..6493a03ff80c599f435dea6971e4e5b0088ad074 100644 (file)
@@ -314,6 +314,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node)
        WRITE_BITMAPSET_FIELD(rewindPlanIDs);
        WRITE_NODE_FIELD(rowMarks);
        WRITE_NODE_FIELD(relationOids);
+       WRITE_NODE_FIELD(partitionOids);
        WRITE_NODE_FIELD(invalItems);
        WRITE_NODE_FIELD(paramExecTypes);
        WRITE_NODE_FIELD(utilityStmt);
@@ -2221,6 +2222,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node)
        WRITE_NODE_FIELD(resultRelations);
        WRITE_NODE_FIELD(appendRelations);
        WRITE_NODE_FIELD(relationOids);
+       WRITE_NODE_FIELD(partitionOids);
        WRITE_NODE_FIELD(invalItems);
        WRITE_NODE_FIELD(paramExecTypes);
        WRITE_UINT_FIELD(lastPHId);
index 718fb58e865a327c19bdb206572c45f051e409c2..c5e136e9c3c1805ae679bb4178f717939e302518 100644 (file)
@@ -1590,6 +1590,7 @@ _readPlannedStmt(void)
        READ_BITMAPSET_FIELD(rewindPlanIDs);
        READ_NODE_FIELD(rowMarks);
        READ_NODE_FIELD(relationOids);
+       READ_NODE_FIELD(partitionOids);
        READ_NODE_FIELD(invalItems);
        READ_NODE_FIELD(paramExecTypes);
        READ_NODE_FIELD(utilityStmt);
index 545b56bcafd53f3654abd17d099a45db3cc8abdf..424d25cbd514ce2f07b165c314c19cf8c934b5d4 100644 (file)
@@ -305,6 +305,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
        glob->resultRelations = NIL;
        glob->appendRelations = NIL;
        glob->relationOids = NIL;
+       glob->partitionOids = NIL;
        glob->invalItems = NIL;
        glob->paramExecTypes = NIL;
        glob->lastPHId = 0;
@@ -316,16 +317,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
        /*
         * Assess whether it's feasible to use parallel mode for this query. We
         * can't do this in a standalone backend, or if the command will try to
-        * modify any data, or if this is a cursor operation, or if GUCs are set
-        * to values that don't permit parallelism, or if parallel-unsafe
-        * functions are present in the query tree.
+        * modify any data (except for Insert), or if this is a cursor operation,
+        * or if GUCs are set to values that don't permit parallelism, or if
+        * parallel-unsafe functions are present in the query tree.
         *
-        * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE
-        * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader
-        * backend writes into a completely new table.  In the future, we can
-        * extend it to allow workers to write into the table.  However, to allow
-        * parallel updates and deletes, we have to solve other problems,
-        * especially around combo CIDs.)
+        * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT
+        * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as
+        * of now, only the leader backend writes into a completely new table. In
+        * the future, we can extend it to allow workers to write into the table.
+        * However, to allow parallel updates and deletes, we have to solve other
+        * problems, especially around combo CIDs.)
         *
         * For now, we don't try to use parallel mode if we're running inside a
         * parallel worker.  We might eventually be able to relax this
@@ -334,13 +335,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
         */
        if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
                IsUnderPostmaster &&
-               parse->commandType == CMD_SELECT &&
+               (parse->commandType == CMD_SELECT ||
+                is_parallel_allowed_for_modify(parse)) &&
                !parse->hasModifyingCTE &&
                max_parallel_workers_per_gather > 0 &&
                !IsParallelWorker())
        {
                /* all the cheap tests pass, so scan the query tree */
-               glob->maxParallelHazard = max_parallel_hazard(parse);
+               glob->maxParallelHazard = max_parallel_hazard(parse, glob);
                glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE);
        }
        else
@@ -521,6 +523,19 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
        result->rewindPlanIDs = glob->rewindPlanIDs;
        result->rowMarks = glob->finalrowmarks;
        result->relationOids = glob->relationOids;
+
+       /*
+        * Register the Oids of parallel-safety-checked partitions as plan
+        * dependencies. This is only really needed in the case of a parallel plan
+        * so that if parallel-unsafe properties are subsequently defined on the
+        * partitions, the cached parallel plan will be invalidated, and a
+        * non-parallel plan will be generated.
+        *
+        * We also use this list to acquire locks on partitions before executing
+        * cached plan. See AcquireExecutorLocks().
+        */
+       if (glob->partitionOids != NIL && glob->parallelModeNeeded)
+               result->partitionOids = glob->partitionOids;
        result->invalItems = glob->invalItems;
        result->paramExecTypes = glob->paramExecTypes;
        /* utilityStmt should be null, but we might as well copy it */
index f3786dd2b63bc1ae5e4609ca872a63dbd82f4349..7ecdc783d579dcd86553da59b34e729798e086b3 100644 (file)
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/htup_details.h"
+#include "access/table.h"
+#include "access/xact.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_constraint.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/functions.h"
 #include "funcapi.h"
@@ -43,6 +48,8 @@
 #include "parser/parse_agg.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_func.h"
+#include "parser/parsetree.h"
+#include "partitioning/partdesc.h"
 #include "rewrite/rewriteManip.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
@@ -51,6 +58,8 @@
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
@@ -88,6 +97,9 @@ typedef struct
        char            max_hazard;             /* worst proparallel hazard found so far */
        char            max_interesting;        /* worst proparallel hazard of interest */
        List       *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
+       RangeTblEntry *target_rte;      /* query's target relation if any */
+       CmdType         command_type;   /* query's command type */
+       PlannerGlobal *planner_global;  /* global info for planner invocation */
 } max_parallel_hazard_context;
 
 static bool contain_agg_clause_walker(Node *node, void *context);
@@ -98,6 +110,20 @@ static bool contain_volatile_functions_walker(Node *node, void *context);
 static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
 static bool max_parallel_hazard_walker(Node *node,
                                                                           max_parallel_hazard_context *context);
+static bool target_rel_max_parallel_hazard(max_parallel_hazard_context *context);
+static bool target_rel_max_parallel_hazard_recurse(Relation relation,
+                                                                                                  CmdType command_type,
+                                                                                                  max_parallel_hazard_context *context);
+static bool target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc,
+                                                                                                  max_parallel_hazard_context *context);
+static bool target_rel_index_max_parallel_hazard(Relation rel,
+                                                                                                max_parallel_hazard_context *context);
+static bool target_rel_domain_max_parallel_hazard(Oid typid,
+                                                                                                 max_parallel_hazard_context *context);
+static bool target_rel_partitions_max_parallel_hazard(Relation rel,
+                                                                                                         max_parallel_hazard_context *context);
+static bool target_rel_chk_constr_max_parallel_hazard(Relation rel,
+                                                                                                         max_parallel_hazard_context *context);
 static bool contain_nonstrict_functions_walker(Node *node, void *context);
 static bool contain_exec_param_walker(Node *node, List *param_ids);
 static bool contain_context_dependent_node(Node *clause);
@@ -545,14 +571,19 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context)
  * later, in the common case where everything is SAFE.
  */
 char
-max_parallel_hazard(Query *parse)
+max_parallel_hazard(Query *parse, PlannerGlobal *glob)
 {
        max_parallel_hazard_context context;
 
        context.max_hazard = PROPARALLEL_SAFE;
        context.max_interesting = PROPARALLEL_UNSAFE;
        context.safe_param_ids = NIL;
+       context.target_rte = parse->resultRelation > 0 ?
+               rt_fetch(parse->resultRelation, parse->rtable) : NULL;
+       context.command_type = parse->commandType;
+       context.planner_global = glob;
        (void) max_parallel_hazard_walker((Node *) parse, &context);
+
        return context.max_hazard;
 }
 
@@ -583,6 +614,9 @@ is_parallel_safe(PlannerInfo *root, Node *node)
        context.max_hazard = PROPARALLEL_SAFE;
        context.max_interesting = PROPARALLEL_RESTRICTED;
        context.safe_param_ids = NIL;
+       context.command_type = node != NULL && IsA(node, Query) ?
+               castNode(Query, node)->commandType : CMD_UNKNOWN;
+       context.planner_global = root->glob;
 
        /*
         * The params that refer to the same or parent query level are considered
@@ -655,14 +689,20 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
         * opclass support functions are generally parallel-safe.  XmlExpr is a
         * bit more dubious but we can probably get away with it.  We err on the
         * side of caution by treating CoerceToDomain as parallel-restricted.
-        * (Note: in principle that's wrong because a domain constraint could
-        * contain a parallel-unsafe function; but useful constraints probably
-        * never would have such, and assuming they do would cripple use of
-        * parallel query in the presence of domain types.)  SQLValueFunction
-        * should be safe in all cases.  NextValueExpr is parallel-unsafe.
+        * However, for table modification statements, we check the parallel
+        * safety of domain constraints as that could contain a parallel-unsafe
+        * function, and executing that in parallel mode will lead to error.
+        * SQLValueFunction should be safe in all cases.  NextValueExpr is
+        * parallel-unsafe.
         */
        if (IsA(node, CoerceToDomain))
        {
+               if (context->target_rte != NULL)
+               {
+                       if (target_rel_domain_max_parallel_hazard(((CoerceToDomain *) node)->resulttype, context))
+                               return true;
+               }
+
                if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
                        return true;
        }
@@ -687,6 +727,27 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
                        return true;
        }
 
+       /*
+        * ModifyingCTE expressions are treated as parallel-unsafe.
+        *
+        * XXX Normally, if the Query has a modifying CTE, the hasModifyingCTE
+        * flag is set in the Query tree, and the query will be regarded as
+        * parallel-usafe. However, in some cases, a re-written query with a
+        * modifying CTE does not have that flag set, due to a bug in the query
+        * rewriter.
+        */
+       else if (IsA(node, CommonTableExpr))
+       {
+               CommonTableExpr *cte = (CommonTableExpr *) node;
+               Query      *ctequery = castNode(Query, cte->ctequery);
+
+               if (ctequery->commandType != CMD_SELECT)
+               {
+                       context->max_hazard = PROPARALLEL_UNSAFE;
+                       return true;
+               }
+       }
+
        /*
         * As a notational convenience for callers, look through RestrictInfo.
         */
@@ -757,6 +818,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
                }
                return false;                   /* nothing to recurse to */
        }
+       else if (IsA(node, RangeTblEntry))
+       {
+               RangeTblEntry *rte = (RangeTblEntry *) node;
+
+               /* Nothing interesting to check for SELECTs */
+               if (context->target_rte == NULL)
+                       return false;
+
+               if (rte == context->target_rte)
+                       return target_rel_max_parallel_hazard(context);
+
+               return false;
+       }
 
        /*
         * When we're first invoked on a completely unplanned tree, we must
@@ -777,7 +851,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
                /* Recurse into subselects */
                return query_tree_walker(query,
                                                                 max_parallel_hazard_walker,
-                                                                context, 0);
+                                                                context,
+                                                                context->target_rte != NULL ?
+                                                                QTW_EXAMINE_RTES_BEFORE : 0);
        }
 
        /* Recurse to check arguments */
@@ -786,6 +862,466 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
                                                                  context);
 }
 
+/*
+ * target_rel_max_parallel_hazard
+ *
+ * Determines the maximum parallel-mode hazard level for modification
+ * of a specified relation.
+ */
+static bool
+target_rel_max_parallel_hazard(max_parallel_hazard_context *context)
+{
+       bool            max_hazard_found;
+
+       Relation        targetRel;
+
+       /*
+        * The target table is already locked by the caller (this is done in the
+        * parse/analyze phase), and remains locked until end-of-transaction.
+        */
+       targetRel = table_open(context->target_rte->relid, NoLock);
+       max_hazard_found = target_rel_max_parallel_hazard_recurse(targetRel,
+                                                                                                                         context->command_type,
+                                                                                                                         context);
+       table_close(targetRel, NoLock);
+
+       return max_hazard_found;
+}
+
+static bool
+target_rel_max_parallel_hazard_recurse(Relation rel,
+                                                                          CmdType command_type,
+                                                                          max_parallel_hazard_context *context)
+{
+       /* Currently only CMD_INSERT is supported */
+       Assert(command_type == CMD_INSERT);
+
+       /*
+        * We can't support table modification in a parallel worker if it's a
+        * foreign table/partition (no FDW API for supporting parallel access) or
+        * a temporary table.
+        */
+       if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+               RelationUsesLocalBuffers(rel))
+       {
+               if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
+                       return true;
+       }
+
+       /*
+        * If a partitioned table, check that each partition is safe for
+        * modification in parallel-mode.
+        */
+       if (target_rel_partitions_max_parallel_hazard(rel, context))
+               return true;
+
+       /*
+        * If there are any index expressions or index predicate, check that they
+        * are parallel-mode safe.
+        */
+       if (target_rel_index_max_parallel_hazard(rel, context))
+               return true;
+
+       /*
+        * If any triggers exist, check that they are parallel-safe.
+        */
+       if (target_rel_trigger_max_parallel_hazard(rel->trigdesc, context))
+               return true;
+
+       /*
+        * Column default expressions are only applicable to INSERT and UPDATE.
+        * For columns in the target-list, these are already being checked for
+        * parallel-safety in the max_parallel_hazard() scan of the query tree in
+        * standard_planner(), so there's no need to do it here. Note that even
+        * though column defaults may be specified separately for each partition
+        * in a partitioned table, a partition's default value is not applied when
+        * inserting a tuple through a partitioned table.
+        */
+
+       /*
+        * CHECK constraints are only applicable to INSERT and UPDATE. If any
+        * CHECK constraints exist, determine if they are parallel-safe.
+        */
+       if (target_rel_chk_constr_max_parallel_hazard(rel, context))
+               return true;
+
+       return false;
+}
+
+/*
+ * target_rel_trigger_max_parallel_hazard
+ *
+ * Finds the maximum parallel-mode hazard level for the specified trigger data.
+ */
+static bool
+target_rel_trigger_max_parallel_hazard(TriggerDesc *trigdesc,
+                                                                          max_parallel_hazard_context *context)
+{
+       int                     i;
+
+       if (trigdesc == NULL)
+               return false;
+
+       for (i = 0; i < trigdesc->numtriggers; i++)
+       {
+               int                     trigtype;
+               Trigger    *trigger = &trigdesc->triggers[i];
+
+               if (max_parallel_hazard_test(func_parallel(trigger->tgfoid), context))
+                       return true;
+
+               /*
+                * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in
+                * the relation, and this would result in creation of new CommandIds
+                * on insert/update and this isn't supported in a parallel worker (but
+                * is safe in the parallel leader).
+                */
+               trigtype = RI_FKey_trigger_type(trigger->tgfoid);
+               if (trigtype == RI_TRIGGER_FK)
+               {
+                       if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
+                               return true;
+               }
+       }
+
+       return false;
+}
+
+/*
+ * target_rel_index_max_parallel_hazard
+ *
+ * Finds the maximum parallel-mode hazard level for any existing index
+ * expressions or index predicate of a specified relation.
+ */
+static bool
+target_rel_index_max_parallel_hazard(Relation rel,
+                                                                        max_parallel_hazard_context *context)
+{
+       List       *index_oid_list;
+       ListCell   *lc;
+       bool            found_max_hazard = false;
+       LOCKMODE        lockmode = AccessShareLock;
+
+       index_oid_list = RelationGetIndexList(rel);
+       foreach(lc, index_oid_list)
+       {
+               Relation        index_rel;
+               Form_pg_index indexStruct;
+               List       *ii_Expressions;
+               List       *ii_Predicate;
+               Oid                     index_oid = lfirst_oid(lc);
+
+               index_rel = index_open(index_oid, lockmode);
+
+               indexStruct = index_rel->rd_index;
+               ii_Expressions = RelationGetIndexExpressions(index_rel);
+
+               if (ii_Expressions != NIL)
+               {
+                       int                     i;
+                       ListCell   *index_expr_item = list_head(ii_Expressions);
+
+                       for (i = 0; i < indexStruct->indnatts; i++)
+                       {
+                               int                     keycol = indexStruct->indkey.values[i];
+
+                               if (keycol == 0)
+                               {
+                                       /* Found an index expression */
+
+                                       Node       *index_expr;
+
+                                       Assert(index_expr_item != NULL);
+                                       if (index_expr_item == NULL)    /* shouldn't happen */
+                                       {
+                                               elog(WARNING, "too few entries in indexprs list");
+                                               context->max_hazard = PROPARALLEL_UNSAFE;
+                                               found_max_hazard = true;
+                                               break;
+                                       }
+
+                                       index_expr = (Node *) lfirst(index_expr_item);
+
+                                       if (max_parallel_hazard_walker(index_expr, context))
+                                       {
+                                               found_max_hazard = true;
+                                               break;
+                                       }
+
+                                       index_expr_item = lnext(ii_Expressions, index_expr_item);
+                               }
+                       }
+               }
+
+               if (!found_max_hazard)
+               {
+                       ii_Predicate = RelationGetIndexPredicate(index_rel);
+                       if (ii_Predicate != NIL)
+                       {
+                               if (max_parallel_hazard_walker((Node *) ii_Predicate, context))
+                                       found_max_hazard = true;
+                       }
+               }
+
+               /*
+                * XXX We don't need to retain lock on index as index expressions
+                * can't be changed later.
+                */
+               index_close(index_rel, lockmode);
+       }
+       list_free(index_oid_list);
+
+       return found_max_hazard;
+}
+
+/*
+ * target_rel_domain_max_parallel_hazard
+ *
+ * Finds the maximum parallel-mode hazard level for the specified DOMAIN type.
+ * Only any CHECK expressions are examined for parallel-safety.
+ */
+static bool
+target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context)
+{
+       Relation        con_rel;
+       ScanKeyData key[1];
+       SysScanDesc scan;
+       HeapTuple       tup;
+       bool            found_max_hazard = false;
+
+       LOCKMODE        lockmode = AccessShareLock;
+
+       con_rel = table_open(ConstraintRelationId, lockmode);
+
+       ScanKeyInit(&key[0],
+                               Anum_pg_constraint_contypid, BTEqualStrategyNumber,
+                               F_OIDEQ, ObjectIdGetDatum(typid));
+       scan = systable_beginscan(con_rel, ConstraintTypidIndexId, true,
+                                                         NULL, 1, key);
+
+       while (HeapTupleIsValid((tup = systable_getnext(scan))))
+       {
+               Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
+
+               if (con->contype == CONSTRAINT_CHECK)
+               {
+                       char       *conbin;
+                       Datum           val;
+                       bool            isnull;
+                       Expr       *check_expr;
+
+                       val = SysCacheGetAttr(CONSTROID, tup,
+                                                                 Anum_pg_constraint_conbin, &isnull);
+                       Assert(!isnull);
+                       if (isnull)
+                       {
+                               /*
+                                * This shouldn't ever happen, but if it does, log a WARNING
+                                * and return UNSAFE, rather than erroring out.
+                                */
+                               elog(WARNING, "null conbin for constraint %u", con->oid);
+                               context->max_hazard = PROPARALLEL_UNSAFE;
+                               found_max_hazard = true;
+                               break;
+                       }
+                       conbin = TextDatumGetCString(val);
+                       check_expr = stringToNode(conbin);
+                       pfree(conbin);
+                       if (max_parallel_hazard_walker((Node *) check_expr, context))
+                       {
+                               found_max_hazard = true;
+                               break;
+                       }
+               }
+       }
+
+       systable_endscan(scan);
+       table_close(con_rel, lockmode);
+       return found_max_hazard;
+}
+
+/*
+ * target_rel_partitions_max_parallel_hazard
+ *
+ * Finds the maximum parallel-mode hazard level for any partitions of a
+ * of a specified relation.
+ */
+static bool
+target_rel_partitions_max_parallel_hazard(Relation rel,
+                                                                                 max_parallel_hazard_context *context)
+{
+       int                     i;
+       PartitionDesc pdesc;
+       PartitionKey pkey;
+       ListCell   *partexprs_item;
+       int                     partnatts;
+       List       *partexprs;
+       PlannerGlobal *glob;
+
+
+       if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+               return false;
+
+       pkey = RelationGetPartitionKey(rel);
+
+       partnatts = get_partition_natts(pkey);
+       partexprs = get_partition_exprs(pkey);
+
+       partexprs_item = list_head(partexprs);
+       for (i = 0; i < partnatts; i++)
+       {
+               /* Check parallel-safety of partition key support functions */
+               if (OidIsValid(pkey->partsupfunc[i].fn_oid))
+               {
+                       if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context))
+                               return true;
+               }
+
+               /* Check parallel-safety of any expressions in the partition key */
+               if (get_partition_col_attnum(pkey, i) == 0)
+               {
+                       Node       *check_expr = (Node *) lfirst(partexprs_item);
+
+                       if (max_parallel_hazard_walker(check_expr, context))
+                               return true;
+
+                       partexprs_item = lnext(partexprs, partexprs_item);
+               }
+       }
+
+       /* Recursively check each partition ... */
+
+       /* Create the PartitionDirectory infrastructure if we didn't already */
+       glob = context->planner_global;
+       if (glob->partition_directory == NULL)
+               glob->partition_directory =
+                       CreatePartitionDirectory(CurrentMemoryContext);
+
+       pdesc = PartitionDirectoryLookup(glob->partition_directory, rel);
+
+       for (i = 0; i < pdesc->nparts; i++)
+       {
+               bool            max_hazard_found;
+               Relation        part_rel;
+
+               /*
+                * The partition needs to be locked, and remain locked until
+                * end-of-transaction to ensure its parallel-safety state is not
+                * hereafter altered.
+                */
+               part_rel = table_open(pdesc->oids[i], context->target_rte->rellockmode);
+               max_hazard_found = target_rel_max_parallel_hazard_recurse(part_rel,
+                                                                                                                                 context->command_type,
+                                                                                                                                 context);
+               table_close(part_rel, NoLock);
+
+               /*
+                * Remember partitionOids to record the partition as a potential plan
+                * dependency.
+                */
+               glob->partitionOids = lappend_oid(glob->partitionOids, pdesc->oids[i]);
+
+               if (max_hazard_found)
+                       return true;
+       }
+
+       return false;
+}
+
+/*
+ * target_rel_chk_constr_max_parallel_hazard
+ *
+ * Finds the maximum parallel-mode hazard level for any CHECK expressions or
+ * CHECK constraints related to the specified relation.
+ */
+static bool
+target_rel_chk_constr_max_parallel_hazard(Relation rel,
+                                                                                 max_parallel_hazard_context *context)
+{
+       TupleDesc       tupdesc;
+
+       tupdesc = RelationGetDescr(rel);
+
+       /*
+        * Determine if there are any CHECK constraints which are not
+        * parallel-safe.
+        */
+       if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
+       {
+               int                     i;
+
+               ConstrCheck *check = tupdesc->constr->check;
+
+               for (i = 0; i < tupdesc->constr->num_check; i++)
+               {
+                       Expr       *check_expr = stringToNode(check->ccbin);
+
+                       if (max_parallel_hazard_walker((Node *) check_expr, context))
+                               return true;
+               }
+       }
+
+       return false;
+}
+
+/*
+ * is_parallel_allowed_for_modify
+ *
+ * Check at a high-level if parallel mode is able to be used for the specified
+ * table-modification statement. Currently, we support only Inserts.
+ *
+ * It's not possible in the following cases:
+ *
+ *  1) INSERT...ON CONFLICT...DO UPDATE
+ *  2) INSERT without SELECT
+ *
+ * (Note: we don't do in-depth parallel-safety checks here, we do only the
+ * cheaper tests that can quickly exclude obvious cases for which
+ * parallelism isn't supported, to avoid having to do further parallel-safety
+ * checks for these)
+ */
+bool
+is_parallel_allowed_for_modify(Query *parse)
+{
+       bool            hasSubQuery;
+       RangeTblEntry *rte;
+       ListCell   *lc;
+
+       if (!IsModifySupportedInParallelMode(parse->commandType))
+               return false;
+
+       /*
+        * UPDATE is not currently supported in parallel-mode, so prohibit
+        * INSERT...ON CONFLICT...DO UPDATE...
+        *
+        * In order to support update, even if only in the leader, some further
+        * work would need to be done. A mechanism would be needed for sharing
+        * combo-cids between leader and workers during parallel-mode, since for
+        * example, the leader might generate a combo-cid and it needs to be
+        * propagated to the workers.
+        */
+       if (parse->commandType == CMD_INSERT &&
+               parse->onConflict != NULL &&
+               parse->onConflict->action == ONCONFLICT_UPDATE)
+               return false;
+
+       /*
+        * If there is no underlying SELECT, a parallel insert operation is not
+        * desirable.
+        */
+       hasSubQuery = false;
+       foreach(lc, parse->rtable)
+       {
+               rte = lfirst_node(RangeTblEntry, lc);
+               if (rte->rtekind == RTE_SUBQUERY)
+               {
+                       hasSubQuery = true;
+                       break;
+               }
+       }
+
+       return hasSubQuery;
+}
 
 /*****************************************************************************
  *             Check clauses for nonstrict functions
index 1a0950489d741dfc7eae3d3f6b5f37f773ac8be8..c1f4128445bd3c1e1ac16c20854f51a8635fd70d 100644 (file)
@@ -1735,6 +1735,23 @@ QueryListGetPrimaryStmt(List *stmts)
        return NULL;
 }
 
+static void
+AcquireExecutorLocksOnPartitions(List *partitionOids, int lockmode,
+                                                                bool acquire)
+{
+       ListCell   *lc;
+
+       foreach(lc, partitionOids)
+       {
+               Oid                     partOid = lfirst_oid(lc);
+
+               if (acquire)
+                       LockRelationOid(partOid, lockmode);
+               else
+                       UnlockRelationOid(partOid, lockmode);
+       }
+}
+
 /*
  * AcquireExecutorLocks: acquire locks needed for execution of a cached plan;
  * or release them if acquire is false.
@@ -1748,6 +1765,8 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
        {
                PlannedStmt *plannedstmt = lfirst_node(PlannedStmt, lc1);
                ListCell   *lc2;
+               Index           rti,
+                                       resultRelation = 0;
 
                if (plannedstmt->commandType == CMD_UTILITY)
                {
@@ -1765,6 +1784,9 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
                        continue;
                }
 
+               rti = 1;
+               if (plannedstmt->resultRelations)
+                       resultRelation = linitial_int(plannedstmt->resultRelations);
                foreach(lc2, plannedstmt->rtable)
                {
                        RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc2);
@@ -1782,6 +1804,14 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
                                LockRelationOid(rte->relid, rte->rellockmode);
                        else
                                UnlockRelationOid(rte->relid, rte->rellockmode);
+
+                       /* Lock partitions ahead of modifying them in parallel mode. */
+                       if (rti == resultRelation &&
+                               plannedstmt->partitionOids != NIL)
+                               AcquireExecutorLocksOnPartitions(plannedstmt->partitionOids,
+                                                                                                rte->rellockmode, acquire);
+
+                       rti++;
                }
        }
 }
@@ -1990,7 +2020,8 @@ PlanCacheRelCallback(Datum arg, Oid relid)
                                if (plannedstmt->commandType == CMD_UTILITY)
                                        continue;       /* Ignore utility statements */
                                if ((relid == InvalidOid) ? plannedstmt->relationOids != NIL :
-                                       list_member_oid(plannedstmt->relationOids, relid))
+                                       (list_member_oid(plannedstmt->relationOids, relid) ||
+                                        list_member_oid(plannedstmt->partitionOids, relid)))
                                {
                                        /* Invalidate the generic plan only */
                                        plansource->gplan->is_valid = false;
index f49a57b35e15d671cc2dfc9ee46ea5e8d47188b7..34cfaf542c6bc5960d7e43ffb2148dad38f1bfe2 100644 (file)
@@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void PrepareParallelModePlanExec(CmdType commandType);
+
+/*
+ * IsModifySupportedInParallelMode
+ *
+ * Indicates whether execution of the specified table-modification command
+ * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
+ * parallel-safety conditions.
+ */
+static inline bool
+IsModifySupportedInParallelMode(CmdType commandType)
+{
+       /* Currently only INSERT is supported */
+       return (commandType == CMD_INSERT);
+}
 
 #endif                                                 /* XACT_H */
index b8a6e0fc9f4e9caef8bc773678a48e7fbf50100a..86405a274ed75a55f41c24db1b9e414d6d0dce6a 100644 (file)
@@ -120,6 +120,8 @@ typedef struct PlannerGlobal
 
        List       *relationOids;       /* OIDs of relations the plan depends on */
 
+       List       *partitionOids;      /* OIDs of partitions the plan depends on */
+
        List       *invalItems;         /* other dependencies, as PlanInvalItems */
 
        List       *paramExecTypes; /* type OIDs for PARAM_EXEC Params */
index 6e62104d0b77b4f75278b33b8bebeaed7ae12001..95292d75735f3f032e8f303aac24f324288a3631 100644 (file)
@@ -79,6 +79,8 @@ typedef struct PlannedStmt
 
        List       *relationOids;       /* OIDs of relations the plan depends on */
 
+       List       *partitionOids;      /* OIDs of partitions the plan depends on */
+
        List       *invalItems;         /* other dependencies, as PlanInvalItems */
 
        List       *paramExecTypes; /* type OIDs for PARAM_EXEC Params */
index 0673887a852c054f6ddac1e55d2593a6975de20a..8d85b02514ca82bd076446bfe6385803e0e0a5db 100644 (file)
@@ -32,7 +32,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause);
 
 extern bool contain_subplans(Node *clause);
 
-extern char max_parallel_hazard(Query *parse);
+extern char max_parallel_hazard(Query *parse, PlannerGlobal *glob);
 extern bool is_parallel_safe(PlannerInfo *root, Node *node);
 extern bool contain_nonstrict_functions(Node *clause);
 extern bool contain_exec_param(Node *clause, List *param_ids);
@@ -52,5 +52,6 @@ extern void CommuteOpExpr(OpExpr *clause);
 
 extern Query *inline_set_returning_function(PlannerInfo *root,
                                                                                        RangeTblEntry *rte);
+extern bool is_parallel_allowed_for_modify(Query *parse);
 
 #endif                                                 /* CLAUSES_H */
diff --git a/src/test/regress/expected/insert_parallel.out b/src/test/regress/expected/insert_parallel.out
new file mode 100644 (file)
index 0000000..d5fae79
--- /dev/null
@@ -0,0 +1,536 @@
+--
+-- PARALLEL
+--
+--
+-- START: setup some tables and data needed by the tests.
+--
+-- Setup - index expressions test
+-- For testing purposes, we'll mark this function as parallel-unsafe
+create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel unsafe;
+create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel restricted;
+create table names(index int, first_name text, last_name text);
+create table names2(index int, first_name text, last_name text);
+create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
+create table names4(index int, first_name text, last_name text);
+create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
+insert into names values
+       (1, 'albert', 'einstein'),
+       (2, 'niels', 'bohr'),
+       (3, 'erwin', 'schrodinger'),
+       (4, 'leonhard', 'euler'),
+       (5, 'stephen', 'hawking'),
+       (6, 'isaac', 'newton'),
+       (7, 'alan', 'turing'),
+       (8, 'richard', 'feynman');
+-- Setup - column default tests
+create or replace function bdefault_unsafe ()
+returns int language plpgsql parallel unsafe as $$
+begin
+       RETURN 5;
+end $$;
+create or replace function cdefault_restricted ()
+returns int language plpgsql parallel restricted as $$
+begin
+       RETURN 10;
+end $$;
+create or replace function ddefault_safe ()
+returns int language plpgsql parallel safe as $$
+begin
+       RETURN 20;
+end $$;
+create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
+create table test_data(a int);
+insert into test_data select * from generate_series(1,10);
+--
+-- END: setup some tables and data needed by the tests.
+--
+-- Serializable isolation would disable parallel query, so explicitly use an
+-- arbitrary other level.
+begin isolation level repeatable read;
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+create table para_insert_p1 (
+       unique1         int4    PRIMARY KEY,
+       stringu1        name
+);
+create table para_insert_f1 (
+       unique1         int4    REFERENCES para_insert_p1(unique1),
+       stringu1        name
+);
+--
+-- Test INSERT with underlying query.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on para_insert_p1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into para_insert_p1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+ count 
+-------
+     1
+(1 row)
+
+--
+-- Test INSERT with ordered underlying query.
+-- (should create plan with parallel SELECT, GatherMerge parent node)
+--
+truncate para_insert_p1 cascade;
+NOTICE:  truncate cascades to table "para_insert_f1"
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+                  QUERY PLAN                  
+----------------------------------------------
+ Insert on para_insert_p1
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: tenk1.unique1
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+ count 
+-------
+     1
+(1 row)
+
+--
+-- Test INSERT with RETURNING clause.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+create table test_data1(like test_data);
+explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
+                 QUERY PLAN                 
+--------------------------------------------
+ Insert on test_data1
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+               Filter: (a = 10)
+(5 rows)
+
+insert into test_data1 select * from test_data where a = 10 returning a as data;
+ data 
+------
+   10
+(1 row)
+
+--
+-- Test INSERT into a table with a foreign key.
+-- (Insert into a table with a foreign key is parallel-restricted,
+--  as doing this in a parallel worker would create a new commandId
+--  and within a worker this is not currently supported)
+--
+explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on para_insert_f1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into para_insert_f1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the insert worked
+select count(*), sum(unique1) from para_insert_f1;
+ count |   sum    
+-------+----------
+ 10000 | 49995000
+(1 row)
+
+--
+-- Test INSERT with ON CONFLICT ... DO UPDATE ...
+-- (should not create a parallel plan)
+--
+create table test_conflict_table(id serial primary key, somedata int);
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
+                 QUERY PLAN                 
+--------------------------------------------
+ Insert on test_conflict_table
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+(4 rows)
+
+insert into test_conflict_table(id, somedata) select a, a from test_data;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
+                      QUERY PLAN                      
+------------------------------------------------------
+ Insert on test_conflict_table
+   Conflict Resolution: UPDATE
+   Conflict Arbiter Indexes: test_conflict_table_pkey
+   ->  Seq Scan on test_data
+(4 rows)
+
+--
+-- Test INSERT with parallel-unsafe index expression
+-- (should not create a parallel plan)
+--
+explain (costs off) insert into names2 select * from names;
+       QUERY PLAN        
+-------------------------
+ Insert on names2
+   ->  Seq Scan on names
+(2 rows)
+
+--
+-- Test INSERT with parallel-restricted index expression
+-- (should create a parallel plan)
+--
+explain (costs off) insert into names4 select * from names;
+               QUERY PLAN               
+----------------------------------------
+ Insert on names4
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on names
+(4 rows)
+
+--
+-- Test INSERT with underlying query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names5 (like names);
+explain (costs off) insert into names5 select * from names returning *;
+               QUERY PLAN               
+----------------------------------------
+ Insert on names5
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on names
+(4 rows)
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names6 (like names);
+explain (costs off) insert into names6 select * from names order by last_name returning *;
+                  QUERY PLAN                  
+----------------------------------------------
+ Insert on names6
+   ->  Gather Merge
+         Workers Planned: 3
+         ->  Sort
+               Sort Key: names.last_name
+               ->  Parallel Seq Scan on names
+(6 rows)
+
+insert into names6 select * from names order by last_name returning *;
+ index | first_name |  last_name  
+-------+------------+-------------
+     2 | niels      | bohr
+     1 | albert     | einstein
+     4 | leonhard   | euler
+     8 | richard    | feynman
+     5 | stephen    | hawking
+     6 | isaac      | newton
+     3 | erwin      | schrodinger
+     7 | alan       | turing
+(8 rows)
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (with projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names7 (like names);
+explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+                  QUERY PLAN                  
+----------------------------------------------
+ Insert on names7
+   ->  Gather Merge
+         Workers Planned: 3
+         ->  Sort
+               Sort Key: names.last_name
+               ->  Parallel Seq Scan on names
+(6 rows)
+
+insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+ last_name_then_first_name 
+---------------------------
+ bohr, niels
+ einstein, albert
+ euler, leonhard
+ feynman, richard
+ hawking, stephen
+ newton, isaac
+ schrodinger, erwin
+ turing, alan
+(8 rows)
+
+--
+-- Test INSERT into temporary table with underlying query.
+-- (Insert into a temp table is parallel-restricted;
+-- should create a parallel plan; parallel SELECT)
+--
+create temporary table temp_names (like names);
+explain (costs off) insert into temp_names select * from names;
+               QUERY PLAN               
+----------------------------------------
+ Insert on temp_names
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on names
+(4 rows)
+
+insert into temp_names select * from names;
+--
+-- Test INSERT with column defaults
+--
+--
+--
+-- Parallel unsafe column default, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
+         QUERY PLAN          
+-----------------------------
+ Insert on testdef
+   ->  Seq Scan on test_data
+(2 rows)
+
+--
+-- Parallel restricted column default, should use parallel SELECT
+--
+explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+                 QUERY PLAN                 
+--------------------------------------------
+ Insert on testdef
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on test_data
+(4 rows)
+
+insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+select * from testdef order by a;
+ a  | b  | c  | d  
+----+----+----+----
+  1 |  2 | 10 |  8
+  2 |  4 | 10 | 16
+  3 |  6 | 10 | 24
+  4 |  8 | 10 | 32
+  5 | 10 | 10 | 40
+  6 | 12 | 10 | 48
+  7 | 14 | 10 | 56
+  8 | 16 | 10 | 64
+  9 | 18 | 10 | 72
+ 10 | 20 | 10 | 80
+(10 rows)
+
+truncate testdef;
+--
+-- Parallel restricted and unsafe column defaults, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
+         QUERY PLAN          
+-----------------------------
+ Insert on testdef
+   ->  Seq Scan on test_data
+(2 rows)
+
+--
+-- Test INSERT into partition with underlying query.
+--
+create table parttable1 (a int, b name) partition by range (a);
+create table parttable1_1 partition of parttable1 for values from (0) to (5000);
+create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
+explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
+               QUERY PLAN               
+----------------------------------------
+ Insert on parttable1
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on tenk1
+(4 rows)
+
+insert into parttable1 select unique1,stringu1 from tenk1;
+select count(*) from parttable1_1;
+ count 
+-------
+  5000
+(1 row)
+
+select count(*) from parttable1_2;
+ count 
+-------
+  5000
+(1 row)
+
+--
+-- Test INSERT into table with parallel-unsafe check constraint
+-- (should not create a parallel plan)
+--
+create or replace function check_b_unsafe(b name) returns boolean as $$
+    begin
+        return (b <> 'XXXXXX');
+    end;
+$$ language plpgsql parallel unsafe;
+create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
+explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1;
+       QUERY PLAN        
+-------------------------
+ Insert on table_check_b
+   ->  Seq Scan on tenk1
+(2 rows)
+
+--
+-- Test INSERT into table with parallel-safe after stmt-level triggers
+-- (should create a parallel SELECT plan; triggers should fire)
+--
+create table names_with_safe_trigger (like names);
+create or replace function insert_after_trigger_safe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_safe';
+               return new;
+    end;
+$$ language plpgsql parallel safe;
+create trigger insert_after_trigger_safe after insert on names_with_safe_trigger
+    for each statement execute procedure insert_after_trigger_safe();
+explain (costs off) insert into names_with_safe_trigger select * from names;
+               QUERY PLAN               
+----------------------------------------
+ Insert on names_with_safe_trigger
+   ->  Gather
+         Workers Planned: 3
+         ->  Parallel Seq Scan on names
+(4 rows)
+
+insert into names_with_safe_trigger select * from names;
+NOTICE:  hello from insert_after_trigger_safe
+--
+-- Test INSERT into table with parallel-unsafe after stmt-level triggers
+-- (should not create a parallel plan; triggers should fire)
+--
+create table names_with_unsafe_trigger (like names);
+create or replace function insert_after_trigger_unsafe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_unsafe';
+               return new;
+    end;
+$$ language plpgsql parallel unsafe;
+create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger
+    for each statement execute procedure insert_after_trigger_unsafe();
+explain (costs off) insert into names_with_unsafe_trigger select * from names;
+             QUERY PLAN              
+-------------------------------------
+ Insert on names_with_unsafe_trigger
+   ->  Seq Scan on names
+(2 rows)
+
+insert into names_with_unsafe_trigger select * from names;
+NOTICE:  hello from insert_after_trigger_unsafe
+--
+-- Test INSERT into partition with parallel-unsafe trigger
+-- (should not create a parallel plan)
+--
+create table part_unsafe_trigger (a int4, b name) partition by range (a);
+create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
+create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
+create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1
+    for each statement execute procedure insert_after_trigger_unsafe();
+explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1;
+          QUERY PLAN           
+-------------------------------
+ Insert on part_unsafe_trigger
+   ->  Seq Scan on tenk1
+(2 rows)
+
+--
+-- Test that parallel-safety-related changes to partitions are detected and
+-- plan cache invalidation is working correctly.
+--
+create table rp (a int) partition by range (a);
+create table rp1 partition of rp for values from (minvalue) to (0);
+create table rp2 partition of rp for values from (0) to (maxvalue);
+create table foo (a) as select unique1 from tenk1;
+prepare q as insert into rp select * from foo where a%2 = 0;
+-- should create a parallel plan
+explain (costs off) execute q;
+              QUERY PLAN              
+--------------------------------------
+ Insert on rp
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Seq Scan on foo
+               Filter: ((a % 2) = 0)
+(5 rows)
+
+create or replace function make_table_bar () returns trigger language
+plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe;
+create trigger ai_rp2 after insert on rp2 for each row execute
+function make_table_bar();
+-- should create a non-parallel plan
+explain (costs off) execute q;
+          QUERY PLAN           
+-------------------------------
+ Insert on rp
+   ->  Seq Scan on foo
+         Filter: ((a % 2) = 0)
+(3 rows)
+
+--
+-- Test INSERT into table having a DOMAIN column with a CHECK constraint
+--
+create function sql_is_distinct_from_u(anyelement, anyelement)
+returns boolean language sql parallel unsafe
+as 'select $1 is distinct from $2 limit 1';
+create domain inotnull_u int
+  check (sql_is_distinct_from_u(value, null));
+create table dom_table_u (x inotnull_u, y int);
+-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint
+explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1;
+       QUERY PLAN        
+-------------------------
+ Insert on dom_table_u
+   ->  Seq Scan on tenk1
+(2 rows)
+
+rollback;
+--
+-- Clean up anything not created in the transaction
+--
+drop table names;
+drop index names2_fullname_idx;
+drop table names2;
+drop index names4_fullname_idx;
+drop table names4;
+drop table testdef;
+drop table test_data;
+drop function bdefault_unsafe;
+drop function cdefault_restricted;
+drop function ddefault_safe;
+drop function fullname_parallel_unsafe;
+drop function fullname_parallel_restricted;
index c77b0d7342f5e911bf4c1f580f5a6b81caa020ee..e280198b17978ae0e04ff559c61cf86d1b4f8ee7 100644 (file)
@@ -90,6 +90,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
 # run by itself so it can run parallel workers
 test: select_parallel
 test: write_parallel
+test: insert_parallel
 
 # no relation related tests can be put in this group
 test: publication subscription
index 0264a97324c219c9c1d2f2e0577e8521bf180f9d..6a57e889a15014155a047ecd8bbbf14d3fda0423 100644 (file)
@@ -148,6 +148,7 @@ test: stats_ext
 test: collate.linux.utf8
 test: select_parallel
 test: write_parallel
+test: insert_parallel
 test: publication
 test: subscription
 test: select_views
diff --git a/src/test/regress/sql/insert_parallel.sql b/src/test/regress/sql/insert_parallel.sql
new file mode 100644 (file)
index 0000000..70ad31a
--- /dev/null
@@ -0,0 +1,335 @@
+--
+-- PARALLEL
+--
+
+--
+-- START: setup some tables and data needed by the tests.
+--
+
+-- Setup - index expressions test
+
+-- For testing purposes, we'll mark this function as parallel-unsafe
+create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel unsafe;
+
+create or replace function fullname_parallel_restricted(f text, l text) returns text as $$
+    begin
+        return f || l;
+    end;
+$$ language plpgsql immutable parallel restricted;
+
+create table names(index int, first_name text, last_name text);
+create table names2(index int, first_name text, last_name text);
+create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name));
+create table names4(index int, first_name text, last_name text);
+create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name));
+
+insert into names values
+       (1, 'albert', 'einstein'),
+       (2, 'niels', 'bohr'),
+       (3, 'erwin', 'schrodinger'),
+       (4, 'leonhard', 'euler'),
+       (5, 'stephen', 'hawking'),
+       (6, 'isaac', 'newton'),
+       (7, 'alan', 'turing'),
+       (8, 'richard', 'feynman');
+
+-- Setup - column default tests
+
+create or replace function bdefault_unsafe ()
+returns int language plpgsql parallel unsafe as $$
+begin
+       RETURN 5;
+end $$;
+
+create or replace function cdefault_restricted ()
+returns int language plpgsql parallel restricted as $$
+begin
+       RETURN 10;
+end $$;
+
+create or replace function ddefault_safe ()
+returns int language plpgsql parallel safe as $$
+begin
+       RETURN 20;
+end $$;
+
+create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe());
+
+create table test_data(a int);
+insert into test_data select * from generate_series(1,10);
+
+--
+-- END: setup some tables and data needed by the tests.
+--
+
+-- Serializable isolation would disable parallel query, so explicitly use an
+-- arbitrary other level.
+begin isolation level repeatable read;
+
+-- encourage use of parallel plans
+set parallel_setup_cost=0;
+set parallel_tuple_cost=0;
+set min_parallel_table_scan_size=0;
+set max_parallel_workers_per_gather=4;
+
+create table para_insert_p1 (
+       unique1         int4    PRIMARY KEY,
+       stringu1        name
+);
+
+create table para_insert_f1 (
+       unique1         int4    REFERENCES para_insert_p1(unique1),
+       stringu1        name
+);
+
+
+--
+-- Test INSERT with underlying query.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1;
+insert into para_insert_p1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+
+--
+-- Test INSERT with ordered underlying query.
+-- (should create plan with parallel SELECT, GatherMerge parent node)
+--
+truncate para_insert_p1 cascade;
+explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1;
+-- select some values to verify that the parallel insert worked
+select count(*), sum(unique1) from para_insert_p1;
+-- verify that the same transaction has been used by all parallel workers
+select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt;
+
+--
+-- Test INSERT with RETURNING clause.
+-- (should create plan with parallel SELECT, Gather parent node)
+--
+create table test_data1(like test_data);
+explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data;
+insert into test_data1 select * from test_data where a = 10 returning a as data;
+
+--
+-- Test INSERT into a table with a foreign key.
+-- (Insert into a table with a foreign key is parallel-restricted,
+--  as doing this in a parallel worker would create a new commandId
+--  and within a worker this is not currently supported)
+--
+explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1;
+insert into para_insert_f1 select unique1, stringu1 from tenk1;
+-- select some values to verify that the insert worked
+select count(*), sum(unique1) from para_insert_f1;
+
+--
+-- Test INSERT with ON CONFLICT ... DO UPDATE ...
+-- (should not create a parallel plan)
+--
+create table test_conflict_table(id serial primary key, somedata int);
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data;
+insert into test_conflict_table(id, somedata) select a, a from test_data;
+explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1;
+
+
+--
+-- Test INSERT with parallel-unsafe index expression
+-- (should not create a parallel plan)
+--
+explain (costs off) insert into names2 select * from names;
+
+--
+-- Test INSERT with parallel-restricted index expression
+-- (should create a parallel plan)
+--
+explain (costs off) insert into names4 select * from names;
+
+--
+-- Test INSERT with underlying query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names5 (like names);
+explain (costs off) insert into names5 select * from names returning *;
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (no projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names6 (like names);
+explain (costs off) insert into names6 select * from names order by last_name returning *;
+insert into names6 select * from names order by last_name returning *;
+
+--
+-- Test INSERT with underlying ordered query - and RETURNING (with projection)
+-- (should create a parallel plan; parallel SELECT)
+--
+create table names7 (like names);
+explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name;
+
+
+--
+-- Test INSERT into temporary table with underlying query.
+-- (Insert into a temp table is parallel-restricted;
+-- should create a parallel plan; parallel SELECT)
+--
+create temporary table temp_names (like names);
+explain (costs off) insert into temp_names select * from names;
+insert into temp_names select * from names;
+
+--
+-- Test INSERT with column defaults
+--
+--
+
+--
+-- Parallel unsafe column default, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data;
+
+--
+-- Parallel restricted column default, should use parallel SELECT
+--
+explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+insert into testdef(a,b,d) select a,a*2,a*8 from test_data;
+select * from testdef order by a;
+truncate testdef;
+
+--
+-- Parallel restricted and unsafe column defaults, should not use a parallel plan
+--
+explain (costs off) insert into testdef(a,d) select a,a*8 from test_data;
+
+--
+-- Test INSERT into partition with underlying query.
+--
+create table parttable1 (a int, b name) partition by range (a);
+create table parttable1_1 partition of parttable1 for values from (0) to (5000);
+create table parttable1_2 partition of parttable1 for values from (5000) to (10000);
+
+explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1;
+insert into parttable1 select unique1,stringu1 from tenk1;
+select count(*) from parttable1_1;
+select count(*) from parttable1_2;
+
+--
+-- Test INSERT into table with parallel-unsafe check constraint
+-- (should not create a parallel plan)
+--
+create or replace function check_b_unsafe(b name) returns boolean as $$
+    begin
+        return (b <> 'XXXXXX');
+    end;
+$$ language plpgsql parallel unsafe;
+
+create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name);
+explain (costs off) insert into table_check_b(a,b,c) select unique1, unique2, stringu1 from tenk1;
+
+--
+-- Test INSERT into table with parallel-safe after stmt-level triggers
+-- (should create a parallel SELECT plan; triggers should fire)
+--
+create table names_with_safe_trigger (like names);
+create or replace function insert_after_trigger_safe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_safe';
+               return new;
+    end;
+$$ language plpgsql parallel safe;
+create trigger insert_after_trigger_safe after insert on names_with_safe_trigger
+    for each statement execute procedure insert_after_trigger_safe();
+explain (costs off) insert into names_with_safe_trigger select * from names;
+insert into names_with_safe_trigger select * from names;
+
+--
+-- Test INSERT into table with parallel-unsafe after stmt-level triggers
+-- (should not create a parallel plan; triggers should fire)
+--
+create table names_with_unsafe_trigger (like names);
+create or replace function insert_after_trigger_unsafe() returns trigger as $$
+    begin
+        raise notice 'hello from insert_after_trigger_unsafe';
+               return new;
+    end;
+$$ language plpgsql parallel unsafe;
+create trigger insert_after_trigger_unsafe after insert on names_with_unsafe_trigger
+    for each statement execute procedure insert_after_trigger_unsafe();
+explain (costs off) insert into names_with_unsafe_trigger select * from names;
+insert into names_with_unsafe_trigger select * from names;
+
+--
+-- Test INSERT into partition with parallel-unsafe trigger
+-- (should not create a parallel plan)
+--
+
+create table part_unsafe_trigger (a int4, b name) partition by range (a);
+create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000);
+create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000);
+create trigger part_insert_after_trigger_unsafe after insert on part_unsafe_trigger_1
+    for each statement execute procedure insert_after_trigger_unsafe();
+
+explain (costs off) insert into part_unsafe_trigger select unique1, stringu1 from tenk1;
+
+--
+-- Test that parallel-safety-related changes to partitions are detected and
+-- plan cache invalidation is working correctly.
+--
+
+create table rp (a int) partition by range (a);
+create table rp1 partition of rp for values from (minvalue) to (0);
+create table rp2 partition of rp for values from (0) to (maxvalue);
+create table foo (a) as select unique1 from tenk1;
+prepare q as insert into rp select * from foo where a%2 = 0;
+-- should create a parallel plan
+explain (costs off) execute q;
+
+create or replace function make_table_bar () returns trigger language
+plpgsql as $$ begin create table bar(); return null; end; $$ parallel unsafe;
+create trigger ai_rp2 after insert on rp2 for each row execute
+function make_table_bar();
+-- should create a non-parallel plan
+explain (costs off) execute q;
+
+--
+-- Test INSERT into table having a DOMAIN column with a CHECK constraint
+--
+create function sql_is_distinct_from_u(anyelement, anyelement)
+returns boolean language sql parallel unsafe
+as 'select $1 is distinct from $2 limit 1';
+
+create domain inotnull_u int
+  check (sql_is_distinct_from_u(value, null));
+
+create table dom_table_u (x inotnull_u, y int);
+
+
+-- Test INSERT into table having a DOMAIN column with parallel-unsafe CHECK constraint
+explain (costs off) insert into dom_table_u select unique1, unique2 from tenk1;
+
+
+rollback;
+
+--
+-- Clean up anything not created in the transaction
+--
+
+drop table names;
+drop index names2_fullname_idx;
+drop table names2;
+drop index names4_fullname_idx;
+drop table names4;
+drop table testdef;
+drop table test_data;
+
+drop function bdefault_unsafe;
+drop function cdefault_restricted;
+drop function ddefault_safe;
+drop function fullname_parallel_unsafe;
+drop function fullname_parallel_restricted;