Add a Gather Merge executor node.
authorRobert Haas <rhaas@postgresql.org>
Thu, 9 Mar 2017 12:40:36 +0000 (07:40 -0500)
committerRobert Haas <rhaas@postgresql.org>
Thu, 9 Mar 2017 12:49:29 +0000 (07:49 -0500)
Like Gather, we spawn multiple workers and run the same plan in each
one; however, Gather Merge is used when each worker produces the same
output ordering and we want to preserve that output ordering while
merging together the streams of tuples from various workers.  (In a
way, Gather Merge is like a hybrid of Gather and MergeAppend.)

This works out to a win if it saves us from having to perform an
expensive Sort.  In cases where only a small amount of data would need
to be sorted, it may actually be faster to use a regular Gather node
and then sort the results afterward, because Gather Merge sometimes
needs to wait synchronously for tuples whereas a pure Gather generally
doesn't.  But if this avoids an expensive sort then it's a win.

Rushabh Lathia, reviewed and tested by Amit Kapila, Thomas Munro,
and Neha Sharma, and reviewed and revised by me.

Discussion: http://postgr.es/m/CAGPqQf09oPX-cQRpBKS0Gq49Z+m6KBxgxd_p9gX8CKk_d75HoQ@mail.gmail.com

27 files changed:
doc/src/sgml/config.sgml
src/backend/commands/explain.c
src/backend/executor/Makefile
src/backend/executor/execProcnode.c
src/backend/executor/nodeGatherMerge.c [new file with mode: 0644]
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/allpaths.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/plan/planner.c
src/backend/optimizer/plan/setrefs.c
src/backend/optimizer/plan/subselect.c
src/backend/optimizer/util/pathnode.c
src/backend/utils/misc/guc.c
src/include/executor/nodeGatherMerge.h [new file with mode: 0644]
src/include/nodes/execnodes.h
src/include/nodes/nodes.h
src/include/nodes/plannodes.h
src/include/nodes/relation.h
src/include/optimizer/cost.h
src/include/optimizer/pathnode.h
src/test/regress/expected/select_parallel.out
src/test/regress/expected/sysviews.out
src/test/regress/sql/select_parallel.sql
src/tools/pgindent/typedefs.list

index 1881236726a85d68f0821a27bd46ab62dc8ec8a2..69844e5b2995c3b0d8b35a507f5bbd2e36f908c0 100644 (file)
@@ -3497,6 +3497,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-gathermerge" xreflabel="enable_gathermerge">
+      <term><varname>enable_gathermerge</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_gathermerge</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of gather
+        merge plan types. The default is <literal>on</>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-hashagg" xreflabel="enable_hashagg">
       <term><varname>enable_hashagg</varname> (<type>boolean</type>)
       <indexterm>
index 6fd82e9d52f4c57ad2fbdbdf28716b340bbc0e75..c9b55ead3dc35d8a4113b4a416a20e66be795193 100644 (file)
@@ -918,6 +918,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
        case T_Gather:
            pname = sname = "Gather";
            break;
+       case T_GatherMerge:
+           pname = sname = "Gather Merge";
+           break;
        case T_IndexScan:
            pname = sname = "Index Scan";
            break;
@@ -1411,6 +1414,26 @@ ExplainNode(PlanState *planstate, List *ancestors,
                    ExplainPropertyBool("Single Copy", gather->single_copy, es);
            }
            break;
+       case T_GatherMerge:
+           {
+               GatherMerge *gm = (GatherMerge *) plan;
+
+               show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
+               if (plan->qual)
+                   show_instrumentation_count("Rows Removed by Filter", 1,
+                                              planstate, es);
+               ExplainPropertyInteger("Workers Planned",
+                                      gm->num_workers, es);
+               if (es->analyze)
+               {
+                   int         nworkers;
+
+                   nworkers = ((GatherMergeState *) planstate)->nworkers_launched;
+                   ExplainPropertyInteger("Workers Launched",
+                                          nworkers, es);
+               }
+           }
+           break;
        case T_FunctionScan:
            if (es->verbose)
            {
index a9893c2b2208e624c0a3ae70134366cac3db0e83..d281906cd5cf496449f61cd358a1566c8ad9b5ef 100644 (file)
@@ -20,7 +20,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
        nodeBitmapHeapscan.o nodeBitmapIndexscan.o \
        nodeCustom.o nodeFunctionscan.o nodeGather.o \
        nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
-       nodeLimit.o nodeLockRows.o \
+       nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
        nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
        nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
        nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
index 468f50e6a6b6fa6b151db9b9b7bc454ba2433e87..80c77addb8e60311783109dc3675332e883ba342 100644 (file)
@@ -89,6 +89,7 @@
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeFunctionscan.h"
 #include "executor/nodeGather.h"
+#include "executor/nodeGatherMerge.h"
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
@@ -326,6 +327,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
                                                  estate, eflags);
            break;
 
+       case T_GatherMerge:
+           result = (PlanState *) ExecInitGatherMerge((GatherMerge *) node,
+                                                      estate, eflags);
+           break;
+
        case T_Hash:
            result = (PlanState *) ExecInitHash((Hash *) node,
                                                estate, eflags);
@@ -535,6 +541,10 @@ ExecProcNode(PlanState *node)
            result = ExecGather((GatherState *) node);
            break;
 
+       case T_GatherMergeState:
+           result = ExecGatherMerge((GatherMergeState *) node);
+           break;
+
        case T_HashState:
            result = ExecHash((HashState *) node);
            break;
@@ -697,6 +707,10 @@ ExecEndNode(PlanState *node)
            ExecEndGather((GatherState *) node);
            break;
 
+       case T_GatherMergeState:
+           ExecEndGatherMerge((GatherMergeState *) node);
+           break;
+
        case T_IndexScanState:
            ExecEndIndexScan((IndexScanState *) node);
            break;
@@ -842,6 +856,9 @@ ExecShutdownNode(PlanState *node)
        case T_CustomScanState:
            ExecShutdownCustomScan((CustomScanState *) node);
            break;
+       case T_GatherMergeState:
+           ExecShutdownGatherMerge((GatherMergeState *) node);
+           break;
        default:
            break;
    }
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
new file mode 100644 (file)
index 0000000..62a6b18
--- /dev/null
@@ -0,0 +1,687 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeGatherMerge.c
+ *     Scan a plan in multiple workers, and do order-preserving merge.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *   src/backend/executor/nodeGatherMerge.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "executor/execdebug.h"
+#include "executor/execParallel.h"
+#include "executor/nodeGatherMerge.h"
+#include "executor/nodeSubplan.h"
+#include "executor/tqueue.h"
+#include "lib/binaryheap.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+/*
+ * Tuple array for each worker
+ */
+typedef struct GMReaderTupleBuffer
+{
+   HeapTuple  *tuple;
+   int         readCounter;
+   int         nTuples;
+   bool        done;
+}  GMReaderTupleBuffer;
+
+/*
+ * When we read tuples from workers, it's a good idea to read several at once
+ * for efficiency when possible: this minimizes context-switching overhead.
+ * But reading too many at a time wastes memory without improving performance.
+ */
+#define MAX_TUPLE_STORE 10
+
+static int32 heap_compare_slots(Datum a, Datum b, void *arg);
+static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
+static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
+                 bool nowait, bool *done);
+static void gather_merge_init(GatherMergeState *gm_state);
+static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
+static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
+                     bool nowait);
+static void form_tuple_array(GatherMergeState *gm_state, int reader);
+
+/* ----------------------------------------------------------------
+ *     ExecInitGather
+ * ----------------------------------------------------------------
+ */
+GatherMergeState *
+ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
+{
+   GatherMergeState *gm_state;
+   Plan       *outerNode;
+   bool        hasoid;
+   TupleDesc   tupDesc;
+
+   /* Gather merge node doesn't have innerPlan node. */
+   Assert(innerPlan(node) == NULL);
+
+   /*
+    * create state structure
+    */
+   gm_state = makeNode(GatherMergeState);
+   gm_state->ps.plan = (Plan *) node;
+   gm_state->ps.state = estate;
+
+   /*
+    * Miscellaneous initialization
+    *
+    * create expression context for node
+    */
+   ExecAssignExprContext(estate, &gm_state->ps);
+
+   /*
+    * initialize child expressions
+    */
+   gm_state->ps.targetlist = (List *)
+       ExecInitExpr((Expr *) node->plan.targetlist,
+                    (PlanState *) gm_state);
+   gm_state->ps.qual = (List *)
+       ExecInitExpr((Expr *) node->plan.qual,
+                    (PlanState *) gm_state);
+
+   /*
+    * tuple table initialization
+    */
+   ExecInitResultTupleSlot(estate, &gm_state->ps);
+
+   /*
+    * now initialize outer plan
+    */
+   outerNode = outerPlan(node);
+   outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
+
+   /*
+    * Initialize result tuple type and projection info.
+    */
+   ExecAssignResultTypeFromTL(&gm_state->ps);
+   ExecAssignProjectionInfo(&gm_state->ps, NULL);
+
+   gm_state->gm_initialized = false;
+
+   /*
+    * initialize sort-key information
+    */
+   if (node->numCols)
+   {
+       int         i;
+
+       gm_state->gm_nkeys = node->numCols;
+       gm_state->gm_sortkeys =
+           palloc0(sizeof(SortSupportData) * node->numCols);
+
+       for (i = 0; i < node->numCols; i++)
+       {
+           SortSupport sortKey = gm_state->gm_sortkeys + i;
+
+           sortKey->ssup_cxt = CurrentMemoryContext;
+           sortKey->ssup_collation = node->collations[i];
+           sortKey->ssup_nulls_first = node->nullsFirst[i];
+           sortKey->ssup_attno = node->sortColIdx[i];
+
+           /*
+            * We don't perform abbreviated key conversion here, for the same
+            * reasons that it isn't used in MergeAppend
+            */
+           sortKey->abbreviate = false;
+
+           PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
+       }
+   }
+
+   /*
+    * store the tuple descriptor into gather merge state, so we can use it
+    * later while initializing the gather merge slots.
+    */
+   if (!ExecContextForcesOids(&gm_state->ps, &hasoid))
+       hasoid = false;
+   tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
+   gm_state->tupDesc = tupDesc;
+
+   return gm_state;
+}
+
+/* ----------------------------------------------------------------
+ *     ExecGatherMerge(node)
+ *
+ *     Scans the relation via multiple workers and returns
+ *     the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGatherMerge(GatherMergeState *node)
+{
+   TupleTableSlot *slot;
+   ExprContext *econtext;
+   int         i;
+
+   /*
+    * As with Gather, we don't launch workers until this node is actually
+    * executed.
+    */
+   if (!node->initialized)
+   {
+       EState     *estate = node->ps.state;
+       GatherMerge *gm = (GatherMerge *) node->ps.plan;
+
+       /*
+        * Sometimes we might have to run without parallelism; but if parallel
+        * mode is active then we can try to fire up some workers.
+        */
+       if (gm->num_workers > 0 && IsInParallelMode())
+       {
+           ParallelContext *pcxt;
+
+           /* Initialize data structures for workers. */
+           if (!node->pei)
+               node->pei = ExecInitParallelPlan(node->ps.lefttree,
+                                                estate,
+                                                gm->num_workers);
+
+           /* Try to launch workers. */
+           pcxt = node->pei->pcxt;
+           LaunchParallelWorkers(pcxt);
+           node->nworkers_launched = pcxt->nworkers_launched;
+
+           /* Set up tuple queue readers to read the results. */
+           if (pcxt->nworkers_launched > 0)
+           {
+               node->nreaders = 0;
+               node->reader = palloc(pcxt->nworkers_launched *
+                                     sizeof(TupleQueueReader *));
+
+               Assert(gm->numCols);
+
+               for (i = 0; i < pcxt->nworkers_launched; ++i)
+               {
+                   shm_mq_set_handle(node->pei->tqueue[i],
+                                     pcxt->worker[i].bgwhandle);
+                   node->reader[node->nreaders++] =
+                       CreateTupleQueueReader(node->pei->tqueue[i],
+                                              node->tupDesc);
+               }
+           }
+           else
+           {
+               /* No workers?  Then never mind. */
+               ExecShutdownGatherMergeWorkers(node);
+           }
+       }
+
+       /* always allow leader to participate */
+       node->need_to_scan_locally = true;
+       node->initialized = true;
+   }
+
+   /*
+    * Reset per-tuple memory context to free any expression evaluation
+    * storage allocated in the previous tuple cycle.
+    */
+   econtext = node->ps.ps_ExprContext;
+   ResetExprContext(econtext);
+
+   /*
+    * Get next tuple, either from one of our workers, or by running the
+    * plan ourselves.
+    */
+   slot = gather_merge_getnext(node);
+   if (TupIsNull(slot))
+       return NULL;
+
+   /*
+    * form the result tuple using ExecProject(), and return it --- unless
+    * the projection produces an empty set, in which case we must loop
+    * back around for another tuple
+    */
+   econtext->ecxt_outertuple = slot;
+   return ExecProject(node->ps.ps_ProjInfo);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecEndGatherMerge
+ *
+ *     frees any storage allocated through C routines.
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndGatherMerge(GatherMergeState *node)
+{
+   ExecEndNode(outerPlanState(node));      /* let children clean up first */
+   ExecShutdownGatherMerge(node);
+   ExecFreeExprContext(&node->ps);
+   ExecClearTuple(node->ps.ps_ResultTupleSlot);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecShutdownGatherMerge
+ *
+ *     Destroy the setup for parallel workers including parallel context.
+ *     Collect all the stats after workers are stopped, else some work
+ *     done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGatherMerge(GatherMergeState *node)
+{
+   ExecShutdownGatherMergeWorkers(node);
+
+   /* Now destroy the parallel context. */
+   if (node->pei != NULL)
+   {
+       ExecParallelCleanup(node->pei);
+       node->pei = NULL;
+   }
+}
+
+/* ----------------------------------------------------------------
+ *     ExecShutdownGatherMergeWorkers
+ *
+ *     Destroy the parallel workers.  Collect all the stats after
+ *     workers are stopped, else some work done by workers won't be
+ *     accounted.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecShutdownGatherMergeWorkers(GatherMergeState *node)
+{
+   /* Shut down tuple queue readers before shutting down workers. */
+   if (node->reader != NULL)
+   {
+       int         i;
+
+       for (i = 0; i < node->nreaders; ++i)
+           if (node->reader[i])
+               DestroyTupleQueueReader(node->reader[i]);
+
+       pfree(node->reader);
+       node->reader = NULL;
+   }
+
+   /* Now shut down the workers. */
+   if (node->pei != NULL)
+       ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecReScanGatherMerge
+ *
+ *     Re-initialize the workers and rescans a relation via them.
+ * ----------------------------------------------------------------
+ */
+void
+ExecReScanGatherMerge(GatherMergeState *node)
+{
+   /*
+    * Re-initialize the parallel workers to perform rescan of relation. We
+    * want to gracefully shutdown all the workers so that they should be able
+    * to propagate any error or other information to master backend before
+    * dying.  Parallel context will be reused for rescan.
+    */
+   ExecShutdownGatherMergeWorkers(node);
+
+   node->initialized = false;
+
+   if (node->pei)
+       ExecParallelReinitialize(node->pei);
+
+   ExecReScan(node->ps.lefttree);
+}
+
+/*
+ * Initialize the Gather merge tuple read.
+ *
+ * Pull at least a single tuple from each worker + leader and set up the heap.
+ */
+static void
+gather_merge_init(GatherMergeState *gm_state)
+{
+   int         nreaders = gm_state->nreaders;
+   bool        initialize = true;
+   int         i;
+
+   /*
+    * Allocate gm_slots for the number of worker + one more slot for leader.
+    * Last slot is always for leader. Leader always calls ExecProcNode() to
+    * read the tuple which will return the TupleTableSlot. Later it will
+    * directly get assigned to gm_slot. So just initialize leader gm_slot
+    * with NULL. For other slots below code will call
+    * ExecInitExtraTupleSlot() which will do the initialization of worker
+    * slots.
+    */
+   gm_state->gm_slots =
+       palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
+   gm_state->gm_slots[gm_state->nreaders] = NULL;
+
+   /* Initialize the tuple slot and tuple array for each worker */
+   gm_state->gm_tuple_buffers =
+       (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
+                                       (gm_state->nreaders + 1));
+   for (i = 0; i < gm_state->nreaders; i++)
+   {
+       /* Allocate the tuple array with MAX_TUPLE_STORE size */
+       gm_state->gm_tuple_buffers[i].tuple =
+           (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
+
+       /* Initialize slot for worker */
+       gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state);
+       ExecSetSlotDescriptor(gm_state->gm_slots[i],
+                             gm_state->tupDesc);
+   }
+
+   /* Allocate the resources for the merge */
+   gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
+                                           heap_compare_slots,
+                                           gm_state);
+
+   /*
+    * First, try to read a tuple from each worker (including leader) in
+    * nowait mode, so that we initialize read from each worker as well as
+    * leader. After this, if all active workers are unable to produce a
+    * tuple, then re-read and this time use wait mode. For workers that were
+    * able to produce a tuple in the earlier loop and are still active, just
+    * try to fill the tuple array if more tuples are avaiable.
+    */
+reread:
+   for (i = 0; i < nreaders + 1; i++)
+   {
+       if (!gm_state->gm_tuple_buffers[i].done &&
+           (TupIsNull(gm_state->gm_slots[i]) ||
+            gm_state->gm_slots[i]->tts_isempty))
+       {
+           if (gather_merge_readnext(gm_state, i, initialize))
+           {
+               binaryheap_add_unordered(gm_state->gm_heap,
+                                        Int32GetDatum(i));
+           }
+       }
+       else
+           form_tuple_array(gm_state, i);
+   }
+   initialize = false;
+
+   for (i = 0; i < nreaders; i++)
+       if (!gm_state->gm_tuple_buffers[i].done &&
+           (TupIsNull(gm_state->gm_slots[i]) ||
+            gm_state->gm_slots[i]->tts_isempty))
+           goto reread;
+
+   binaryheap_build(gm_state->gm_heap);
+   gm_state->gm_initialized = true;
+}
+
+/*
+ * Clear out a slot in the tuple table for each gather merge
+ * slot and return the clear cleared slot.
+ */
+static TupleTableSlot *
+gather_merge_clear_slots(GatherMergeState *gm_state)
+{
+   int         i;
+
+   for (i = 0; i < gm_state->nreaders; i++)
+   {
+       pfree(gm_state->gm_tuple_buffers[i].tuple);
+       gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]);
+   }
+
+   /* Free tuple array as we don't need it any more */
+   pfree(gm_state->gm_tuple_buffers);
+   /* Free the binaryheap, which was created for sort */
+   binaryheap_free(gm_state->gm_heap);
+
+   /* return any clear slot */
+   return gm_state->gm_slots[0];
+}
+
+/*
+ * Read the next tuple for gather merge.
+ *
+ * Fetch the sorted tuple out of the heap.
+ */
+static TupleTableSlot *
+gather_merge_getnext(GatherMergeState *gm_state)
+{
+   int         i;
+
+   /*
+    * First time through: pull the first tuple from each participate, and set
+    * up the heap.
+    */
+   if (gm_state->gm_initialized == false)
+       gather_merge_init(gm_state);
+   else
+   {
+       /*
+        * Otherwise, pull the next tuple from whichever participant we
+        * returned from last time, and reinsert the index into the heap,
+        * because it might now compare differently against the existing
+        * elements of the heap.
+        */
+       i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+
+       if (gather_merge_readnext(gm_state, i, false))
+           binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
+       else
+           (void) binaryheap_remove_first(gm_state->gm_heap);
+   }
+
+   if (binaryheap_empty(gm_state->gm_heap))
+   {
+       /* All the queues are exhausted, and so is the heap */
+       return gather_merge_clear_slots(gm_state);
+   }
+   else
+   {
+       i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+       return gm_state->gm_slots[i];
+   }
+
+   return gather_merge_clear_slots(gm_state);
+}
+
+/*
+ * Read the tuple for given reader in nowait mode, and form the tuple array.
+ */
+static void
+form_tuple_array(GatherMergeState *gm_state, int reader)
+{
+   GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+   int         i;
+
+   /* Last slot is for leader and we don't build tuple array for leader */
+   if (reader == gm_state->nreaders)
+       return;
+
+   /*
+    * We here because we already read all the tuples from the tuple array, so
+    * initialize the counter to zero.
+    */
+   if (tuple_buffer->nTuples == tuple_buffer->readCounter)
+       tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
+
+   /* Tuple array is already full? */
+   if (tuple_buffer->nTuples == MAX_TUPLE_STORE)
+       return;
+
+   for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
+   {
+       tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state,
+                                                                 reader,
+                                                                 false,
+                                                      &tuple_buffer->done));
+       if (!HeapTupleIsValid(tuple_buffer->tuple[i]))
+           break;
+       tuple_buffer->nTuples++;
+   }
+}
+
+/*
+ * Store the next tuple for a given reader into the appropriate slot.
+ *
+ * Returns false if the reader is exhausted, and true otherwise.
+ */
+static bool
+gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
+{
+   GMReaderTupleBuffer *tuple_buffer;
+   HeapTuple   tup = NULL;
+
+   /*
+    * If we're being asked to generate a tuple from the leader, then we
+    * just call ExecProcNode as normal to produce one.
+    */
+   if (gm_state->nreaders == reader)
+   {
+       if (gm_state->need_to_scan_locally)
+       {
+           PlanState  *outerPlan = outerPlanState(gm_state);
+           TupleTableSlot *outerTupleSlot;
+
+           outerTupleSlot = ExecProcNode(outerPlan);
+
+           if (!TupIsNull(outerTupleSlot))
+           {
+               gm_state->gm_slots[reader] = outerTupleSlot;
+               return true;
+           }
+           gm_state->gm_tuple_buffers[reader].done = true;
+           gm_state->need_to_scan_locally = false;
+       }
+       return false;
+   }
+
+   /* Otherwise, check the state of the relevant tuple buffer. */
+   tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+
+   if (tuple_buffer->nTuples > tuple_buffer->readCounter)
+   {
+       /* Return any tuple previously read that is still buffered. */
+       tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+       tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
+   }
+   else if (tuple_buffer->done)
+   {
+       /* Reader is known to be exhausted. */
+       DestroyTupleQueueReader(gm_state->reader[reader]);
+       gm_state->reader[reader] = NULL;
+       return false;
+   }
+   else
+   {
+       /* Read and buffer next tuple. */
+       tup = heap_copytuple(gm_readnext_tuple(gm_state,
+                                              reader,
+                                              nowait,
+                                              &tuple_buffer->done));
+
+       /*
+        * Attempt to read more tuples in nowait mode and store them in
+        * the tuple array.
+        */
+       if (HeapTupleIsValid(tup))
+           form_tuple_array(gm_state, reader);
+       else
+           return false;
+   }
+
+   Assert(HeapTupleIsValid(tup));
+
+   /* Build the TupleTableSlot for the given tuple */
+   ExecStoreTuple(tup,         /* tuple to store */
+                  gm_state->gm_slots[reader],  /* slot in which to store the
+                                                * tuple */
+                  InvalidBuffer,       /* buffer associated with this tuple */
+                  true);       /* pfree this pointer if not from heap */
+
+   return true;
+}
+
+/*
+ * Attempt to read a tuple from given reader.
+ */
+static HeapTuple
+gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
+                 bool *done)
+{
+   TupleQueueReader *reader;
+   HeapTuple   tup = NULL;
+   MemoryContext oldContext;
+   MemoryContext tupleContext;
+
+   tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
+
+   if (done != NULL)
+       *done = false;
+
+   /* Check for async events, particularly messages from workers. */
+   CHECK_FOR_INTERRUPTS();
+
+   /* Attempt to read a tuple. */
+   reader = gm_state->reader[nreader];
+
+   /* Run TupleQueueReaders in per-tuple context */
+   oldContext = MemoryContextSwitchTo(tupleContext);
+   tup = TupleQueueReaderNext(reader, nowait, done);
+   MemoryContextSwitchTo(oldContext);
+
+   return tup;
+}
+
+/*
+ * We have one slot for each item in the heap array.  We use SlotNumber
+ * to store slot indexes.  This doesn't actually provide any formal
+ * type-safety, but it makes the code more self-documenting.
+ */
+typedef int32 SlotNumber;
+
+/*
+ * Compare the tuples in the two given slots.
+ */
+static int32
+heap_compare_slots(Datum a, Datum b, void *arg)
+{
+   GatherMergeState *node = (GatherMergeState *) arg;
+   SlotNumber  slot1 = DatumGetInt32(a);
+   SlotNumber  slot2 = DatumGetInt32(b);
+
+   TupleTableSlot *s1 = node->gm_slots[slot1];
+   TupleTableSlot *s2 = node->gm_slots[slot2];
+   int         nkey;
+
+   Assert(!TupIsNull(s1));
+   Assert(!TupIsNull(s2));
+
+   for (nkey = 0; nkey < node->gm_nkeys; nkey++)
+   {
+       SortSupport sortKey = node->gm_sortkeys + nkey;
+       AttrNumber  attno = sortKey->ssup_attno;
+       Datum       datum1,
+                   datum2;
+       bool        isNull1,
+                   isNull2;
+       int         compare;
+
+       datum1 = slot_getattr(s1, attno, &isNull1);
+       datum2 = slot_getattr(s2, attno, &isNull2);
+
+       compare = ApplySortComparator(datum1, isNull1,
+                                     datum2, isNull2,
+                                     sortKey);
+       if (compare != 0)
+           return -compare;
+   }
+   return 0;
+}
index ac8e50ef1dc98b89e71dbcd9d52d84fa987f3918..bfc2ac1716510ed6537fd4e378565376945931a3 100644 (file)
@@ -360,6 +360,31 @@ _copyGather(const Gather *from)
    return newnode;
 }
 
+/*
+ * _copyGatherMerge
+ */
+static GatherMerge *
+_copyGatherMerge(const GatherMerge *from)
+{
+   GatherMerge    *newnode = makeNode(GatherMerge);
+
+   /*
+    * copy node superclass fields
+    */
+   CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+   /*
+    * copy remainder of node
+    */
+   COPY_SCALAR_FIELD(num_workers);
+   COPY_SCALAR_FIELD(numCols);
+   COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+   COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+   COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+   COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+
+   return newnode;
+}
 
 /*
  * CopyScanFields
@@ -4594,6 +4619,9 @@ copyObject(const void *from)
        case T_Gather:
            retval = _copyGather(from);
            break;
+       case T_GatherMerge:
+           retval = _copyGatherMerge(from);
+           break;
        case T_SeqScan:
            retval = _copySeqScan(from);
            break;
index 825a7b283a348148e6935604eb95983e1bd57e17..7418fbededf91070d43aee088b78e1ded3c3918a 100644 (file)
@@ -457,6 +457,35 @@ _outGather(StringInfo str, const Gather *node)
    WRITE_BOOL_FIELD(invisible);
 }
 
+static void
+_outGatherMerge(StringInfo str, const GatherMerge *node)
+{
+   int     i;
+
+   WRITE_NODE_TYPE("GATHERMERGE");
+
+   _outPlanInfo(str, (const Plan *) node);
+
+   WRITE_INT_FIELD(num_workers);
+   WRITE_INT_FIELD(numCols);
+
+   appendStringInfoString(str, " :sortColIdx");
+   for (i = 0; i < node->numCols; i++)
+       appendStringInfo(str, " %d", node->sortColIdx[i]);
+
+   appendStringInfoString(str, " :sortOperators");
+   for (i = 0; i < node->numCols; i++)
+       appendStringInfo(str, " %u", node->sortOperators[i]);
+
+   appendStringInfoString(str, " :collations");
+   for (i = 0; i < node->numCols; i++)
+       appendStringInfo(str, " %u", node->collations[i]);
+
+   appendStringInfoString(str, " :nullsFirst");
+   for (i = 0; i < node->numCols; i++)
+       appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
+}
+
 static void
 _outScan(StringInfo str, const Scan *node)
 {
@@ -2016,6 +2045,17 @@ _outLimitPath(StringInfo str, const LimitPath *node)
    WRITE_NODE_FIELD(limitCount);
 }
 
+static void
+_outGatherMergePath(StringInfo str, const GatherMergePath *node)
+{
+   WRITE_NODE_TYPE("GATHERMERGEPATH");
+
+   _outPathInfo(str, (const Path *) node);
+
+   WRITE_NODE_FIELD(subpath);
+   WRITE_INT_FIELD(num_workers);
+}
+
 static void
 _outNestPath(StringInfo str, const NestPath *node)
 {
@@ -3473,6 +3513,9 @@ outNode(StringInfo str, const void *obj)
            case T_Gather:
                _outGather(str, obj);
                break;
+           case T_GatherMerge:
+               _outGatherMerge(str, obj);
+               break;
            case T_Scan:
                _outScan(str, obj);
                break;
@@ -3809,6 +3852,9 @@ outNode(StringInfo str, const void *obj)
            case T_LimitPath:
                _outLimitPath(str, obj);
                break;
+           case T_GatherMergePath:
+               _outGatherMergePath(str, obj);
+               break;
            case T_NestPath:
                _outNestPath(str, obj);
                break;
index 8f39d93a123c99abe734da3dbda1b121446d206e..d3bbc02f24b5cb08e51f9ea8b16af563b6227e66 100644 (file)
@@ -2137,6 +2137,26 @@ _readGather(void)
    READ_DONE();
 }
 
+/*
+ * _readGatherMerge
+ */
+static GatherMerge *
+_readGatherMerge(void)
+{
+   READ_LOCALS(GatherMerge);
+
+   ReadCommonPlan(&local_node->plan);
+
+   READ_INT_FIELD(num_workers);
+   READ_INT_FIELD(numCols);
+   READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols);
+   READ_OID_ARRAY(sortOperators, local_node->numCols);
+   READ_OID_ARRAY(collations, local_node->numCols);
+   READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+
+   READ_DONE();
+}
+
 /*
  * _readHash
  */
@@ -2577,6 +2597,8 @@ parseNodeString(void)
        return_value = _readUnique();
    else if (MATCH("GATHER", 6))
        return_value = _readGather();
+   else if (MATCH("GATHERMERGE", 11))
+       return_value = _readGatherMerge();
    else if (MATCH("HASH", 4))
        return_value = _readHash();
    else if (MATCH("SETOP", 5))
index fbb2cda9d73bd89579ee0c2fa75c38b16ca1397e..b263359fdedd60aac7dcfc23ecb7bad70bf7405c 100644 (file)
@@ -2084,39 +2084,51 @@ set_worktable_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 
 /*
  * generate_gather_paths
- *     Generate parallel access paths for a relation by pushing a Gather on
- *     top of a partial path.
+ *     Generate parallel access paths for a relation by pushing a Gather or
+ *     Gather Merge on top of a partial path.
  *
  * This must not be called until after we're done creating all partial paths
  * for the specified relation.  (Otherwise, add_partial_path might delete a
- * path that some GatherPath has a reference to.)
+ * path that some GatherPath or GatherMergePath has a reference to.)
  */
 void
 generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
 {
    Path       *cheapest_partial_path;
    Path       *simple_gather_path;
+   ListCell   *lc;
 
    /* If there are no partial paths, there's nothing to do here. */
    if (rel->partial_pathlist == NIL)
        return;
 
    /*
-    * The output of Gather is currently always unsorted, so there's only one
-    * partial path of interest: the cheapest one.  That will be the one at
-    * the front of partial_pathlist because of the way add_partial_path
-    * works.
-    *
-    * Eventually, we should have a Gather Merge operation that can merge
-    * multiple tuple streams together while preserving their ordering.  We
-    * could usefully generate such a path from each partial path that has
-    * non-NIL pathkeys.
+    * The output of Gather is always unsorted, so there's only one partial
+    * path of interest: the cheapest one.  That will be the one at the front
+    * of partial_pathlist because of the way add_partial_path works.
     */
    cheapest_partial_path = linitial(rel->partial_pathlist);
    simple_gather_path = (Path *)
        create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
                           NULL, NULL);
    add_path(rel, simple_gather_path);
+
+   /*
+    * For each useful ordering, we can consider an order-preserving Gather
+    * Merge.
+    */
+   foreach (lc, rel->partial_pathlist)
+   {
+       Path   *subpath = (Path *) lfirst(lc);
+       GatherMergePath   *path;
+
+       if (subpath->pathkeys == NIL)
+           continue;
+
+       path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
+                                       subpath->pathkeys, NULL, NULL);
+       add_path(rel, &path->path);
+   }
 }
 
 /*
index 627e3f1b954121337783db5834b7385f9b3c2750..e78f3a84c5041ffe00a1ca56bf9006e2160daf66 100644 (file)
@@ -126,6 +126,7 @@ bool        enable_nestloop = true;
 bool       enable_material = true;
 bool       enable_mergejoin = true;
 bool       enable_hashjoin = true;
+bool       enable_gathermerge = true;
 
 typedef struct
 {
@@ -372,6 +373,73 @@ cost_gather(GatherPath *path, PlannerInfo *root,
    path->path.total_cost = (startup_cost + run_cost);
 }
 
+/*
+ * cost_gather_merge
+ *   Determines and returns the cost of gather merge path.
+ *
+ * GatherMerge merges several pre-sorted input streams, using a heap that at
+ * any given instant holds the next tuple from each stream. If there are N
+ * streams, we need about N*log2(N) tuple comparisons to construct the heap at
+ * startup, and then for each output tuple, about log2(N) comparisons to
+ * replace the top heap entry with the next tuple from the same stream.
+ */
+void
+cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
+                 RelOptInfo *rel, ParamPathInfo *param_info,
+                 Cost input_startup_cost, Cost input_total_cost,
+                 double *rows)
+{
+   Cost        startup_cost = 0;
+   Cost        run_cost = 0;
+   Cost        comparison_cost;
+   double      N;
+   double      logN;
+
+   /* Mark the path with the correct row estimate */
+   if (rows)
+       path->path.rows = *rows;
+   else if (param_info)
+       path->path.rows = param_info->ppi_rows;
+   else
+       path->path.rows = rel->rows;
+
+   if (!enable_gathermerge)
+       startup_cost += disable_cost;
+
+   /*
+    * Add one to the number of workers to account for the leader.  This might
+    * be overgenerous since the leader will do less work than other workers
+    * in typical cases, but we'll go with it for now.
+    */
+   Assert(path->num_workers > 0);
+   N = (double) path->num_workers + 1;
+   logN = LOG2(N);
+
+   /* Assumed cost per tuple comparison */
+   comparison_cost = 2.0 * cpu_operator_cost;
+
+   /* Heap creation cost */
+   startup_cost += comparison_cost * N * logN;
+
+   /* Per-tuple heap maintenance cost */
+   run_cost += path->path.rows * comparison_cost * logN;
+
+   /* small cost for heap management, like cost_merge_append */
+   run_cost += cpu_operator_cost * path->path.rows;
+
+   /*
+    * Parallel setup and communication cost.  Since Gather Merge, unlike
+    * Gather, requires us to block until a tuple is available from every
+    * worker, we bump the IPC cost up a little bit as compared with Gather.
+    * For lack of a better idea, charge an extra 5%.
+    */
+   startup_cost += parallel_setup_cost;
+   run_cost += parallel_tuple_cost * path->path.rows * 1.05;
+
+   path->path.startup_cost = startup_cost + input_startup_cost;
+   path->path.total_cost = (startup_cost + run_cost + input_total_cost);
+}
+
 /*
  * cost_index
  *   Determines and returns the cost of scanning a relation using an index.
index 8f8663c1e1408be5c10b5734046d988f6cfdd8d1..e18c634a7be86ff7dda5982d4f3f7ae4889d9b81 100644 (file)
@@ -277,6 +277,8 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
                 List *resultRelations, List *subplans,
                 List *withCheckOptionLists, List *returningLists,
                 List *rowMarks, OnConflictExpr *onconflict, int epqParam);
+static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
+                        GatherMergePath *best_path);
 
 
 /*
@@ -475,6 +477,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
                                              (LimitPath *) best_path,
                                              flags);
            break;
+       case T_GatherMerge:
+           plan = (Plan *) create_gather_merge_plan(root,
+                                             (GatherMergePath *) best_path);
+           break;
        default:
            elog(ERROR, "unrecognized node type: %d",
                 (int) best_path->pathtype);
@@ -1451,6 +1457,86 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
    return gather_plan;
 }
 
+/*
+ * create_gather_merge_plan
+ *
+ *   Create a Gather Merge plan for 'best_path' and (recursively)
+ *   plans for its subpaths.
+ */
+static GatherMerge *
+create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
+{
+   GatherMerge *gm_plan;
+   Plan       *subplan;
+   List       *pathkeys = best_path->path.pathkeys;
+   int         numsortkeys;
+   AttrNumber *sortColIdx;
+   Oid        *sortOperators;
+   Oid        *collations;
+   bool       *nullsFirst;
+
+   /* As with Gather, it's best to project away columns in the workers. */
+   subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
+
+   /* See create_merge_append_plan for why there's no make_xxx function */
+   gm_plan = makeNode(GatherMerge);
+   gm_plan->plan.targetlist = subplan->targetlist;
+   gm_plan->num_workers = best_path->num_workers;
+   copy_generic_path_info(&gm_plan->plan, &best_path->path);
+
+   /* Gather Merge is pointless with no pathkeys; use Gather instead. */
+   Assert(pathkeys != NIL);
+
+   /* Compute sort column info, and adjust GatherMerge tlist as needed */
+   (void) prepare_sort_from_pathkeys(&gm_plan->plan, pathkeys,
+                                     best_path->path.parent->relids,
+                                     NULL,
+                                     true,
+                                     &gm_plan->numCols,
+                                     &gm_plan->sortColIdx,
+                                     &gm_plan->sortOperators,
+                                     &gm_plan->collations,
+                                     &gm_plan->nullsFirst);
+
+
+   /* Compute sort column info, and adjust subplan's tlist as needed */
+   subplan = prepare_sort_from_pathkeys(subplan, pathkeys,
+                                        best_path->subpath->parent->relids,
+                                        gm_plan->sortColIdx,
+                                        false,
+                                        &numsortkeys,
+                                        &sortColIdx,
+                                        &sortOperators,
+                                        &collations,
+                                        &nullsFirst);
+
+   /* As for MergeAppend, check that we got the same sort key information. */
+   Assert(numsortkeys == gm_plan->numCols);
+   if (memcmp(sortColIdx, gm_plan->sortColIdx,
+              numsortkeys * sizeof(AttrNumber)) != 0)
+       elog(ERROR, "GatherMerge child's targetlist doesn't match GatherMerge");
+   Assert(memcmp(sortOperators, gm_plan->sortOperators,
+                 numsortkeys * sizeof(Oid)) == 0);
+   Assert(memcmp(collations, gm_plan->collations,
+                 numsortkeys * sizeof(Oid)) == 0);
+   Assert(memcmp(nullsFirst, gm_plan->nullsFirst,
+                 numsortkeys * sizeof(bool)) == 0);
+
+   /* Now, insert a Sort node if subplan isn't sufficiently ordered */
+   if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys))
+       subplan = (Plan *) make_sort(subplan, numsortkeys,
+                                    sortColIdx, sortOperators,
+                                    collations, nullsFirst);
+
+   /* Now insert the subplan under GatherMerge. */
+   gm_plan->plan.lefttree = subplan;
+
+   /* use parallel mode for parallel plans. */
+   root->glob->parallelModeNeeded = true;
+
+   return gm_plan;
+}
+
 /*
  * create_projection_plan
  *
index 1636a69dba49328a926ef7f3c1582e907ce7d131..209f76963263e4c5895239e59e6cb845b4b177d8 100644 (file)
@@ -3663,8 +3663,7 @@ create_grouping_paths(PlannerInfo *root,
 
        /*
         * Now generate a complete GroupAgg Path atop of the cheapest partial
-        * path. We need only bother with the cheapest path here, as the
-        * output of Gather is never sorted.
+        * path.  We can do this using either Gather or Gather Merge.
         */
        if (grouped_rel->partial_pathlist)
        {
@@ -3711,6 +3710,70 @@ create_grouping_paths(PlannerInfo *root,
                                           parse->groupClause,
                                           (List *) parse->havingQual,
                                           dNumGroups));
+
+           /*
+            * The point of using Gather Merge rather than Gather is that it
+            * can preserve the ordering of the input path, so there's no
+            * reason to try it unless (1) it's possible to produce more than
+            * one output row and (2) we want the output path to be ordered.
+            */
+           if (parse->groupClause != NIL && root->group_pathkeys != NIL)
+           {
+               foreach(lc, grouped_rel->partial_pathlist)
+               {
+                   Path       *subpath = (Path *) lfirst(lc);
+                   Path       *gmpath;
+                   double      total_groups;
+
+                   /*
+                    * It's useful to consider paths that are already properly
+                    * ordered for Gather Merge, because those don't need a
+                    * sort.  It's also useful to consider the cheapest path,
+                    * because sorting it in parallel and then doing Gather
+                    * Merge may be better than doing an unordered Gather
+                    * followed by a sort.  But there's no point in
+                    * considering non-cheapest paths that aren't already
+                    * sorted correctly.
+                    */
+                   if (path != subpath &&
+                       !pathkeys_contained_in(root->group_pathkeys,
+                                              subpath->pathkeys))
+                       continue;
+
+                   total_groups = subpath->rows * subpath->parallel_workers;
+
+                   gmpath = (Path *)
+                       create_gather_merge_path(root,
+                                                grouped_rel,
+                                                subpath,
+                                                NULL,
+                                                root->group_pathkeys,
+                                                NULL,
+                                                &total_groups);
+
+                   if (parse->hasAggs)
+                       add_path(grouped_rel, (Path *)
+                                create_agg_path(root,
+                                                grouped_rel,
+                                                gmpath,
+                                                target,
+                                parse->groupClause ? AGG_SORTED : AGG_PLAIN,
+                                                AGGSPLIT_FINAL_DESERIAL,
+                                                parse->groupClause,
+                                                (List *) parse->havingQual,
+                                                &agg_final_costs,
+                                                dNumGroups));
+                   else
+                       add_path(grouped_rel, (Path *)
+                                create_group_path(root,
+                                                  grouped_rel,
+                                                  gmpath,
+                                                  target,
+                                                  parse->groupClause,
+                                                  (List *) parse->havingQual,
+                                                  dNumGroups));
+               }
+           }
        }
    }
 
@@ -3808,6 +3871,16 @@ create_grouping_paths(PlannerInfo *root,
    /* Now choose the best path(s) */
    set_cheapest(grouped_rel);
 
+   /*
+    * We've been using the partial pathlist for the grouped relation to hold
+    * partially aggregated paths, but that's actually a little bit bogus
+    * because it's unsafe for later planning stages -- like ordered_rel ---
+    * to get the idea that they can use these partial paths as if they didn't
+    * need a FinalizeAggregate step.  Zap the partial pathlist at this stage
+    * so we don't get confused.
+    */
+   grouped_rel->partial_pathlist = NIL;
+
    return grouped_rel;
 }
 
@@ -4275,6 +4348,56 @@ create_ordered_paths(PlannerInfo *root,
        }
    }
 
+   /*
+    * generate_gather_paths() will have already generated a simple Gather
+    * path for the best parallel path, if any, and the loop above will have
+    * considered sorting it.  Similarly, generate_gather_paths() will also
+    * have generated order-preserving Gather Merge plans which can be used
+    * without sorting if they happen to match the sort_pathkeys, and the loop
+    * above will have handled those as well.  However, there's one more
+    * possibility: it may make sense to sort the cheapest partial path
+    * according to the required output order and then use Gather Merge.
+    */
+   if (ordered_rel->consider_parallel && root->sort_pathkeys != NIL &&
+       input_rel->partial_pathlist != NIL)
+   {
+       Path       *cheapest_partial_path;
+
+       cheapest_partial_path = linitial(input_rel->partial_pathlist);
+
+       /*
+        * If cheapest partial path doesn't need a sort, this is redundant
+        * with what's already been tried.
+        */
+       if (!pathkeys_contained_in(root->sort_pathkeys,
+                                  cheapest_partial_path->pathkeys))
+       {
+           Path       *path;
+           double      total_groups;
+
+           path = (Path *) create_sort_path(root,
+                                            ordered_rel,
+                                            cheapest_partial_path,
+                                            root->sort_pathkeys,
+                                            limit_tuples);
+
+           total_groups = cheapest_partial_path->rows *
+               cheapest_partial_path->parallel_workers;
+           path = (Path *)
+               create_gather_merge_path(root, ordered_rel,
+                                        path,
+                                        target, root->sort_pathkeys, NULL,
+                                        &total_groups);
+
+           /* Add projection step if needed */
+           if (path->pathtarget != target)
+               path = apply_projection_to_path(root, ordered_rel,
+                                               path, target);
+
+           add_path(ordered_rel, path);
+       }
+   }
+
    /*
     * If there is an FDW that's responsible for all baserels of the query,
     * let it consider adding ForeignPaths.
index 3d2c12433d3d52282fd6a397c2737af0bd9d09fe..5f3027e96f981e7b19a17d4bf4edcc089818fe08 100644 (file)
@@ -616,6 +616,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
            break;
 
        case T_Gather:
+       case T_GatherMerge:
            set_upper_references(root, plan, rtoffset);
            break;
 
index da9a84be3d76bbbbc5c403fbfbd5094d6b873dcf..6fa654066247cf58d4928ba305d0c4eb6a45dcdb 100644 (file)
@@ -2700,6 +2700,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
        case T_Sort:
        case T_Unique:
        case T_Gather:
+       case T_GatherMerge:
        case T_SetOp:
        case T_Group:
            /* no node-type-specific fields need fixing */
index 0d925c6fcbf816690e1cd32604ace7cdda9b946b..8ce772d27437fb3bee7e679db4fd31ac17d4c761 100644 (file)
@@ -1627,6 +1627,66 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
    return pathnode;
 }
 
+/*
+ * create_gather_merge_path
+ *
+ *   Creates a path corresponding to a gather merge scan, returning
+ *   the pathnode.
+ */
+GatherMergePath *
+create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
+                        PathTarget *target, List *pathkeys,
+                        Relids required_outer, double *rows)
+{
+   GatherMergePath *pathnode = makeNode(GatherMergePath);
+   Cost             input_startup_cost = 0;
+   Cost             input_total_cost = 0;
+
+   Assert(subpath->parallel_safe);
+   Assert(pathkeys);
+
+   pathnode->path.pathtype = T_GatherMerge;
+   pathnode->path.parent = rel;
+   pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
+                                                         required_outer);
+   pathnode->path.parallel_aware = false;
+
+   pathnode->subpath = subpath;
+   pathnode->num_workers = subpath->parallel_workers;
+   pathnode->path.pathkeys = pathkeys;
+   pathnode->path.pathtarget = target ? target : rel->reltarget;
+   pathnode->path.rows += subpath->rows;
+
+   if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+   {
+       /* Subpath is adequately ordered, we won't need to sort it */
+       input_startup_cost += subpath->startup_cost;
+       input_total_cost += subpath->total_cost;
+   }
+   else
+   {
+       /* We'll need to insert a Sort node, so include cost for that */
+       Path        sort_path;      /* dummy for result of cost_sort */
+
+       cost_sort(&sort_path,
+                 root,
+                 pathkeys,
+                 subpath->total_cost,
+                 subpath->rows,
+                 subpath->pathtarget->width,
+                 0.0,
+                 work_mem,
+                 -1);
+       input_startup_cost += sort_path.startup_cost;
+       input_total_cost += sort_path.total_cost;
+   }
+
+   cost_gather_merge(pathnode, root, rel, pathnode->path.param_info,
+                     input_startup_cost, input_total_cost, rows);
+
+   return pathnode;
+}
+
 /*
  * translate_sub_tlist - get subquery column numbers represented by tlist
  *
index f8b073d8a97249f661318e54125d108e5e1d6866..811ea5153b298b79c8cb08d7e635484aa0004a55 100644 (file)
@@ -902,6 +902,15 @@ static struct config_bool ConfigureNamesBool[] =
        true,
        NULL, NULL, NULL
    },
+   {
+       {"enable_gathermerge", PGC_USERSET, QUERY_TUNING_METHOD,
+           gettext_noop("Enables the planner's use of gather merge plans."),
+           NULL
+       },
+       &enable_gathermerge,
+       true,
+       NULL, NULL, NULL
+   },
 
    {
        {"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
diff --git a/src/include/executor/nodeGatherMerge.h b/src/include/executor/nodeGatherMerge.h
new file mode 100644 (file)
index 0000000..3c8b42b
--- /dev/null
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeGatherMerge.h
+ *     prototypes for nodeGatherMerge.c
+ *
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeGatherMerge.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEGATHERMERGE_H
+#define NODEGATHERMERGE_H
+
+#include "nodes/execnodes.h"
+
+extern GatherMergeState *ExecInitGatherMerge(GatherMerge * node,
+                   EState *estate,
+                   int eflags);
+extern TupleTableSlot *ExecGatherMerge(GatherMergeState * node);
+extern void ExecEndGatherMerge(GatherMergeState * node);
+extern void ExecReScanGatherMerge(GatherMergeState * node);
+extern void ExecShutdownGatherMerge(GatherMergeState * node);
+
+#endif   /* NODEGATHERMERGE_H */
index 6a0d590ef2974c55e937ca694fda94e01cec40a7..f856f6036f664f0b341106593a8d1eb77a6a3bf3 100644 (file)
@@ -2094,6 +2094,35 @@ typedef struct GatherState
    bool        need_to_scan_locally;
 } GatherState;
 
+/* ----------------
+ * GatherMergeState information
+ *
+ *     Gather merge nodes launch 1 or more parallel workers, run a
+ *     subplan which produces sorted output in each worker, and then
+ *     merge the results into a single sorted stream.
+ * ----------------
+ */
+struct GMReaderTuple;
+
+typedef struct GatherMergeState
+{
+   PlanState   ps;             /* its first field is NodeTag */
+   bool        initialized;
+   struct ParallelExecutorInfo *pei;
+   int         nreaders;
+   int         nworkers_launched;
+   struct TupleQueueReader **reader;
+   TupleDesc   tupDesc;
+   TupleTableSlot **gm_slots;
+   struct binaryheap *gm_heap; /* binary heap of slot indices */
+   bool        gm_initialized; /* gather merge initilized ? */
+   bool        need_to_scan_locally;
+   int         gm_nkeys;
+   SortSupport gm_sortkeys;    /* array of length ms_nkeys */
+   struct GMReaderTupleBuffer *gm_tuple_buffers;       /* tuple buffer per
+                                                        * reader */
+} GatherMergeState;
+
 /* ----------------
  *  HashState information
  * ----------------
index 49fa9447556436dfc2868999d528d5fb2d46dd38..2bc7a5df11be76d8a22f4796d21d8afb7e26d1b8 100644 (file)
@@ -77,6 +77,7 @@ typedef enum NodeTag
    T_WindowAgg,
    T_Unique,
    T_Gather,
+   T_GatherMerge,
    T_Hash,
    T_SetOp,
    T_LockRows,
@@ -127,6 +128,7 @@ typedef enum NodeTag
    T_WindowAggState,
    T_UniqueState,
    T_GatherState,
+   T_GatherMergeState,
    T_HashState,
    T_SetOpState,
    T_LockRowsState,
@@ -249,6 +251,7 @@ typedef enum NodeTag
    T_MaterialPath,
    T_UniquePath,
    T_GatherPath,
+   T_GatherMergePath,
    T_ProjectionPath,
    T_ProjectSetPath,
    T_SortPath,
index 7fbb0c2c77e550321fc25fef293fd1244fd80256..b880dc16cf0b900e88fb43deb1135f7f9f1474c3 100644 (file)
@@ -797,6 +797,22 @@ typedef struct Gather
    bool        invisible;      /* suppress EXPLAIN display (for testing)? */
 } Gather;
 
+/* ------------
+ *     gather merge node
+ * ------------
+ */
+typedef struct GatherMerge
+{
+   Plan        plan;
+   int         num_workers;
+   /* remaining fields are just like the sort-key info in struct Sort */
+   int         numCols;        /* number of sort-key columns */
+   AttrNumber *sortColIdx;     /* their indexes in the target list */
+   Oid        *sortOperators;  /* OIDs of operators to sort them by */
+   Oid        *collations;     /* OIDs of collations */
+   bool       *nullsFirst;     /* NULLS FIRST/LAST directions */
+} GatherMerge;
+
 /* ----------------
  *     hash build node
  *
index f7ac6f600fe8da0fc3ca861ca50dd3f9eb0f8c4e..05d6f07aea9de3d4baacf288e743dbcdf6449037 100644 (file)
@@ -1203,6 +1203,19 @@ typedef struct GatherPath
    bool        single_copy;    /* don't execute path more than once */
 } GatherPath;
 
+/*
+ * GatherMergePath runs several copies of a plan in parallel and
+ * collects the results. For gather merge parallel leader always execute the
+ * plan.
+ */
+typedef struct GatherMergePath
+{
+   Path        path;
+   Path       *subpath;        /* path for each worker */
+   int         num_workers;    /* number of workers sought to help */
+} GatherMergePath;
+
+
 /*
  * All join-type paths share these fields.
  */
index 2b386835e3728810925f9a56249a39e3f4920a63..d9a9b12a06cc33946e5a2b51f326913c601ec8e0 100644 (file)
@@ -66,6 +66,7 @@ extern bool enable_nestloop;
 extern bool enable_material;
 extern bool enable_mergejoin;
 extern bool enable_hashjoin;
+extern bool enable_gathermerge;
 extern int constraint_exclusion;
 
 extern double clamp_row_est(double nrows);
@@ -205,5 +206,9 @@ extern Selectivity clause_selectivity(PlannerInfo *root,
                   int varRelid,
                   JoinType jointype,
                   SpecialJoinInfo *sjinfo);
+extern void cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
+                             RelOptInfo *rel, ParamPathInfo *param_info,
+                             Cost input_startup_cost, Cost input_total_cost,
+                             double *rows);
 
 #endif   /* COST_H */
index f0fe8307224218bbf9042ea7746b3550e1c556c5..373c7221a842f8c7a539cf6b124aa06d9d046c3c 100644 (file)
@@ -78,6 +78,13 @@ extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
 extern GatherPath *create_gather_path(PlannerInfo *root,
                   RelOptInfo *rel, Path *subpath, PathTarget *target,
                   Relids required_outer, double *rows);
+extern GatherMergePath *create_gather_merge_path(PlannerInfo *root,
+                                                RelOptInfo *rel,
+                                                Path *subpath,
+                                                PathTarget *target,
+                                                List *pathkeys,
+                                                Relids required_outer,
+                                                double *rows);
 extern SubqueryScanPath *create_subqueryscan_path(PlannerInfo *root,
                         RelOptInfo *rel, Path *subpath,
                         List *pathkeys, Relids required_outer);
index 290b735b6b59527156b546e4f15b694fef0ee19c..038a62efd76dabd6f94bf4df7e16635ced5121d4 100644 (file)
@@ -213,6 +213,33 @@ select  count(*) from tenk1, tenk2 where tenk1.unique1 = tenk2.unique1;
 
 reset enable_hashjoin;
 reset enable_nestloop;
+--test gather merge
+set enable_hashagg to off;
+explain (costs off)
+   select  string4, count((unique2)) from tenk1 group by string4 order by string4;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Finalize GroupAggregate
+   Group Key: string4
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Partial GroupAggregate
+               Group Key: string4
+               ->  Sort
+                     Sort Key: string4
+                     ->  Parallel Seq Scan on tenk1
+(9 rows)
+
+select  string4, count((unique2)) from tenk1 group by string4 order by string4;
+ string4 | count 
+---------+-------
+ AAAAxx  |  2500
+ HHHHxx  |  2500
+ OOOOxx  |  2500
+ VVVVxx  |  2500
+(4 rows)
+
+reset enable_hashagg;
 set force_parallel_mode=1;
 explain (costs off)
   select stringu1::int2 from tenk1 where unique1 = 1;
index d48abd7e09281f80d72bbbb29c2fb154d2bfbea7..568b783f5edacbec24585071435b5ebd177ad68d 100644 (file)
@@ -73,6 +73,7 @@ select name, setting from pg_settings where name like 'enable%';
          name         | setting 
 ----------------------+---------
  enable_bitmapscan    | on
+ enable_gathermerge   | on
  enable_hashagg       | on
  enable_hashjoin      | on
  enable_indexonlyscan | on
@@ -83,7 +84,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan       | on
  enable_sort          | on
  enable_tidscan       | on
-(11 rows)
+(12 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
index 80412b990d24c8cd1dd683e1a2a3d72a35989dd5..9311a775af606b8e9e36fbcd30a0f18e9bc4b81b 100644 (file)
@@ -84,6 +84,17 @@ select  count(*) from tenk1, tenk2 where tenk1.unique1 = tenk2.unique1;
 
 reset enable_hashjoin;
 reset enable_nestloop;
+
+--test gather merge
+set enable_hashagg to off;
+
+explain (costs off)
+   select  string4, count((unique2)) from tenk1 group by string4 order by string4;
+
+select  string4, count((unique2)) from tenk1 group by string4 order by string4;
+
+reset enable_hashagg;
+
 set force_parallel_mode=1;
 
 explain (costs off)
index 3155ec6d5b4440446c6597c3b064faf62432e909..296552e3948b58f67920714501082e7dda1964b5 100644 (file)
@@ -779,6 +779,9 @@ GV
 Gather
 GatherPath
 GatherState
+GatherMerge
+GatherMergePath
+GatherMergeState
 Gene
 GenericCosts
 GenericExprState