</listitem>
</varlistentry>
+ <varlistentry id="guc-enable-gathermerge" xreflabel="enable_gathermerge">
+ <term><varname>enable_gathermerge</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>enable_gathermerge</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables or disables the query planner's use of gather
+ merge plan types. The default is <literal>on</>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-enable-hashagg" xreflabel="enable_hashagg">
<term><varname>enable_hashagg</varname> (<type>boolean</type>)
<indexterm>
case T_Gather:
pname = sname = "Gather";
break;
+ case T_GatherMerge:
+ pname = sname = "Gather Merge";
+ break;
case T_IndexScan:
pname = sname = "Index Scan";
break;
ExplainPropertyBool("Single Copy", gather->single_copy, es);
}
break;
+ case T_GatherMerge:
+ {
+ GatherMerge *gm = (GatherMerge *) plan;
+
+ show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
+ if (plan->qual)
+ show_instrumentation_count("Rows Removed by Filter", 1,
+ planstate, es);
+ ExplainPropertyInteger("Workers Planned",
+ gm->num_workers, es);
+ if (es->analyze)
+ {
+ int nworkers;
+
+ nworkers = ((GatherMergeState *) planstate)->nworkers_launched;
+ ExplainPropertyInteger("Workers Launched",
+ nworkers, es);
+ }
+ }
+ break;
case T_FunctionScan:
if (es->verbose)
{
nodeBitmapHeapscan.o nodeBitmapIndexscan.o \
nodeCustom.o nodeFunctionscan.o nodeGather.o \
nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
- nodeLimit.o nodeLockRows.o \
+ nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
#include "executor/nodeForeignscan.h"
#include "executor/nodeFunctionscan.h"
#include "executor/nodeGather.h"
+#include "executor/nodeGatherMerge.h"
#include "executor/nodeGroup.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
estate, eflags);
break;
+ case T_GatherMerge:
+ result = (PlanState *) ExecInitGatherMerge((GatherMerge *) node,
+ estate, eflags);
+ break;
+
case T_Hash:
result = (PlanState *) ExecInitHash((Hash *) node,
estate, eflags);
result = ExecGather((GatherState *) node);
break;
+ case T_GatherMergeState:
+ result = ExecGatherMerge((GatherMergeState *) node);
+ break;
+
case T_HashState:
result = ExecHash((HashState *) node);
break;
ExecEndGather((GatherState *) node);
break;
+ case T_GatherMergeState:
+ ExecEndGatherMerge((GatherMergeState *) node);
+ break;
+
case T_IndexScanState:
ExecEndIndexScan((IndexScanState *) node);
break;
case T_CustomScanState:
ExecShutdownCustomScan((CustomScanState *) node);
break;
+ case T_GatherMergeState:
+ ExecShutdownGatherMerge((GatherMergeState *) node);
+ break;
default:
break;
}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * nodeGatherMerge.c
+ * Scan a plan in multiple workers, and do order-preserving merge.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/nodeGatherMerge.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "executor/execdebug.h"
+#include "executor/execParallel.h"
+#include "executor/nodeGatherMerge.h"
+#include "executor/nodeSubplan.h"
+#include "executor/tqueue.h"
+#include "lib/binaryheap.h"
+#include "miscadmin.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+/*
+ * Tuple array for each worker
+ */
+typedef struct GMReaderTupleBuffer
+{
+ HeapTuple *tuple;
+ int readCounter;
+ int nTuples;
+ bool done;
+} GMReaderTupleBuffer;
+
+/*
+ * When we read tuples from workers, it's a good idea to read several at once
+ * for efficiency when possible: this minimizes context-switching overhead.
+ * But reading too many at a time wastes memory without improving performance.
+ */
+#define MAX_TUPLE_STORE 10
+
+static int32 heap_compare_slots(Datum a, Datum b, void *arg);
+static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
+static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
+ bool nowait, bool *done);
+static void gather_merge_init(GatherMergeState *gm_state);
+static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
+static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
+ bool nowait);
+static void form_tuple_array(GatherMergeState *gm_state, int reader);
+
+/* ----------------------------------------------------------------
+ * ExecInitGather
+ * ----------------------------------------------------------------
+ */
+GatherMergeState *
+ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
+{
+ GatherMergeState *gm_state;
+ Plan *outerNode;
+ bool hasoid;
+ TupleDesc tupDesc;
+
+ /* Gather merge node doesn't have innerPlan node. */
+ Assert(innerPlan(node) == NULL);
+
+ /*
+ * create state structure
+ */
+ gm_state = makeNode(GatherMergeState);
+ gm_state->ps.plan = (Plan *) node;
+ gm_state->ps.state = estate;
+
+ /*
+ * Miscellaneous initialization
+ *
+ * create expression context for node
+ */
+ ExecAssignExprContext(estate, &gm_state->ps);
+
+ /*
+ * initialize child expressions
+ */
+ gm_state->ps.targetlist = (List *)
+ ExecInitExpr((Expr *) node->plan.targetlist,
+ (PlanState *) gm_state);
+ gm_state->ps.qual = (List *)
+ ExecInitExpr((Expr *) node->plan.qual,
+ (PlanState *) gm_state);
+
+ /*
+ * tuple table initialization
+ */
+ ExecInitResultTupleSlot(estate, &gm_state->ps);
+
+ /*
+ * now initialize outer plan
+ */
+ outerNode = outerPlan(node);
+ outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
+
+ /*
+ * Initialize result tuple type and projection info.
+ */
+ ExecAssignResultTypeFromTL(&gm_state->ps);
+ ExecAssignProjectionInfo(&gm_state->ps, NULL);
+
+ gm_state->gm_initialized = false;
+
+ /*
+ * initialize sort-key information
+ */
+ if (node->numCols)
+ {
+ int i;
+
+ gm_state->gm_nkeys = node->numCols;
+ gm_state->gm_sortkeys =
+ palloc0(sizeof(SortSupportData) * node->numCols);
+
+ for (i = 0; i < node->numCols; i++)
+ {
+ SortSupport sortKey = gm_state->gm_sortkeys + i;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = node->collations[i];
+ sortKey->ssup_nulls_first = node->nullsFirst[i];
+ sortKey->ssup_attno = node->sortColIdx[i];
+
+ /*
+ * We don't perform abbreviated key conversion here, for the same
+ * reasons that it isn't used in MergeAppend
+ */
+ sortKey->abbreviate = false;
+
+ PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
+ }
+ }
+
+ /*
+ * store the tuple descriptor into gather merge state, so we can use it
+ * later while initializing the gather merge slots.
+ */
+ if (!ExecContextForcesOids(&gm_state->ps, &hasoid))
+ hasoid = false;
+ tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
+ gm_state->tupDesc = tupDesc;
+
+ return gm_state;
+}
+
+/* ----------------------------------------------------------------
+ * ExecGatherMerge(node)
+ *
+ * Scans the relation via multiple workers and returns
+ * the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGatherMerge(GatherMergeState *node)
+{
+ TupleTableSlot *slot;
+ ExprContext *econtext;
+ int i;
+
+ /*
+ * As with Gather, we don't launch workers until this node is actually
+ * executed.
+ */
+ if (!node->initialized)
+ {
+ EState *estate = node->ps.state;
+ GatherMerge *gm = (GatherMerge *) node->ps.plan;
+
+ /*
+ * Sometimes we might have to run without parallelism; but if parallel
+ * mode is active then we can try to fire up some workers.
+ */
+ if (gm->num_workers > 0 && IsInParallelMode())
+ {
+ ParallelContext *pcxt;
+
+ /* Initialize data structures for workers. */
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gm->num_workers);
+
+ /* Try to launch workers. */
+ pcxt = node->pei->pcxt;
+ LaunchParallelWorkers(pcxt);
+ node->nworkers_launched = pcxt->nworkers_launched;
+
+ /* Set up tuple queue readers to read the results. */
+ if (pcxt->nworkers_launched > 0)
+ {
+ node->nreaders = 0;
+ node->reader = palloc(pcxt->nworkers_launched *
+ sizeof(TupleQueueReader *));
+
+ Assert(gm->numCols);
+
+ for (i = 0; i < pcxt->nworkers_launched; ++i)
+ {
+ shm_mq_set_handle(node->pei->tqueue[i],
+ pcxt->worker[i].bgwhandle);
+ node->reader[node->nreaders++] =
+ CreateTupleQueueReader(node->pei->tqueue[i],
+ node->tupDesc);
+ }
+ }
+ else
+ {
+ /* No workers? Then never mind. */
+ ExecShutdownGatherMergeWorkers(node);
+ }
+ }
+
+ /* always allow leader to participate */
+ node->need_to_scan_locally = true;
+ node->initialized = true;
+ }
+
+ /*
+ * Reset per-tuple memory context to free any expression evaluation
+ * storage allocated in the previous tuple cycle.
+ */
+ econtext = node->ps.ps_ExprContext;
+ ResetExprContext(econtext);
+
+ /*
+ * Get next tuple, either from one of our workers, or by running the
+ * plan ourselves.
+ */
+ slot = gather_merge_getnext(node);
+ if (TupIsNull(slot))
+ return NULL;
+
+ /*
+ * form the result tuple using ExecProject(), and return it --- unless
+ * the projection produces an empty set, in which case we must loop
+ * back around for another tuple
+ */
+ econtext->ecxt_outertuple = slot;
+ return ExecProject(node->ps.ps_ProjInfo);
+}
+
+/* ----------------------------------------------------------------
+ * ExecEndGatherMerge
+ *
+ * frees any storage allocated through C routines.
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndGatherMerge(GatherMergeState *node)
+{
+ ExecEndNode(outerPlanState(node)); /* let children clean up first */
+ ExecShutdownGatherMerge(node);
+ ExecFreeExprContext(&node->ps);
+ ExecClearTuple(node->ps.ps_ResultTupleSlot);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGatherMerge
+ *
+ * Destroy the setup for parallel workers including parallel context.
+ * Collect all the stats after workers are stopped, else some work
+ * done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGatherMerge(GatherMergeState *node)
+{
+ ExecShutdownGatherMergeWorkers(node);
+
+ /* Now destroy the parallel context. */
+ if (node->pei != NULL)
+ {
+ ExecParallelCleanup(node->pei);
+ node->pei = NULL;
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGatherMergeWorkers
+ *
+ * Destroy the parallel workers. Collect all the stats after
+ * workers are stopped, else some work done by workers won't be
+ * accounted.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecShutdownGatherMergeWorkers(GatherMergeState *node)
+{
+ /* Shut down tuple queue readers before shutting down workers. */
+ if (node->reader != NULL)
+ {
+ int i;
+
+ for (i = 0; i < node->nreaders; ++i)
+ if (node->reader[i])
+ DestroyTupleQueueReader(node->reader[i]);
+
+ pfree(node->reader);
+ node->reader = NULL;
+ }
+
+ /* Now shut down the workers. */
+ if (node->pei != NULL)
+ ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ * ExecReScanGatherMerge
+ *
+ * Re-initialize the workers and rescans a relation via them.
+ * ----------------------------------------------------------------
+ */
+void
+ExecReScanGatherMerge(GatherMergeState *node)
+{
+ /*
+ * Re-initialize the parallel workers to perform rescan of relation. We
+ * want to gracefully shutdown all the workers so that they should be able
+ * to propagate any error or other information to master backend before
+ * dying. Parallel context will be reused for rescan.
+ */
+ ExecShutdownGatherMergeWorkers(node);
+
+ node->initialized = false;
+
+ if (node->pei)
+ ExecParallelReinitialize(node->pei);
+
+ ExecReScan(node->ps.lefttree);
+}
+
+/*
+ * Initialize the Gather merge tuple read.
+ *
+ * Pull at least a single tuple from each worker + leader and set up the heap.
+ */
+static void
+gather_merge_init(GatherMergeState *gm_state)
+{
+ int nreaders = gm_state->nreaders;
+ bool initialize = true;
+ int i;
+
+ /*
+ * Allocate gm_slots for the number of worker + one more slot for leader.
+ * Last slot is always for leader. Leader always calls ExecProcNode() to
+ * read the tuple which will return the TupleTableSlot. Later it will
+ * directly get assigned to gm_slot. So just initialize leader gm_slot
+ * with NULL. For other slots below code will call
+ * ExecInitExtraTupleSlot() which will do the initialization of worker
+ * slots.
+ */
+ gm_state->gm_slots =
+ palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
+ gm_state->gm_slots[gm_state->nreaders] = NULL;
+
+ /* Initialize the tuple slot and tuple array for each worker */
+ gm_state->gm_tuple_buffers =
+ (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
+ (gm_state->nreaders + 1));
+ for (i = 0; i < gm_state->nreaders; i++)
+ {
+ /* Allocate the tuple array with MAX_TUPLE_STORE size */
+ gm_state->gm_tuple_buffers[i].tuple =
+ (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
+
+ /* Initialize slot for worker */
+ gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state);
+ ExecSetSlotDescriptor(gm_state->gm_slots[i],
+ gm_state->tupDesc);
+ }
+
+ /* Allocate the resources for the merge */
+ gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
+ heap_compare_slots,
+ gm_state);
+
+ /*
+ * First, try to read a tuple from each worker (including leader) in
+ * nowait mode, so that we initialize read from each worker as well as
+ * leader. After this, if all active workers are unable to produce a
+ * tuple, then re-read and this time use wait mode. For workers that were
+ * able to produce a tuple in the earlier loop and are still active, just
+ * try to fill the tuple array if more tuples are avaiable.
+ */
+reread:
+ for (i = 0; i < nreaders + 1; i++)
+ {
+ if (!gm_state->gm_tuple_buffers[i].done &&
+ (TupIsNull(gm_state->gm_slots[i]) ||
+ gm_state->gm_slots[i]->tts_isempty))
+ {
+ if (gather_merge_readnext(gm_state, i, initialize))
+ {
+ binaryheap_add_unordered(gm_state->gm_heap,
+ Int32GetDatum(i));
+ }
+ }
+ else
+ form_tuple_array(gm_state, i);
+ }
+ initialize = false;
+
+ for (i = 0; i < nreaders; i++)
+ if (!gm_state->gm_tuple_buffers[i].done &&
+ (TupIsNull(gm_state->gm_slots[i]) ||
+ gm_state->gm_slots[i]->tts_isempty))
+ goto reread;
+
+ binaryheap_build(gm_state->gm_heap);
+ gm_state->gm_initialized = true;
+}
+
+/*
+ * Clear out a slot in the tuple table for each gather merge
+ * slot and return the clear cleared slot.
+ */
+static TupleTableSlot *
+gather_merge_clear_slots(GatherMergeState *gm_state)
+{
+ int i;
+
+ for (i = 0; i < gm_state->nreaders; i++)
+ {
+ pfree(gm_state->gm_tuple_buffers[i].tuple);
+ gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]);
+ }
+
+ /* Free tuple array as we don't need it any more */
+ pfree(gm_state->gm_tuple_buffers);
+ /* Free the binaryheap, which was created for sort */
+ binaryheap_free(gm_state->gm_heap);
+
+ /* return any clear slot */
+ return gm_state->gm_slots[0];
+}
+
+/*
+ * Read the next tuple for gather merge.
+ *
+ * Fetch the sorted tuple out of the heap.
+ */
+static TupleTableSlot *
+gather_merge_getnext(GatherMergeState *gm_state)
+{
+ int i;
+
+ /*
+ * First time through: pull the first tuple from each participate, and set
+ * up the heap.
+ */
+ if (gm_state->gm_initialized == false)
+ gather_merge_init(gm_state);
+ else
+ {
+ /*
+ * Otherwise, pull the next tuple from whichever participant we
+ * returned from last time, and reinsert the index into the heap,
+ * because it might now compare differently against the existing
+ * elements of the heap.
+ */
+ i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+
+ if (gather_merge_readnext(gm_state, i, false))
+ binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
+ else
+ (void) binaryheap_remove_first(gm_state->gm_heap);
+ }
+
+ if (binaryheap_empty(gm_state->gm_heap))
+ {
+ /* All the queues are exhausted, and so is the heap */
+ return gather_merge_clear_slots(gm_state);
+ }
+ else
+ {
+ i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+ return gm_state->gm_slots[i];
+ }
+
+ return gather_merge_clear_slots(gm_state);
+}
+
+/*
+ * Read the tuple for given reader in nowait mode, and form the tuple array.
+ */
+static void
+form_tuple_array(GatherMergeState *gm_state, int reader)
+{
+ GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+ int i;
+
+ /* Last slot is for leader and we don't build tuple array for leader */
+ if (reader == gm_state->nreaders)
+ return;
+
+ /*
+ * We here because we already read all the tuples from the tuple array, so
+ * initialize the counter to zero.
+ */
+ if (tuple_buffer->nTuples == tuple_buffer->readCounter)
+ tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
+
+ /* Tuple array is already full? */
+ if (tuple_buffer->nTuples == MAX_TUPLE_STORE)
+ return;
+
+ for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
+ {
+ tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state,
+ reader,
+ false,
+ &tuple_buffer->done));
+ if (!HeapTupleIsValid(tuple_buffer->tuple[i]))
+ break;
+ tuple_buffer->nTuples++;
+ }
+}
+
+/*
+ * Store the next tuple for a given reader into the appropriate slot.
+ *
+ * Returns false if the reader is exhausted, and true otherwise.
+ */
+static bool
+gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
+{
+ GMReaderTupleBuffer *tuple_buffer;
+ HeapTuple tup = NULL;
+
+ /*
+ * If we're being asked to generate a tuple from the leader, then we
+ * just call ExecProcNode as normal to produce one.
+ */
+ if (gm_state->nreaders == reader)
+ {
+ if (gm_state->need_to_scan_locally)
+ {
+ PlanState *outerPlan = outerPlanState(gm_state);
+ TupleTableSlot *outerTupleSlot;
+
+ outerTupleSlot = ExecProcNode(outerPlan);
+
+ if (!TupIsNull(outerTupleSlot))
+ {
+ gm_state->gm_slots[reader] = outerTupleSlot;
+ return true;
+ }
+ gm_state->gm_tuple_buffers[reader].done = true;
+ gm_state->need_to_scan_locally = false;
+ }
+ return false;
+ }
+
+ /* Otherwise, check the state of the relevant tuple buffer. */
+ tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+
+ if (tuple_buffer->nTuples > tuple_buffer->readCounter)
+ {
+ /* Return any tuple previously read that is still buffered. */
+ tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+ tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
+ }
+ else if (tuple_buffer->done)
+ {
+ /* Reader is known to be exhausted. */
+ DestroyTupleQueueReader(gm_state->reader[reader]);
+ gm_state->reader[reader] = NULL;
+ return false;
+ }
+ else
+ {
+ /* Read and buffer next tuple. */
+ tup = heap_copytuple(gm_readnext_tuple(gm_state,
+ reader,
+ nowait,
+ &tuple_buffer->done));
+
+ /*
+ * Attempt to read more tuples in nowait mode and store them in
+ * the tuple array.
+ */
+ if (HeapTupleIsValid(tup))
+ form_tuple_array(gm_state, reader);
+ else
+ return false;
+ }
+
+ Assert(HeapTupleIsValid(tup));
+
+ /* Build the TupleTableSlot for the given tuple */
+ ExecStoreTuple(tup, /* tuple to store */
+ gm_state->gm_slots[reader], /* slot in which to store the
+ * tuple */
+ InvalidBuffer, /* buffer associated with this tuple */
+ true); /* pfree this pointer if not from heap */
+
+ return true;
+}
+
+/*
+ * Attempt to read a tuple from given reader.
+ */
+static HeapTuple
+gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
+ bool *done)
+{
+ TupleQueueReader *reader;
+ HeapTuple tup = NULL;
+ MemoryContext oldContext;
+ MemoryContext tupleContext;
+
+ tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
+
+ if (done != NULL)
+ *done = false;
+
+ /* Check for async events, particularly messages from workers. */
+ CHECK_FOR_INTERRUPTS();
+
+ /* Attempt to read a tuple. */
+ reader = gm_state->reader[nreader];
+
+ /* Run TupleQueueReaders in per-tuple context */
+ oldContext = MemoryContextSwitchTo(tupleContext);
+ tup = TupleQueueReaderNext(reader, nowait, done);
+ MemoryContextSwitchTo(oldContext);
+
+ return tup;
+}
+
+/*
+ * We have one slot for each item in the heap array. We use SlotNumber
+ * to store slot indexes. This doesn't actually provide any formal
+ * type-safety, but it makes the code more self-documenting.
+ */
+typedef int32 SlotNumber;
+
+/*
+ * Compare the tuples in the two given slots.
+ */
+static int32
+heap_compare_slots(Datum a, Datum b, void *arg)
+{
+ GatherMergeState *node = (GatherMergeState *) arg;
+ SlotNumber slot1 = DatumGetInt32(a);
+ SlotNumber slot2 = DatumGetInt32(b);
+
+ TupleTableSlot *s1 = node->gm_slots[slot1];
+ TupleTableSlot *s2 = node->gm_slots[slot2];
+ int nkey;
+
+ Assert(!TupIsNull(s1));
+ Assert(!TupIsNull(s2));
+
+ for (nkey = 0; nkey < node->gm_nkeys; nkey++)
+ {
+ SortSupport sortKey = node->gm_sortkeys + nkey;
+ AttrNumber attno = sortKey->ssup_attno;
+ Datum datum1,
+ datum2;
+ bool isNull1,
+ isNull2;
+ int compare;
+
+ datum1 = slot_getattr(s1, attno, &isNull1);
+ datum2 = slot_getattr(s2, attno, &isNull2);
+
+ compare = ApplySortComparator(datum1, isNull1,
+ datum2, isNull2,
+ sortKey);
+ if (compare != 0)
+ return -compare;
+ }
+ return 0;
+}
return newnode;
}
+/*
+ * _copyGatherMerge
+ */
+static GatherMerge *
+_copyGatherMerge(const GatherMerge *from)
+{
+ GatherMerge *newnode = makeNode(GatherMerge);
+
+ /*
+ * copy node superclass fields
+ */
+ CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+ /*
+ * copy remainder of node
+ */
+ COPY_SCALAR_FIELD(num_workers);
+ COPY_SCALAR_FIELD(numCols);
+ COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+ COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+ COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+ COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+
+ return newnode;
+}
/*
* CopyScanFields
case T_Gather:
retval = _copyGather(from);
break;
+ case T_GatherMerge:
+ retval = _copyGatherMerge(from);
+ break;
case T_SeqScan:
retval = _copySeqScan(from);
break;
WRITE_BOOL_FIELD(invisible);
}
+static void
+_outGatherMerge(StringInfo str, const GatherMerge *node)
+{
+ int i;
+
+ WRITE_NODE_TYPE("GATHERMERGE");
+
+ _outPlanInfo(str, (const Plan *) node);
+
+ WRITE_INT_FIELD(num_workers);
+ WRITE_INT_FIELD(numCols);
+
+ appendStringInfoString(str, " :sortColIdx");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %d", node->sortColIdx[i]);
+
+ appendStringInfoString(str, " :sortOperators");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %u", node->sortOperators[i]);
+
+ appendStringInfoString(str, " :collations");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %u", node->collations[i]);
+
+ appendStringInfoString(str, " :nullsFirst");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
+}
+
static void
_outScan(StringInfo str, const Scan *node)
{
WRITE_NODE_FIELD(limitCount);
}
+static void
+_outGatherMergePath(StringInfo str, const GatherMergePath *node)
+{
+ WRITE_NODE_TYPE("GATHERMERGEPATH");
+
+ _outPathInfo(str, (const Path *) node);
+
+ WRITE_NODE_FIELD(subpath);
+ WRITE_INT_FIELD(num_workers);
+}
+
static void
_outNestPath(StringInfo str, const NestPath *node)
{
case T_Gather:
_outGather(str, obj);
break;
+ case T_GatherMerge:
+ _outGatherMerge(str, obj);
+ break;
case T_Scan:
_outScan(str, obj);
break;
case T_LimitPath:
_outLimitPath(str, obj);
break;
+ case T_GatherMergePath:
+ _outGatherMergePath(str, obj);
+ break;
case T_NestPath:
_outNestPath(str, obj);
break;
READ_DONE();
}
+/*
+ * _readGatherMerge
+ */
+static GatherMerge *
+_readGatherMerge(void)
+{
+ READ_LOCALS(GatherMerge);
+
+ ReadCommonPlan(&local_node->plan);
+
+ READ_INT_FIELD(num_workers);
+ READ_INT_FIELD(numCols);
+ READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols);
+ READ_OID_ARRAY(sortOperators, local_node->numCols);
+ READ_OID_ARRAY(collations, local_node->numCols);
+ READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+
+ READ_DONE();
+}
+
/*
* _readHash
*/
return_value = _readUnique();
else if (MATCH("GATHER", 6))
return_value = _readGather();
+ else if (MATCH("GATHERMERGE", 11))
+ return_value = _readGatherMerge();
else if (MATCH("HASH", 4))
return_value = _readHash();
else if (MATCH("SETOP", 5))
/*
* generate_gather_paths
- * Generate parallel access paths for a relation by pushing a Gather on
- * top of a partial path.
+ * Generate parallel access paths for a relation by pushing a Gather or
+ * Gather Merge on top of a partial path.
*
* This must not be called until after we're done creating all partial paths
* for the specified relation. (Otherwise, add_partial_path might delete a
- * path that some GatherPath has a reference to.)
+ * path that some GatherPath or GatherMergePath has a reference to.)
*/
void
generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
{
Path *cheapest_partial_path;
Path *simple_gather_path;
+ ListCell *lc;
/* If there are no partial paths, there's nothing to do here. */
if (rel->partial_pathlist == NIL)
return;
/*
- * The output of Gather is currently always unsorted, so there's only one
- * partial path of interest: the cheapest one. That will be the one at
- * the front of partial_pathlist because of the way add_partial_path
- * works.
- *
- * Eventually, we should have a Gather Merge operation that can merge
- * multiple tuple streams together while preserving their ordering. We
- * could usefully generate such a path from each partial path that has
- * non-NIL pathkeys.
+ * The output of Gather is always unsorted, so there's only one partial
+ * path of interest: the cheapest one. That will be the one at the front
+ * of partial_pathlist because of the way add_partial_path works.
*/
cheapest_partial_path = linitial(rel->partial_pathlist);
simple_gather_path = (Path *)
create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
NULL, NULL);
add_path(rel, simple_gather_path);
+
+ /*
+ * For each useful ordering, we can consider an order-preserving Gather
+ * Merge.
+ */
+ foreach (lc, rel->partial_pathlist)
+ {
+ Path *subpath = (Path *) lfirst(lc);
+ GatherMergePath *path;
+
+ if (subpath->pathkeys == NIL)
+ continue;
+
+ path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
+ subpath->pathkeys, NULL, NULL);
+ add_path(rel, &path->path);
+ }
}
/*
bool enable_material = true;
bool enable_mergejoin = true;
bool enable_hashjoin = true;
+bool enable_gathermerge = true;
typedef struct
{
path->path.total_cost = (startup_cost + run_cost);
}
+/*
+ * cost_gather_merge
+ * Determines and returns the cost of gather merge path.
+ *
+ * GatherMerge merges several pre-sorted input streams, using a heap that at
+ * any given instant holds the next tuple from each stream. If there are N
+ * streams, we need about N*log2(N) tuple comparisons to construct the heap at
+ * startup, and then for each output tuple, about log2(N) comparisons to
+ * replace the top heap entry with the next tuple from the same stream.
+ */
+void
+cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
+ RelOptInfo *rel, ParamPathInfo *param_info,
+ Cost input_startup_cost, Cost input_total_cost,
+ double *rows)
+{
+ Cost startup_cost = 0;
+ Cost run_cost = 0;
+ Cost comparison_cost;
+ double N;
+ double logN;
+
+ /* Mark the path with the correct row estimate */
+ if (rows)
+ path->path.rows = *rows;
+ else if (param_info)
+ path->path.rows = param_info->ppi_rows;
+ else
+ path->path.rows = rel->rows;
+
+ if (!enable_gathermerge)
+ startup_cost += disable_cost;
+
+ /*
+ * Add one to the number of workers to account for the leader. This might
+ * be overgenerous since the leader will do less work than other workers
+ * in typical cases, but we'll go with it for now.
+ */
+ Assert(path->num_workers > 0);
+ N = (double) path->num_workers + 1;
+ logN = LOG2(N);
+
+ /* Assumed cost per tuple comparison */
+ comparison_cost = 2.0 * cpu_operator_cost;
+
+ /* Heap creation cost */
+ startup_cost += comparison_cost * N * logN;
+
+ /* Per-tuple heap maintenance cost */
+ run_cost += path->path.rows * comparison_cost * logN;
+
+ /* small cost for heap management, like cost_merge_append */
+ run_cost += cpu_operator_cost * path->path.rows;
+
+ /*
+ * Parallel setup and communication cost. Since Gather Merge, unlike
+ * Gather, requires us to block until a tuple is available from every
+ * worker, we bump the IPC cost up a little bit as compared with Gather.
+ * For lack of a better idea, charge an extra 5%.
+ */
+ startup_cost += parallel_setup_cost;
+ run_cost += parallel_tuple_cost * path->path.rows * 1.05;
+
+ path->path.startup_cost = startup_cost + input_startup_cost;
+ path->path.total_cost = (startup_cost + run_cost + input_total_cost);
+}
+
/*
* cost_index
* Determines and returns the cost of scanning a relation using an index.
List *resultRelations, List *subplans,
List *withCheckOptionLists, List *returningLists,
List *rowMarks, OnConflictExpr *onconflict, int epqParam);
+static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
+ GatherMergePath *best_path);
/*
(LimitPath *) best_path,
flags);
break;
+ case T_GatherMerge:
+ plan = (Plan *) create_gather_merge_plan(root,
+ (GatherMergePath *) best_path);
+ break;
default:
elog(ERROR, "unrecognized node type: %d",
(int) best_path->pathtype);
return gather_plan;
}
+/*
+ * create_gather_merge_plan
+ *
+ * Create a Gather Merge plan for 'best_path' and (recursively)
+ * plans for its subpaths.
+ */
+static GatherMerge *
+create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
+{
+ GatherMerge *gm_plan;
+ Plan *subplan;
+ List *pathkeys = best_path->path.pathkeys;
+ int numsortkeys;
+ AttrNumber *sortColIdx;
+ Oid *sortOperators;
+ Oid *collations;
+ bool *nullsFirst;
+
+ /* As with Gather, it's best to project away columns in the workers. */
+ subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
+
+ /* See create_merge_append_plan for why there's no make_xxx function */
+ gm_plan = makeNode(GatherMerge);
+ gm_plan->plan.targetlist = subplan->targetlist;
+ gm_plan->num_workers = best_path->num_workers;
+ copy_generic_path_info(&gm_plan->plan, &best_path->path);
+
+ /* Gather Merge is pointless with no pathkeys; use Gather instead. */
+ Assert(pathkeys != NIL);
+
+ /* Compute sort column info, and adjust GatherMerge tlist as needed */
+ (void) prepare_sort_from_pathkeys(&gm_plan->plan, pathkeys,
+ best_path->path.parent->relids,
+ NULL,
+ true,
+ &gm_plan->numCols,
+ &gm_plan->sortColIdx,
+ &gm_plan->sortOperators,
+ &gm_plan->collations,
+ &gm_plan->nullsFirst);
+
+
+ /* Compute sort column info, and adjust subplan's tlist as needed */
+ subplan = prepare_sort_from_pathkeys(subplan, pathkeys,
+ best_path->subpath->parent->relids,
+ gm_plan->sortColIdx,
+ false,
+ &numsortkeys,
+ &sortColIdx,
+ &sortOperators,
+ &collations,
+ &nullsFirst);
+
+ /* As for MergeAppend, check that we got the same sort key information. */
+ Assert(numsortkeys == gm_plan->numCols);
+ if (memcmp(sortColIdx, gm_plan->sortColIdx,
+ numsortkeys * sizeof(AttrNumber)) != 0)
+ elog(ERROR, "GatherMerge child's targetlist doesn't match GatherMerge");
+ Assert(memcmp(sortOperators, gm_plan->sortOperators,
+ numsortkeys * sizeof(Oid)) == 0);
+ Assert(memcmp(collations, gm_plan->collations,
+ numsortkeys * sizeof(Oid)) == 0);
+ Assert(memcmp(nullsFirst, gm_plan->nullsFirst,
+ numsortkeys * sizeof(bool)) == 0);
+
+ /* Now, insert a Sort node if subplan isn't sufficiently ordered */
+ if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys))
+ subplan = (Plan *) make_sort(subplan, numsortkeys,
+ sortColIdx, sortOperators,
+ collations, nullsFirst);
+
+ /* Now insert the subplan under GatherMerge. */
+ gm_plan->plan.lefttree = subplan;
+
+ /* use parallel mode for parallel plans. */
+ root->glob->parallelModeNeeded = true;
+
+ return gm_plan;
+}
+
/*
* create_projection_plan
*
/*
* Now generate a complete GroupAgg Path atop of the cheapest partial
- * path. We need only bother with the cheapest path here, as the
- * output of Gather is never sorted.
+ * path. We can do this using either Gather or Gather Merge.
*/
if (grouped_rel->partial_pathlist)
{
parse->groupClause,
(List *) parse->havingQual,
dNumGroups));
+
+ /*
+ * The point of using Gather Merge rather than Gather is that it
+ * can preserve the ordering of the input path, so there's no
+ * reason to try it unless (1) it's possible to produce more than
+ * one output row and (2) we want the output path to be ordered.
+ */
+ if (parse->groupClause != NIL && root->group_pathkeys != NIL)
+ {
+ foreach(lc, grouped_rel->partial_pathlist)
+ {
+ Path *subpath = (Path *) lfirst(lc);
+ Path *gmpath;
+ double total_groups;
+
+ /*
+ * It's useful to consider paths that are already properly
+ * ordered for Gather Merge, because those don't need a
+ * sort. It's also useful to consider the cheapest path,
+ * because sorting it in parallel and then doing Gather
+ * Merge may be better than doing an unordered Gather
+ * followed by a sort. But there's no point in
+ * considering non-cheapest paths that aren't already
+ * sorted correctly.
+ */
+ if (path != subpath &&
+ !pathkeys_contained_in(root->group_pathkeys,
+ subpath->pathkeys))
+ continue;
+
+ total_groups = subpath->rows * subpath->parallel_workers;
+
+ gmpath = (Path *)
+ create_gather_merge_path(root,
+ grouped_rel,
+ subpath,
+ NULL,
+ root->group_pathkeys,
+ NULL,
+ &total_groups);
+
+ if (parse->hasAggs)
+ add_path(grouped_rel, (Path *)
+ create_agg_path(root,
+ grouped_rel,
+ gmpath,
+ target,
+ parse->groupClause ? AGG_SORTED : AGG_PLAIN,
+ AGGSPLIT_FINAL_DESERIAL,
+ parse->groupClause,
+ (List *) parse->havingQual,
+ &agg_final_costs,
+ dNumGroups));
+ else
+ add_path(grouped_rel, (Path *)
+ create_group_path(root,
+ grouped_rel,
+ gmpath,
+ target,
+ parse->groupClause,
+ (List *) parse->havingQual,
+ dNumGroups));
+ }
+ }
}
}
/* Now choose the best path(s) */
set_cheapest(grouped_rel);
+ /*
+ * We've been using the partial pathlist for the grouped relation to hold
+ * partially aggregated paths, but that's actually a little bit bogus
+ * because it's unsafe for later planning stages -- like ordered_rel ---
+ * to get the idea that they can use these partial paths as if they didn't
+ * need a FinalizeAggregate step. Zap the partial pathlist at this stage
+ * so we don't get confused.
+ */
+ grouped_rel->partial_pathlist = NIL;
+
return grouped_rel;
}
}
}
+ /*
+ * generate_gather_paths() will have already generated a simple Gather
+ * path for the best parallel path, if any, and the loop above will have
+ * considered sorting it. Similarly, generate_gather_paths() will also
+ * have generated order-preserving Gather Merge plans which can be used
+ * without sorting if they happen to match the sort_pathkeys, and the loop
+ * above will have handled those as well. However, there's one more
+ * possibility: it may make sense to sort the cheapest partial path
+ * according to the required output order and then use Gather Merge.
+ */
+ if (ordered_rel->consider_parallel && root->sort_pathkeys != NIL &&
+ input_rel->partial_pathlist != NIL)
+ {
+ Path *cheapest_partial_path;
+
+ cheapest_partial_path = linitial(input_rel->partial_pathlist);
+
+ /*
+ * If cheapest partial path doesn't need a sort, this is redundant
+ * with what's already been tried.
+ */
+ if (!pathkeys_contained_in(root->sort_pathkeys,
+ cheapest_partial_path->pathkeys))
+ {
+ Path *path;
+ double total_groups;
+
+ path = (Path *) create_sort_path(root,
+ ordered_rel,
+ cheapest_partial_path,
+ root->sort_pathkeys,
+ limit_tuples);
+
+ total_groups = cheapest_partial_path->rows *
+ cheapest_partial_path->parallel_workers;
+ path = (Path *)
+ create_gather_merge_path(root, ordered_rel,
+ path,
+ target, root->sort_pathkeys, NULL,
+ &total_groups);
+
+ /* Add projection step if needed */
+ if (path->pathtarget != target)
+ path = apply_projection_to_path(root, ordered_rel,
+ path, target);
+
+ add_path(ordered_rel, path);
+ }
+ }
+
/*
* If there is an FDW that's responsible for all baserels of the query,
* let it consider adding ForeignPaths.
break;
case T_Gather:
+ case T_GatherMerge:
set_upper_references(root, plan, rtoffset);
break;
case T_Sort:
case T_Unique:
case T_Gather:
+ case T_GatherMerge:
case T_SetOp:
case T_Group:
/* no node-type-specific fields need fixing */
return pathnode;
}
+/*
+ * create_gather_merge_path
+ *
+ * Creates a path corresponding to a gather merge scan, returning
+ * the pathnode.
+ */
+GatherMergePath *
+create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
+ PathTarget *target, List *pathkeys,
+ Relids required_outer, double *rows)
+{
+ GatherMergePath *pathnode = makeNode(GatherMergePath);
+ Cost input_startup_cost = 0;
+ Cost input_total_cost = 0;
+
+ Assert(subpath->parallel_safe);
+ Assert(pathkeys);
+
+ pathnode->path.pathtype = T_GatherMerge;
+ pathnode->path.parent = rel;
+ pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
+ required_outer);
+ pathnode->path.parallel_aware = false;
+
+ pathnode->subpath = subpath;
+ pathnode->num_workers = subpath->parallel_workers;
+ pathnode->path.pathkeys = pathkeys;
+ pathnode->path.pathtarget = target ? target : rel->reltarget;
+ pathnode->path.rows += subpath->rows;
+
+ if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+ {
+ /* Subpath is adequately ordered, we won't need to sort it */
+ input_startup_cost += subpath->startup_cost;
+ input_total_cost += subpath->total_cost;
+ }
+ else
+ {
+ /* We'll need to insert a Sort node, so include cost for that */
+ Path sort_path; /* dummy for result of cost_sort */
+
+ cost_sort(&sort_path,
+ root,
+ pathkeys,
+ subpath->total_cost,
+ subpath->rows,
+ subpath->pathtarget->width,
+ 0.0,
+ work_mem,
+ -1);
+ input_startup_cost += sort_path.startup_cost;
+ input_total_cost += sort_path.total_cost;
+ }
+
+ cost_gather_merge(pathnode, root, rel, pathnode->path.param_info,
+ input_startup_cost, input_total_cost, rows);
+
+ return pathnode;
+}
+
/*
* translate_sub_tlist - get subquery column numbers represented by tlist
*
true,
NULL, NULL, NULL
},
+ {
+ {"enable_gathermerge", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of gather merge plans."),
+ NULL
+ },
+ &enable_gathermerge,
+ true,
+ NULL, NULL, NULL
+ },
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * nodeGatherMerge.h
+ * prototypes for nodeGatherMerge.c
+ *
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeGatherMerge.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEGATHERMERGE_H
+#define NODEGATHERMERGE_H
+
+#include "nodes/execnodes.h"
+
+extern GatherMergeState *ExecInitGatherMerge(GatherMerge * node,
+ EState *estate,
+ int eflags);
+extern TupleTableSlot *ExecGatherMerge(GatherMergeState * node);
+extern void ExecEndGatherMerge(GatherMergeState * node);
+extern void ExecReScanGatherMerge(GatherMergeState * node);
+extern void ExecShutdownGatherMerge(GatherMergeState * node);
+
+#endif /* NODEGATHERMERGE_H */
bool need_to_scan_locally;
} GatherState;
+/* ----------------
+ * GatherMergeState information
+ *
+ * Gather merge nodes launch 1 or more parallel workers, run a
+ * subplan which produces sorted output in each worker, and then
+ * merge the results into a single sorted stream.
+ * ----------------
+ */
+struct GMReaderTuple;
+
+typedef struct GatherMergeState
+{
+ PlanState ps; /* its first field is NodeTag */
+ bool initialized;
+ struct ParallelExecutorInfo *pei;
+ int nreaders;
+ int nworkers_launched;
+ struct TupleQueueReader **reader;
+ TupleDesc tupDesc;
+ TupleTableSlot **gm_slots;
+ struct binaryheap *gm_heap; /* binary heap of slot indices */
+ bool gm_initialized; /* gather merge initilized ? */
+ bool need_to_scan_locally;
+ int gm_nkeys;
+ SortSupport gm_sortkeys; /* array of length ms_nkeys */
+ struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per
+ * reader */
+} GatherMergeState;
+
/* ----------------
* HashState information
* ----------------
T_WindowAgg,
T_Unique,
T_Gather,
+ T_GatherMerge,
T_Hash,
T_SetOp,
T_LockRows,
T_WindowAggState,
T_UniqueState,
T_GatherState,
+ T_GatherMergeState,
T_HashState,
T_SetOpState,
T_LockRowsState,
T_MaterialPath,
T_UniquePath,
T_GatherPath,
+ T_GatherMergePath,
T_ProjectionPath,
T_ProjectSetPath,
T_SortPath,
bool invisible; /* suppress EXPLAIN display (for testing)? */
} Gather;
+/* ------------
+ * gather merge node
+ * ------------
+ */
+typedef struct GatherMerge
+{
+ Plan plan;
+ int num_workers;
+ /* remaining fields are just like the sort-key info in struct Sort */
+ int numCols; /* number of sort-key columns */
+ AttrNumber *sortColIdx; /* their indexes in the target list */
+ Oid *sortOperators; /* OIDs of operators to sort them by */
+ Oid *collations; /* OIDs of collations */
+ bool *nullsFirst; /* NULLS FIRST/LAST directions */
+} GatherMerge;
+
/* ----------------
* hash build node
*
bool single_copy; /* don't execute path more than once */
} GatherPath;
+/*
+ * GatherMergePath runs several copies of a plan in parallel and
+ * collects the results. For gather merge parallel leader always execute the
+ * plan.
+ */
+typedef struct GatherMergePath
+{
+ Path path;
+ Path *subpath; /* path for each worker */
+ int num_workers; /* number of workers sought to help */
+} GatherMergePath;
+
+
/*
* All join-type paths share these fields.
*/
extern bool enable_material;
extern bool enable_mergejoin;
extern bool enable_hashjoin;
+extern bool enable_gathermerge;
extern int constraint_exclusion;
extern double clamp_row_est(double nrows);
int varRelid,
JoinType jointype,
SpecialJoinInfo *sjinfo);
+extern void cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
+ RelOptInfo *rel, ParamPathInfo *param_info,
+ Cost input_startup_cost, Cost input_total_cost,
+ double *rows);
#endif /* COST_H */
extern GatherPath *create_gather_path(PlannerInfo *root,
RelOptInfo *rel, Path *subpath, PathTarget *target,
Relids required_outer, double *rows);
+extern GatherMergePath *create_gather_merge_path(PlannerInfo *root,
+ RelOptInfo *rel,
+ Path *subpath,
+ PathTarget *target,
+ List *pathkeys,
+ Relids required_outer,
+ double *rows);
extern SubqueryScanPath *create_subqueryscan_path(PlannerInfo *root,
RelOptInfo *rel, Path *subpath,
List *pathkeys, Relids required_outer);
reset enable_hashjoin;
reset enable_nestloop;
+--test gather merge
+set enable_hashagg to off;
+explain (costs off)
+ select string4, count((unique2)) from tenk1 group by string4 order by string4;
+ QUERY PLAN
+----------------------------------------------------
+ Finalize GroupAggregate
+ Group Key: string4
+ -> Gather Merge
+ Workers Planned: 4
+ -> Partial GroupAggregate
+ Group Key: string4
+ -> Sort
+ Sort Key: string4
+ -> Parallel Seq Scan on tenk1
+(9 rows)
+
+select string4, count((unique2)) from tenk1 group by string4 order by string4;
+ string4 | count
+---------+-------
+ AAAAxx | 2500
+ HHHHxx | 2500
+ OOOOxx | 2500
+ VVVVxx | 2500
+(4 rows)
+
+reset enable_hashagg;
set force_parallel_mode=1;
explain (costs off)
select stringu1::int2 from tenk1 where unique1 = 1;
name | setting
----------------------+---------
enable_bitmapscan | on
+ enable_gathermerge | on
enable_hashagg | on
enable_hashjoin | on
enable_indexonlyscan | on
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(11 rows)
+(12 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
reset enable_hashjoin;
reset enable_nestloop;
+
+--test gather merge
+set enable_hashagg to off;
+
+explain (costs off)
+ select string4, count((unique2)) from tenk1 group by string4 order by string4;
+
+select string4, count((unique2)) from tenk1 group by string4 order by string4;
+
+reset enable_hashagg;
+
set force_parallel_mode=1;
explain (costs off)
Gather
GatherPath
GatherState
+GatherMerge
+GatherMergePath
+GatherMergeState
Gene
GenericCosts
GenericExprState