summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/common/heaptuple.c100
-rw-r--r--src/backend/access/common/printtup.c13
-rw-r--r--src/backend/commands/copy.c19
-rw-r--r--src/backend/executor/execTuples.c122
-rw-r--r--src/backend/pgxc/plan/planner.c541
-rw-r--r--src/backend/pgxc/pool/Makefile2
-rw-r--r--src/backend/pgxc/pool/combiner.c652
-rw-r--r--src/backend/pgxc/pool/datanode.c1571
-rw-r--r--src/backend/pgxc/pool/execRemote.c2453
-rw-r--r--src/backend/tcop/postgres.c238
-rw-r--r--src/backend/utils/misc/guc.c37
-rw-r--r--src/backend/utils/mmgr/mcxt.c8
-rw-r--r--src/backend/utils/sort/tuplesort.c209
-rw-r--r--src/include/executor/tuptable.h15
-rw-r--r--src/include/nodes/nodes.h6
-rw-r--r--src/include/pgxc/combiner.h73
-rw-r--r--src/include/pgxc/datanode.h46
-rw-r--r--src/include/pgxc/execRemote.h101
-rw-r--r--src/include/pgxc/planner.h42
-rw-r--r--src/include/utils/tuplesort.h10
20 files changed, 3862 insertions, 2396 deletions
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index ac5749c713..cae5794103 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -57,6 +57,9 @@
#include "postgres.h"
+#ifdef PGXC
+#include "funcapi.h"
+#endif
#include "access/heapam.h"
#include "access/sysattr.h"
#include "access/tuptoaster.h"
@@ -1157,6 +1160,80 @@ slot_deform_tuple(TupleTableSlot *slot, int natts)
slot->tts_slow = slow;
}
+#ifdef PGXC
+/*
+ * slot_deform_datarow
+ * Extract data from the DataRow message into Datum/isnull arrays.
+ * We always extract all atributes, as specified in tts_tupleDescriptor,
+ * because there is no easy way to find random attribute in the DataRow.
+ */
+static void
+slot_deform_datarow(TupleTableSlot *slot)
+{
+ int attnum = slot->tts_tupleDescriptor->natts;
+ int i;
+ int col_count;
+ char *cur = slot->tts_dataRow;
+ StringInfo buffer;
+ uint16 n16;
+ uint32 n32;
+
+ /* fastpath: exit if values already extracted */
+ if (slot->tts_nvalid == attnum)
+ return;
+
+ Assert(slot->tts_dataRow);
+
+ memcpy(&n16, cur, 2);
+ cur += 2;
+ col_count = ntohs(n16);
+
+ if (col_count != attnum)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Tuple does not match the descriptor")));
+
+ if (slot->tts_attinmeta == NULL)
+ slot->tts_attinmeta = TupleDescGetAttInMetadata(slot->tts_tupleDescriptor);
+
+ buffer = makeStringInfo();
+ for (i = 0; i < attnum; i++)
+ {
+ Form_pg_attribute attr = slot->tts_tupleDescriptor->attrs[i];
+ int len;
+
+ /* get size */
+ memcpy(&n32, cur, 4);
+ cur += 4;
+ len = ntohl(n32);
+
+ /* get data */
+ if (len == -1)
+ {
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
+ else
+ {
+ appendBinaryStringInfo(buffer, cur, len);
+ cur += len;
+
+ slot->tts_values[i] = InputFunctionCall(slot->tts_attinmeta->attinfuncs + i,
+ buffer->data,
+ slot->tts_attinmeta->attioparams[i],
+ slot->tts_attinmeta->atttypmods[i]);
+ slot->tts_isnull[i] = false;
+
+ resetStringInfo(buffer);
+ }
+ }
+ pfree(buffer->data);
+ pfree(buffer);
+
+ slot->tts_nvalid = attnum;
+}
+#endif
+
/*
* slot_getattr
* This function fetches an attribute of the slot's current tuple.
@@ -1250,6 +1327,11 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
/*
* Extract the attribute, along with any preceding attributes.
*/
+#ifdef PGXC
+ if (slot->tts_dataRow)
+ slot_deform_datarow(slot);
+ else
+#endif
slot_deform_tuple(slot, attnum);
/*
@@ -1276,6 +1358,15 @@ slot_getallattrs(TupleTableSlot *slot)
if (slot->tts_nvalid == tdesc_natts)
return;
+#ifdef PGXC
+ /* Handle the DataRow tuple case */
+ if (slot->tts_dataRow)
+ {
+ slot_deform_datarow(slot);
+ return;
+ }
+#endif
+
/*
* otherwise we had better have a physical tuple (tts_nvalid should equal
* natts in all virtual-tuple cases)
@@ -1319,6 +1410,15 @@ slot_getsomeattrs(TupleTableSlot *slot, int attnum)
if (slot->tts_nvalid >= attnum)
return;
+#ifdef PGXC
+ /* Handle the DataRow tuple case */
+ if (slot->tts_dataRow)
+ {
+ slot_deform_datarow(slot);
+ return;
+ }
+#endif
+
/* Check for caller error */
if (attnum <= 0 || attnum > slot->tts_tupleDescriptor->natts)
elog(ERROR, "invalid attribute number %d", attnum);
diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c
index 13a09dd9b3..3c77ab4ff4 100644
--- a/src/backend/access/common/printtup.c
+++ b/src/backend/access/common/printtup.c
@@ -292,6 +292,19 @@ printtup(TupleTableSlot *slot, DestReceiver *self)
int natts = typeinfo->natts;
int i;
+#ifdef PGXC
+ /*
+ * If we are having DataRow-based tuple we do not have to encode attribute
+ * values, just send over the DataRow message as we received it from the
+ * data node
+ */
+ if (slot->tts_dataRow)
+ {
+ pq_putmessage('D', slot->tts_dataRow, slot->tts_dataLen);
+ return;
+ }
+#endif
+
/* Set or update my derived attribute info, if needed */
if (myState->attrinfo != typeinfo || myState->nattrs != natts)
printtup_prepare_info(myState, typeinfo, natts);
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 92f5db85f2..50650190ee 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -35,7 +35,7 @@
#include "parser/parse_relation.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
-#include "pgxc/datanode.h"
+#include "pgxc/execRemote.h"
#include "pgxc/locator.h"
#include "pgxc/poolmgr.h"
#endif
@@ -1511,8 +1511,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
DataNodeCopyFinish(
cstate->connections,
primary_data_node,
- COMBINE_TYPE_NONE,
- whereToSendOutput);
+ COMBINE_TYPE_NONE);
pfree(cstate->connections);
pfree(cstate->query_buf.data);
FreeRelationLocInfo(cstate->rel_loc);
@@ -1526,14 +1525,12 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
cstate->processed = DataNodeCopyFinish(
cstate->connections,
primary_data_node,
- COMBINE_TYPE_SAME,
- whereToSendOutput);
+ COMBINE_TYPE_SAME);
else
cstate->processed = DataNodeCopyFinish(
cstate->connections,
0,
- COMBINE_TYPE_SUM,
- whereToSendOutput);
+ COMBINE_TYPE_SUM);
pfree(cstate->connections);
pfree(cstate->query_buf.data);
FreeRelationLocInfo(cstate->rel_loc);
@@ -1775,10 +1772,10 @@ CopyTo(CopyState cstate)
#ifdef PGXC
if (IS_PGXC_COORDINATOR && !cstate->on_coord)
{
- DataNodeCopyOut(GetRelationNodes(cstate->rel_loc, NULL, true),
- cstate->connections,
- whereToSendOutput,
- cstate->copy_file);
+ cstate->processed = DataNodeCopyOut(
+ GetRelationNodes(cstate->rel_loc, NULL, true),
+ cstate->connections,
+ cstate->copy_file);
}
else
{
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 7b5ba41f5f..a6c496eee9 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -149,6 +149,12 @@ ExecCreateTupleTable(int tableSize)
slot->tts_shouldFreeMin = false;
slot->tts_tuple = NULL;
slot->tts_tupleDescriptor = NULL;
+#ifdef PGXC
+ slot->tts_shouldFreeRow = false;
+ slot->tts_dataRow = NULL;
+ slot->tts_dataLen = -1;
+ slot->tts_attinmeta = NULL;
+#endif
slot->tts_mcxt = CurrentMemoryContext;
slot->tts_buffer = InvalidBuffer;
slot->tts_nvalid = 0;
@@ -228,6 +234,12 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc)
slot->tts_shouldFreeMin = false;
slot->tts_tuple = NULL;
slot->tts_tupleDescriptor = NULL;
+#ifdef PGXC
+ slot->tts_shouldFreeRow = false;
+ slot->tts_dataRow = NULL;
+ slot->tts_dataLen = -1;
+ slot->tts_attinmeta = NULL;
+#endif
slot->tts_mcxt = CurrentMemoryContext;
slot->tts_buffer = InvalidBuffer;
slot->tts_nvalid = 0;
@@ -334,6 +346,12 @@ ExecSetSlotDescriptor(TupleTableSlot *slot, /* slot to change */
if (slot->tts_tupleDescriptor)
ReleaseTupleDesc(slot->tts_tupleDescriptor);
+#ifdef PGXC
+ /* XXX there in no routine to release AttInMetadata instance */
+ if (slot->tts_attinmeta)
+ slot->tts_attinmeta = NULL;
+#endif
+
if (slot->tts_values)
pfree(slot->tts_values);
if (slot->tts_isnull)
@@ -415,6 +433,14 @@ ExecStoreTuple(HeapTuple tuple,
heap_freetuple(slot->tts_tuple);
if (slot->tts_shouldFreeMin)
heap_free_minimal_tuple(slot->tts_mintuple);
+#ifdef PGXC
+ if (slot->tts_shouldFreeRow)
+ pfree(slot->tts_dataRow);
+
+ slot->tts_shouldFreeRow = false;
+ slot->tts_dataRow = NULL;
+ slot->tts_dataLen = -1;
+#endif
/*
* Store the new tuple into the specified slot.
@@ -476,6 +502,14 @@ ExecStoreMinimalTuple(MinimalTuple mtup,
heap_freetuple(slot->tts_tuple);
if (slot->tts_shouldFreeMin)
heap_free_minimal_tuple(slot->tts_mintuple);
+#ifdef PGXC
+ if (slot->tts_shouldFreeRow)
+ pfree(slot->tts_dataRow);
+
+ slot->tts_shouldFreeRow = false;
+ slot->tts_dataRow = NULL;
+ slot->tts_dataLen = -1;
+#endif
/*
* Drop the pin on the referenced buffer, if there is one.
@@ -504,6 +538,62 @@ ExecStoreMinimalTuple(MinimalTuple mtup,
return slot;
}
+#ifdef PGXC
+/* --------------------------------
+ * ExecStoreDataRowTuple
+ *
+ * Store a buffer in DataRow message format into the slot.
+ *
+ * --------------------------------
+ */
+TupleTableSlot *
+ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree)
+{
+ /*
+ * sanity checks
+ */
+ Assert(msg != NULL);
+ Assert(len > 0);
+ Assert(slot != NULL);
+ Assert(slot->tts_tupleDescriptor != NULL);
+
+ /*
+ * Free any old physical tuple belonging to the slot.
+ */
+ if (slot->tts_shouldFree)
+ heap_freetuple(slot->tts_tuple);
+ if (slot->tts_shouldFreeMin)
+ heap_free_minimal_tuple(slot->tts_mintuple);
+ if (slot->tts_shouldFreeRow)
+ pfree(slot->tts_dataRow);
+
+ /*
+ * Drop the pin on the referenced buffer, if there is one.
+ */
+ if (BufferIsValid(slot->tts_buffer))
+ ReleaseBuffer(slot->tts_buffer);
+
+ slot->tts_buffer = InvalidBuffer;
+
+ /*
+ * Store the new tuple into the specified slot.
+ */
+ slot->tts_isempty = false;
+ slot->tts_shouldFree = false;
+ slot->tts_shouldFreeMin = false;
+ slot->tts_shouldFreeRow = shouldFree;
+ slot->tts_tuple = NULL;
+ slot->tts_mintuple = NULL;
+ slot->tts_dataRow = msg;
+ slot->tts_dataLen = len;
+
+ /* Mark extracted state invalid */
+ slot->tts_nvalid = 0;
+
+ return slot;
+}
+#endif
+
/* --------------------------------
* ExecClearTuple
*
@@ -527,6 +617,14 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */
heap_freetuple(slot->tts_tuple);
if (slot->tts_shouldFreeMin)
heap_free_minimal_tuple(slot->tts_mintuple);
+#ifdef PGXC
+ if (slot->tts_shouldFreeRow)
+ pfree(slot->tts_dataRow);
+
+ slot->tts_shouldFreeRow = false;
+ slot->tts_dataRow = NULL;
+ slot->tts_dataLen = -1;
+#endif
slot->tts_tuple = NULL;
slot->tts_mintuple = NULL;
@@ -634,7 +732,13 @@ ExecCopySlotTuple(TupleTableSlot *slot)
return heap_copytuple(slot->tts_tuple);
if (slot->tts_mintuple)
return heap_tuple_from_minimal_tuple(slot->tts_mintuple);
-
+#ifdef PGXC
+ /*
+ * Ensure values are extracted from data row to the Datum array
+ */
+ if (slot->tts_dataRow)
+ slot_getallattrs(slot);
+#endif
/*
* Otherwise we need to build a tuple from the Datum array.
*/
@@ -667,7 +771,13 @@ ExecCopySlotMinimalTuple(TupleTableSlot *slot)
return heap_copy_minimal_tuple(slot->tts_mintuple);
if (slot->tts_tuple)
return minimal_tuple_from_heap_tuple(slot->tts_tuple);
-
+#ifdef PGXC
+ /*
+ * Ensure values are extracted from data row to the Datum array
+ */
+ if (slot->tts_dataRow)
+ slot_getallattrs(slot);
+#endif
/*
* Otherwise we need to build a tuple from the Datum array.
*/
@@ -861,6 +971,14 @@ ExecMaterializeSlot(TupleTableSlot *slot)
if (!slot->tts_shouldFreeMin)
slot->tts_mintuple = NULL;
+#ifdef PGXC
+ if (!slot->tts_shouldFreeRow)
+ {
+ slot->tts_dataRow = NULL;
+ slot->tts_dataLen = -1;
+ }
+#endif
+
return slot->tts_tuple;
}
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index ae537e5f1f..1bbbb75848 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -22,8 +22,10 @@
#include "catalog/pg_type.h"
#include "lib/stringinfo.h"
#include "nodes/nodeFuncs.h"
+#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "optimizer/clauses.h"
+#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "pgxc/locator.h"
@@ -123,12 +125,10 @@ typedef struct XCWalkerContext
int varno;
bool within_or;
bool within_not;
+ List *join_list; /* A list of List*'s, one for each relation. */
} XCWalkerContext;
-/* A list of List*'s, one for each relation. */
-List *join_list = NULL;
-
/* Forbid unsafe SQL statements */
bool StrictStatementChecking = true;
@@ -185,12 +185,12 @@ new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
* Look up the join struct for a particular join
*/
static PGXC_Join *
-find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
+find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context)
{
ListCell *lc;
/* return if list is still empty */
- if (join_list == NULL)
+ if (context->join_list == NULL)
return NULL;
/* in the PGXC_Join struct, we always sort with relid1 < relid2 */
@@ -209,7 +209,7 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
* there should be a small number, so we just search linearly, although
* long term a hash table would be better.
*/
- foreach(lc, join_list)
+ foreach(lc, context->join_list)
{
PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc);
@@ -225,16 +225,16 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
* Find or create a join between 2 relations
*/
static PGXC_Join *
-find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
+find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context)
{
PGXC_Join *pgxcjoin;
- pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2);
+ pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2, context);
if (pgxcjoin == NULL)
{
pgxcjoin = new_pgxc_join(relid1, aliasname1, relid2, aliasname2);
- join_list = lappend(join_list, pgxcjoin);
+ context->join_list = lappend(context->join_list, pgxcjoin);
}
return pgxcjoin;
@@ -277,7 +277,7 @@ free_special_relations(Special_Conditions *special_conditions)
* frees join_list
*/
static void
-free_join_list(void)
+free_join_list(List *join_list)
{
if (join_list == NULL)
return;
@@ -368,13 +368,13 @@ get_base_var(Var *var, XCWalkerContext *context)
}
else if (rte->rtekind == RTE_SUBQUERY)
{
- /*
+ /*
* Handle views like select * from v1 where col1 = 1
* where col1 is partition column of base relation
*/
/* the varattno corresponds with the subquery's target list (projections) */
TargetEntry *tle = list_nth(rte->subquery->targetList, var->varattno - 1); /* or varno? */
-
+
if (!IsA(tle->expr, Var))
return NULL; /* not column based expressoin, return */
else
@@ -684,7 +684,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
/* get data struct about these two relations joining */
pgxc_join = find_or_create_pgxc_join(column_base->relid, column_base->relalias,
- column_base2->relid, column_base2->relalias);
+ column_base2->relid, column_base2->relalias, context);
if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED)
{
@@ -914,7 +914,7 @@ contains_only_pg_catalog (List *rtable)
{
if (get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE)
return false;
- } else if (rte->rtekind == RTE_SUBQUERY &&
+ } else if (rte->rtekind == RTE_SUBQUERY &&
!contains_only_pg_catalog (rte->subquery->rtable))
return false;
}
@@ -967,7 +967,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
{
/* May be complicated. Before giving up, just check for pg_catalog usage */
if (contains_only_pg_catalog (query->rtable))
- {
+ {
/* just pg_catalog tables */
context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
@@ -1018,7 +1018,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
/* We compare to make sure that the subquery is safe to execute with previous-
* we may have multiple ones in the FROM clause.
- * We handle the simple case of allowing multiple subqueries in the from clause,
+ * We handle the simple case of allowing multiple subqueries in the from clause,
* but only allow one of them to not contain replicated tables
*/
if (!from_query_nodes)
@@ -1028,20 +1028,20 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
/* ok, safe */
if (!from_query_nodes)
from_query_nodes = current_nodes;
- }
+ }
else
{
if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
from_query_nodes = current_nodes;
else
{
- /* Allow if they are both using one node, and the same one */
+ /* Allow if they are both using one node, and the same one */
if (!same_single_node (from_query_nodes->nodelist, current_nodes->nodelist))
/* Complicated */
return true;
}
}
- }
+ }
else if (rte->rtekind == RTE_RELATION)
{
/* Look for pg_catalog tables */
@@ -1049,7 +1049,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
current_usage_type = TABLE_USAGE_TYPE_PGCATALOG;
else
current_usage_type = TABLE_USAGE_TYPE_USER;
- }
+ }
else if (rte->rtekind == RTE_FUNCTION)
{
/* See if it is a catalog function */
@@ -1095,9 +1095,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
return true;
/* Examine join conditions, see if each join is single-node safe */
- if (join_list != NULL)
+ if (context->join_list != NULL)
{
- foreach(lc, join_list)
+ foreach(lc, context->join_list)
{
PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc);
@@ -1254,22 +1254,28 @@ static Exec_Nodes *
get_plan_nodes(Query *query, bool isRead)
{
Exec_Nodes *result_nodes;
- XCWalkerContext *context = palloc0(sizeof(XCWalkerContext));
-
- context->query = query;
- context->isRead = isRead;
-
- context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions));
- context->rtables = lappend(context->rtables, query->rtable);
-
- join_list = NULL;
-
- if (get_plan_nodes_walker((Node *) query, context))
+ XCWalkerContext context;
+
+
+ context.query = query;
+ context.isRead = isRead;
+ context.exec_nodes = NULL;
+ context.conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions));
+ context.rtables = NIL;
+ context.rtables = lappend(context.rtables, query->rtable);
+ context.multilevel_join = false;
+ context.varno = 0;
+ context.within_or = false;
+ context.within_not = false;
+ context.join_list = NIL;
+
+ if (get_plan_nodes_walker((Node *) query, &context))
result_nodes = NULL;
else
- result_nodes = context->exec_nodes;
+ result_nodes = context.exec_nodes;
- free_special_relations(context->conditions);
+ free_special_relations(context.conditions);
+ free_join_list(context.join_list);
return result_nodes;
}
@@ -1304,7 +1310,6 @@ get_plan_nodes_command(Query *query)
return NULL;
}
- free_join_list();
return exec_nodes;
}
@@ -1345,17 +1350,17 @@ static List *
get_simple_aggregates(Query * query)
{
List *simple_agg_list = NIL;
-
+
/* Check for simple multi-node aggregate */
if (query->hasAggs)
{
ListCell *lc;
int column_pos = 0;
-
+
foreach (lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
-
+
if (IsA(tle->expr, Aggref))
{
/*PGXC borrowed this code from nodeAgg.c, see ExecInitAgg()*/
@@ -1422,7 +1427,7 @@ get_simple_aggregates(Query * query)
get_func_name(finalfn_oid));
}
}
-
+
/* resolve actual type of transition state, if polymorphic */
aggcollecttype = aggform->aggcollecttype;
@@ -1468,7 +1473,7 @@ get_simple_aggregates(Query * query)
get_typlenbyval(aggcollecttype,
&simple_agg->transtypeLen,
&simple_agg->transtypeByVal);
-
+
/*
* initval is potentially null, so don't try to access it as a struct
* field. Must do it the hard way with SysCacheGetAttr.
@@ -1534,6 +1539,427 @@ get_simple_aggregates(Query * query)
/*
+ * add_sort_column --- utility subroutine for building sort info arrays
+ *
+ * We need this routine because the same column might be selected more than
+ * once as a sort key column; if so, the extra mentions are redundant.
+ *
+ * Caller is assumed to have allocated the arrays large enough for the
+ * max possible number of columns. Return value is the new column count.
+ *
+ * PGXC: copied from optimizer/plan/planner.c
+ */
+static int
+add_sort_column(AttrNumber colIdx, Oid sortOp, bool nulls_first,
+ int numCols, AttrNumber *sortColIdx,
+ Oid *sortOperators, bool *nullsFirst)
+{
+ int i;
+
+ Assert(OidIsValid(sortOp));
+
+ for (i = 0; i < numCols; i++)
+ {
+ /*
+ * Note: we check sortOp because it's conceivable that "ORDER BY foo
+ * USING <, foo USING <<<" is not redundant, if <<< distinguishes
+ * values that < considers equal. We need not check nulls_first
+ * however because a lower-order column with the same sortop but
+ * opposite nulls direction is redundant.
+ */
+ if (sortColIdx[i] == colIdx && sortOperators[i] == sortOp)
+ {
+ /* Already sorting by this col, so extra sort key is useless */
+ return numCols;
+ }
+ }
+
+ /* Add the column */
+ sortColIdx[numCols] = colIdx;
+ sortOperators[numCols] = sortOp;
+ nullsFirst[numCols] = nulls_first;
+ return numCols + 1;
+}
+
+/*
+ * add_distinct_column - utility subroutine to remove redundant columns, just
+ * like add_sort_column
+ */
+static int
+add_distinct_column(AttrNumber colIdx, Oid eqOp, int numCols,
+ AttrNumber *sortColIdx, Oid *eqOperators)
+{
+ int i;
+
+ Assert(OidIsValid(eqOp));
+
+ for (i = 0; i < numCols; i++)
+ {
+ if (sortColIdx[i] == colIdx && eqOperators[i] == eqOp)
+ {
+ /* Already sorting by this col, so extra sort key is useless */
+ return numCols;
+ }
+ }
+
+ /* Add the column */
+ sortColIdx[numCols] = colIdx;
+ eqOperators[numCols] = eqOp;
+ return numCols + 1;
+}
+
+
+/*
+ * Reconstruct the step query
+ */
+static void
+reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
+ RemoteQuery *step)
+{
+ List *context;
+ bool useprefix;
+ List *sub_tlist = step->plan.targetlist;
+ ListCell *l;
+ StringInfo buf = makeStringInfo();
+ char *sql;
+ char *cur;
+ char *sql_from;
+
+ context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL);
+ useprefix = list_length(rtable) > 1;
+
+ foreach(l, sub_tlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(l);
+ char *exprstr = deparse_expression((Node *) tle->expr, context,
+ useprefix, false);
+
+ if (buf->len == 0)
+ {
+ appendStringInfo(buf, "SELECT ");
+ if (step->distinct)
+ appendStringInfo(buf, "DISTINCT ");
+ }
+ else
+ appendStringInfo(buf, ", ");
+
+ appendStringInfoString(buf, exprstr);
+ }
+
+ /*
+ * A kind of dummy
+ * Do not reconstruct remaining query, just search original statement
+ * for " FROM " and append remainder to the target list we just generated.
+ * Do not handle the case if " FROM " we found is not a "FROM" keyword, but,
+ * for example, a part of string constant.
+ */
+ sql = pstrdup(step->sql_statement); /* mutable copy */
+ /* string to upper case, for comparing */
+ cur = sql;
+ while (*cur)
+ {
+ /* replace whitespace with a space */
+ if (isspace((unsigned char) *cur))
+ *cur = ' ';
+ *cur++ = toupper(*cur);
+ }
+
+ /* find the keyword */
+ sql_from = strstr(sql, " FROM ");
+ if (sql_from)
+ {
+ /* the same offset in the original string */
+ int offset = sql_from - sql;
+ /* remove terminating semicolon */
+ char *end = strrchr(step->sql_statement, ';');
+ *end = '\0';
+
+ appendStringInfoString(buf, step->sql_statement + offset);
+ }
+
+ if (extra_sort)
+ {
+ foreach(l, extra_sort)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(l);
+ char *exprstr = deparse_expression((Node *) tle->expr, context,
+ useprefix, false);
+
+ if (has_order_by)
+ appendStringInfo(buf, ", ");
+ else
+ {
+ appendStringInfo(buf, " ORDER BY ");
+ has_order_by = true;
+ }
+
+ appendStringInfoString(buf, exprstr);
+ }
+ }
+
+ /* do not need the copy */
+ pfree(sql);
+
+ /* free previous query */
+ pfree(step->sql_statement);
+ /* get a copy of new query */
+ step->sql_statement = pstrdup(buf->data);
+ /* free the query buffer */
+ pfree(buf->data);
+ pfree(buf);
+}
+
+
+/*
+ * Plan to sort step tuples
+ * PGXC: copied and adopted from optimizer/plan/planner.c
+ */
+static void
+make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step)
+{
+ List *sortcls = query->sortClause;
+ List *distinctcls = query->distinctClause;
+ List *sub_tlist = step->plan.targetlist;
+ SimpleSort *sort;
+ SimpleDistinct *distinct;
+ ListCell *l;
+ int numsortkeys;
+ int numdistkeys;
+ AttrNumber *sortColIdx;
+ AttrNumber *distColIdx;
+ Oid *sortOperators;
+ Oid *eqOperators;
+ bool *nullsFirst;
+ bool need_reconstruct = false;
+ /*
+ * List of target list entries from DISTINCT which are not in the ORDER BY.
+ * The exressions should be appended to the ORDER BY clause of remote query
+ */
+ List *extra_distincts = NIL;
+
+ Assert(step->sort == NULL);
+ Assert(step->distinct == NULL);
+
+ /*
+ * We will need at most list_length(sortcls) sort columns; possibly less
+ * Also need room for extra distinct expressions if we need to append them
+ */
+ numsortkeys = list_length(sortcls) + list_length(distinctcls);
+ sortColIdx = (AttrNumber *) palloc(numsortkeys * sizeof(AttrNumber));
+ sortOperators = (Oid *) palloc(numsortkeys * sizeof(Oid));
+ nullsFirst = (bool *) palloc(numsortkeys * sizeof(bool));
+
+ numsortkeys = 0;
+ sort = (SimpleSort *) palloc(sizeof(SimpleSort));
+
+ if (sortcls)
+ {
+ foreach(l, sortcls)
+ {
+ SortGroupClause *sortcl = (SortGroupClause *) lfirst(l);
+ TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist);
+
+ if (tle->resjunk)
+ need_reconstruct = true;
+
+ /*
+ * Check for the possibility of duplicate order-by clauses --- the
+ * parser should have removed 'em, but no point in sorting
+ * redundantly.
+ */
+ numsortkeys = add_sort_column(tle->resno, sortcl->sortop,
+ sortcl->nulls_first,
+ numsortkeys,
+ sortColIdx, sortOperators, nullsFirst);
+ }
+ }
+
+ if (distinctcls)
+ {
+ /*
+ * Validate distinct clause
+ * We have to sort tuples to filter duplicates, and if ORDER BY clause
+ * is already present the sort order specified here may be incompatible
+ * with order needed for distinct.
+ *
+ * To be compatible, all expressions from DISTINCT must appear at the
+ * beginning of ORDER BY list. If list of DISTINCT expressions is longer
+ * then ORDER BY we can make ORDER BY compatible we can append remaining
+ * expressions from DISTINCT to ORDER BY. Obviously ORDER BY must not
+ * contain expressions not from the DISTINCT list in this case.
+ *
+ * For validation purposes we use column indexes (AttrNumber) to
+ * identify expressions. May be this is not enough and we should revisit
+ * the algorithm.
+ *
+ * We validate compatibility as follow:
+ * 1. Make working copy of DISTINCT
+ * 1a. Remove possible duplicates when copying: do not add expression
+ * 2. If order by is empty they are already compatible, skip 3
+ * 3. Iterate over ORDER BY items
+ * 3a. If the item is in the working copy delete it from the working
+ * list. If working list is empty after deletion DISTINCT and
+ * ORDER BY are compatible, so break the loop. If working list is
+ * not empty continue iterating
+ * 3b. ORDER BY clause may contain duplicates. So if we can not found
+ * expression in the remainder of DISTINCT, probably it has already
+ * been removed because of duplicate ORDER BY entry. Check original
+ * DISTINCT clause, if expression is there continue iterating.
+ * 3c. DISTINCT and ORDER BY are not compatible, emit error
+ * 4. DISTINCT and ORDER BY are compatible, if we have remaining items
+ * in the working copy we should append it to the order by list
+ */
+ /*
+ * Create the list of unique DISTINCT clause expressions
+ */
+ foreach(l, distinctcls)
+ {
+ SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l);
+ TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist);
+ bool found = false;
+
+ if (extra_distincts)
+ {
+ ListCell *xl;
+
+ foreach(xl, extra_distincts)
+ {
+ TargetEntry *xtle = (TargetEntry *) lfirst(xl);
+ if (xtle->resno == tle->resno)
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+
+ if (!found)
+ extra_distincts = lappend(extra_distincts, tle);
+ }
+
+ if (sortcls)
+ {
+ foreach(l, sortcls)
+ {
+ SortGroupClause *sortcl = (SortGroupClause *) lfirst(l);
+ TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist);
+ bool found = false;
+ ListCell *xl;
+ ListCell *prev = NULL;
+
+ /* Search for the expression in the DISTINCT clause */
+ foreach(xl, extra_distincts)
+ {
+ TargetEntry *xtle = (TargetEntry *) lfirst(xl);
+ if (xtle->resno == tle->resno)
+ {
+ extra_distincts = list_delete_cell(extra_distincts, xl,
+ prev);
+ found = true;
+ break;
+ }
+ prev = xl;
+ }
+
+ /* Probably we've done */
+ if (found && list_length(extra_distincts) == 0)
+ break;
+
+ /* Ensure sort expression is not a duplicate */
+ if (!found)
+ {
+ foreach(xl, distinctcls)
+ {
+ SortGroupClause *xcl = (SortGroupClause *) lfirst(xl);
+ TargetEntry *xtle = get_sortgroupclause_tle(xcl, sub_tlist);
+ if (xtle->resno == tle->resno)
+ {
+ /* it is a duplicate then */
+ found = true;
+ break;
+ }
+ }
+ }
+
+ /* Give up, we do not support it */
+ if (!found)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("Such combination of ORDER BY and DISTINCT is not yet supported"))));
+ }
+ }
+ }
+ /* need to append to the ORDER BY */
+ if (list_length(extra_distincts) > 0)
+ need_reconstruct = true;
+
+ /*
+ * End of validation, expression to append to ORDER BY are in the
+ * extra_distincts list
+ */
+
+ distinct = (SimpleDistinct *) palloc(sizeof(SimpleDistinct));
+
+ /*
+ * We will need at most list_length(distinctcls) sort columns
+ */
+ numdistkeys = list_length(distinctcls);
+ distColIdx = (AttrNumber *) palloc(numdistkeys * sizeof(AttrNumber));
+ eqOperators = (Oid *) palloc(numdistkeys * sizeof(Oid));
+
+ numdistkeys = 0;
+
+ foreach(l, distinctcls)
+ {
+ SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l);
+ TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist);
+
+ /*
+ * Check for the possibility of duplicate order-by clauses --- the
+ * parser should have removed 'em, but no point in sorting
+ * redundantly.
+ */
+ numdistkeys = add_distinct_column(tle->resno,
+ distinctcl->eqop,
+ numdistkeys,
+ distColIdx,
+ eqOperators);
+ /* append also extra sort operator, if not already there */
+ numsortkeys = add_sort_column(tle->resno,
+ distinctcl->sortop,
+ distinctcl->nulls_first,
+ numsortkeys,
+ sortColIdx,
+ sortOperators,
+ nullsFirst);
+ }
+
+ Assert(numdistkeys > 0);
+
+ distinct->numCols = numdistkeys;
+ distinct->uniqColIdx = distColIdx;
+ distinct->eqOperators = eqOperators;
+
+ step->distinct = distinct;
+ }
+
+
+ Assert(numsortkeys > 0);
+
+ sort->numCols = numsortkeys;
+ sort->sortColIdx = sortColIdx;
+ sort->sortOperators = sortOperators;
+ sort->nullsFirst = nullsFirst;
+
+ step->sort = sort;
+
+ if (need_reconstruct)
+ reconstruct_step_query(query->rtable, sortcls != NULL, extra_distincts,
+ step);
+}
+
+/*
* Build up a QueryPlan to execute on.
*
* For the prototype, there will only be one step,
@@ -1543,17 +1969,16 @@ Query_Plan *
GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
{
Query_Plan *query_plan = palloc(sizeof(Query_Plan));
- Query_Step *query_step = palloc(sizeof(Query_Step));
+ RemoteQuery *query_step = makeNode(RemoteQuery);
Query *query;
-
- query_plan->force_autocommit = false;
-
query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1);
strcpy(query_step->sql_statement, sql_statement);
query_step->exec_nodes = NULL;
query_step->combine_type = COMBINE_TYPE_NONE;
query_step->simple_aggregates = NULL;
+ query_step->read_only = false;
+ query_step->force_autocommit = false;
query_plan->query_step_list = lappend(NULL, query_step);
@@ -1565,11 +1990,16 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
switch (nodeTag(parsetree))
{
case T_SelectStmt:
+ /* Optimize multi-node handling */
+ query_step->read_only = true;
+ /* fallthru */
case T_InsertStmt:
case T_UpdateStmt:
case T_DeleteStmt:
/* just use first one in querytree_list */
query = (Query *) linitial(querytree_list);
+ /* should copy instead ? */
+ query_step->plan.targetlist = query->targetList;
/* Perform some checks to make sure we can support the statement */
if (nodeTag(parsetree) == T_SelectStmt)
@@ -1633,6 +2063,12 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
}
/*
+ * Add sortring to the step
+ */
+ if (query->sortClause || query->distinctClause)
+ make_simple_sort_from_sortclauses(query, query_step);
+
+ /*
* PG-XC cannot yet support some variations of SQL statements.
* We perform some checks to at least catch common cases
*/
@@ -1658,15 +2094,6 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("Multi-node LIMIT not yet supported"))));
- if (query->sortClause && StrictSelectChecking)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Multi-node ORDER BY not yet supported"))));
- /* PGXCTODO - check if first column partitioning column */
- if (query->distinctClause)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Multi-node DISTINCT`not yet supported"))));
}
}
break;
@@ -1686,7 +2113,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
case T_DropdbStmt:
case T_VacuumStmt:
query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- query_plan->force_autocommit = true;
+ query_step->force_autocommit = true;
break;
case T_DropPropertyStmt:
@@ -1864,7 +2291,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
* Free Query_Step struct
*/
static void
-free_query_step(Query_Step *query_step)
+free_query_step(RemoteQuery *query_step)
{
if (query_step == NULL)
return;
@@ -1894,7 +2321,7 @@ FreeQueryPlan(Query_Plan *query_plan)
return;
foreach(item, query_plan->query_step_list)
- free_query_step((Query_Step *) lfirst(item));
+ free_query_step((RemoteQuery *) lfirst(item));
pfree(query_plan->query_step_list);
pfree(query_plan);
diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile
index 7143af5d97..e8753031c2 100644
--- a/src/backend/pgxc/pool/Makefile
+++ b/src/backend/pgxc/pool/Makefile
@@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = combiner.o datanode.o poolmgr.o poolcomm.o
+OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c
deleted file mode 100644
index 53e5dfb11c..0000000000
--- a/src/backend/pgxc/pool/combiner.c
+++ /dev/null
@@ -1,652 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * combiner.c
- *
- * Combine responses from multiple Data Nodes
- *
- *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
- * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
- *
- * IDENTIFICATION
- * $$
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-#include "pgxc/combiner.h"
-#include "pgxc/planner.h"
-#include "catalog/pg_type.h"
-#include "libpq/libpq.h"
-#include "libpq/pqformat.h"
-#include "utils/builtins.h"
-#include "utils/datum.h"
-
-
-/*
- * Create a structure to store parameters needed to combine responses from
- * multiple connections as well as state information
- */
-ResponseCombiner
-CreateResponseCombiner(int node_count, CombineType combine_type,
- CommandDest dest)
-{
- ResponseCombiner combiner;
-
- /* ResponseComber is a typedef for pointer to ResponseCombinerData */
- combiner = (ResponseCombiner) palloc(sizeof(ResponseCombinerData));
- if (combiner == NULL)
- {
- /* Out of memory */
- return combiner;
- }
-
- combiner->node_count = node_count;
- combiner->combine_type = combine_type;
- combiner->dest = dest;
- combiner->command_complete_count = 0;
- combiner->row_count = 0;
- combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
- combiner->description_count = 0;
- combiner->copy_in_count = 0;
- combiner->copy_out_count = 0;
- combiner->inErrorState = false;
- combiner->initAggregates = true;
- combiner->simple_aggregates = NULL;
- combiner->copy_file = NULL;
-
- return combiner;
-}
-
-/*
- * Parse out row count from the command status response and convert it to integer
- */
-static int
-parse_row_count(const char *message, size_t len, int *rowcount)
-{
- int digits = 0;
- int pos;
-
- *rowcount = 0;
- /* skip \0 string terminator */
- for (pos = 0; pos < len - 1; pos++)
- {
- if (message[pos] >= '0' && message[pos] <= '9')
- {
- *rowcount = *rowcount * 10 + message[pos] - '0';
- digits++;
- }
- else
- {
- *rowcount = 0;
- digits = 0;
- }
- }
- return digits;
-}
-
-/*
- * Extract a transition value from data row. Invoke the Input Function
- * associated with the transition data type to represent value as a Datum.
- * Output parameters value and val_null, receive extracted value and indicate
- * whether it is null.
- */
-static void
-parse_aggregate_value(SimpleAgg *simple_agg, char *col_data, size_t datalen, Datum *value, bool *val_null)
-{
- /* Check NULL */
- if (datalen == -1)
- {
- *value = (Datum) 0;
- *val_null = true;
- }
- else
- {
- resetStringInfo(&simple_agg->valuebuf);
- appendBinaryStringInfo(&simple_agg->valuebuf, col_data, datalen);
- *value = InputFunctionCall(&simple_agg->arginputfn, simple_agg->valuebuf.data, simple_agg->argioparam, -1);
- *val_null = false;
- }
-}
-
-/*
- * Initialize the collection value, when agregation is first set up, or for a
- * new group (grouping support is not implemented yet)
- */
-static void
-initialize_collect_aggregates(SimpleAgg *simple_agg)
-{
- if (simple_agg->initValueIsNull)
- simple_agg->collectValue = simple_agg->initValue;
- else
- simple_agg->collectValue = datumCopy(simple_agg->initValue,
- simple_agg->transtypeByVal,
- simple_agg->transtypeLen);
- simple_agg->noCollectValue = simple_agg->initValueIsNull;
- simple_agg->collectValueNull = simple_agg->initValueIsNull;
-}
-
-/*
- * Finalize the aggregate after current group or entire relation is processed
- * (grouping support is not implemented yet)
- */
-static void
-finalize_collect_aggregates(SimpleAgg *simple_agg, Datum *resultVal, bool *resultIsNull)
-{
- /*
- * Apply the agg's finalfn if one is provided, else return collectValue.
- */
- if (OidIsValid(simple_agg->finalfn_oid))
- {
- FunctionCallInfoData fcinfo;
-
- InitFunctionCallInfoData(fcinfo, &(simple_agg->finalfn), 1,
- (void *) simple_agg, NULL);
- fcinfo.arg[0] = simple_agg->collectValue;
- fcinfo.argnull[0] = simple_agg->collectValueNull;
- if (fcinfo.flinfo->fn_strict && simple_agg->collectValueNull)
- {
- /* don't call a strict function with NULL inputs */
- *resultVal = (Datum) 0;
- *resultIsNull = true;
- }
- else
- {
- *resultVal = FunctionCallInvoke(&fcinfo);
- *resultIsNull = fcinfo.isnull;
- }
- }
- else
- {
- *resultVal = simple_agg->collectValue;
- *resultIsNull = simple_agg->collectValueNull;
- }
-}
-
-/*
- * Given new input value(s), advance the transition function of an aggregate.
- *
- * The new values (and null flags) have been preloaded into argument positions
- * 1 and up in fcinfo, so that we needn't copy them again to pass to the
- * collection function. No other fields of fcinfo are assumed valid.
- *
- * It doesn't matter which memory context this is called in.
- */
-static void
-advance_collect_function(SimpleAgg *simple_agg, FunctionCallInfoData *fcinfo)
-{
- Datum newVal;
-
- if (simple_agg->transfn.fn_strict)
- {
- /*
- * For a strict transfn, nothing happens when there's a NULL input; we
- * just keep the prior transValue.
- */
- if (fcinfo->argnull[1])
- return;
- if (simple_agg->noCollectValue)
- {
- /*
- * result has not been initialized
- * We must copy the datum into result if it is pass-by-ref. We
- * do not need to pfree the old result, since it's NULL.
- */
- simple_agg->collectValue = datumCopy(fcinfo->arg[1],
- simple_agg->transtypeByVal,
- simple_agg->transtypeLen);
- simple_agg->collectValueNull = false;
- simple_agg->noCollectValue = false;
- return;
- }
- if (simple_agg->collectValueNull)
- {
- /*
- * Don't call a strict function with NULL inputs. Note it is
- * possible to get here despite the above tests, if the transfn is
- * strict *and* returned a NULL on a prior cycle. If that happens
- * we will propagate the NULL all the way to the end.
- */
- return;
- }
- }
-
- /*
- * OK to call the transition function
- */
- InitFunctionCallInfoData(*fcinfo, &(simple_agg->transfn), 2, (void *) simple_agg, NULL);
- fcinfo->arg[0] = simple_agg->collectValue;
- fcinfo->argnull[0] = simple_agg->collectValueNull;
- newVal = FunctionCallInvoke(fcinfo);
-
- /*
- * If pass-by-ref datatype, must copy the new value into aggcontext and
- * pfree the prior transValue. But if transfn returned a pointer to its
- * first input, we don't need to do anything.
- */
- if (!simple_agg->transtypeByVal &&
- DatumGetPointer(newVal) != DatumGetPointer(simple_agg->collectValue))
- {
- if (!fcinfo->isnull)
- {
- newVal = datumCopy(newVal,
- simple_agg->transtypeByVal,
- simple_agg->transtypeLen);
- }
- if (!simple_agg->collectValueNull)
- pfree(DatumGetPointer(simple_agg->collectValue));
- }
-
- simple_agg->collectValue = newVal;
- simple_agg->collectValueNull = fcinfo->isnull;
-}
-
-/*
- * Handle response message and update combiner's state.
- * This function contains main combiner logic
- */
-int
-CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t len)
-{
- int digits = 0;
-
- /* Ignore anything if we have encountered error */
- if (combiner->inErrorState)
- return EOF;
-
- switch (msg_type)
- {
- case 'c': /* CopyOutCommandComplete */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- combiner->request_type = REQUEST_TYPE_COPY_OUT;
- if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
- /* Inconsistent responses */
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Unexpected response from the data nodes")));
- /* Just do nothing, close message is managed by the coordinator */
- combiner->copy_out_count++;
- break;
- case 'C': /* CommandComplete */
- /*
- * If we did not receive description we are having rowcount or OK
- * response
- */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- combiner->request_type = REQUEST_TYPE_COMMAND;
- /* Extract rowcount */
- if (combiner->combine_type != COMBINE_TYPE_NONE)
- {
- int rowcount;
- digits = parse_row_count(msg_body, len, &rowcount);
- if (digits > 0)
- {
- /* Replicated write, make sure they are the same */
- if (combiner->combine_type == COMBINE_TYPE_SAME)
- {
- if (combiner->command_complete_count)
- {
- if (rowcount != combiner->row_count)
- /* There is a consistency issue in the database with the replicated table */
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Write to replicated table returned different results from the data nodes")));
- }
- else
- /* first result */
- combiner->row_count = rowcount;
- }
- else
- combiner->row_count += rowcount;
- }
- else
- combiner->combine_type = COMBINE_TYPE_NONE;
- }
- if (++combiner->command_complete_count == combiner->node_count)
- {
-
- if (combiner->simple_aggregates
- /*
- * Aggregates has not been initialized - that means
- * no rows received from data nodes, nothing to send
- * It is possible if HAVING clause is present
- */
- && !combiner->initAggregates)
- {
- /* Build up and send a datarow with aggregates */
- StringInfo dataRowBuffer = makeStringInfo();
- ListCell *lc;
-
- /* Number of fields */
- pq_sendint(dataRowBuffer, list_length(combiner->simple_aggregates), 2);
-
- foreach (lc, combiner->simple_aggregates)
- {
- SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc);
- Datum resultVal;
- bool resultIsNull;
-
- finalize_collect_aggregates(simple_agg, &resultVal, &resultIsNull);
- /* Aggregation result */
- if (resultIsNull)
- {
- pq_sendint(dataRowBuffer, -1, 4);
- }
- else
- {
- char *text = OutputFunctionCall(&simple_agg->resoutputfn, resultVal);
- size_t len = strlen(text);
- pq_sendint(dataRowBuffer, len, 4);
- pq_sendtext(dataRowBuffer, text, len);
- }
- }
- pq_putmessage('D', dataRowBuffer->data, dataRowBuffer->len);
- pfree(dataRowBuffer->data);
- pfree(dataRowBuffer);
- }
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- {
- if (combiner->combine_type == COMBINE_TYPE_NONE)
- {
- pq_putmessage(msg_type, msg_body, len);
- }
- else
- {
- char command_complete_buffer[256];
-
- /* Truncate msg_body to get base string */
- msg_body[len - digits - 1] = '\0';
- len = sprintf(command_complete_buffer, "%s%d", msg_body, combiner->row_count) + 1;
- pq_putmessage(msg_type, command_complete_buffer, len);
- }
- }
- }
- break;
- case 'T': /* RowDescription */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- combiner->request_type = REQUEST_TYPE_QUERY;
- if (combiner->request_type != REQUEST_TYPE_QUERY)
- {
- /* Inconsistent responses */
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Unexpected response from the data nodes")));
- }
- /* Proxy first */
- if (combiner->description_count++ == 0)
- {
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- pq_putmessage(msg_type, msg_body, len);
- }
- break;
- case 'S': /* ParameterStatus (SET command) */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- combiner->request_type = REQUEST_TYPE_QUERY;
- if (combiner->request_type != REQUEST_TYPE_QUERY)
- {
- /* Inconsistent responses */
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Unexpected response from the data nodes")));
- }
- /* Proxy last */
- if (++combiner->description_count == combiner->node_count)
- {
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- pq_putmessage(msg_type, msg_body, len);
- }
- break;
- case 'G': /* CopyInResponse */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- combiner->request_type = REQUEST_TYPE_COPY_IN;
- if (combiner->request_type != REQUEST_TYPE_COPY_IN)
- {
- /* Inconsistent responses */
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Unexpected response from the data nodes")));
- }
- /* Proxy first */
- if (combiner->copy_in_count++ == 0)
- {
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- pq_putmessage(msg_type, msg_body, len);
- }
- break;
- case 'H': /* CopyOutResponse */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- combiner->request_type = REQUEST_TYPE_COPY_OUT;
- if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
- {
- /* Inconsistent responses */
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Unexpected response from the data nodes")));
- }
- /*
- * The normal PG code will output an H message when it runs in the
- * coordinator, so do not proxy message here, just count it.
- */
- combiner->copy_out_count++;
- break;
- case 'd': /* CopyOutDataRow */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- combiner->request_type = REQUEST_TYPE_COPY_OUT;
-
- /* Inconsistent responses */
- if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Unexpected response from the data nodes")));
-
- /* If there is a copy file, data has to be sent to the local file */
- if (combiner->copy_file)
- {
- /* write data to the copy file */
- char *data_row;
- data_row = (char *) palloc0(len);
- memcpy(data_row, msg_body, len);
-
- fwrite(data_row, 1, len, combiner->copy_file);
- break;
- }
- /*
- * In this case data is sent back to the client
- */
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- {
- StringInfo data_buffer;
-
- data_buffer = makeStringInfo();
-
- pq_sendtext(data_buffer, msg_body, len);
- pq_putmessage(msg_type,
- data_buffer->data,
- data_buffer->len);
-
- pfree(data_buffer->data);
- pfree(data_buffer);
- }
- break;
- case 'D': /* DataRow */
- if (!combiner->simple_aggregates)
- {
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- pq_putmessage(msg_type, msg_body, len);
- }
- else
- {
- ListCell *lc;
- char **col_values;
- int *col_value_len;
- uint16 col_count;
- int i, cur = 0;
-
- /* Get values from the data row into array to speed up access */
- memcpy(&col_count, msg_body, 2);
- col_count = ntohs(col_count);
- cur += 2;
-
- col_values = (char **) palloc0(col_count * sizeof(char *));
- col_value_len = (int *) palloc0(col_count * sizeof(int));
- for (i = 0; i < col_count; i++)
- {
- int n32;
-
- memcpy(&n32, msg_body + cur, 4);
- col_value_len[i] = ntohl(n32);
- cur += 4;
-
- if (col_value_len[i] != -1)
- {
- col_values[i] = msg_body + cur;
- cur += col_value_len[i];
- }
- }
-
- if (combiner->initAggregates)
- {
- foreach (lc, combiner->simple_aggregates)
- initialize_collect_aggregates((SimpleAgg *) lfirst(lc));
-
- combiner->initAggregates = false;
- }
-
- foreach (lc, combiner->simple_aggregates)
- {
- SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc);
- FunctionCallInfoData fcinfo;
-
- parse_aggregate_value(simple_agg,
- col_values[simple_agg->column_pos],
- col_value_len[simple_agg->column_pos],
- fcinfo.arg + 1,
- fcinfo.argnull + 1);
-
- advance_collect_function(simple_agg, &fcinfo);
- }
- pfree(col_values);
- pfree(col_value_len);
- }
- break;
- case 'E': /* ErrorResponse */
- combiner->inErrorState = true;
- /* fallthru */
- case 'A': /* NotificationResponse */
- case 'N': /* NoticeResponse */
- /* Proxy error message back if specified,
- * or if doing internal primary copy
- */
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- pq_putmessage(msg_type, msg_body, len);
- break;
- case 'I': /* EmptyQuery */
- default:
- /* Unexpected message */
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Unexpected response from the data nodes")));
- }
- return 0;
-}
-
-/*
- * Examine the specified combiner state and determine if command was completed
- * successfully
- */
-static bool
-validate_combiner(ResponseCombiner combiner)
-{
- /* There was error message while combining */
- if (combiner->inErrorState)
- return false;
- /* Check if state is defined */
- if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
- return false;
- /* Check all nodes completed */
- if ((combiner->request_type == REQUEST_TYPE_COMMAND
- || combiner->request_type == REQUEST_TYPE_QUERY)
- && combiner->command_complete_count != combiner->node_count)
- return false;
-
- /* Check count of description responses */
- if (combiner->request_type == REQUEST_TYPE_QUERY
- && combiner->description_count != combiner->node_count)
- return false;
-
- /* Check count of copy-in responses */
- if (combiner->request_type == REQUEST_TYPE_COPY_IN
- && combiner->copy_in_count != combiner->node_count)
- return false;
-
- /* Check count of copy-out responses */
- if (combiner->request_type == REQUEST_TYPE_COPY_OUT
- && combiner->copy_out_count != combiner->node_count)
- return false;
-
- /* Add other checks here as needed */
-
- /* All is good if we are here */
- return true;
-}
-
-/*
- * Validate combiner and release storage freeing allocated memory
- */
-bool
-ValidateAndCloseCombiner(ResponseCombiner combiner)
-{
- bool valid = validate_combiner(combiner);
-
- pfree(combiner);
-
- return valid;
-}
-
-/*
- * Validate combiner and reset storage
- */
-bool
-ValidateAndResetCombiner(ResponseCombiner combiner)
-{
- bool valid = validate_combiner(combiner);
-
- combiner->command_complete_count = 0;
- combiner->row_count = 0;
- combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
- combiner->description_count = 0;
- combiner->copy_in_count = 0;
- combiner->copy_out_count = 0;
- combiner->inErrorState = false;
- combiner->simple_aggregates = NULL;
- combiner->copy_file = NULL;
-
- return valid;
-}
-
-/*
- * Close combiner and free allocated memory, if it is not needed
- */
-void
-CloseCombiner(ResponseCombiner combiner)
-{
- if (combiner)
- pfree(combiner);
-}
-
-/*
- * Assign combiner aggregates
- */
-void
-AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates)
-{
- combiner->simple_aggregates = simple_aggregates;
-}
diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c
index 6a1aba8190..517b1e4d78 100644
--- a/src/backend/pgxc/pool/datanode.c
+++ b/src/backend/pgxc/pool/datanode.c
@@ -15,6 +15,7 @@
*-------------------------------------------------------------------------
*/
+#include "postgres.h"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
@@ -22,166 +23,33 @@
#include <string.h>
#include <unistd.h>
#include <errno.h>
-#include "pgxc/poolmgr.h"
#include "access/gtm.h"
#include "access/transam.h"
#include "access/xact.h"
-#include "postgres.h"
-#include "utils/snapmgr.h"
-#include "pgxc/pgxc.h"
#include "gtm/gtm_c.h"
#include "pgxc/datanode.h"
#include "pgxc/locator.h"
-#include "../interfaces/libpq/libpq-fe.h"
+#include "pgxc/pgxc.h"
+#include "pgxc/poolmgr.h"
+#include "tcop/dest.h"
#include "utils/elog.h"
#include "utils/memutils.h"
-
+#include "utils/snapmgr.h"
+#include "../interfaces/libpq/libpq-fe.h"
#define NO_SOCKET -1
-/*
- * Buffer size does not affect performance significantly, just do not allow
- * connection buffer grows infinitely
- */
-#define COPY_BUFFER_SIZE 8192
-#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024
-
static int node_count = 0;
static DataNodeHandle *handles = NULL;
-static bool autocommit = true;
-static DataNodeHandle **write_node_list = NULL;
-static int write_node_count = 0;
-static DataNodeHandle **get_handles(List *nodelist);
-static int get_transaction_nodes(DataNodeHandle **connections);
-static void release_handles(void);
-
-static void data_node_init(DataNodeHandle *handle, int sock);
+static void data_node_init(DataNodeHandle *handle, int sock, int nodenum);
static void data_node_free(DataNodeHandle *handle);
-static int data_node_begin(int conn_count, DataNodeHandle **connections, CommandDest dest, GlobalTransactionId gxid);
-static int data_node_commit(int conn_count, DataNodeHandle **connections, CommandDest dest);
-static int data_node_rollback(int conn_count, DataNodeHandle **connections, CommandDest dest);
-
-static int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle);
-static int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle);
-
-static int data_node_send_query(DataNodeHandle *handle, const char *query);
-static int data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid);
-static int data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot);
-
-static void add_error_message(DataNodeHandle *handle, const char *message);
-
-static int data_node_read_data(DataNodeHandle *conn);
-static int handle_response(DataNodeHandle *conn, ResponseCombiner combiner);
-
-static int get_int(DataNodeHandle *conn, size_t len, int *out);
-static int get_char(DataNodeHandle *conn, char *out);
-
-static void clear_write_node_list();
-
-#define MAX_STATEMENTS_PER_TRAN 10
-
-/* Variables to collect statistics */
-static int total_transactions = 0;
-static int total_statements = 0;
-static int total_autocommit = 0;
-static int nonautocommit_2pc = 0;
-static int autocommit_2pc = 0;
-static int current_tran_statements = 0;
-static int *statements_per_transaction = NULL;
-static int *nodes_per_transaction = NULL;
-
-/*
- * statistics collection: count a statement
- */
-static void
-stat_statement()
-{
- total_statements++;
- current_tran_statements++;
-}
-
-/*
- * To collect statistics: count a transaction
- */
-static void
-stat_transaction(int node_count)
-{
- total_transactions++;
- if (autocommit)
- total_autocommit++;
-
- if (!statements_per_transaction)
- {
- statements_per_transaction = (int *) malloc((MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int));
- memset(statements_per_transaction, 0, (MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int));
- }
-
- if (current_tran_statements > MAX_STATEMENTS_PER_TRAN)
- statements_per_transaction[MAX_STATEMENTS_PER_TRAN]++;
- else
- statements_per_transaction[current_tran_statements]++;
-
- current_tran_statements = 0;
- if (node_count > 0 && node_count <= NumDataNodes)
- {
- if (!nodes_per_transaction)
- {
- nodes_per_transaction = (int *) malloc(NumDataNodes * sizeof(int));
- memset(nodes_per_transaction, 0, NumDataNodes * sizeof(int));
- }
- nodes_per_transaction[node_count - 1]++;
- }
-}
-
-
-/*
- * To collect statistics: count a two-phase commit on nodes
- */
-static void
-stat_2pc(void)
-{
- if (autocommit)
- autocommit_2pc++;
- else
- nonautocommit_2pc++;
-}
+static int get_int(DataNodeHandle * conn, size_t len, int *out);
+static int get_char(DataNodeHandle * conn, char *out);
/*
- * Output collected statistics to the log
- */
-static void
-stat_log(void)
-{
- elog(DEBUG1, "Total Transactions: %d Total Statements: %d", total_transactions, total_statements);
- elog(DEBUG1, "Autocommit: %d 2PC for Autocommit: %d 2PC for non-Autocommit: %d",
- total_autocommit, autocommit_2pc, nonautocommit_2pc);
- if (total_transactions)
- {
- if (statements_per_transaction)
- {
- int i;
-
- for (i = 0; i < MAX_STATEMENTS_PER_TRAN; i++)
- elog(DEBUG1, "%d Statements per Transaction: %d (%d%%)",
- i, statements_per_transaction[i], statements_per_transaction[i] * 100 / total_transactions);
- }
- elog(DEBUG1, "%d+ Statements per Transaction: %d (%d%%)",
- MAX_STATEMENTS_PER_TRAN, statements_per_transaction[MAX_STATEMENTS_PER_TRAN], statements_per_transaction[MAX_STATEMENTS_PER_TRAN] * 100 / total_transactions);
- if (nodes_per_transaction)
- {
- int i;
-
- for (i = 0; i < NumDataNodes; i++)
- elog(DEBUG1, "%d Nodes per Transaction: %d (%d%%)",
- i + 1, nodes_per_transaction[i], nodes_per_transaction[i] * 100 / total_transactions);
- }
- }
-}
-
-/*
* Allocate and initialize memory to store DataNode handles.
*/
void
@@ -325,8 +193,9 @@ data_node_free(DataNodeHandle *handle)
* Structure stores state info and I/O buffers
*/
static void
-data_node_init(DataNodeHandle *handle, int sock)
+data_node_init(DataNodeHandle *handle, int sock, int nodenum)
{
+ handle->nodenum = nodenum;
handle->sock = sock;
handle->transaction_status = 'I';
handle->state = DN_CONNECTION_STATE_IDLE;
@@ -339,136 +208,99 @@ data_node_init(DataNodeHandle *handle, int sock)
/*
- * Handle responses from the Data node connections
+ * Wait while at least one of specified connections has data available and read
+ * the data into the buffer
*/
-static void
-data_node_receive_responses(const int conn_count, DataNodeHandle **connections,
- struct timeval *timeout, ResponseCombiner combiner)
+void
+data_node_receive(const int conn_count,
+ DataNodeHandle ** connections, struct timeval * timeout)
{
- int count = conn_count;
- DataNodeHandle *to_receive[conn_count];
-
- /* make a copy of the pointers to the connections */
- memcpy(to_receive, connections, conn_count * sizeof(DataNodeHandle *));
+ int i,
+ res_select,
+ nfds = 0;
+ fd_set readfds;
- /*
- * Read results.
- * Note we try and read from data node connections even if there is an error on one,
- * so as to avoid reading incorrect results on the next statement.
- * It might be better to just destroy these connections and tell the pool manager.
- */
- while (count > 0)
+ FD_ZERO(&readfds);
+ for (i = 0; i < conn_count; i++)
{
- int i,
- res_select,
- nfds = 0;
- fd_set readfds;
-
- FD_ZERO(&readfds);
- for (i = 0; i < count; i++)
- {
- /* note if a connection has error */
- if (!to_receive[i]
- || to_receive[i]->state == DN_CONNECTION_STATE_ERROR_FATAL)
- {
- /* Handling is done, do not track this connection */
- count--;
-
- /* Move last connection in its place */
- if (i < count)
- {
- to_receive[i] = to_receive[count];
- /* stay on the current position */
- i--;
- }
- continue;
- }
+ /* If connection finised sending do not wait input from it */
+ if (connections[i]->state == DN_CONNECTION_STATE_IDLE
+ || connections[i]->state == DN_CONNECTION_STATE_HAS_DATA)
+ continue;
- /* prepare select params */
- if (nfds < to_receive[i]->sock)
- nfds = to_receive[i]->sock;
+ /* prepare select params */
+ if (nfds < connections[i]->sock)
+ nfds = connections[i]->sock;
- FD_SET (to_receive[i]->sock, &readfds);
- }
+ FD_SET(connections[i]->sock, &readfds);
+ }
- /* Make sure we still have valid connections */
- if (count == 0)
- break;
+ /*
+ * Return if we do not have connections to receive input
+ */
+ if (nfds == 0)
+ return;
retry:
- res_select = select(nfds + 1, &readfds, NULL, NULL, timeout);
- if (res_select < 0)
- {
- /* error - retry if EINTR or EAGAIN */
- if (errno == EINTR || errno == EAGAIN)
- goto retry;
-
- /*
- * PGXCTODO - we may want to close the connections and notify the
- * pooler that these are invalid.
- */
- if (errno == EBADF)
- {
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("select() bad file descriptor set")));
- }
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("select() error: %d", errno)));
- }
+ res_select = select(nfds + 1, &readfds, NULL, NULL, timeout);
+ if (res_select < 0)
+ {
+ /* error - retry if EINTR or EAGAIN */
+ if (errno == EINTR || errno == EAGAIN)
+ goto retry;
- if (res_select == 0)
+ /*
+ * PGXCTODO - we may want to close the connections and notify the
+ * pooler that these are invalid.
+ */
+ if (errno == EBADF)
{
- /* Handle timeout */
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("timeout while waiting for response")));
+ errmsg("select() bad file descriptor set")));
}
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("select() error: %d", errno)));
+ }
+
+ if (res_select == 0)
+ {
+ /* Handle timeout */
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("timeout while waiting for response")));
+ }
- /* read data */
- for (i = 0; i < count; i++)
+ /* read data */
+ for (i = 0; i < conn_count; i++)
+ {
+ DataNodeHandle *conn = connections[i];
+
+ if (FD_ISSET(conn->sock, &readfds))
{
- DataNodeHandle *conn = to_receive[i];
+ int read_status = data_node_read_data(conn);
- if (FD_ISSET(conn->sock, &readfds))
+ if (read_status == EOF || read_status < 0)
{
- int read_status = data_node_read_data(conn);
-
- if (read_status == EOF || read_status < 0)
- {
- /* PGXCTODO - we should notify the pooler to destroy the connections */
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("unexpected EOF on datanode connection")));
- }
+ /* PGXCTODO - we should notify the pooler to destroy the connections */
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("unexpected EOF on datanode connection")));
}
-
- if (conn->inStart < conn->inEnd)
+ else
{
- if (handle_response(conn, combiner) == 0
- || conn->state == DN_CONNECTION_STATE_ERROR_READY
- || conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
- {
- /* Handling is done, do not track this connection */
- count--;
- /* Move last connection in place */
- if (i < count)
- {
- to_receive[i] = to_receive[count];
- /* stay on the current position */
- i--;
- }
- }
+ conn->state = DN_CONNECTION_STATE_HAS_DATA;
}
}
}
}
+
/*
* Read up incoming messages from the Data ndoe connection
*/
-static int
+int
data_node_read_data(DataNodeHandle *conn)
{
int someread = 0;
@@ -601,11 +433,12 @@ retry:
return 0;
}
+
/*
* Get one character from the connection buffer and advance cursor
*/
static int
-get_char(DataNodeHandle *conn, char *out)
+get_char(DataNodeHandle * conn, char *out)
{
if (conn->inCursor < conn->inEnd)
{
@@ -647,379 +480,62 @@ get_int(DataNodeHandle *conn, size_t len, int *out)
return 0;
}
-/*
- * Read next message from the connection and update the combiner accordingly
- * If we are in an error state we just consume the messages, and do not proxy
- * Long term, we should look into cancelling executing statements
- * and closing the connections.
- */
-static int
-handle_response(DataNodeHandle *conn, ResponseCombiner combiner)
-{
- char msg_type;
- int msg_len;
-
- for (;;)
- {
- /* try to read the message, return if not enough data */
- conn->inCursor = conn->inStart;
-
- /*
- * Make sure we receive a complete message, otherwise, return EOF, and
- * we will be called again after more data has been read.
- */
- if (conn->inEnd - conn->inCursor < 5)
- return EOF;
-
- if (get_char(conn, &msg_type))
- return EOF;
-
- if (get_int(conn, 4, &msg_len))
- return EOF;
-
- msg_len -= 4;
-
- if (conn->inEnd - conn->inCursor < msg_len)
- {
- ensure_in_buffer_capacity(conn->inCursor + (size_t) msg_len, conn);
- return EOF;
- }
-
- /* TODO handle other possible responses */
- switch (msg_type)
- {
- case 'c': /* CopyToCommandComplete */
- /* no need to parse, just move cursor */
- conn->inCursor += msg_len;
- conn->state = DN_CONNECTION_STATE_COMPLETED;
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
- break;
- case 'C': /* CommandComplete */
- /* no need to parse, just move cursor */
- conn->inCursor += msg_len;
- conn->state = DN_CONNECTION_STATE_COMPLETED;
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
- break;
- case 'T': /* RowDescription */
- case 'D': /* DataRow */
- case 'S': /* ParameterStatus */
- /* no need to parse, just move cursor */
- conn->inCursor += msg_len;
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
- break;
- case 'G': /* CopyInResponse */
- /* no need to parse, just move cursor */
- conn->inCursor += msg_len;
- conn->state = DN_CONNECTION_STATE_COPY_IN;
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
- /* Done, return to caller to let it know the data can be passed in */
- conn->inStart = conn->inCursor;
- conn->state = DN_CONNECTION_STATE_COPY_IN;
- return 0;
- case 'H': /* CopyOutResponse */
- /* no need to parse, just move cursor */
- conn->inCursor += msg_len;
- conn->state = DN_CONNECTION_STATE_COPY_OUT;
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
- conn->inStart = conn->inCursor;
- conn->state = DN_CONNECTION_STATE_COPY_OUT;
- return 0;
- case 'd': /* CopyOutDataRow */
- /* No need to parse, just send a row back to client */
- conn->inCursor += msg_len;
- conn->state = DN_CONNECTION_STATE_COPY_OUT;
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
- break;
- case 'E': /* ErrorResponse */
- /* no need to parse, just move cursor */
- conn->inCursor += msg_len;
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
- conn->inStart = conn->inCursor;
- conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
- /*
- * Do not return with an error, we still need to consume Z,
- * ready-for-query
- */
- break;
- case 'A': /* NotificationResponse */
- case 'N': /* NoticeResponse */
- conn->inCursor += msg_len;
-
- /*
- * Ignore these to prevent multiple messages, one from each
- * node. Coordinator will send one for DDL anyway
- */
- break;
- case 'Z': /* ReadyForQuery */
- get_char(conn, &conn->transaction_status);
- conn->inStart = conn->inCursor;
- if (conn->state == DN_CONNECTION_STATE_ERROR_NOT_READY)
- {
- conn->state = DN_CONNECTION_STATE_ERROR_READY;
- return EOF;
- } else
- conn->state = DN_CONNECTION_STATE_IDLE;
- return 0;
- case 'I': /* EmptyQuery */
- default:
- /* sync lost? */
- conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
- return EOF;
- }
- conn->inStart = conn->inCursor;
- }
- /* Keep compiler quiet */
- return EOF;
-}
/*
- * Send BEGIN command to the Data nodes and receive responses
+ * get_message
+ * If connection has enough data read entire message from the connection buffer
+ * and returns message type. Message data and data length are returned as
+ * var parameters.
+ * If buffer does not have enough data leaves cursor unchanged, changes
+ * connection status to DN_CONNECTION_STATE_QUERY indicating it needs to
+ * receive more and returns \0
+ * conn - connection to read from
+ * len - returned length of the data where msg is pointing to
+ * msg - returns pointer to memory in the incoming buffer. The buffer probably
+ * will be overwritten upon next receive, so if caller wants to refer it later
+ * it should make a copy.
*/
-static int
-data_node_begin(int conn_count, DataNodeHandle **connections, CommandDest dest, GlobalTransactionId gxid)
+char
+get_message(DataNodeHandle *conn, int *len, char **msg)
{
- int i;
- struct timeval *timeout = NULL;
- ResponseCombiner combiner;
+ char msgtype;
- /* Send BEGIN */
- for (i = 0; i < conn_count; i++)
+ if (get_char(conn, &msgtype) || get_int(conn, 4, len))
{
- if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid))
- return EOF;
-
- if (data_node_send_query(connections[i], "BEGIN"))
- return EOF;
+ /* Successful get_char would move cursor, restore position */
+ conn->inCursor = conn->inStart;
+ return '\0';
}
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, dest);
-
- /* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
-
- /* Verify status */
- return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
-}
-
-/* Clears the write node list */
-static void
-clear_write_node_list(void)
-{
- /* we just malloc once and use counter */
- if (write_node_list == NULL)
- write_node_list = (DataNodeHandle **) malloc(NumDataNodes * sizeof(DataNodeHandle *));
-
- write_node_count = 0;
-}
-
-
-/*
- * Switch autocommmit mode off, so all subsequent statements will be in the same transaction
- */
-void
-DataNodeBegin(void)
-{
- autocommit = false;
- clear_write_node_list();
-}
-
-
-/*
- * Commit current transaction, use two-phase commit if necessary
- */
-int
-DataNodeCommit(CommandDest dest)
-{
- int res;
- int tran_count;
- DataNodeHandle *connections[node_count];
- /* Quick check to make sure we have connections */
- if (node_count == 0)
- goto finish;
-
- /* gather connections to commit */
- tran_count = get_transaction_nodes(connections);
-
- /*
- * If we do not have open transactions we have nothing to commit, just
- * report success
- */
- if (tran_count == 0)
- goto finish;
-
- res = data_node_commit(tran_count, connections, dest);
-
-finish:
- /* In autocommit mode statistics is collected in DataNodeExec */
- if (!autocommit)
- stat_transaction(node_count);
- if (!PersistentConnections)
- release_handles();
- autocommit = true;
- clear_write_node_list();
- return res;
-}
-
+ *len -= 4;
-/*
- * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses
- */
-static int
-data_node_commit(int conn_count, DataNodeHandle **connections, CommandDest dest)
-{
- int i;
- struct timeval *timeout = NULL;
- char buffer[256];
- GlobalTransactionId gxid = InvalidGlobalTransactionId;
- int result = 0;
- ResponseCombiner combiner = NULL;
-
-
- /* can set this to false to disable temporarily */
- /* bool do2PC = conn_count > 1; */
-
- /*
- * Only use 2PC if more than one node was written to. Otherwise, just send
- * COMMIT to all
- */
- bool do2PC = write_node_count > 1;
-
- /* Extra XID for Two Phase Commit */
- GlobalTransactionId two_phase_xid = 0;
-
- if (do2PC)
+ if (conn->inCursor + *len > conn->inEnd)
{
- stat_2pc();
-
/*
- * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here,
- * but since we need 2pc, we surely have sent down a command and got
- * gxid for it. Hence GetCurrentGlobalTransactionId() just returns
- * already allocated gxid
+ * Not enough data in the buffer, we should read more.
+ * Reading function will discard already consumed data in the buffer
+ * till conn->inBegin. Then we want the message that is partly in the
+ * buffer now has been read completely, to avoid extra read/handle
+ * cycles. The space needed is 1 byte for message type, 4 bytes for
+ * message length and message itself which size is currently in *len.
+ * The buffer may already be large enough, in this case the function
+ * ensure_in_buffer_capacity() will immediately return
*/
- gxid = GetCurrentGlobalTransactionId();
-
- sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid);
- /* Send PREPARE */
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_query(connections[i], buffer))
- return EOF;
- }
-
- combiner = CreateResponseCombiner(conn_count,
- COMBINE_TYPE_NONE, dest);
- /* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
-
- /* Reset combiner */
- if (!ValidateAndResetCombiner(combiner))
- {
- result = EOF;
- }
- }
-
- if (!do2PC)
- strcpy(buffer, "COMMIT");
- else
- {
- if (result)
- sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid);
- else
- sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid);
-
- /* We need to use a new xid, the data nodes have reset */
- two_phase_xid = BeginTranGTM();
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_gxid(connections[i], two_phase_xid))
- {
- add_error_message(connections[i], "Can not send request");
- result = EOF;
- goto finish;
- }
- }
- }
-
- /* Send COMMIT */
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_query(connections[i], buffer))
- {
- result = EOF;
- goto finish;
- }
+ ensure_in_buffer_capacity(5 + (size_t) *len, conn);
+ conn->state == DN_CONNECTION_STATE_QUERY;
+ conn->inCursor = conn->inStart;
+ return '\0';
}
- if (!combiner)
- combiner = CreateResponseCombiner(conn_count,
- COMBINE_TYPE_NONE, dest);
- /* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
- result = ValidateAndCloseCombiner(combiner) ? result : EOF;
-
-finish:
- if (do2PC)
- CommitTranGTM((GlobalTransactionId) two_phase_xid);
-
- return result;
-}
-
-
-/*
- * Rollback current transaction
- */
-int
-DataNodeRollback(CommandDest dest)
-{
- int res = 0;
- int tran_count;
- DataNodeHandle *connections[node_count];
-
- /* Quick check to make sure we have connections */
- if (node_count == 0)
- goto finish;
-
- /* gather connections to rollback */
- tran_count = get_transaction_nodes(connections);
-
- /*
- * If we do not have open transactions we have nothing to rollback just
- * report success
- */
- if (tran_count == 0)
- goto finish;
-
- res = data_node_rollback(tran_count, connections, dest);
-
-finish:
- /* In autocommit mode statistics is collected in DataNodeExec */
- if (!autocommit)
- stat_transaction(node_count);
- if (!PersistentConnections)
- release_handles();
- autocommit = true;
- clear_write_node_list();
- return res;
+ *msg = conn->inBuffer + conn->inCursor;
+ conn->inCursor += *len;
+ conn->inStart = conn->inCursor;
+ return msgtype;
}
/* Release all data node connections back to pool and release occupied memory */
-static void
+void
release_handles(void)
{
int i;
@@ -1041,320 +557,11 @@ release_handles(void)
/*
- * Send ROLLBACK command down to the Data nodes and handle responses
- */
-static int
-data_node_rollback(int conn_count, DataNodeHandle **connections, CommandDest dest)
-{
- int i;
- struct timeval *timeout = NULL;
- int result = 0;
- ResponseCombiner combiner;
-
- /* Send ROLLBACK - */
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_query(connections[i], "ROLLBACK"))
- result = EOF;
- }
-
- combiner = CreateResponseCombiner(conn_count,
- COMBINE_TYPE_NONE, dest);
- /* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
-
- /* Verify status */
- return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
-}
-
-
-/*
- * Execute specified statement on specified Data nodes, combine responses and
- * send results back to the client
- *
- * const char *query - SQL string to execute
- * List *primarynode - if a write operation on a replicated table, the primary node
- * List *nodelist - the nodes to execute on (excludes primary, if set in primarynode
- * CommandDest dest - destination for results
- * Snapshot snapshot - snapshot to use
- * bool force_autocommit - force autocommit
- * List *simple_aggregates - list of simple aggregates to execute
- * bool is_read_only - if this is a read-only query
- */
-int
-DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type,
- CommandDest dest, Snapshot snapshot, bool force_autocommit,
- List *simple_aggregates, bool is_read_only)
-{
- int i;
- int j;
- int regular_conn_count;
- int total_conn_count;
- struct timeval *timeout = NULL; /* wait forever */
- ResponseCombiner combiner = NULL;
- int row_count = 0;
- int new_count = 0;
- bool need_tran;
- bool found;
- GlobalTransactionId gxid = InvalidGlobalTransactionId;
- DataNodeHandle *new_connections[NumDataNodes];
- DataNodeHandle **connections = NULL;
- DataNodeHandle **primaryconnection = NULL;
- List *nodelist = NIL;
- List *primarynode = NIL;
- /* add up affected row count by default, override for replicated writes */
-
- if (exec_nodes)
- {
- nodelist = exec_nodes->nodelist;
- primarynode = exec_nodes->primarynodelist;
- }
-
- if (list_length(nodelist) == 0)
- {
- if (primarynode)
- regular_conn_count = NumDataNodes - 1;
- else
- regular_conn_count = NumDataNodes;
- }
- else
- {
- regular_conn_count = list_length(nodelist);
- }
-
- total_conn_count = regular_conn_count;
-
- /* Get connection for primary node, if used */
- if (primarynode)
- {
- primaryconnection = get_handles(primarynode);
- total_conn_count++;
- }
-
- /* Get other connections (non-primary) */
- if (regular_conn_count == 0)
- return EOF;
- else
- {
- connections = get_handles(nodelist);
- if (!connections)
- return EOF;
- }
-
- if (force_autocommit)
- need_tran = false;
- else
- need_tran = !autocommit || total_conn_count > 1;
-
- elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
-
- stat_statement();
- if (autocommit)
- stat_transaction(total_conn_count);
-
- /* We normally clear for transactions, but if autocommit, clear here, too */
- if (autocommit == true)
- {
- clear_write_node_list();
- }
-
- /* Check status of connections */
-
- /*
- * We want to track new "write" nodes, and new nodes in the current
- * transaction whether or not they are write nodes.
- */
- if (!is_read_only && write_node_count < NumDataNodes)
- {
- if (primaryconnection)
- {
- found = false;
- for (j = 0; j < write_node_count && !found; j++)
- {
- if (write_node_list[j] == primaryconnection[0])
- found = true;
- }
- if (!found)
- {
- /* Add to transaction wide-list */
- write_node_list[write_node_count++] = primaryconnection[0];
- /* Add to current statement list */
- new_connections[new_count++] = primaryconnection[0];
- }
- }
- for (i = 0; i < regular_conn_count; i++)
- {
- found = false;
- for (j = 0; j < write_node_count && !found; j++)
- {
- if (write_node_list[j] == connections[i])
- found = true;
- }
- if (!found)
- {
- /* Add to transaction wide-list */
- write_node_list[write_node_count++] = connections[i];
- /* Add to current statement list */
- new_connections[new_count++] = connections[i];
- }
- }
- /* Check connection state is DN_CONNECTION_STATE_IDLE */
- }
-
- gxid = GetCurrentGlobalTransactionId();
-
- if (!GlobalTransactionIdIsValid(gxid))
- {
- pfree(connections);
- return EOF;
- }
- if (new_count > 0 && need_tran)
- {
- /* Start transaction on connections where it is not started */
- if (data_node_begin(new_count, new_connections, DestNone, gxid))
- {
- pfree(connections);
- return EOF;
- }
- }
-
- /* See if we have a primary nodes, execute on it first before the others */
- if (primaryconnection)
- {
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid))
- {
- add_error_message(primaryconnection[0], "Can not send request");
- if (connections)
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- return EOF;
- }
- if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
- {
- add_error_message(primaryconnection[0], "Can not send request");
- if (connections)
- pfree(connections);
- pfree(primaryconnection);
- return EOF;
- }
- if (data_node_send_query(primaryconnection[0], query) != 0)
- {
- add_error_message(primaryconnection[0], "Can not send request");
- if (connections)
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- return EOF;
- }
-
- Assert(combine_type == COMBINE_TYPE_SAME);
-
- /*
- * Create combiner.
- * Note that we use the same combiner later with the secondary nodes,
- * so that we do not prematurely send a response to the client
- * until all nodes have completed execution.
- */
- combiner = CreateResponseCombiner(total_conn_count, combine_type, dest);
- AssignCombinerAggregates(combiner, simple_aggregates);
-
- /* Receive responses */
- data_node_receive_responses(1, primaryconnection, timeout, combiner);
- /* If we got an error response return immediately */
- if (DN_CONNECTION_STATE_ERROR(primaryconnection[0]))
- {
- /* We are going to exit, so release combiner */
- CloseCombiner(combiner);
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
-
- if (primaryconnection)
- pfree(primaryconnection);
- if (connections)
- pfree(connections);
- return EOF;
- }
- }
-
- /* Send query to nodes */
- for (i = 0; i < regular_conn_count; i++)
- {
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(connections[i], gxid))
- {
- add_error_message(connections[i], "Can not send request");
- pfree(connections);
- return EOF;
- }
- if (snapshot && data_node_send_snapshot(connections[i], snapshot))
- {
- add_error_message(connections[i], "Can not send request");
- pfree(connections);
- return EOF;
- }
- if (data_node_send_query(connections[i], query) != 0)
- {
- add_error_message(connections[i], "Can not send request");
- pfree(connections);
- return EOF;
- }
- }
-
- /* We may already have combiner if it is replicated case with primary data node */
- if (!combiner)
- {
- combiner = CreateResponseCombiner(regular_conn_count, combine_type, dest);
- AssignCombinerAggregates(combiner, simple_aggregates);
- }
-
- /* Receive responses */
- data_node_receive_responses(regular_conn_count, connections, timeout, combiner);
- row_count = combiner->row_count;
-
- /* Check for errors and if primary nodeMake sure primary and secondary nodes were updated the same */
- if (!ValidateAndCloseCombiner(combiner))
- {
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
-
- pfree(connections);
-
- return EOF;
- }
-
- if (autocommit)
- {
- if (need_tran)
- DataNodeCommit(DestNone); /* PGXCTODO - call CommitTransaction()
- * instead? */
- else if (!PersistentConnections)
- release_handles();
- }
-
- /* Verify status? */
- pfree(connections);
- return 0;
-}
-
-
-/*
* Ensure specified amount of data can fit to the incoming buffer and
* increase it if necessary
*/
-static int
-ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle)
+int
+ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle)
{
int newsize = handle->inSize;
char *newbuf;
@@ -1405,8 +612,8 @@ ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle)
* Ensure specified amount of data can fit to the outgoing buffer and
* increase it if necessary
*/
-static int
-ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle)
+int
+ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle)
{
int newsize = handle->outSize;
char *newbuf;
@@ -1456,8 +663,8 @@ ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle)
/*
* Send specified amount of data from the outgoing buffer over the connection
*/
-static int
-send_some(DataNodeHandle * handle, int len)
+int
+send_some(DataNodeHandle *handle, int len)
{
char *ptr = handle->outBuffer;
int remaining = handle->outEnd;
@@ -1556,7 +763,7 @@ send_some(DataNodeHandle * handle, int len)
* This method won't return until connection buffer is empty or error occurs
* To ensure all data are on the wire before waiting for response
*/
-static int
+int
data_node_flush(DataNodeHandle *handle)
{
while (handle->outEnd)
@@ -1573,8 +780,8 @@ data_node_flush(DataNodeHandle *handle)
/*
* Send specified statement down to the Data node
*/
-static int
-data_node_send_query(DataNodeHandle *handle, const char *query)
+int
+data_node_send_query(DataNodeHandle * handle, const char *query)
{
int strLen = strlen(query) + 1;
@@ -1595,7 +802,7 @@ data_node_send_query(DataNodeHandle *handle, const char *query)
memcpy(handle->outBuffer + handle->outEnd, query, strLen);
handle->outEnd += strLen;
- handle->state = DN_CONNECTION_STATE_BUSY;
+ handle->state = DN_CONNECTION_STATE_QUERY;
return data_node_flush(handle);
}
@@ -1604,7 +811,7 @@ data_node_send_query(DataNodeHandle *handle, const char *query)
/*
* Send the GXID down to the Data node
*/
-static int
+int
data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid)
{
int msglen = 8;
@@ -1632,7 +839,7 @@ data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid)
/*
* Send the snapshot down to the Data node
*/
-static int
+int
data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot)
{
int msglen;
@@ -1686,11 +893,11 @@ data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot)
* Add another message to the list of errors to be returned back to the client
* at the convenient time
*/
-static void
+void
add_error_message(DataNodeHandle *handle, const char *message)
{
handle->transaction_status = 'E';
- handle->state = DN_CONNECTION_STATE_ERROR_READY;
+ handle->state = DN_CONNECTION_STATE_IDLE;
if (handle->error)
{
/* PGXCTODO append */
@@ -1706,16 +913,19 @@ add_error_message(DataNodeHandle *handle, const char *message)
* Special case is empty or NIL nodeList, in this case return all the nodes.
* The returned list should be pfree'd when no longer needed.
*/
-static DataNodeHandle **
+DataNodeHandle **
get_handles(List *nodelist)
{
DataNodeHandle **result;
ListCell *node_list_item;
List *allocate = NIL;
+ MemoryContext old_context;
/* index of the result array */
int i = 0;
+ /* Handles should be there while transaction lasts */
+ old_context = MemoryContextSwitchTo(TopTransactionContext);
/* If node list is empty execute request on current nodes */
if (list_length(nodelist) == 0)
{
@@ -1757,8 +967,8 @@ get_handles(List *nodelist)
{
int node = lfirst_int(node_list_item);
- if (node > NumDataNodes || node <= 0)
- elog(ERROR, "Node number: %d passed is not a known node", node);
+ Assert(node > 0 && node <= NumDataNodes);
+
result[i++] = &handles[node - 1];
if (handles[node - 1].sock == NO_SOCKET)
allocate = lappend_int(allocate, node);
@@ -1781,13 +991,16 @@ get_handles(List *nodelist)
int node = lfirst_int(node_list_item);
int fdsock = fds[j++];
- data_node_init(&handles[node - 1], fdsock);
+ data_node_init(&handles[node - 1], fdsock, node);
node_count++;
}
pfree(fds);
list_free(allocate);
}
+ /* restore context */
+ MemoryContextSwitchTo(old_context);
+
return result;
}
@@ -1802,8 +1015,8 @@ get_handles(List *nodelist)
* The function returns number of pointers written to the connections array.
* Remaining items in the array, if any, will be kept unchanged
*/
-static int
-get_transaction_nodes(DataNodeHandle ** connections)
+int
+get_transaction_nodes(DataNodeHandle **connections)
{
int tran_count = 0;
int i;
@@ -1819,503 +1032,3 @@ get_transaction_nodes(DataNodeHandle ** connections)
return tran_count;
}
-
-
-/*
- * Called when the backend is ending.
- */
-void
-DataNodeCleanAndRelease(int code, Datum arg)
-{
- /* Rollback on Data Nodes */
- if (IsTransactionState())
- {
- DataNodeRollback(DestNone);
-
- /* Rollback on GTM if transaction id opened. */
- RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny());
- }
-
- /* Release data node connections */
- release_handles();
-
- /* Close connection with GTM */
- CloseGTM();
-
- /* Dump collected statistics to the log */
- stat_log();
-}
-
-/*
- * Begin COPY command
- * The copy_connections array must have room for NumDataNodes items
- */
-DataNodeHandle**
-DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from)
-{
- int i, j;
- int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist);
- struct timeval *timeout = NULL;
- DataNodeHandle **connections;
- DataNodeHandle **copy_connections;
- DataNodeHandle *newConnections[conn_count];
- int new_count = 0;
- ListCell *nodeitem;
- bool need_tran;
- GlobalTransactionId gxid;
- ResponseCombiner combiner;
-
- if (conn_count == 0)
- return NULL;
-
- /* Get needed datanode connections */
- connections = get_handles(nodelist);
- if (!connections)
- return NULL;
-
- need_tran = !autocommit || conn_count > 1;
-
- elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false");
-
- /*
- * We need to be able quickly find a connection handle for specified node number,
- * So store connections in an array where index is node-1.
- * Unused items in the array should be NULL
- */
- copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *));
- i = 0;
- foreach(nodeitem, nodelist)
- copy_connections[lfirst_int(nodeitem) - 1] = connections[i++];
-
- /* Gather statistics */
- stat_statement();
- if (autocommit)
- stat_transaction(conn_count);
-
- /* We normally clear for transactions, but if autocommit, clear here, too */
- if (autocommit)
- clear_write_node_list();
-
- /* Check status of connections */
- /* We want to track new "write" nodes, and new nodes in the current transaction
- * whether or not they are write nodes. */
- if (write_node_count < NumDataNodes)
- {
- for (i = 0; i < conn_count; i++)
- {
- bool found = false;
- for (j=0; j<write_node_count && !found; j++)
- {
- if (write_node_list[j] == connections[i])
- found = true;
- }
- if (!found)
- {
- /*
- * Add to transaction wide-list if COPY FROM
- * CopyOut (COPY TO) is not a write operation, no need to update
- */
- if (is_from)
- write_node_list[write_node_count++] = connections[i];
- /* Add to current statement list */
- newConnections[new_count++] = connections[i];
- }
- }
- // Check connection state is DN_CONNECTION_STATE_IDLE
- }
-
- gxid = GetCurrentGlobalTransactionId();
-
- /* elog(DEBUG1, "Current gxid = %d", gxid); */
-
- if (!GlobalTransactionIdIsValid(gxid))
- {
- pfree(connections);
- pfree(copy_connections);
- return NULL;
- }
-
- if (new_count > 0 && need_tran)
- {
- /* Start transaction on connections where it is not started */
- if (data_node_begin(new_count, newConnections, DestNone, gxid))
- {
- pfree(connections);
- pfree(copy_connections);
- return NULL;
- }
- }
-
- /* Send query to nodes */
- for (i = 0; i < conn_count; i++)
- {
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(connections[i], gxid))
- {
- add_error_message(connections[i], "Can not send request");
- pfree(connections);
- pfree(copy_connections);
- return NULL;
- }
- if (snapshot && data_node_send_snapshot(connections[i], snapshot))
- {
- add_error_message(connections[i], "Can not send request");
- pfree(connections);
- pfree(copy_connections);
- return NULL;
- }
- if (data_node_send_query(connections[i], query) != 0)
- {
- add_error_message(connections[i], "Can not send request");
- pfree(connections);
- pfree(copy_connections);
- return NULL;
- }
- }
-
- /*
- * We are expecting CopyIn response, but do not want to send it to client,
- * caller should take care about this, because here we do not know if
- * client runs console or file copy
- */
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, DestNone);
-
- /* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
- if (!ValidateAndCloseCombiner(combiner))
- {
- if (autocommit)
- {
- if (need_tran)
- DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE, DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
-
- pfree(connections);
- pfree(copy_connections);
- return NULL;
- }
-
- pfree(connections);
- return copy_connections;
-}
-
-/*
- * Send a data row to the specified nodes
- */
-int
-DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections)
-{
- DataNodeHandle *primary_handle = NULL;
- ListCell *nodeitem;
- /* size + data row + \n */
- int msgLen = 4 + len + 1;
- int nLen = htonl(msgLen);
-
- if (exec_nodes->primarynodelist)
- primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1];
-
- if (primary_handle)
- {
- if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN)
- {
- /* precalculate to speed up access */
- int bytes_needed = primary_handle->outEnd + 1 + msgLen;
-
- /* flush buffer if it is almost full */
- if (bytes_needed > COPY_BUFFER_SIZE)
- {
- /* First look if data node has sent a error message */
- int read_status = data_node_read_data(primary_handle);
- if (read_status == EOF || read_status < 0)
- {
- add_error_message(primary_handle, "failed to read data from data node");
- return EOF;
- }
-
- if (primary_handle->inStart < primary_handle->inEnd)
- {
- ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone);
- handle_response(primary_handle, combiner);
- if (!ValidateAndCloseCombiner(combiner))
- return EOF;
- }
-
- if (DN_CONNECTION_STATE_ERROR(primary_handle))
- return EOF;
-
- if (send_some(primary_handle, primary_handle->outEnd) < 0)
- {
- add_error_message(primary_handle, "failed to send data to data node");
- return EOF;
- }
- }
-
- if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
-
- primary_handle->outBuffer[primary_handle->outEnd++] = 'd';
- memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4);
- primary_handle->outEnd += 4;
- memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len);
- primary_handle->outEnd += len;
- primary_handle->outBuffer[primary_handle->outEnd++] = '\n';
- }
- else
- {
- add_error_message(primary_handle, "Invalid data node connection");
- return EOF;
- }
- }
-
- foreach(nodeitem, exec_nodes->nodelist)
- {
- DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1];
- if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN)
- {
- /* precalculate to speed up access */
- int bytes_needed = handle->outEnd + 1 + msgLen;
-
- /* flush buffer if it is almost full */
- if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD)
- || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE))
- {
- int to_send = handle->outEnd;
-
- /* First look if data node has sent a error message */
- int read_status = data_node_read_data(handle);
- if (read_status == EOF || read_status < 0)
- {
- add_error_message(handle, "failed to read data from data node");
- return EOF;
- }
-
- if (handle->inStart < handle->inEnd)
- {
- ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone);
- handle_response(handle, combiner);
- if (!ValidateAndCloseCombiner(combiner))
- return EOF;
- }
-
- if (DN_CONNECTION_STATE_ERROR(handle))
- return EOF;
-
- /*
- * Allow primary node to write out data before others.
- * If primary node was blocked it would not accept copy data.
- * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes.
- * If primary node is blocked and is buffering, other buffers will
- * grow accordingly.
- */
- if (primary_handle)
- {
- if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd)
- to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD;
- else
- to_send = 0;
- }
-
- /*
- * Try to send down buffered data if we have
- */
- if (to_send && send_some(handle, to_send) < 0)
- {
- add_error_message(handle, "failed to send data to data node");
- return EOF;
- }
- }
-
- if (ensure_out_buffer_capacity(bytes_needed, handle) != 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
-
- handle->outBuffer[handle->outEnd++] = 'd';
- memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
- handle->outEnd += 4;
- memcpy(handle->outBuffer + handle->outEnd, data_row, len);
- handle->outEnd += len;
- handle->outBuffer[handle->outEnd++] = '\n';
- }
- else
- {
- add_error_message(handle, "Invalid data node connection");
- return EOF;
- }
- }
-
- return 0;
-}
-
-int
-DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle **copy_connections, CommandDest dest, FILE *copy_file)
-{
- ResponseCombiner combiner;
- int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist);
- int count = 0;
- bool need_tran;
- List *nodelist;
- ListCell *nodeitem;
-
- nodelist = exec_nodes->nodelist;
- need_tran = !autocommit || conn_count > 1;
-
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM, dest);
- /* If there is an existing file where to copy data, pass it to combiner */
- if (copy_file)
- combiner->copy_file = copy_file;
-
- foreach(nodeitem, exec_nodes->nodelist)
- {
- DataNodeHandle *handle = copy_connections[count];
- count++;
-
- if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT)
- {
- int read_status = 0;
- /* H message has been consumed, continue to manage data row messages */
- while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */
- {
- if (handle_response(handle,combiner) == EOF)
- {
- /* read some extra-data */
- read_status = data_node_read_data(handle);
- if (read_status < 0)
- return EOF;
- }
- /* There is no more data that can be read from connection */
- }
- }
- }
-
- if (!ValidateAndCloseCombiner(combiner))
- {
- if (autocommit && !PersistentConnections)
- release_handles();
- pfree(copy_connections);
- return EOF;
- }
-
- return 0;
-}
-
-/*
- * Finish copy process on all connections
- */
-uint64
-DataNodeCopyFinish(DataNodeHandle **copy_connections, int primary_data_node,
- CombineType combine_type, CommandDest dest)
-{
- int i;
- int nLen = htonl(4);
- ResponseCombiner combiner = NULL;
- bool need_tran;
- bool res = 0;
- struct timeval *timeout = NULL; /* wait forever */
- DataNodeHandle *connections[NumDataNodes];
- DataNodeHandle *primary_handle = NULL;
- int conn_count = 0;
-
- for (i = 0; i < NumDataNodes; i++)
- {
- DataNodeHandle *handle = copy_connections[i];
-
- if (!handle)
- continue;
-
- if (i == primary_data_node - 1)
- primary_handle = handle;
- else
- connections[conn_count++] = handle;
- }
-
- if (primary_handle)
- {
- if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT)
- {
- /* msgType + msgLen */
- if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
-
- primary_handle->outBuffer[primary_handle->outEnd++] = 'c';
- memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4);
- primary_handle->outEnd += 4;
-
- /* We need response right away, so send immediately */
- if (data_node_flush(primary_handle) < 0)
- res = EOF;
- }
- else
- res = EOF;
-
- combiner = CreateResponseCombiner(conn_count + 1, combine_type, dest);
- data_node_receive_responses(1, &primary_handle, timeout, combiner);
- }
-
- for (i = 0; i < conn_count; i++)
- {
- DataNodeHandle *handle = connections[i];
-
- if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT)
- {
- /* msgType + msgLen */
- if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0)
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
-
- handle->outBuffer[handle->outEnd++] = 'c';
- memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
- handle->outEnd += 4;
-
- /* We need response right away, so send immediately */
- if (data_node_flush(handle) < 0)
- res = EOF;
- }
- else
- res = EOF;
- }
-
- need_tran = !autocommit || primary_handle || conn_count > 1;
-
- if (!combiner)
- combiner = CreateResponseCombiner(conn_count, combine_type, dest);
-
- data_node_receive_responses(conn_count, connections, timeout, combiner);
-
- if (!ValidateAndCloseCombiner(combiner) || res)
- {
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
-
- return 0;
- }
-
- if (autocommit)
- {
- if (need_tran)
- DataNodeCommit(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
-
- // Verify status?
- return combiner->row_count;
-}
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
new file mode 100644
index 0000000000..1ee1d59e99
--- /dev/null
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -0,0 +1,2453 @@
+/*-------------------------------------------------------------------------
+ *
+ * execRemote.c
+ *
+ * Functions to execute commands on remote data nodes
+ *
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
+ *
+ * IDENTIFICATION
+ * $$
+ *
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "access/gtm.h"
+#include "access/xact.h"
+#include "executor/executor.h"
+#include "gtm/gtm_c.h"
+#include "libpq/libpq.h"
+#include "miscadmin.h"
+#include "pgxc/execRemote.h"
+#include "pgxc/poolmgr.h"
+#include "utils/datum.h"
+#include "utils/memutils.h"
+#include "utils/tuplesort.h"
+#include "utils/snapmgr.h"
+
+/*
+ * Buffer size does not affect performance significantly, just do not allow
+ * connection buffer grows infinitely
+ */
+#define COPY_BUFFER_SIZE 8192
+#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024
+
+static bool autocommit = true;
+static DataNodeHandle **write_node_list = NULL;
+static int write_node_count = 0;
+
+static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid);
+static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest);
+static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest);
+
+static void clear_write_node_list();
+
+#define MAX_STATEMENTS_PER_TRAN 10
+
+/* Variables to collect statistics */
+static int total_transactions = 0;
+static int total_statements = 0;
+static int total_autocommit = 0;
+static int nonautocommit_2pc = 0;
+static int autocommit_2pc = 0;
+static int current_tran_statements = 0;
+static int *statements_per_transaction = NULL;
+static int *nodes_per_transaction = NULL;
+
+/*
+ * statistics collection: count a statement
+ */
+static void
+stat_statement()
+{
+ total_statements++;
+ current_tran_statements++;
+}
+
+/*
+ * To collect statistics: count a transaction
+ */
+static void
+stat_transaction(int node_count)
+{
+ total_transactions++;
+ if (autocommit)
+ total_autocommit++;
+ if (!statements_per_transaction)
+ {
+ statements_per_transaction = (int *) malloc((MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int));
+ memset(statements_per_transaction, 0, (MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int));
+ }
+ if (current_tran_statements > MAX_STATEMENTS_PER_TRAN)
+ statements_per_transaction[MAX_STATEMENTS_PER_TRAN]++;
+ else
+ statements_per_transaction[current_tran_statements]++;
+ current_tran_statements = 0;
+ if (node_count > 0 && node_count <= NumDataNodes)
+ {
+ if (!nodes_per_transaction)
+ {
+ nodes_per_transaction = (int *) malloc(NumDataNodes * sizeof(int));
+ memset(nodes_per_transaction, 0, NumDataNodes * sizeof(int));
+ }
+ nodes_per_transaction[node_count - 1]++;
+ }
+}
+
+
+/*
+ * To collect statistics: count a two-phase commit on nodes
+ */
+static void
+stat_2pc()
+{
+ if (autocommit)
+ autocommit_2pc++;
+ else
+ nonautocommit_2pc++;
+}
+
+
+/*
+ * Output collected statistics to the log
+ */
+static void
+stat_log()
+{
+ elog(DEBUG1, "Total Transactions: %d Total Statements: %d", total_transactions, total_statements);
+ elog(DEBUG1, "Autocommit: %d 2PC for Autocommit: %d 2PC for non-Autocommit: %d",
+ total_autocommit, autocommit_2pc, nonautocommit_2pc);
+ if (total_transactions)
+ {
+ if (statements_per_transaction)
+ {
+ int i;
+
+ for (i = 0; i < MAX_STATEMENTS_PER_TRAN; i++)
+ elog(DEBUG1, "%d Statements per Transaction: %d (%d%%)",
+ i, statements_per_transaction[i], statements_per_transaction[i] * 100 / total_transactions);
+ }
+ elog(DEBUG1, "%d+ Statements per Transaction: %d (%d%%)",
+ MAX_STATEMENTS_PER_TRAN, statements_per_transaction[MAX_STATEMENTS_PER_TRAN], statements_per_transaction[MAX_STATEMENTS_PER_TRAN] * 100 / total_transactions);
+ if (nodes_per_transaction)
+ {
+ int i;
+
+ for (i = 0; i < NumDataNodes; i++)
+ elog(DEBUG1, "%d Nodes per Transaction: %d (%d%%)",
+ i + 1, nodes_per_transaction[i], nodes_per_transaction[i] * 100 / total_transactions);
+ }
+ }
+}
+
+
+/*
+ * Create a structure to store parameters needed to combine responses from
+ * multiple connections as well as state information
+ */
+static RemoteQueryState *
+CreateResponseCombiner(int node_count, CombineType combine_type)
+{
+ RemoteQueryState *combiner;
+
+ /* ResponseComber is a typedef for pointer to ResponseCombinerData */
+ combiner = makeNode(RemoteQueryState);
+ if (combiner == NULL)
+ {
+ /* Out of memory */
+ return combiner;
+ }
+
+ combiner->node_count = node_count;
+ combiner->connections = NULL;
+ combiner->conn_count = 0;
+ combiner->combine_type = combine_type;
+ combiner->dest = NULL;
+ combiner->command_complete_count = 0;
+ combiner->row_count = 0;
+ combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
+ combiner->tuple_desc = NULL;
+ combiner->description_count = 0;
+ combiner->copy_in_count = 0;
+ combiner->copy_out_count = 0;
+ combiner->errorMessage = NULL;
+ combiner->query_Done = false;
+ combiner->completionTag = NULL;
+ combiner->msg = NULL;
+ combiner->msglen = 0;
+ combiner->initAggregates = true;
+ combiner->simple_aggregates = NULL;
+ combiner->copy_file = NULL;
+
+ return combiner;
+}
+
+/*
+ * Parse out row count from the command status response and convert it to integer
+ */
+static int
+parse_row_count(const char *message, size_t len, uint64 *rowcount)
+{
+ int digits = 0;
+ int pos;
+
+ *rowcount = 0;
+ /* skip \0 string terminator */
+ for (pos = 0; pos < len - 1; pos++)
+ {
+ if (message[pos] >= '0' && message[pos] <= '9')
+ {
+ *rowcount = *rowcount * 10 + message[pos] - '0';
+ digits++;
+ }
+ else
+ {
+ *rowcount = 0;
+ digits = 0;
+ }
+ }
+ return digits;
+}
+
+/*
+ * Initialize the collection value, when agregation is first set up, or for a
+ * new group (grouping support is not implemented yet)
+ */
+static void
+initialize_collect_aggregates(SimpleAgg *simple_agg)
+{
+ if (simple_agg->initValueIsNull)
+ simple_agg->collectValue = simple_agg->initValue;
+ else
+ simple_agg->collectValue = datumCopy(simple_agg->initValue,
+ simple_agg->transtypeByVal,
+ simple_agg->transtypeLen);
+ simple_agg->noCollectValue = simple_agg->initValueIsNull;
+ simple_agg->collectValueNull = simple_agg->initValueIsNull;
+}
+
+/*
+ * Finalize the aggregate after current group or entire relation is processed
+ * (grouping support is not implemented yet)
+ */
+static void
+finalize_collect_aggregates(SimpleAgg *simple_agg, Datum *resultVal, bool *resultIsNull)
+{
+ /*
+ * Apply the agg's finalfn if one is provided, else return collectValue.
+ */
+ if (OidIsValid(simple_agg->finalfn_oid))
+ {
+ FunctionCallInfoData fcinfo;
+
+ InitFunctionCallInfoData(fcinfo, &(simple_agg->finalfn), 1,
+ (void *) simple_agg, NULL);
+ fcinfo.arg[0] = simple_agg->collectValue;
+ fcinfo.argnull[0] = simple_agg->collectValueNull;
+ if (fcinfo.flinfo->fn_strict && simple_agg->collectValueNull)
+ {
+ /* don't call a strict function with NULL inputs */
+ *resultVal = (Datum) 0;
+ *resultIsNull = true;
+ }
+ else
+ {
+ *resultVal = FunctionCallInvoke(&fcinfo);
+ *resultIsNull = fcinfo.isnull;
+ }
+ }
+ else
+ {
+ *resultVal = simple_agg->collectValue;
+ *resultIsNull = simple_agg->collectValueNull;
+ }
+}
+
+/*
+ * Given new input value(s), advance the transition function of an aggregate.
+ *
+ * The new values (and null flags) have been preloaded into argument positions
+ * 1 and up in fcinfo, so that we needn't copy them again to pass to the
+ * collection function. No other fields of fcinfo are assumed valid.
+ *
+ * It doesn't matter which memory context this is called in.
+ */
+static void
+advance_collect_function(SimpleAgg *simple_agg, FunctionCallInfoData *fcinfo)
+{
+ Datum newVal;
+
+ if (simple_agg->transfn.fn_strict)
+ {
+ /*
+ * For a strict transfn, nothing happens when there's a NULL input; we
+ * just keep the prior transValue.
+ */
+ if (fcinfo->argnull[1])
+ return;
+ if (simple_agg->noCollectValue)
+ {
+ /*
+ * result has not been initialized
+ * We must copy the datum into result if it is pass-by-ref. We
+ * do not need to pfree the old result, since it's NULL.
+ */
+ simple_agg->collectValue = datumCopy(fcinfo->arg[1],
+ simple_agg->transtypeByVal,
+ simple_agg->transtypeLen);
+ simple_agg->collectValueNull = false;
+ simple_agg->noCollectValue = false;
+ return;
+ }
+ if (simple_agg->collectValueNull)
+ {
+ /*
+ * Don't call a strict function with NULL inputs. Note it is
+ * possible to get here despite the above tests, if the transfn is
+ * strict *and* returned a NULL on a prior cycle. If that happens
+ * we will propagate the NULL all the way to the end.
+ */
+ return;
+ }
+ }
+
+ /*
+ * OK to call the transition function
+ */
+ InitFunctionCallInfoData(*fcinfo, &(simple_agg->transfn), 2, (void *) simple_agg, NULL);
+ fcinfo->arg[0] = simple_agg->collectValue;
+ fcinfo->argnull[0] = simple_agg->collectValueNull;
+ newVal = FunctionCallInvoke(fcinfo);
+
+ /*
+ * If pass-by-ref datatype, must copy the new value into aggcontext and
+ * pfree the prior transValue. But if transfn returned a pointer to its
+ * first input, we don't need to do anything.
+ */
+ if (!simple_agg->transtypeByVal &&
+ DatumGetPointer(newVal) != DatumGetPointer(simple_agg->collectValue))
+ {
+ if (!fcinfo->isnull)
+ {
+ newVal = datumCopy(newVal,
+ simple_agg->transtypeByVal,
+ simple_agg->transtypeLen);
+ }
+ if (!simple_agg->collectValueNull)
+ pfree(DatumGetPointer(simple_agg->collectValue));
+ }
+
+ simple_agg->collectValue = newVal;
+ simple_agg->collectValueNull = fcinfo->isnull;
+}
+
+/*
+ * Convert RowDescription message to a TupleDesc
+ */
+static TupleDesc
+create_tuple_desc(char *msg_body, size_t len)
+{
+ TupleDesc result;
+ int i, nattr;
+ uint16 n16;
+
+ /* get number of attributes */
+ memcpy(&n16, msg_body, 2);
+ nattr = ntohs(n16);
+ msg_body += 2;
+
+ result = CreateTemplateTupleDesc(nattr, false);
+
+ /* decode attributes */
+ for (i = 1; i <= nattr; i++)
+ {
+ AttrNumber attnum;
+ char *attname;
+ Oid oidtypeid;
+ int32 typmod;
+
+ uint32 n32;
+
+ attnum = (AttrNumber) i;
+
+ /* attribute name */
+ attname = msg_body;
+ msg_body += strlen(attname) + 1;
+
+ /* table OID, ignored */
+ msg_body += 4;
+
+ /* column no, ignored */
+ msg_body += 2;
+
+ /* data type */
+ memcpy(&n32, msg_body, 4);
+ oidtypeid = ntohl(n32);
+ msg_body += 4;
+
+ /* type len, ignored */
+ msg_body += 2;
+
+ /* type mod */
+ memcpy(&n32, msg_body, 4);
+ typmod = ntohl(n32);
+ msg_body += 4;
+
+ /* PGXCTODO text/binary flag? */
+ msg_body += 2;
+
+ TupleDescInitEntry(result, attnum, attname, oidtypeid, typmod, 0);
+ }
+ return result;
+}
+
+static void
+exec_simple_aggregates(RemoteQueryState *combiner, TupleTableSlot *slot)
+{
+ ListCell *lc;
+
+ Assert(combiner->simple_aggregates);
+ Assert(!TupIsNull(slot));
+
+ if (combiner->initAggregates)
+ {
+ foreach (lc, combiner->simple_aggregates)
+ initialize_collect_aggregates((SimpleAgg *) lfirst(lc));
+
+ combiner->initAggregates = false;
+ }
+
+ foreach (lc, combiner->simple_aggregates)
+ {
+ SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc);
+ FunctionCallInfoData fcinfo;
+ int attr = simple_agg->column_pos;
+
+ slot_getsomeattrs(slot, attr + 1);
+ fcinfo.arg[1] = slot->tts_values[attr];
+ fcinfo.argnull[1] = slot->tts_isnull[attr];
+
+ advance_collect_function(simple_agg, &fcinfo);
+ }
+}
+
+static void
+finish_simple_aggregates(RemoteQueryState *combiner, TupleTableSlot *slot)
+{
+ ListCell *lc;
+ ExecClearTuple(slot);
+
+ /*
+ * Aggregates may not been initialized if no rows has been received
+ * from the data nodes because of HAVING clause.
+ * In this case finish_simple_aggregates() should return empty slot
+ */
+ if (!combiner->initAggregates)
+ {
+ foreach (lc, combiner->simple_aggregates)
+ {
+ SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc);
+ int attr = simple_agg->column_pos;
+
+ finalize_collect_aggregates(simple_agg,
+ slot->tts_values + attr,
+ slot->tts_isnull + attr);
+ }
+ ExecStoreVirtualTuple(slot);
+ /* To prevent aggregates get finalized again */
+ combiner->initAggregates = true;
+ }
+}
+
+/*
+ * Handle CopyOutCommandComplete ('c') message from a data node connection
+ */
+static void
+HandleCopyOutComplete(RemoteQueryState *combiner)
+{
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_COPY_OUT;
+ if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ /* Just do nothing, close message is managed by the coordinator */
+ combiner->copy_out_count++;
+}
+
+/*
+ * Handle CommandComplete ('C') message from a data node connection
+ */
+static void
+HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
+{
+ int digits = 0;
+
+ /*
+ * If we did not receive description we are having rowcount or OK response
+ */
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_COMMAND;
+ /* Extract rowcount */
+ if (combiner->combine_type != COMBINE_TYPE_NONE)
+ {
+ uint64 rowcount;
+ digits = parse_row_count(msg_body, len, &rowcount);
+ if (digits > 0)
+ {
+ /* Replicated write, make sure they are the same */
+ if (combiner->combine_type == COMBINE_TYPE_SAME)
+ {
+ if (combiner->command_complete_count)
+ {
+ if (rowcount != combiner->row_count)
+ /* There is a consistency issue in the database with the replicated table */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Write to replicated table returned different results from the data nodes")));
+ }
+ else
+ /* first result */
+ combiner->row_count = rowcount;
+ }
+ else
+ combiner->row_count += rowcount;
+ }
+ else
+ combiner->combine_type = COMBINE_TYPE_NONE;
+ }
+ if (++combiner->command_complete_count == combiner->node_count)
+ {
+ if (combiner->completionTag)
+ {
+ if (combiner->combine_type == COMBINE_TYPE_NONE)
+ {
+ /* ensure we do not go beyond buffer bounds */
+ if (len > COMPLETION_TAG_BUFSIZE)
+ len = COMPLETION_TAG_BUFSIZE;
+ memcpy(combiner->completionTag, msg_body, len);
+ }
+ else
+ {
+ /* Truncate msg_body to get base string */
+ msg_body[len - digits - 1] = '\0';
+ snprintf(combiner->completionTag,
+ COMPLETION_TAG_BUFSIZE,
+ "%s" UINT64_FORMAT,
+ msg_body,
+ combiner->row_count);
+ }
+ }
+ }
+}
+
+/*
+ * Handle RowDescription ('T') message from a data node connection
+ */
+static bool
+HandleRowDescription(RemoteQueryState *combiner, char *msg_body, size_t len)
+{
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_QUERY;
+ if (combiner->request_type != REQUEST_TYPE_QUERY)
+ {
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ }
+ /* Increment counter and check if it was first */
+ if (combiner->description_count++ == 0)
+ {
+ combiner->tuple_desc = create_tuple_desc(msg_body, len);
+ return true;
+ }
+ return false;
+}
+
+/*
+ * Handle ParameterStatus ('S') message from a data node connection (SET command)
+ */
+static void
+HandleParameterStatus(RemoteQueryState *combiner, char *msg_body, size_t len)
+{
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_QUERY;
+ if (combiner->request_type != REQUEST_TYPE_QUERY)
+ {
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ }
+ /* Proxy last */
+ if (++combiner->description_count == combiner->node_count)
+ {
+ pq_putmessage('S', msg_body, len);
+ }
+}
+
+/*
+ * Handle CopyInResponse ('G') message from a data node connection
+ */
+static void
+HandleCopyIn(RemoteQueryState *combiner)
+{
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_COPY_IN;
+ if (combiner->request_type != REQUEST_TYPE_COPY_IN)
+ {
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ }
+ /*
+ * The normal PG code will output an G message when it runs in the
+ * coordinator, so do not proxy message here, just count it.
+ */
+ combiner->copy_in_count++;
+}
+
+/*
+ * Handle CopyOutResponse ('H') message from a data node connection
+ */
+static void
+HandleCopyOut(RemoteQueryState *combiner)
+{
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_COPY_OUT;
+ if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
+ {
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ }
+ /*
+ * The normal PG code will output an H message when it runs in the
+ * coordinator, so do not proxy message here, just count it.
+ */
+ combiner->copy_out_count++;
+}
+
+/*
+ * Handle CopyOutDataRow ('d') message from a data node connection
+ */
+static void
+HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
+{
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_COPY_OUT;
+
+ /* Inconsistent responses */
+ if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+
+ /* If there is a copy file, data has to be sent to the local file */
+ if (combiner->copy_file)
+ /* write data to the copy file */
+ fwrite(msg_body, 1, len, combiner->copy_file);
+ else
+ pq_putmessage('d', msg_body, len);
+}
+
+/*
+ * Handle DataRow ('D') message from a data node connection
+ * The function returns true if buffer can accept more data rows.
+ * Caller must stop reading if function returns false
+ */
+static void
+HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
+{
+ /* We expect previous message is consumed */
+ Assert(combiner->msg == NULL);
+
+ if (combiner->request_type != REQUEST_TYPE_QUERY)
+ {
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ }
+
+ /*
+ * If we got an error already ignore incoming data rows from other nodes
+ * Still we want to continue reading until get CommandComplete
+ */
+ if (combiner->errorMessage)
+ return;
+
+ /*
+ * We are copying message because it points into connection buffer, and
+ * will be overwritten on next socket read
+ */
+ combiner->msg = (char *) palloc(len);
+ memcpy(combiner->msg, msg_body, len);
+ combiner->msglen = len;
+}
+
+/*
+ * Handle ErrorResponse ('E') message from a data node connection
+ */
+static void
+HandleError(RemoteQueryState *combiner, char *msg_body, size_t len)
+{
+ /* parse error message */
+ char *severity = NULL;
+ char *code = NULL;
+ char *message = NULL;
+ char *detail = NULL;
+ char *hint = NULL;
+ char *position = NULL;
+ char *int_position = NULL;
+ char *int_query = NULL;
+ char *where = NULL;
+ char *file = NULL;
+ char *line = NULL;
+ char *routine = NULL;
+ int offset = 0;
+
+ /*
+ * Scan until point to terminating \0
+ */
+ while (offset + 1 < len)
+ {
+ /* pointer to the field message */
+ char *str = msg_body + offset + 1;
+
+ switch (msg_body[offset])
+ {
+ case 'S':
+ severity = str;
+ break;
+ case 'C':
+ code = str;
+ break;
+ case 'M':
+ message = str;
+ break;
+ case 'D':
+ detail = str;
+ break;
+ case 'H':
+ hint = str;
+ break;
+ case 'P':
+ position = str;
+ break;
+ case 'p':
+ int_position = str;
+ break;
+ case 'q':
+ int_query = str;
+ break;
+ case 'W':
+ where = str;
+ break;
+ case 'F':
+ file = str;
+ break;
+ case 'L':
+ line = str;
+ break;
+ case 'R':
+ routine = str;
+ break;
+ }
+
+ /* code, message and \0 */
+ offset += strlen(str) + 2;
+ }
+
+ /*
+ * We may have special handling for some errors, default handling is to
+ * throw out error with the same message. We can not ereport immediately
+ * because we should read from this and other connections until
+ * ReadyForQuery is received, so we just store the error message.
+ * If multiple connections return errors only first one is reported.
+ */
+ if (!combiner->errorMessage)
+ {
+ combiner->errorMessage = pstrdup(message);
+ /* Error Code is exactly 5 significant bytes */
+ memcpy(combiner->errorCode, code, 5);
+ }
+
+ /*
+ * If data node have sent ErrorResponse it will never send CommandComplete.
+ * Increment the counter to prevent endless waiting for it.
+ */
+ combiner->command_complete_count++;
+}
+
+/*
+ * Examine the specified combiner state and determine if command was completed
+ * successfully
+ */
+static bool
+validate_combiner(RemoteQueryState *combiner)
+{
+ /* There was error message while combining */
+ if (combiner->errorMessage)
+ return false;
+ /* Check if state is defined */
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ return false;
+ /* Check all nodes completed */
+ if ((combiner->request_type == REQUEST_TYPE_COMMAND
+ || combiner->request_type == REQUEST_TYPE_QUERY)
+ && combiner->command_complete_count != combiner->node_count)
+ return false;
+
+ /* Check count of description responses */
+ if (combiner->request_type == REQUEST_TYPE_QUERY
+ && combiner->description_count != combiner->node_count)
+ return false;
+
+ /* Check count of copy-in responses */
+ if (combiner->request_type == REQUEST_TYPE_COPY_IN
+ && combiner->copy_in_count != combiner->node_count)
+ return false;
+
+ /* Check count of copy-out responses */
+ if (combiner->request_type == REQUEST_TYPE_COPY_OUT
+ && combiner->copy_out_count != combiner->node_count)
+ return false;
+
+ /* Add other checks here as needed */
+
+ /* All is good if we are here */
+ return true;
+}
+
+/*
+ * Close combiner and free allocated memory, if it is not needed
+ */
+static void
+CloseCombiner(RemoteQueryState *combiner)
+{
+ if (combiner)
+ {
+ if (combiner->connections)
+ pfree(combiner->connections);
+ if (combiner->tuple_desc)
+ FreeTupleDesc(combiner->tuple_desc);
+ if (combiner->errorMessage)
+ pfree(combiner->errorMessage);
+ pfree(combiner);
+ }
+}
+
+/*
+ * Validate combiner and release storage freeing allocated memory
+ */
+static bool
+ValidateAndCloseCombiner(RemoteQueryState *combiner)
+{
+ bool valid = validate_combiner(combiner);
+
+ CloseCombiner(combiner);
+
+ return valid;
+}
+
+/*
+ * Validate combiner and reset storage
+ */
+static bool
+ValidateAndResetCombiner(RemoteQueryState *combiner)
+{
+ bool valid = validate_combiner(combiner);
+
+ if (combiner->connections)
+ pfree(combiner->connections);
+ if (combiner->tuple_desc)
+ FreeTupleDesc(combiner->tuple_desc);
+ if (combiner->msg)
+ pfree(combiner->msg);
+ if (combiner->errorMessage)
+ pfree(combiner->errorMessage);
+
+ combiner->command_complete_count = 0;
+ combiner->connections = NULL;
+ combiner->conn_count = 0;
+ combiner->row_count = 0;
+ combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
+ combiner->tuple_desc = NULL;
+ combiner->description_count = 0;
+ combiner->copy_in_count = 0;
+ combiner->copy_out_count = 0;
+ combiner->errorMessage = NULL;
+ combiner->query_Done = false;
+ combiner->msg = NULL;
+ combiner->msglen = 0;
+ combiner->simple_aggregates = NULL;
+ combiner->copy_file = NULL;
+
+ return valid;
+}
+
+/*
+ * Get next data row from the combiner's buffer into provided slot
+ * Just clear slot and return false if buffer is empty, that means more data
+ * should be read
+ */
+bool
+FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot)
+{
+ /* have messages in the buffer, consume them */
+ if (combiner->msg)
+ {
+ ExecStoreDataRowTuple(combiner->msg, combiner->msglen, slot, true);
+ combiner->msg = NULL;
+ combiner->msglen = 0;
+ return true;
+ }
+ /* inform caller that buffer is empty */
+ ExecClearTuple(slot);
+ return false;
+}
+
+
+/*
+ * Handle responses from the Data node connections
+ */
+static void
+data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
+ struct timeval * timeout, RemoteQueryState *combiner)
+{
+ int count = conn_count;
+ DataNodeHandle *to_receive[conn_count];
+
+ /* make a copy of the pointers to the connections */
+ memcpy(to_receive, connections, conn_count * sizeof(DataNodeHandle *));
+
+ /*
+ * Read results.
+ * Note we try and read from data node connections even if there is an error on one,
+ * so as to avoid reading incorrect results on the next statement.
+ * It might be better to just destroy these connections and tell the pool manager.
+ */
+ while (count > 0)
+ {
+ int i = 0;
+
+ data_node_receive(count, to_receive, timeout);
+ while (i < count)
+ {
+ switch (handle_response(to_receive[i], combiner))
+ {
+ case RESPONSE_EOF: /* have something to read, keep receiving */
+ i++;
+ break;
+ case RESPONSE_COMPLETE:
+ case RESPONSE_COPY:
+ /* Handling is done, do not track this connection */
+ count--;
+ /* Move last connection in place */
+ if (i < count)
+ to_receive[i] = to_receive[count];
+ break;
+ default:
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from the data nodes")));
+ }
+ }
+ }
+}
+
+/*
+ * Read next message from the connection and update the combiner accordingly
+ * If we are in an error state we just consume the messages, and do not proxy
+ * Long term, we should look into cancelling executing statements
+ * and closing the connections.
+ * Return values:
+ * EOF - need to receive more data for the connection
+ * 0 - done with the connection
+ * 1 - got data row
+ * 2 - got copy response
+ */
+int
+handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
+{
+ char *msg;
+ int msg_len;
+
+ for (;;)
+ {
+ /* No data available, exit */
+ if (conn->state == DN_CONNECTION_STATE_QUERY)
+ return RESPONSE_EOF;
+
+ /* TODO handle other possible responses */
+ switch (get_message(conn, &msg_len, &msg))
+ {
+ case '\0': /* Not enough data in the buffer */
+ conn->state = DN_CONNECTION_STATE_QUERY;
+ return RESPONSE_EOF;
+ case 'c': /* CopyToCommandComplete */
+ conn->state = DN_CONNECTION_STATE_COMPLETED;
+ HandleCopyOutComplete(combiner);
+ break;
+ case 'C': /* CommandComplete */
+ conn->state = DN_CONNECTION_STATE_COMPLETED;
+ HandleCommandComplete(combiner, msg, msg_len);
+ break;
+ case 'T': /* RowDescription */
+ if (HandleRowDescription(combiner, msg, msg_len))
+ return RESPONSE_TUPDESC;
+ break;
+ case 'D': /* DataRow */
+ HandleDataRow(combiner, msg, msg_len);
+ return RESPONSE_DATAROW;
+ case 'G': /* CopyInResponse */
+ conn->state = DN_CONNECTION_STATE_COPY_IN;
+ HandleCopyIn(combiner);
+ /* Done, return to caller to let it know the data can be passed in */
+ return RESPONSE_COPY;
+ case 'H': /* CopyOutResponse */
+ conn->state = DN_CONNECTION_STATE_COPY_OUT;
+ HandleCopyOut(combiner);
+ return RESPONSE_COPY;
+ case 'd': /* CopyOutDataRow */
+ conn->state = DN_CONNECTION_STATE_COPY_OUT;
+ HandleCopyDataRow(combiner, msg, msg_len);
+ break;
+ case 'E': /* ErrorResponse */
+ HandleError(combiner, msg, msg_len);
+ conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
+ /*
+ * Do not return with an error, we still need to consume Z,
+ * ready-for-query
+ */
+ break;
+ case 'A': /* NotificationResponse */
+ case 'N': /* NoticeResponse */
+ /*
+ * Ignore these to prevent multiple messages, one from each
+ * node. Coordinator will send one for DDL anyway
+ */
+ break;
+ case 'Z': /* ReadyForQuery */
+ conn->transaction_status = msg[0];
+ conn->state = DN_CONNECTION_STATE_IDLE;
+ return RESPONSE_COMPLETE;
+ case 'I': /* EmptyQuery */
+ default:
+ /* sync lost? */
+ conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
+ return RESPONSE_EOF;
+ }
+ }
+ /* Keep compiler quiet */
+ return RESPONSE_EOF;
+}
+
+/*
+ * Send BEGIN command to the Data nodes and receive responses
+ */
+static int
+data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid)
+{
+ int i;
+ struct timeval *timeout = NULL;
+ RemoteQueryState *combiner;
+
+ /* Send BEGIN */
+ for (i = 0; i < conn_count; i++)
+ {
+ if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid))
+ return EOF;
+
+ if (data_node_send_query(connections[i], "BEGIN"))
+ return EOF;
+ }
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ combiner->dest = None_Receiver;
+
+ /* Receive responses */
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+
+ /* Verify status */
+ return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
+}
+
+/* Clears the write node list */
+static void
+clear_write_node_list()
+{
+ /* we just malloc once and use counter */
+ if (write_node_list == NULL)
+ {
+ write_node_list = (DataNodeHandle **) malloc(NumDataNodes * sizeof(DataNodeHandle *));
+ }
+ write_node_count = 0;
+}
+
+
+/*
+ * Switch autocommmit mode off, so all subsequent statements will be in the same transaction
+ */
+void
+DataNodeBegin(void)
+{
+ autocommit = false;
+ clear_write_node_list();
+}
+
+
+/*
+ * Commit current transaction, use two-phase commit if necessary
+ */
+int
+DataNodeCommit(CommandDest dest)
+{
+ int res;
+ int tran_count;
+ DataNodeHandle *connections[NumDataNodes];
+
+ /* gather connections to commit */
+ tran_count = get_transaction_nodes(connections);
+
+ /*
+ * If we do not have open transactions we have nothing to commit, just
+ * report success
+ */
+ if (tran_count == 0)
+ goto finish;
+
+ res = data_node_commit(tran_count, connections, dest);
+
+finish:
+ /* In autocommit mode statistics is collected in DataNodeExec */
+ if (!autocommit)
+ stat_transaction(tran_count);
+ if (!PersistentConnections)
+ release_handles();
+ autocommit = true;
+ clear_write_node_list();
+ return res;
+}
+
+
+/*
+ * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses
+ */
+static int
+data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest)
+{
+ int i;
+ struct timeval *timeout = NULL;
+ char buffer[256];
+ GlobalTransactionId gxid = InvalidGlobalTransactionId;
+ int result = 0;
+ RemoteQueryState *combiner = NULL;
+
+
+ /* can set this to false to disable temporarily */
+ /* bool do2PC = conn_count > 1; */
+
+ /*
+ * Only use 2PC if more than one node was written to. Otherwise, just send
+ * COMMIT to all
+ */
+ bool do2PC = write_node_count > 1;
+
+ /* Extra XID for Two Phase Commit */
+ GlobalTransactionId two_phase_xid = 0;
+
+ if (do2PC)
+ {
+ stat_2pc();
+
+ /*
+ * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here,
+ * but since we need 2pc, we surely have sent down a command and got
+ * gxid for it. Hence GetCurrentGlobalTransactionId() just returns
+ * already allocated gxid
+ */
+ gxid = GetCurrentGlobalTransactionId();
+
+ sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid);
+ /* Send PREPARE */
+ for (i = 0; i < conn_count; i++)
+ {
+ if (data_node_send_query(connections[i], buffer))
+ return EOF;
+ }
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ combiner->dest = None_Receiver;
+ /* Receive responses */
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+
+ /* Reset combiner */
+ if (!ValidateAndResetCombiner(combiner))
+ {
+ result = EOF;
+ }
+ }
+
+ if (!do2PC)
+ strcpy(buffer, "COMMIT");
+ else
+ {
+ if (result)
+ sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid);
+ else
+ sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid);
+
+ /* We need to use a new xid, the data nodes have reset */
+ two_phase_xid = BeginTranGTM();
+ for (i = 0; i < conn_count; i++)
+ {
+ if (data_node_send_gxid(connections[i], two_phase_xid))
+ {
+ add_error_message(connections[i], "Can not send request");
+ result = EOF;
+ goto finish;
+ }
+ }
+ }
+
+ /* Send COMMIT */
+ for (i = 0; i < conn_count; i++)
+ {
+ if (data_node_send_query(connections[i], buffer))
+ {
+ result = EOF;
+ goto finish;
+ }
+ }
+
+ if (!combiner)
+ {
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ combiner->dest = None_Receiver;
+ }
+ /* Receive responses */
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+ result = ValidateAndCloseCombiner(combiner) ? result : EOF;
+
+finish:
+ if (do2PC)
+ CommitTranGTM((GlobalTransactionId) two_phase_xid);
+
+ return result;
+}
+
+
+/*
+ * Rollback current transaction
+ */
+int
+DataNodeRollback(CommandDest dest)
+{
+ int res = 0;
+ int tran_count;
+ DataNodeHandle *connections[NumDataNodes];
+
+ /* gather connections to rollback */
+ tran_count = get_transaction_nodes(connections);
+
+ /*
+ * If we do not have open transactions we have nothing to rollback just
+ * report success
+ */
+ if (tran_count == 0)
+ goto finish;
+
+ res = data_node_rollback(tran_count, connections, dest);
+
+finish:
+ /* In autocommit mode statistics is collected in DataNodeExec */
+ if (!autocommit)
+ stat_transaction(tran_count);
+ if (!PersistentConnections)
+ release_handles();
+ autocommit = true;
+ clear_write_node_list();
+ return res;
+}
+
+
+/*
+ * Send ROLLBACK command down to the Data nodes and handle responses
+ */
+static int
+data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest)
+{
+ int i;
+ struct timeval *timeout = NULL;
+ int result = 0;
+ RemoteQueryState *combiner;
+
+ /* Send ROLLBACK - */
+ for (i = 0; i < conn_count; i++)
+ {
+ if (data_node_send_query(connections[i], "ROLLBACK"))
+ result = EOF;
+ }
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ combiner->dest = None_Receiver;
+ /* Receive responses */
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+
+ /* Verify status */
+ return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
+}
+
+
+/*
+ * Begin COPY command
+ * The copy_connections array must have room for NumDataNodes items
+ */
+DataNodeHandle**
+DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from)
+{
+ int i, j;
+ int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist);
+ struct timeval *timeout = NULL;
+ DataNodeHandle **connections;
+ DataNodeHandle **copy_connections;
+ DataNodeHandle *newConnections[conn_count];
+ int new_count = 0;
+ ListCell *nodeitem;
+ bool need_tran;
+ GlobalTransactionId gxid;
+ RemoteQueryState *combiner;
+
+ if (conn_count == 0)
+ return NULL;
+
+ /* Get needed datanode connections */
+ connections = get_handles(nodelist);
+ if (!connections)
+ return NULL;
+
+ need_tran = !autocommit || conn_count > 1;
+
+ elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false");
+
+ /*
+ * We need to be able quickly find a connection handle for specified node number,
+ * So store connections in an array where index is node-1.
+ * Unused items in the array should be NULL
+ */
+ copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *));
+ i = 0;
+ foreach(nodeitem, nodelist)
+ copy_connections[lfirst_int(nodeitem) - 1] = connections[i++];
+
+ /* Gather statistics */
+ stat_statement();
+ if (autocommit)
+ stat_transaction(conn_count);
+
+ /* We normally clear for transactions, but if autocommit, clear here, too */
+ if (autocommit)
+ {
+ clear_write_node_list();
+ }
+
+ /* Check status of connections */
+ /* We want to track new "write" nodes, and new nodes in the current transaction
+ * whether or not they are write nodes. */
+ if (write_node_count < NumDataNodes)
+ {
+ for (i = 0; i < conn_count; i++)
+ {
+ bool found = false;
+ for (j=0; j<write_node_count && !found; j++)
+ {
+ if (write_node_list[j] == connections[i])
+ found = true;
+ }
+ if (!found)
+ {
+ /*
+ * Add to transaction wide-list if COPY FROM
+ * CopyOut (COPY TO) is not a write operation, no need to update
+ */
+ if (is_from)
+ write_node_list[write_node_count++] = connections[i];
+ /* Add to current statement list */
+ newConnections[new_count++] = connections[i];
+ }
+ }
+ // Check connection state is DN_CONNECTION_STATE_IDLE
+ }
+
+ gxid = GetCurrentGlobalTransactionId();
+
+ /* elog(DEBUG1, "Current gxid = %d", gxid); */
+
+ if (!GlobalTransactionIdIsValid(gxid))
+ {
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ if (new_count > 0 && need_tran)
+ {
+ /* Start transaction on connections where it is not started */
+ if (data_node_begin(new_count, newConnections, DestNone, gxid))
+ {
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ }
+
+ /* Send query to nodes */
+ for (i = 0; i < conn_count; i++)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(connections[i], gxid))
+ {
+ add_error_message(connections[i], "Can not send request");
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ if (snapshot && data_node_send_snapshot(connections[i], snapshot))
+ {
+ add_error_message(connections[i], "Can not send request");
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ if (data_node_send_query(connections[i], query) != 0)
+ {
+ add_error_message(connections[i], "Can not send request");
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ }
+
+ /*
+ * We are expecting CopyIn response, but do not want to send it to client,
+ * caller should take care about this, because here we do not know if
+ * client runs console or file copy
+ */
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ combiner->dest = None_Receiver;
+
+ /* Receive responses */
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (!ValidateAndCloseCombiner(combiner))
+ {
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE);
+ else
+ if (!PersistentConnections) release_handles();
+ }
+
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+
+ pfree(connections);
+ return copy_connections;
+}
+
+/*
+ * Send a data row to the specified nodes
+ */
+int
+DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections)
+{
+ DataNodeHandle *primary_handle = NULL;
+ ListCell *nodeitem;
+ /* size + data row + \n */
+ int msgLen = 4 + len + 1;
+ int nLen = htonl(msgLen);
+
+ if (exec_nodes->primarynodelist)
+ {
+ primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1];
+ }
+
+ if (primary_handle)
+ {
+ if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN)
+ {
+ /* precalculate to speed up access */
+ int bytes_needed = primary_handle->outEnd + 1 + msgLen;
+
+ /* flush buffer if it is almost full */
+ if (bytes_needed > COPY_BUFFER_SIZE)
+ {
+ /* First look if data node has sent a error message */
+ int read_status = data_node_read_data(primary_handle);
+ if (read_status == EOF || read_status < 0)
+ {
+ add_error_message(primary_handle, "failed to read data from data node");
+ return EOF;
+ }
+
+ if (primary_handle->inStart < primary_handle->inEnd)
+ {
+ RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE);
+ combiner->dest = None_Receiver;
+ handle_response(primary_handle, combiner);
+ if (!ValidateAndCloseCombiner(combiner))
+ return EOF;
+ }
+
+ if (DN_CONNECTION_STATE_ERROR(primary_handle))
+ return EOF;
+
+ if (send_some(primary_handle, primary_handle->outEnd) < 0)
+ {
+ add_error_message(primary_handle, "failed to send data to data node");
+ return EOF;
+ }
+ }
+
+ if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ primary_handle->outBuffer[primary_handle->outEnd++] = 'd';
+ memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4);
+ primary_handle->outEnd += 4;
+ memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len);
+ primary_handle->outEnd += len;
+ primary_handle->outBuffer[primary_handle->outEnd++] = '\n';
+ }
+ else
+ {
+ add_error_message(primary_handle, "Invalid data node connection");
+ return EOF;
+ }
+ }
+
+ foreach(nodeitem, exec_nodes->nodelist)
+ {
+ DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1];
+ if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN)
+ {
+ /* precalculate to speed up access */
+ int bytes_needed = handle->outEnd + 1 + msgLen;
+
+ /* flush buffer if it is almost full */
+ if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD)
+ || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE))
+ {
+ int to_send = handle->outEnd;
+
+ /* First look if data node has sent a error message */
+ int read_status = data_node_read_data(handle);
+ if (read_status == EOF || read_status < 0)
+ {
+ add_error_message(handle, "failed to read data from data node");
+ return EOF;
+ }
+
+ if (handle->inStart < handle->inEnd)
+ {
+ RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE);
+ combiner->dest = None_Receiver;
+ handle_response(handle, combiner);
+ if (!ValidateAndCloseCombiner(combiner))
+ return EOF;
+ }
+
+ if (DN_CONNECTION_STATE_ERROR(handle))
+ return EOF;
+
+ /*
+ * Allow primary node to write out data before others.
+ * If primary node was blocked it would not accept copy data.
+ * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes.
+ * If primary node is blocked and is buffering, other buffers will
+ * grow accordingly.
+ */
+ if (primary_handle)
+ {
+ if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd)
+ to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD;
+ else
+ to_send = 0;
+ }
+
+ /*
+ * Try to send down buffered data if we have
+ */
+ if (to_send && send_some(handle, to_send) < 0)
+ {
+ add_error_message(handle, "failed to send data to data node");
+ return EOF;
+ }
+ }
+
+ if (ensure_out_buffer_capacity(bytes_needed, handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'd';
+ memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
+ handle->outEnd += 4;
+ memcpy(handle->outBuffer + handle->outEnd, data_row, len);
+ handle->outEnd += len;
+ handle->outBuffer[handle->outEnd++] = '\n';
+ }
+ else
+ {
+ add_error_message(handle, "Invalid data node connection");
+ return EOF;
+ }
+ }
+
+ return 0;
+}
+
+uint64
+DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file)
+{
+ RemoteQueryState *combiner;
+ int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist);
+ int count = 0;
+ bool need_tran;
+ List *nodelist;
+ ListCell *nodeitem;
+ uint64 processed = 0;
+
+ nodelist = exec_nodes->nodelist;
+ need_tran = !autocommit || conn_count > 1;
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM);
+ combiner->dest = None_Receiver;
+ /* If there is an existing file where to copy data, pass it to combiner */
+ if (copy_file)
+ combiner->copy_file = copy_file;
+
+ foreach(nodeitem, exec_nodes->nodelist)
+ {
+ DataNodeHandle *handle = copy_connections[count];
+ count++;
+
+ if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ int read_status = 0;
+ /* H message has been consumed, continue to manage data row messages */
+ while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */
+ {
+ if (handle_response(handle,combiner) == RESPONSE_EOF)
+ {
+ /* read some extra-data */
+ read_status = data_node_read_data(handle);
+ if (read_status < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("unexpected EOF on datanode connection")));
+ }
+ /* There is no more data that can be read from connection */
+ }
+ }
+ }
+
+ processed = combiner->row_count;
+
+ if (!ValidateAndCloseCombiner(combiner))
+ {
+ if (autocommit && !PersistentConnections)
+ release_handles();
+ pfree(copy_connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ }
+
+ return processed;
+}
+
+/*
+ * Finish copy process on all connections
+ */
+uint64
+DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
+ CombineType combine_type)
+{
+ int i;
+ int nLen = htonl(4);
+ RemoteQueryState *combiner = NULL;
+ bool need_tran;
+ bool error = false;
+ struct timeval *timeout = NULL; /* wait forever */
+ DataNodeHandle *connections[NumDataNodes];
+ DataNodeHandle *primary_handle = NULL;
+ int conn_count = 0;
+ uint64 processed;
+
+ for (i = 0; i < NumDataNodes; i++)
+ {
+ DataNodeHandle *handle = copy_connections[i];
+
+ if (!handle)
+ continue;
+
+ if (i == primary_data_node - 1)
+ primary_handle = handle;
+ else
+ connections[conn_count++] = handle;
+ }
+
+ if (primary_handle)
+ {
+ if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ primary_handle->outBuffer[primary_handle->outEnd++] = 'c';
+ memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4);
+ primary_handle->outEnd += 4;
+
+ /* We need response right away, so send immediately */
+ if (data_node_flush(primary_handle) < 0)
+ {
+ error = true;
+ }
+ }
+ else
+ {
+ error = true;
+ }
+
+ combiner = CreateResponseCombiner(conn_count + 1, combine_type);
+ combiner->dest = None_Receiver;
+ data_node_receive_responses(1, &primary_handle, timeout, combiner);
+ }
+
+ for (i = 0; i < conn_count; i++)
+ {
+ DataNodeHandle *handle = connections[i];
+
+ if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'c';
+ memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
+ handle->outEnd += 4;
+
+ /* We need response right away, so send immediately */
+ if (data_node_flush(handle) < 0)
+ {
+ error = true;
+ }
+ }
+ else
+ {
+ error = true;
+ }
+ }
+
+ need_tran = !autocommit || primary_handle || conn_count > 1;
+
+ if (!combiner)
+ {
+ combiner = CreateResponseCombiner(conn_count, combine_type);
+ combiner->dest = None_Receiver;
+ }
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+
+ processed = combiner->row_count;
+
+ if (!ValidateAndCloseCombiner(combiner) || error)
+ {
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeRollback(DestNone);
+ else
+ if (!PersistentConnections) release_handles();
+ }
+
+ return 0;
+ }
+
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeCommit(DestNone);
+ else
+ if (!PersistentConnections) release_handles();
+ }
+
+ return processed;
+}
+
+RemoteQueryState *
+ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
+{
+ RemoteQueryState *remotestate;
+
+ remotestate = CreateResponseCombiner(0, node->combine_type);
+ remotestate->ss.ps.plan = (Plan *) node;
+ remotestate->ss.ps.state = estate;
+ remotestate->simple_aggregates = node->simple_aggregates;
+
+ ExecInitResultTupleSlot(estate, &remotestate->ss.ps);
+ if (node->plan.targetlist)
+ {
+ TupleDesc typeInfo = ExecCleanTypeFromTL(node->plan.targetlist, false);
+ ExecSetSlotDescriptor(remotestate->ss.ps.ps_ResultTupleSlot, typeInfo);
+ }
+
+ ExecInitScanTupleSlot(estate, &remotestate->ss);
+ /*
+ * Tuple description for the scan slot will be set on runtime from
+ * a RowDescription message
+ */
+
+ if (node->distinct)
+ {
+ /* prepare equate functions */
+ remotestate->eqfunctions =
+ execTuplesMatchPrepare(node->distinct->numCols,
+ node->distinct->eqOperators);
+ /* create memory context for execTuplesMatch */
+ remotestate->tmp_ctx =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "RemoteUnique",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ }
+ return remotestate;
+}
+
+
+static void
+copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
+{
+ if (src->tts_dataRow
+ && dst->tts_tupleDescriptor->natts == src->tts_tupleDescriptor->natts)
+ {
+ if (src->tts_mcxt == dst->tts_mcxt)
+ {
+ /* now dst slot controls the backing message */
+ ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, dst, src->tts_shouldFreeRow);
+ src->tts_shouldFreeRow = false;
+ }
+ else
+ {
+ /* have to make a copy */
+ MemoryContext oldcontext = MemoryContextSwitchTo(dst->tts_mcxt);
+ int len = src->tts_dataLen;
+ char *msg = (char *) palloc(len);
+
+ memcpy(msg, src->tts_dataRow, len);
+ ExecStoreDataRowTuple(msg, len, dst, true);
+ MemoryContextSwitchTo(oldcontext);
+ }
+ }
+ else
+ {
+ int i;
+ ExecClearTuple(dst);
+ slot_getallattrs(src);
+ /* PGXCTODO revisit: probably incorrect */
+ for (i = 0; i < dst->tts_tupleDescriptor->natts; i++)
+ {
+ dst->tts_values[i] = src->tts_values[i];
+ dst->tts_isnull[i] = src->tts_isnull[i];
+ }
+ ExecStoreVirtualTuple(dst);
+ }
+}
+
+/*
+ * Execute step of PGXC plan.
+ * The step specifies a command to be executed on specified nodes.
+ * On first invocation connections to the data nodes are initialized and
+ * command is executed. Further, as well as within subsequent invocations,
+ * responses are received until step is completed or there is a tuple to emit.
+ * If there is a tuple it is returned, otherwise returned NULL. The NULL result
+ * from the function indicates completed step.
+ * The function returns at most one tuple per invocation.
+ */
+TupleTableSlot *
+ExecRemoteQuery(RemoteQueryState *node)
+{
+ RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan;
+ EState *estate = node->ss.ps.state;
+ TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
+ TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
+
+ if (!node->query_Done)
+ {
+ /* First invocation, initialize */
+ Exec_Nodes *exec_nodes = step->exec_nodes;
+ bool force_autocommit = step->force_autocommit;
+ bool is_read_only = step->read_only;
+ GlobalTransactionId gxid = InvalidGlobalTransactionId;
+ Snapshot snapshot = GetActiveSnapshot();
+ DataNodeHandle **connections = NULL;
+ DataNodeHandle **primaryconnection = NULL;
+ List *nodelist = NIL;
+ List *primarynode = NIL;
+ int i;
+ int j;
+ int regular_conn_count;
+ int total_conn_count;
+ bool need_tran;
+
+ if (exec_nodes)
+ {
+ nodelist = exec_nodes->nodelist;
+ primarynode = exec_nodes->primarynodelist;
+ }
+
+ if (list_length(nodelist) == 0)
+ {
+ if (primarynode)
+ regular_conn_count = NumDataNodes - 1;
+ else
+ regular_conn_count = NumDataNodes;
+ }
+ else
+ {
+ regular_conn_count = list_length(nodelist);
+ }
+
+ total_conn_count = regular_conn_count;
+ node->node_count = total_conn_count;
+
+ /* Get connection for primary node, if used */
+ if (primarynode)
+ {
+ primaryconnection = get_handles(primarynode);
+ if (!primaryconnection)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not obtain connection from pool")));
+ total_conn_count++;
+ }
+
+ /* Get other connections (non-primary) */
+ connections = get_handles(nodelist);
+ if (!connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not obtain connection from pool")));
+
+ if (force_autocommit)
+ need_tran = false;
+ else
+ need_tran = !autocommit || total_conn_count > 1;
+
+ elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
+
+ stat_statement();
+ if (autocommit)
+ {
+ stat_transaction(total_conn_count);
+ /* We normally clear for transactions, but if autocommit, clear here, too */
+ clear_write_node_list();
+ }
+
+ /* Check status of connections */
+ /*
+ * We would want to run 2PC if current transaction modified more then
+ * one node. So optimize little bit and do not look further if we
+ * already have two.
+ */
+ if (!is_read_only && write_node_count < 2)
+ {
+ bool found;
+
+ if (primaryconnection)
+ {
+ found = false;
+ for (j = 0; j < write_node_count && !found; j++)
+ {
+ if (write_node_list[j] == primaryconnection[0])
+ found = true;
+ }
+ if (!found)
+ {
+ /* Add to transaction wide-list */
+ write_node_list[write_node_count++] = primaryconnection[0];
+ }
+ }
+ for (i = 0; i < regular_conn_count && write_node_count < 2; i++)
+ {
+ found = false;
+ for (j = 0; j < write_node_count && !found; j++)
+ {
+ if (write_node_list[j] == connections[i])
+ found = true;
+ }
+ if (!found)
+ {
+ /* Add to transaction wide-list */
+ write_node_list[write_node_count++] = connections[i];
+ }
+ }
+ }
+
+ gxid = GetCurrentGlobalTransactionId();
+
+ if (!GlobalTransactionIdIsValid(gxid))
+ {
+ if (primaryconnection)
+ pfree(primaryconnection);
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to get next transaction ID")));
+ }
+
+ if (need_tran)
+ {
+ /*
+ * Check if data node connections are in transaction and start
+ * transactions on nodes where it is not started
+ */
+ DataNodeHandle *new_connections[total_conn_count];
+ int new_count = 0;
+
+ if (primaryconnection && primaryconnection[0]->transaction_status != 'T')
+ new_connections[new_count++] = primaryconnection[0];
+ for (i = 0; i < regular_conn_count; i++)
+ if (connections[i]->transaction_status != 'T')
+ new_connections[new_count++] = connections[i];
+
+ if (new_count)
+ data_node_begin(new_count, new_connections, DestNone, gxid);
+ }
+
+ /* See if we have a primary nodes, execute on it first before the others */
+ if (primaryconnection)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid))
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (data_node_send_query(primaryconnection[0], step->sql_statement) != 0)
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+
+ Assert(node->combine_type == COMBINE_TYPE_SAME);
+
+ while (node->command_complete_count < 1)
+ {
+ PG_TRY();
+ {
+ data_node_receive(1, primaryconnection, NULL);
+ while (handle_response(primaryconnection[0], node) == RESPONSE_EOF)
+ data_node_receive(1, primaryconnection, NULL);
+ if (node->errorMessage)
+ {
+ char *code = node->errorCode;
+ ereport(ERROR,
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", node->errorMessage)));
+ }
+ }
+ /* If we got an error response return immediately */
+ PG_CATCH();
+ {
+ /* We are going to exit, so release combiner */
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeRollback(DestNone);
+ else if (!PersistentConnections)
+ release_handles();
+ }
+
+ pfree(primaryconnection);
+ pfree(connections);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+ pfree(primaryconnection);
+ }
+
+ for (i = 0; i < regular_conn_count; i++)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(connections[i], gxid))
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && data_node_send_snapshot(connections[i], snapshot))
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (data_node_send_query(connections[i], step->sql_statement) != 0)
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ }
+
+ PG_TRY();
+ {
+ /*
+ * Stop if all commands are completed or we got a data row and
+ * initialized state node for subsequent invocations
+ */
+ while (regular_conn_count > 0 && node->connections == NULL)
+ {
+ int i = 0;
+
+ data_node_receive(regular_conn_count, connections, NULL);
+ /*
+ * Handle input from the data nodes.
+ * If we got a RESPONSE_DATAROW we can break handling to wrap
+ * it into a tuple and return. Handling will be continued upon
+ * subsequent invocations.
+ * If we got 0, we exclude connection from the list. We do not
+ * expect more input from it. In case of non-SELECT query we quit
+ * the loop when all nodes finish their work and send ReadyForQuery
+ * with empty connections array.
+ * If we got EOF, move to the next connection, will receive more
+ * data on the next iteration.
+ */
+ while (i < regular_conn_count)
+ {
+ int res = handle_response(connections[i], node);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (i < --regular_conn_count)
+ connections[i] = connections[regular_conn_count];
+ }
+ else if (res == RESPONSE_TUPDESC)
+ {
+ ExecSetSlotDescriptor(scanslot, node->tuple_desc);
+ /*
+ * we should send to client not the tuple_desc we just
+ * received, but tuple_desc from the planner.
+ * Data node may be sending junk columns for sorting
+ */
+ (*node->dest->rStartup) (node->dest, CMD_SELECT,
+ resultslot->tts_tupleDescriptor);
+ if (step->sort)
+ {
+ SimpleSort *sort = step->sort;
+
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
+ /*
+ * First message is already in the buffer
+ * Further fetch will be under tuplesort control
+ * If query does not produce rows tuplesort will not
+ * be initialized
+ */
+ node->tuplesortstate = tuplesort_begin_merge(
+ node->tuple_desc,
+ sort->numCols,
+ sort->sortColIdx,
+ sort->sortOperators,
+ sort->nullsFirst,
+ node,
+ work_mem);
+ /*
+ * Break the loop, do not wait for first row.
+ * Tuplesort module want to control node it is
+ * fetching rows from, while in this loop first
+ * row would be got from random node
+ */
+ break;
+ }
+ }
+ else if (res == RESPONSE_DATAROW)
+ {
+ /*
+ * Got first data row, quit the loop
+ */
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
+ node->current_conn = i;
+ break;
+ }
+ }
+ }
+ }
+ /* If we got an error response return immediately */
+ PG_CATCH();
+ {
+ /* We are going to exit, so release combiner */
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeRollback(DestNone);
+ else if (!PersistentConnections)
+ release_handles();
+ }
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ node->query_Done = true;
+ node->need_tran = need_tran;
+ }
+
+ PG_TRY();
+ {
+ bool have_tuple = false;
+
+ if (node->tuplesortstate)
+ {
+ while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
+ true, scanslot))
+ {
+ have_tuple = true;
+ /*
+ * If DISTINCT is specified and current tuple matches to
+ * previous skip it and get next one.
+ * Othervise return current tuple
+ */
+ if (step->distinct)
+ {
+ /*
+ * Always receive very first tuple and
+ * skip to next if scan slot match to previous (result slot)
+ */
+ if (!TupIsNull(resultslot) &&
+ execTuplesMatch(scanslot,
+ resultslot,
+ step->distinct->numCols,
+ step->distinct->uniqColIdx,
+ node->eqfunctions,
+ node->tmp_ctx))
+ {
+ have_tuple = false;
+ continue;
+ }
+ }
+ copy_slot(node, scanslot, resultslot);
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ break;
+ }
+ if (!have_tuple)
+ ExecClearTuple(resultslot);
+ }
+ else
+ {
+ while (node->conn_count > 0 && !have_tuple)
+ {
+ int i;
+
+ /*
+ * If combiner already has tuple go ahead and return it
+ * otherwise tuple will be cleared
+ */
+ if (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
+ {
+ if (node->simple_aggregates)
+ {
+ /*
+ * Advance aggregate functions and allow to read up next
+ * data row message and get tuple in the same slot on
+ * next iteration
+ */
+ exec_simple_aggregates(node, scanslot);
+ }
+ else
+ {
+ /*
+ * Receive current slot and read up next data row
+ * message before exiting the loop. Next time when this
+ * function is invoked we will have either data row
+ * message ready or EOF
+ */
+ copy_slot(node, scanslot, resultslot);
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ have_tuple = true;
+ }
+ }
+
+ /*
+ * Handle input to get next row or ensure command is completed,
+ * starting from connection next after current. If connection
+ * does not
+ */
+ if ((i = node->current_conn + 1) == node->conn_count)
+ i = 0;
+
+ for (;;)
+ {
+ int res = handle_response(node->connections[i], node);
+ if (res == RESPONSE_EOF)
+ {
+ /* go to next connection */
+ if (++i == node->conn_count)
+ i = 0;
+ /* if we cycled over all connections we need to receive more */
+ if (i == node->current_conn)
+ data_node_receive(node->conn_count, node->connections, NULL);
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (--node->conn_count == 0)
+ break;
+ if (i == node->conn_count)
+ i = 0;
+ else
+ node->connections[i] = node->connections[node->conn_count];
+ if (node->current_conn == node->conn_count)
+ node->current_conn = i;
+ }
+ else if (res == RESPONSE_DATAROW)
+ {
+ node->current_conn = i;
+ break;
+ }
+ }
+ }
+
+ /*
+ * We may need to finalize aggregates
+ */
+ if (!have_tuple && node->simple_aggregates)
+ {
+ finish_simple_aggregates(node, resultslot);
+ if (!TupIsNull(resultslot))
+ {
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ have_tuple = true;
+ }
+ }
+
+ if (!have_tuple) /* report end of scan */
+ ExecClearTuple(resultslot);
+
+ }
+
+ if (node->errorMessage)
+ {
+ char *code = node->errorCode;
+ ereport(ERROR,
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", node->errorMessage)));
+ }
+
+ /*
+ * If command is completed we should commit work.
+ */
+ if (node->conn_count == 0 && autocommit && node->need_tran)
+ DataNodeCommit(DestNone);
+ }
+ /* If we got an error response return immediately */
+ PG_CATCH();
+ {
+ /* We are going to exit, so release combiner */
+ if (autocommit)
+ {
+ if (node->need_tran)
+ DataNodeRollback(DestNone);
+ else if (!PersistentConnections)
+ release_handles();
+ }
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return resultslot;
+}
+
+void
+ExecEndRemoteQuery(RemoteQueryState *node)
+{
+ (*node->dest->rShutdown) (node->dest);
+ if (node->tmp_ctx)
+ MemoryContextDelete(node->tmp_ctx);
+ CloseCombiner(node);
+}
+
+
+/*
+ * Called when the backend is ending.
+ */
+void
+DataNodeCleanAndRelease(int code, Datum arg)
+{
+ /* Rollback on Data Nodes */
+ if (IsTransactionState())
+ {
+ DataNodeRollback(DestNone);
+
+ /* Rollback on GTM if transaction id opened. */
+ RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny());
+ }
+
+ /* Release data node connections */
+ release_handles();
+
+ /* Close connection with GTM */
+ CloseGTM();
+
+ /* Dump collected statistics to the log */
+ stat_log();
+}
+
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8c58b62940..077e8f9190 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -77,6 +77,7 @@
#include "pgxc/pgxc.h"
#include "access/gtm.h"
/* PGXC_COORD */
+#include "pgxc/execRemote.h"
#include "pgxc/planner.h"
#include "pgxc/datanode.h"
#include "commands/copy.h"
@@ -198,7 +199,6 @@ static void log_disconnections(int code, Datum arg);
#ifdef PGXC /* PGXC_DATANODE */
static void pgxc_transaction_stmt (Node *parsetree);
-static List * pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bool snapshot_set, bool *exec_on_coord);
/* ----------------------------------------------------------------
* PG-XC routines
@@ -900,11 +900,9 @@ exec_simple_query(const char *query_string)
DestReceiver *receiver;
int16 format;
#ifdef PGXC
- Query_Plan *query_plan;
- Query_Step *query_step;
+ Query_Plan *query_plan;
+ RemoteQuery *query_step;
bool exec_on_coord;
- int data_node_error = 0;
-
/*
* By default we do not want data nodes to contact GTM directly,
@@ -977,12 +975,78 @@ exec_simple_query(const char *query_string)
pgxc_transaction_stmt(parsetree);
else if (IsA(parsetree, ExecDirectStmt))
- querytree_list = pgxc_execute_direct(parsetree, querytree_list, dest, snapshot_set, &exec_on_coord);
+ {
+ ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree;
+ List *inner_parse_tree_list;
+
+ Assert(IS_PGXC_COORDINATOR);
+
+ exec_on_coord = execdirect->coordinator;
+
+ /*
+ * Switch to appropriate context for constructing parse and
+ * query trees (these must outlive the execution context).
+ */
+ oldcontext = MemoryContextSwitchTo(MessageContext);
+
+ inner_parse_tree_list = pg_parse_query(execdirect->query);
+ /*
+ * we do not support complex commands (expanded to multiple
+ * parse trees) within EXEC DIRECT
+ */
+ if (list_length(parsetree_list) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Can not execute %s with EXECUTE DIRECT",
+ execdirect->query)));
+ }
+ parsetree = linitial(inner_parse_tree_list);
+ /*
+ * Set up a snapshot if parse analysis/planning will need
+ * one.
+ */
+ if (analyze_requires_snapshot(parsetree))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+
+ querytree_list = pg_analyze_and_rewrite(parsetree,
+ query_string,
+ NULL,
+ 0);
+
+ if (execdirect->nodes)
+ {
+ ListCell *lc;
+ Query *query = (Query *) linitial(querytree_list);
+
+ query_plan = (Query_Plan *) palloc0(sizeof(Query_Plan));
+ query_step = makeNode(RemoteQuery);
+ query_step->plan.targetlist = query->targetList;
+ query_step->sql_statement = pstrdup(execdirect->query);
+ query_step->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
+ foreach (lc, execdirect->nodes)
+ {
+ int node = intVal(lfirst(lc));
+ query_step->exec_nodes->nodelist = lappend_int(query_step->exec_nodes->nodelist, node);
+ }
+ query_step->combine_type = COMBINE_TYPE_SAME;
+
+ query_plan->query_step_list = lappend(NULL, query_step);
+ query_plan->exec_loc_type = EXEC_ON_DATA_NODES;
+ }
+
+ /* Restore context */
+ MemoryContextSwitchTo(oldcontext);
+
+ }
else if (IsA(parsetree, CopyStmt))
{
- CopyStmt *copy = (CopyStmt *) parsetree;
- bool done;
+ CopyStmt *copy = (CopyStmt *) parsetree;
+ uint64 processed;
/* Snapshot is needed for the Copy */
if (!snapshot_set)
{
@@ -994,13 +1058,15 @@ exec_simple_query(const char *query_string)
* Datanode or on Coordinator.
* If a table has no locator data, then IsCoordPortalCopy returns false and copy is launched
* on Coordinator instead (e.g., using pg_catalog tables).
- * If a table has some locator data (user tables), then copy is launched normally
+ * If a table has some locator data (user tables), then copy was launched normally
* in Datanodes
*/
if (!IsCoordPortalCopy(copy))
{
- DoCopy(copy, query_string, false);
exec_on_coord = false;
+ processed = DoCopy(copy, query_string, false);
+ snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
+ "COPY " UINT64_FORMAT, processed);
}
else
exec_on_coord = true;
@@ -1015,6 +1081,7 @@ exec_simple_query(const char *query_string)
/* First execute on the coordinator, if involved (DDL), then data nodes */
}
+ plantree_list = NIL;
if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE)
#endif
plantree_list = pg_plan_queries(querytree_list, 0, NULL);
@@ -1036,10 +1103,11 @@ exec_simple_query(const char *query_string)
if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment)
SetForceXidFromGTM(true);
- /* PGXC_COORD */
- /* Force getting Xid from GTM if not autovacuum, but a vacuum */
- /* Skip the Portal stuff on coordinator if command only executes on data nodes */
- if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE)
+ /*
+ * Create and run Portal only if it is needed.
+ * In some special cases we have nothing to run at this point
+ */
+ if (plantree_list || query_plan)
{
#endif
@@ -1102,6 +1170,11 @@ exec_simple_query(const char *query_string)
*/
MemoryContextSwitchTo(oldcontext);
+#ifdef PGXC
+ /* Skip the Portal stuff on coordinator if command only executes on data nodes */
+ if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE)
+ {
+#endif
/*
* Run the portal to completion, and then drop it (and the receiver).
*/
@@ -1112,10 +1185,6 @@ exec_simple_query(const char *query_string)
receiver,
completionTag);
- (*receiver->rDestroy) (receiver);
-
- PortalDrop(portal, false);
-
#ifdef PGXC
}
@@ -1125,36 +1194,45 @@ exec_simple_query(const char *query_string)
{
if (query_plan && (query_plan->exec_loc_type & EXEC_ON_DATA_NODES))
{
+ RemoteQueryState *state;
+ TupleTableSlot *slot;
+ EState *estate = CreateExecutorState();
+ oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
query_step = linitial(query_plan->query_step_list);
-
- data_node_error = DataNodeExec(query_step->sql_statement,
- query_step->exec_nodes,
- query_step->combine_type,
- dest,
- snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(),
- query_plan->force_autocommit,
- query_step->simple_aggregates,
- IsA(parsetree, SelectStmt));
-
- if (data_node_error)
+ estate->es_tupleTable = ExecCreateTupleTable(2);
+ state = ExecInitRemoteQuery(query_step, estate, 0);
+ state->dest = receiver;
+ state->completionTag = completionTag;
+ if (!snapshot_set)
{
- /* An error occurred, change status on Coordinator, too,
- * even if no statements ran on it.
- * We want to only allow COMMIT/ROLLBACK
- */
- AbortCurrentTransaction();
- xact_started = false;
- /* AT() clears active snapshot */
- snapshot_set = false;
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+ do
+ {
+ slot = ExecRemoteQuery(state);
}
+ while (!TupIsNull(slot));
+
+ ExecEndRemoteQuery(state);
+ /* Restore context */
+ MemoryContextSwitchTo(oldcontext);
}
FreeQueryPlan(query_plan);
}
+#endif /* PGXC_COORD */
+
+ (*receiver->rDestroy) (receiver);
+
+ PortalDrop(portal, false);
+
+#ifdef PGXC
+ }
if (snapshot_set)
PopActiveSnapshot();
-#endif /* PGXC_COORD */
+#endif
if (IsA(parsetree, TransactionStmt))
{
@@ -1186,11 +1264,6 @@ exec_simple_query(const char *query_string)
*/
CommandCounterIncrement();
}
-#ifdef PGXC /* PGXC_COORD */
- /* In case of PGXC handling client already received a response */
- if ((IS_PGXC_COORDINATOR && exec_on_coord && !data_node_error) || IS_PGXC_DATANODE)
- {
-#endif
/*
* Tell client that we're done with this query. Note we emit exactly
@@ -1199,9 +1272,6 @@ exec_simple_query(const char *query_string)
* aborted by error will not send an EndCommand report at all.)
*/
EndCommand(completionTag, dest);
-#ifdef PGXC /* PGXC_COORD */
- }
-#endif
} /* end loop over parsetrees */
/*
@@ -4328,80 +4398,4 @@ pgxc_transaction_stmt (Node *parsetree)
}
}
}
-
-
-/*
- * Handle EXECUTE DIRECT
- */
-List *
-pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bool snapshot_set, bool *exec_on_coord)
-{
- List *parsetree_list;
- ListCell *node_cell;
- ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree;
- bool on_coord = execdirect->coordinator;
- Exec_Nodes *exec_nodes;
-
-
- Assert(IS_PGXC_COORDINATOR);
- Assert(IsA(parsetree, ExecDirectStmt));
-
-
- exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
-
- foreach (node_cell, execdirect->nodes)
- {
- int node_int = intVal(lfirst(node_cell));
- exec_nodes->nodelist = lappend_int(exec_nodes->nodelist, node_int);
- }
- if (exec_nodes->nodelist)
- if (DataNodeExec(execdirect->query,
- exec_nodes,
- COMBINE_TYPE_SAME,
- dest,
- snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(),
- FALSE,
- FALSE,
- FALSE) != 0)
- on_coord = false;
-
- if (on_coord)
- {
- /*
- * Parse inner statement, like at the begiining of the function
- * We do not have to release wrapper trees, the message context
- * will be deleted later
- * Also, no need to switch context - current is already
- * the MessageContext
- */
- parsetree_list = pg_parse_query(execdirect->query);
-
- /* We do not want to log or display the inner command */
-
- /*
- * we do not support complex commands (expanded to multiple
- * parse trees) within EXEC DIRECT
- */
- if (list_length(parsetree_list) != 1)
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("Can not execute %s with EXECUTE DIRECT",
- execdirect->query)));
- }
-
- /*
- * Get parse tree from the list
- */
- parsetree = (Node *) lfirst(list_head(parsetree_list));
-
- /*
- * Build new query tree */
- querytree_list = pg_analyze_and_rewrite(parsetree,
- execdirect->query, NULL, 0);
- }
- *exec_on_coord = on_coord;
-
- return querytree_list;
-}
#endif /* PGXC */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 268fe5a8c5..0676f0da0d 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -55,6 +55,7 @@
#include "parser/scansup.h"
#include "pgstat.h"
#ifdef PGXC
+#include "pgxc/execRemote.h"
#include "pgxc/locator.h"
#include "pgxc/planner.h"
#include "pgxc/poolmgr.h"
@@ -1261,7 +1262,7 @@ static struct config_bool ConfigureNamesBool[] =
{"strict_select_checking", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("Forbid if SELECT has ORDER BY"),
gettext_noop("and is not safe for the cluster"),
- GUC_NOT_IN_SAMPLE
+ GUC_NOT_IN_SAMPLE
},
&StrictSelectChecking,
false, NULL, NULL
@@ -1301,7 +1302,7 @@ static struct config_int ConfigureNamesInt[] =
gettext_noop("This applies to table columns that have not had a "
"column-specific target set via ALTER TABLE SET STATISTICS.")
},
- &default_statistics_target,
+ &default_statistics_target,
100, 1, 10000, NULL, NULL
},
{
@@ -2008,8 +2009,8 @@ static struct config_int ConfigureNamesInt[] =
NULL
},
&NumDataNodes,
- 2, 1, 65535, NULL, NULL
- },
+ 2, 1, 65535, NULL, NULL
+ },
{
{"min_pool_size", PGC_POSTMASTER, DATA_NODES,
@@ -2018,8 +2019,8 @@ static struct config_int ConfigureNamesInt[] =
"new connections are established")
},
&MinPoolSize,
- 1, 1, 65535, NULL, NULL
- },
+ 1, 1, 65535, NULL, NULL
+ },
{
{"max_pool_size", PGC_POSTMASTER, DATA_NODES,
@@ -2028,8 +2029,8 @@ static struct config_int ConfigureNamesInt[] =
"other connection requests will be refused")
},
&MaxPoolSize,
- 100, 1, 65535, NULL, NULL
- },
+ 100, 1, 65535, NULL, NULL
+ },
{
{"pooler_port", PGC_POSTMASTER, DATA_NODES,
@@ -2037,8 +2038,8 @@ static struct config_int ConfigureNamesInt[] =
NULL
},
&PoolerPort,
- 6667, 1, 65535, NULL, NULL
- },
+ 6667, 1, 65535, NULL, NULL
+ },
{
{"gtm_port", PGC_POSTMASTER, GTM,
@@ -2046,8 +2047,8 @@ static struct config_int ConfigureNamesInt[] =
NULL
},
&GtmPort,
- 6666, 1, 65535, NULL, NULL
- },
+ 6666, 1, 65535, NULL, NULL
+ },
{
{"gtm_coordinator_id", PGC_POSTMASTER, GTM,
@@ -2055,8 +2056,8 @@ static struct config_int ConfigureNamesInt[] =
NULL
},
&GtmCoordinatorId,
- 1, 1, INT_MAX, NULL, NULL
- },
+ 1, 1, INT_MAX, NULL, NULL
+ },
{
{"primary_data_node", PGC_POSTMASTER, DATA_NODES,
@@ -2064,8 +2065,8 @@ static struct config_int ConfigureNamesInt[] =
NULL
},
&primary_data_node,
- 1, 0, INT_MAX, NULL, NULL
- },
+ 1, 0, INT_MAX, NULL, NULL
+ },
#endif
/* End-of-list marker */
{
@@ -2624,8 +2625,8 @@ static struct config_string ConfigureNamesString[] =
gettext_noop("A list of data nodes to read from replicated tables")
},
&PreferredDataNodes,
- "", NULL, NULL
- },
+ "", NULL, NULL
+ },
{
{"data_node_hosts", PGC_POSTMASTER, DATA_NODES,
diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c
index 9b08df366d..8161e9bc92 100644
--- a/src/backend/utils/mmgr/mcxt.c
+++ b/src/backend/utils/mmgr/mcxt.c
@@ -507,7 +507,7 @@ MemoryContextAlloc(MemoryContext context, Size size)
AssertArg(MemoryContextIsValid(context));
if (!AllocSizeIsValid(size))
- elog(ERROR, "invalid memory alloc request size %lu",
+ elog(PANIC, "invalid memory alloc request size %lu",
(unsigned long) size);
return (*context->methods->alloc) (context, size);
@@ -528,7 +528,7 @@ MemoryContextAllocZero(MemoryContext context, Size size)
AssertArg(MemoryContextIsValid(context));
if (!AllocSizeIsValid(size))
- elog(ERROR, "invalid memory alloc request size %lu",
+ elog(PANIC, "invalid memory alloc request size %lu",
(unsigned long) size);
ret = (*context->methods->alloc) (context, size);
@@ -553,7 +553,7 @@ MemoryContextAllocZeroAligned(MemoryContext context, Size size)
AssertArg(MemoryContextIsValid(context));
if (!AllocSizeIsValid(size))
- elog(ERROR, "invalid memory alloc request size %lu",
+ elog(PANIC, "invalid memory alloc request size %lu",
(unsigned long) size);
ret = (*context->methods->alloc) (context, size);
@@ -617,7 +617,7 @@ repalloc(void *pointer, Size size)
AssertArg(MemoryContextIsValid(header->context));
if (!AllocSizeIsValid(size))
- elog(ERROR, "invalid memory alloc request size %lu",
+ elog(PANIC, "invalid memory alloc request size %lu",
(unsigned long) size);
return (*header->context->methods->realloc) (header->context,
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 6d07296d59..c506ce064f 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -107,6 +107,9 @@
#include "commands/tablespace.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#ifdef PGXC
+#include "pgxc/execRemote.h"
+#endif
#include "utils/datum.h"
#include "utils/logtape.h"
#include "utils/lsyscache.h"
@@ -121,6 +124,9 @@
#define HEAP_SORT 0
#define INDEX_SORT 1
#define DATUM_SORT 2
+#ifdef PGXC
+#define MERGE_SORT 3
+#endif
/* GUC variables */
#ifdef TRACE_SORT
@@ -213,6 +219,9 @@ struct Tuplesortstate
int tapeRange; /* maxTapes-1 (Knuth's P) */
MemoryContext sortcontext; /* memory context holding all sort data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
+#ifdef PGXC
+ RemoteQueryState *combiner; /* tuple source, alternate to tapeset */
+#endif
/*
* These function pointers decouple the routines that must know what kind
@@ -253,6 +262,14 @@ struct Tuplesortstate
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
+#ifdef PGXC
+ /*
+ * Function to read length of next stored tuple.
+ * Used as 'len' parameter for readtup function.
+ */
+ unsigned int (*getlen) (Tuplesortstate *state, int tapenum, bool eofOK);
+#endif
+
/*
* Function to reverse the sort direction from its current state. (We
* could dispense with this if we wanted to enforce that all variants
@@ -378,6 +395,9 @@ struct Tuplesortstate
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
+#ifdef PGXC
+#define GETLEN(state,tape,eofOK) ((*(state)->getlen) (state, tape, eofOK))
+#endif
#define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
#define LACKMEM(state) ((state)->availMem < 0)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
@@ -450,6 +470,12 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void reversedirection_heap(Tuplesortstate *state);
+#ifdef PGXC
+static unsigned int getlen_datanode(Tuplesortstate *state, int tapenum,
+ bool eofOK);
+static void readtup_datanode(Tuplesortstate *state, SortTuple *stup,
+ int tapenum, unsigned int len);
+#endif
static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
@@ -587,6 +613,9 @@ tuplesort_begin_heap(TupleDesc tupDesc,
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
+#ifdef PGXC
+ state->getlen = getlen;
+#endif
state->reversedirection = reversedirection_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
@@ -657,6 +686,9 @@ tuplesort_begin_index_btree(Relation indexRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
+#ifdef PGXC
+ state->getlen = getlen;
+#endif
state->reversedirection = reversedirection_index_btree;
state->indexRel = indexRel;
@@ -692,6 +724,9 @@ tuplesort_begin_index_hash(Relation indexRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
+#ifdef PGXC
+ state->getlen = getlen;
+#endif
state->reversedirection = reversedirection_index_hash;
state->indexRel = indexRel;
@@ -736,6 +771,9 @@ tuplesort_begin_datum(Oid datumType,
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
+#ifdef PGXC
+ state->getlen = getlen;
+#endif
state->reversedirection = reversedirection_datum;
state->datumType = datumType;
@@ -762,6 +800,121 @@ tuplesort_begin_datum(Oid datumType,
return state;
}
+#ifdef PGXC
+/*
+ * Tuples are coming from source where they are already sorted.
+ * It is pretty much like sorting heap tuples but no need to load sorter.
+ * Sorter initial status is final merge, and correct readtup and getlen
+ * callbacks should be passed in.
+ * Usage pattern of the merge sorter
+ * tuplesort_begin_merge
+ * while (tuple = tuplesort_gettuple())
+ * {
+ * // process
+ * }
+ * tuplesort_end_merge
+ */
+Tuplesortstate *
+tuplesort_begin_merge(TupleDesc tupDesc,
+ int nkeys, AttrNumber *attNums,
+ Oid *sortOperators, bool *nullsFirstFlags,
+ RemoteQueryState *combiner,
+ int workMem)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, false);
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+ AssertArg(nkeys > 0);
+ AssertArg(combiner);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin merge sort: nkeys = %d, workMem = %d", nkeys, workMem);
+#endif
+
+ state->nKeys = nkeys;
+
+ TRACE_POSTGRESQL_SORT_START(MERGE_SORT,
+ false, /* no unique check */
+ nkeys,
+ workMem,
+ false);
+
+ state->combiner = combiner;
+ state->comparetup = comparetup_heap;
+ state->copytup = NULL;
+ state->writetup = NULL;
+ state->readtup = readtup_datanode;
+ state->getlen = getlen_datanode;
+ state->reversedirection = reversedirection_heap;
+
+ state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
+ state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData));
+
+ for (i = 0; i < nkeys; i++)
+ {
+ Oid sortFunction;
+ bool reverse;
+
+ AssertArg(attNums[i] != 0);
+ AssertArg(sortOperators[i] != 0);
+
+ if (!get_compare_function_for_ordering_op(sortOperators[i],
+ &sortFunction, &reverse))
+ elog(ERROR, "operator %u is not a valid ordering operator",
+ sortOperators[i]);
+
+ /*
+ * We needn't fill in sk_strategy or sk_subtype since these scankeys
+ * will never be passed to an index.
+ */
+ ScanKeyInit(&state->scanKeys[i],
+ attNums[i],
+ InvalidStrategy,
+ sortFunction,
+ (Datum) 0);
+
+ /* However, we use btree's conventions for encoding directionality */
+ if (reverse)
+ state->scanKeys[i].sk_flags |= SK_BT_DESC;
+ if (nullsFirstFlags[i])
+ state->scanKeys[i].sk_flags |= SK_BT_NULLS_FIRST;
+ }
+
+ /*
+ * logical tape in this case is a sorted stream
+ */
+ state->maxTapes = combiner->conn_count;
+ state->tapeRange = combiner->conn_count;
+
+ state->mergeactive = (bool *) palloc0(combiner->conn_count * sizeof(bool));
+ state->mergenext = (int *) palloc0(combiner->conn_count * sizeof(int));
+ state->mergelast = (int *) palloc0(combiner->conn_count * sizeof(int));
+ state->mergeavailslots = (int *) palloc0(combiner->conn_count * sizeof(int));
+ state->mergeavailmem = (long *) palloc0(combiner->conn_count * sizeof(long));
+
+ state->tp_runs = (int *) palloc0(combiner->conn_count * sizeof(int));
+ state->tp_dummy = (int *) palloc0(combiner->conn_count * sizeof(int));
+ state->tp_tapenum = (int *) palloc0(combiner->conn_count * sizeof(int));
+ /* mark each stream (tape) has one run */
+ for (i = 0; i < combiner->conn_count; i++)
+ {
+ state->tp_runs[i] = 1;
+ state->tp_tapenum[i] = i;
+ }
+ beginmerge(state);
+ state->status = TSS_FINALMERGE;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+#endif
+
/*
* tuplesort_set_bound
*
@@ -1296,7 +1449,6 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
sizeof(unsigned int)))
return false;
tuplen = getlen(state, state->result_tape, false);
-
/*
* Back up to get ending length word of tuple before it.
*/
@@ -2005,7 +2157,11 @@ mergeprereadone(Tuplesortstate *state, int srcTape)
state->mergenext[srcTape] == 0)
{
/* read next tuple, if any */
+#ifdef PGXC
+ if ((tuplen = GETLEN(state, srcTape, true)) == 0)
+#else
if ((tuplen = getlen(state, srcTape, true)) == 0)
+#endif
{
state->mergeactive[srcTape] = false;
break;
@@ -2468,7 +2624,6 @@ markrunend(Tuplesortstate *state, int tapenum)
LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
}
-
/*
* Set up for an external caller of ApplySortFunction. This function
* basically just exists to localize knowledge of the encoding of sk_flags
@@ -2716,6 +2871,56 @@ reversedirection_heap(Tuplesortstate *state)
}
}
+#ifdef PGXC
+static unsigned int
+getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK)
+{
+ for (;;)
+ {
+ switch (handle_response(state->combiner->connections[tapenum], state->combiner))
+ {
+ case RESPONSE_EOF:
+ data_node_receive(1, state->combiner->connections + tapenum, NULL);
+ break;
+ case RESPONSE_COMPLETE:
+ if (eofOK)
+ return 0;
+ else
+ elog(ERROR, "unexpected end of data");
+ break;
+ case RESPONSE_DATAROW:
+ return state->combiner->msglen;
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from the data nodes")));
+ }
+ }
+}
+
+static void
+readtup_datanode(Tuplesortstate *state, SortTuple *stup,
+ int tapenum, unsigned int len)
+{
+ TupleTableSlot *slot = state->combiner->ss.ss_ScanTupleSlot;
+ MinimalTuple tuple;
+ HeapTupleData htup;
+
+ FetchTuple(state->combiner, slot);
+
+ /* copy the tuple into sort storage */
+ tuple = ExecCopySlotMinimalTuple(slot);
+ stup->tuple = (void *) tuple;
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+ /* set up first-column key value */
+ htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
+ htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
+ stup->datum1 = heap_getattr(&htup,
+ state->scanKeys[0].sk_attno,
+ state->tupDesc,
+ &stup->isnull1);
+}
+#endif
/*
* Routines specialized for IndexTuple case
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index 879a310c68..c85b1b0c61 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -118,6 +118,15 @@ typedef struct TupleTableSlot
bool tts_shouldFreeMin; /* should pfree tts_mintuple? */
bool tts_slow; /* saved state for slot_deform_tuple */
HeapTuple tts_tuple; /* physical tuple, or NULL if virtual */
+#ifdef PGXC
+ /*
+ * PGXC extension to support tuples sent from remote data node.
+ */
+ char *tts_dataRow; /* Tuple data in DataRow format */
+ int tts_dataLen; /* Actual length of the data row */
+ bool tts_shouldFreeRow; /* should pfree tts_dataRow? */
+ struct AttInMetadata *tts_attinmeta; /* store here info to extract values from the DataRow */
+#endif
TupleDesc tts_tupleDescriptor; /* slot's tuple descriptor */
MemoryContext tts_mcxt; /* slot itself is in this context */
Buffer tts_buffer; /* tuple's buffer, or InvalidBuffer */
@@ -165,6 +174,12 @@ extern TupleTableSlot *ExecStoreTuple(HeapTuple tuple,
extern TupleTableSlot *ExecStoreMinimalTuple(MinimalTuple mtup,
TupleTableSlot *slot,
bool shouldFree);
+#ifdef PGXC
+extern TupleTableSlot *ExecStoreDataRowTuple(char *msg,
+ size_t len,
+ TupleTableSlot *slot,
+ bool shouldFree);
+#endif
extern TupleTableSlot *ExecClearTuple(TupleTableSlot *slot);
extern TupleTableSlot *ExecStoreVirtualTuple(TupleTableSlot *slot);
extern TupleTableSlot *ExecStoreAllNullTuple(TupleTableSlot *slot);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 078b6733e7..4f1c59f8bc 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -72,6 +72,9 @@ typedef enum NodeTag
T_Hash,
T_SetOp,
T_Limit,
+#ifdef PGXC
+ T_RemoteQuery,
+#endif
/* this one isn't a subclass of Plan: */
T_PlanInvalItem,
@@ -110,6 +113,9 @@ typedef enum NodeTag
T_HashState,
T_SetOpState,
T_LimitState,
+#ifdef PGXC
+ T_RemoteQueryState,
+#endif
/*
* TAGS FOR PRIMITIVE NODES (primnodes.h)
diff --git a/src/include/pgxc/combiner.h b/src/include/pgxc/combiner.h
deleted file mode 100644
index f977c4ab88..0000000000
--- a/src/include/pgxc/combiner.h
+++ /dev/null
@@ -1,73 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * combiner.h
- *
- * Combine responses from multiple Data Nodes
- *
- *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ?
- * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
- *
- * IDENTIFICATION
- * $$
- *
- *-------------------------------------------------------------------------
- */
-
-#ifndef COMBINER_H
-#define COMBINER_H
-
-#include "postgres.h"
-#include "tcop/dest.h"
-
-typedef enum
-{
- COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */
- COMBINE_TYPE_SUM, /* sum row counts (partitioned, round robin) */
- COMBINE_TYPE_SAME /* expect all row counts to be the same (replicated write) */
-} CombineType;
-
-typedef enum
-{
- REQUEST_TYPE_NOT_DEFINED, /* not determined yet */
- REQUEST_TYPE_COMMAND, /* OK or row count response */
- REQUEST_TYPE_QUERY, /* Row description response */
- REQUEST_TYPE_COPY_IN, /* Copy In response */
- REQUEST_TYPE_COPY_OUT /* Copy Out response */
-} RequestType;
-
-
-typedef struct
-{
- int node_count;
- CombineType combine_type;
- CommandDest dest;
- int command_complete_count;
- int row_count;
- RequestType request_type;
- int description_count;
- uint64 copy_in_count;
- uint64 copy_out_count;
- bool inErrorState;
- /*
- * While we are not supporting grouping use this flag to indicate we need
- * to initialize collecting of aggregates from the DNs
- */
- bool initAggregates;
- List *simple_aggregates;
- FILE *copy_file; /* used if copy_dest == COPY_FILE */
-} ResponseCombinerData;
-
-
-typedef ResponseCombinerData *ResponseCombiner;
-
-extern ResponseCombiner CreateResponseCombiner(int node_count,
- CombineType combine_type, CommandDest dest);
-extern int CombineResponse(ResponseCombiner combiner, char msg_type,
- char *msg_body, size_t len);
-extern bool ValidateAndCloseCombiner(ResponseCombiner combiner);
-extern bool ValidateAndResetCombiner(ResponseCombiner combiner);
-extern void AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates);
-extern void CloseCombiner(ResponseCombiner combiner);
-
-#endif /* COMBINER_H */
diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h
index 28c5d8748e..ab95022b5f 100644
--- a/src/include/pgxc/datanode.h
+++ b/src/include/pgxc/datanode.h
@@ -16,9 +16,9 @@
#ifndef DATANODE_H
#define DATANODE_H
-#include "combiner.h"
+#include "postgres.h"
+#include "gtm/gtm_c.h"
#include "nodes/pg_list.h"
-#include "pgxc/locator.h"
#include "utils/snapshot.h"
#include <unistd.h>
@@ -28,23 +28,23 @@ typedef struct PGconn NODE_CONNECTION;
/* Helper structure to access data node from Session */
typedef enum
{
- DN_CONNECTION_STATE_IDLE,
- DN_CONNECTION_STATE_BUSY,
- DN_CONNECTION_STATE_COMPLETED,
+ DN_CONNECTION_STATE_IDLE, /* idle, ready for query */
+ DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */
+ DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */
+ DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */
DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
- DN_CONNECTION_STATE_ERROR_READY, /* error and received ReadyForQuery */
- DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
+ DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
DN_CONNECTION_STATE_COPY_IN,
DN_CONNECTION_STATE_COPY_OUT
} DNConnectionState;
#define DN_CONNECTION_STATE_ERROR(dnconn) \
(dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
- || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY \
- || (dnconn)->state == DN_CONNECTION_STATE_ERROR_READY
+ || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY
struct data_node_handle
{
+ int nodenum; /* node identifier 1..NumDataNodes */
/* fd of the connection */
int sock;
/* Connection state */
@@ -75,17 +75,25 @@ extern int DataNodeConnected(NODE_CONNECTION * conn);
extern int DataNodeConnClean(NODE_CONNECTION * conn);
extern void DataNodeCleanAndRelease(int code, Datum arg);
-/* Multinode Executor */
-extern void DataNodeBegin(void);
-extern int DataNodeCommit(CommandDest dest);
-extern int DataNodeRollback(CommandDest dest);
+extern DataNodeHandle **get_handles(List *nodelist);
+extern void release_handles(void);
+extern int get_transaction_nodes(DataNodeHandle ** connections);
-extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only);
+extern int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
+extern int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
-extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
-extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections);
-extern int DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, CommandDest dest, FILE* copy_file);
-extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type, CommandDest dest);
+extern int data_node_send_query(DataNodeHandle * handle, const char *query);
+extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid);
+extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot);
+
+extern void data_node_receive(const int conn_count,
+ DataNodeHandle ** connections, struct timeval * timeout);
+extern int data_node_read_data(DataNodeHandle * conn);
+extern int send_some(DataNodeHandle * handle, int len);
+extern int data_node_flush(DataNodeHandle *handle);
+
+extern char get_message(DataNodeHandle *conn, int *len, char **msg);
+
+extern void add_error_message(DataNodeHandle * handle, const char *message);
-extern int primary_data_node;
#endif
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
new file mode 100644
index 0000000000..d99806a01b
--- /dev/null
+++ b/src/include/pgxc/execRemote.h
@@ -0,0 +1,101 @@
+/*-------------------------------------------------------------------------
+ *
+ * execRemote.h
+ *
+ * Functions to execute commands on multiple Data Nodes
+ *
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ?
+ * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
+ *
+ * IDENTIFICATION
+ * $$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECREMOTE_H
+#define EXECREMOTE_H
+#include "datanode.h"
+#include "locator.h"
+#include "planner.h"
+#include "access/tupdesc.h"
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+#include "nodes/pg_list.h"
+#include "tcop/dest.h"
+#include "utils/snapshot.h"
+
+/* Outputs of handle_response() */
+#define RESPONSE_EOF EOF
+#define RESPONSE_COMPLETE 0
+#define RESPONSE_TUPDESC 1
+#define RESPONSE_DATAROW 2
+#define RESPONSE_COPY 3
+
+typedef enum
+{
+ REQUEST_TYPE_NOT_DEFINED, /* not determined yet */
+ REQUEST_TYPE_COMMAND, /* OK or row count response */
+ REQUEST_TYPE_QUERY, /* Row description response */
+ REQUEST_TYPE_COPY_IN, /* Copy In response */
+ REQUEST_TYPE_COPY_OUT /* Copy Out response */
+} RequestType;
+
+
+typedef struct RemoteQueryState
+{
+ ScanState ss; /* its first field is NodeTag */
+ int node_count; /* total count of participating nodes */
+ DataNodeHandle **connections; /* data node connections being combined */
+ int conn_count; /* count of active connections */
+ int current_conn; /* used to balance load when reading from connections */
+ CombineType combine_type; /* see CombineType enum */
+ DestReceiver *dest; /* output destination */
+ int command_complete_count; /* count of received CommandComplete messages */
+ uint64 row_count; /* how many rows affected by the query */
+ RequestType request_type; /* see RequestType enum */
+ TupleDesc tuple_desc; /* tuple descriptor to be referenced by emitted tuples */
+ int description_count; /* count of received RowDescription messages */
+ int copy_in_count; /* count of received CopyIn messages */
+ int copy_out_count; /* count of received CopyOut messages */
+ char errorCode[5]; /* error code to send back to client */
+ char *errorMessage; /* error message to send back to client */
+ bool query_Done; /* query has been sent down to data nodes */
+ bool need_tran; /* auto commit on nodes after completion */
+ char *completionTag; /* completion tag to present to caller */
+ char *msg; /* last data row message */
+ int msglen; /* length of the data row message */
+ /*
+ * While we are not supporting grouping use this flag to indicate we need
+ * to initialize collecting of aggregates from the DNs
+ */
+ bool initAggregates;
+ List *simple_aggregates; /* description of aggregate functions */
+ void *tuplesortstate; /* for merge sort */
+ /* Simple DISTINCT support */
+ FmgrInfo *eqfunctions; /* functions to compare tuples */
+ MemoryContext tmp_ctx; /* separate context is needed to compare tuples */
+ FILE *copy_file; /* used if copy_dest == COPY_FILE */
+} RemoteQueryState;
+
+/* Multinode Executor */
+extern void DataNodeBegin(void);
+extern int DataNodeCommit(CommandDest dest);
+extern int DataNodeRollback(CommandDest dest);
+
+extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
+extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections);
+extern uint64 DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file);
+extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+
+extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags);
+extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step);
+extern void ExecEndRemoteQuery(RemoteQueryState *step);
+
+extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner);
+extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
+
+extern int primary_data_node;
+
+#endif \ No newline at end of file
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index 4920aace0a..47665b2341 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -17,27 +17,59 @@
#include "fmgr.h"
#include "lib/stringinfo.h"
+#include "nodes/plannodes.h"
#include "nodes/primnodes.h"
#include "pgxc/locator.h"
-#include "pgxc/combiner.h"
+#include "tcop/dest.h"
/* for Query_Plan.exec_loc_type can have these OR'ed*/
#define EXEC_ON_COORD 0x1
#define EXEC_ON_DATA_NODES 0x2
+typedef enum
+{
+ COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */
+ COMBINE_TYPE_SUM, /* sum row counts (partitioned, round robin) */
+ COMBINE_TYPE_SAME /* expect all row counts to be the same (replicated write) */
+} CombineType;
+
+/* For sorting within RemoteQuery handling */
+/*
+ * It is pretty much like Sort, but without Plan. We may use Sort later.
+ */
+typedef struct
+{
+ int numCols; /* number of sort-key columns */
+ AttrNumber *sortColIdx; /* their indexes in the target list */
+ Oid *sortOperators; /* OIDs of operators to sort them by */
+ bool *nullsFirst; /* NULLS FIRST/LAST directions */
+} SimpleSort;
+
+/* For returning distinct results from the RemoteQuery*/
+typedef struct
+{
+ int numCols; /* number of sort-key columns */
+ AttrNumber *uniqColIdx; /* their indexes in the target list */
+ Oid *eqOperators; /* OIDs of operators to equate them by */
+} SimpleDistinct;
+
/* Contains instructions on processing a step of a query.
* In the prototype this will be simple, but it will eventually
* evolve into a GridSQL-style QueryStep.
*/
typedef struct
{
+ Plan plan;
char *sql_statement;
Exec_Nodes *exec_nodes;
CombineType combine_type;
- List *simple_aggregates; /* simple aggregate to combine on this
- * step */
-} Query_Step;
+ List *simple_aggregates; /* simple aggregate to combine on this step */
+ SimpleSort *sort;
+ SimpleDistinct *distinct;
+ bool read_only; /* do not use 2PC when committing read only steps */
+ bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+} RemoteQuery;
/*
@@ -48,7 +80,6 @@ typedef struct
typedef struct
{
int exec_loc_type;
- bool force_autocommit; /* For CREATE DATABASE */
List *query_step_list; /* List of QuerySteps */
} Query_Plan;
@@ -67,7 +98,6 @@ typedef enum
/* For handling simple aggregates */
-/* For now, only support int/long types */
typedef struct
{
int column_pos; /* Only use 1 for now */
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index e351536d47..e2b3c033e0 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -23,6 +23,9 @@
#include "access/itup.h"
#include "executor/tuptable.h"
#include "fmgr.h"
+#ifdef PGXC
+#include "pgxc/execRemote.h"
+#endif
#include "utils/relcache.h"
@@ -64,6 +67,13 @@ extern Tuplesortstate *tuplesort_begin_index_hash(Relation indexRel,
extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
Oid sortOperator, bool nullsFirstFlag,
int workMem, bool randomAccess);
+#ifdef PGXC
+extern Tuplesortstate *tuplesort_begin_merge(TupleDesc tupDesc,
+ int nkeys, AttrNumber *attNums,
+ Oid *sortOperators, bool *nullsFirstFlags,
+ RemoteQueryState *combiner,
+ int workMem);
+#endif
extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound);