summaryrefslogtreecommitdiff
path: root/src/backend/nodes
diff options
context:
space:
mode:
authorRobert Haas2017-03-09 12:40:36 +0000
committerRobert Haas2017-03-09 12:49:29 +0000
commit355d3993c53ed62c5b53d020648e4fbcfbf5f155 (patch)
tree9a439084995c6553dd035fe218d9864011192b36 /src/backend/nodes
parenta72f0365db4168e7d720608fe3ac0ca3fe9355df (diff)
Add a Gather Merge executor node.
Like Gather, we spawn multiple workers and run the same plan in each one; however, Gather Merge is used when each worker produces the same output ordering and we want to preserve that output ordering while merging together the streams of tuples from various workers. (In a way, Gather Merge is like a hybrid of Gather and MergeAppend.) This works out to a win if it saves us from having to perform an expensive Sort. In cases where only a small amount of data would need to be sorted, it may actually be faster to use a regular Gather node and then sort the results afterward, because Gather Merge sometimes needs to wait synchronously for tuples whereas a pure Gather generally doesn't. But if this avoids an expensive sort then it's a win. Rushabh Lathia, reviewed and tested by Amit Kapila, Thomas Munro, and Neha Sharma, and reviewed and revised by me. Discussion: http://postgr.es/m/CAGPqQf09oPX-cQRpBKS0Gq49Z+m6KBxgxd_p9gX8CKk_d75HoQ@mail.gmail.com
Diffstat (limited to 'src/backend/nodes')
-rw-r--r--src/backend/nodes/copyfuncs.c28
-rw-r--r--src/backend/nodes/outfuncs.c46
-rw-r--r--src/backend/nodes/readfuncs.c22
3 files changed, 96 insertions, 0 deletions
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index ac8e50ef1dc..bfc2ac17165 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -360,6 +360,31 @@ _copyGather(const Gather *from)
return newnode;
}
+/*
+ * _copyGatherMerge
+ */
+static GatherMerge *
+_copyGatherMerge(const GatherMerge *from)
+{
+ GatherMerge *newnode = makeNode(GatherMerge);
+
+ /*
+ * copy node superclass fields
+ */
+ CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+ /*
+ * copy remainder of node
+ */
+ COPY_SCALAR_FIELD(num_workers);
+ COPY_SCALAR_FIELD(numCols);
+ COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+ COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+ COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+ COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+
+ return newnode;
+}
/*
* CopyScanFields
@@ -4594,6 +4619,9 @@ copyObject(const void *from)
case T_Gather:
retval = _copyGather(from);
break;
+ case T_GatherMerge:
+ retval = _copyGatherMerge(from);
+ break;
case T_SeqScan:
retval = _copySeqScan(from);
break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 825a7b283a3..7418fbededf 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -458,6 +458,35 @@ _outGather(StringInfo str, const Gather *node)
}
static void
+_outGatherMerge(StringInfo str, const GatherMerge *node)
+{
+ int i;
+
+ WRITE_NODE_TYPE("GATHERMERGE");
+
+ _outPlanInfo(str, (const Plan *) node);
+
+ WRITE_INT_FIELD(num_workers);
+ WRITE_INT_FIELD(numCols);
+
+ appendStringInfoString(str, " :sortColIdx");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %d", node->sortColIdx[i]);
+
+ appendStringInfoString(str, " :sortOperators");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %u", node->sortOperators[i]);
+
+ appendStringInfoString(str, " :collations");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %u", node->collations[i]);
+
+ appendStringInfoString(str, " :nullsFirst");
+ for (i = 0; i < node->numCols; i++)
+ appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
+}
+
+static void
_outScan(StringInfo str, const Scan *node)
{
WRITE_NODE_TYPE("SCAN");
@@ -2017,6 +2046,17 @@ _outLimitPath(StringInfo str, const LimitPath *node)
}
static void
+_outGatherMergePath(StringInfo str, const GatherMergePath *node)
+{
+ WRITE_NODE_TYPE("GATHERMERGEPATH");
+
+ _outPathInfo(str, (const Path *) node);
+
+ WRITE_NODE_FIELD(subpath);
+ WRITE_INT_FIELD(num_workers);
+}
+
+static void
_outNestPath(StringInfo str, const NestPath *node)
{
WRITE_NODE_TYPE("NESTPATH");
@@ -3473,6 +3513,9 @@ outNode(StringInfo str, const void *obj)
case T_Gather:
_outGather(str, obj);
break;
+ case T_GatherMerge:
+ _outGatherMerge(str, obj);
+ break;
case T_Scan:
_outScan(str, obj);
break;
@@ -3809,6 +3852,9 @@ outNode(StringInfo str, const void *obj)
case T_LimitPath:
_outLimitPath(str, obj);
break;
+ case T_GatherMergePath:
+ _outGatherMergePath(str, obj);
+ break;
case T_NestPath:
_outNestPath(str, obj);
break;
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 8f39d93a123..d3bbc02f24b 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2138,6 +2138,26 @@ _readGather(void)
}
/*
+ * _readGatherMerge
+ */
+static GatherMerge *
+_readGatherMerge(void)
+{
+ READ_LOCALS(GatherMerge);
+
+ ReadCommonPlan(&local_node->plan);
+
+ READ_INT_FIELD(num_workers);
+ READ_INT_FIELD(numCols);
+ READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols);
+ READ_OID_ARRAY(sortOperators, local_node->numCols);
+ READ_OID_ARRAY(collations, local_node->numCols);
+ READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+
+ READ_DONE();
+}
+
+/*
* _readHash
*/
static Hash *
@@ -2577,6 +2597,8 @@ parseNodeString(void)
return_value = _readUnique();
else if (MATCH("GATHER", 6))
return_value = _readGather();
+ else if (MATCH("GATHERMERGE", 11))
+ return_value = _readGatherMerge();
else if (MATCH("HASH", 4))
return_value = _readHash();
else if (MATCH("SETOP", 5))