diff options
| author | Mason S | 2010-06-29 19:32:26 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2011-05-19 16:45:11 +0000 |
| commit | ad1681c9912300df66a81522d0091f292b6cd942 (patch) | |
| tree | 0b0d0f08412842b2a7b84ba6d94676a1c40be6c2 /src | |
| parent | 66066d6f1c9a49916f5a5c479b1926e11e5c0098 (diff) | |
Add support for ORDER BY adn DISTINCT.
This is handled on the Coordinator. It will push down the ORDER BY
and merge-sort the sorted input streams from the nodes.
It converts from DataRow to tuple format as needed.
If one of the SELECT clause expressions is not in the ORDER BY,
it appends it to the ORDER BY when pushing it down to the data nodes
and leaves it off when returning to the client.
With DISTINCT, an ORDER BY will be used and pushed down to the data
nodes such that a merge-sort can be done and de-duplication can
occur.
By Andrei Martsinchyk
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/access/common/heaptuple.c | 100 | ||||
| -rw-r--r-- | src/backend/access/common/printtup.c | 13 | ||||
| -rw-r--r-- | src/backend/commands/copy.c | 19 | ||||
| -rw-r--r-- | src/backend/executor/execTuples.c | 122 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 541 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/Makefile | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/combiner.c | 652 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/datanode.c | 1571 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 2453 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 238 | ||||
| -rw-r--r-- | src/backend/utils/misc/guc.c | 37 | ||||
| -rw-r--r-- | src/backend/utils/mmgr/mcxt.c | 8 | ||||
| -rw-r--r-- | src/backend/utils/sort/tuplesort.c | 209 | ||||
| -rw-r--r-- | src/include/executor/tuptable.h | 15 | ||||
| -rw-r--r-- | src/include/nodes/nodes.h | 6 | ||||
| -rw-r--r-- | src/include/pgxc/combiner.h | 73 | ||||
| -rw-r--r-- | src/include/pgxc/datanode.h | 46 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 101 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 42 | ||||
| -rw-r--r-- | src/include/utils/tuplesort.h | 10 |
20 files changed, 3862 insertions, 2396 deletions
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c index ac5749c713..cae5794103 100644 --- a/src/backend/access/common/heaptuple.c +++ b/src/backend/access/common/heaptuple.c @@ -57,6 +57,9 @@ #include "postgres.h" +#ifdef PGXC +#include "funcapi.h" +#endif #include "access/heapam.h" #include "access/sysattr.h" #include "access/tuptoaster.h" @@ -1157,6 +1160,80 @@ slot_deform_tuple(TupleTableSlot *slot, int natts) slot->tts_slow = slow; } +#ifdef PGXC +/* + * slot_deform_datarow + * Extract data from the DataRow message into Datum/isnull arrays. + * We always extract all atributes, as specified in tts_tupleDescriptor, + * because there is no easy way to find random attribute in the DataRow. + */ +static void +slot_deform_datarow(TupleTableSlot *slot) +{ + int attnum = slot->tts_tupleDescriptor->natts; + int i; + int col_count; + char *cur = slot->tts_dataRow; + StringInfo buffer; + uint16 n16; + uint32 n32; + + /* fastpath: exit if values already extracted */ + if (slot->tts_nvalid == attnum) + return; + + Assert(slot->tts_dataRow); + + memcpy(&n16, cur, 2); + cur += 2; + col_count = ntohs(n16); + + if (col_count != attnum) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Tuple does not match the descriptor"))); + + if (slot->tts_attinmeta == NULL) + slot->tts_attinmeta = TupleDescGetAttInMetadata(slot->tts_tupleDescriptor); + + buffer = makeStringInfo(); + for (i = 0; i < attnum; i++) + { + Form_pg_attribute attr = slot->tts_tupleDescriptor->attrs[i]; + int len; + + /* get size */ + memcpy(&n32, cur, 4); + cur += 4; + len = ntohl(n32); + + /* get data */ + if (len == -1) + { + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + else + { + appendBinaryStringInfo(buffer, cur, len); + cur += len; + + slot->tts_values[i] = InputFunctionCall(slot->tts_attinmeta->attinfuncs + i, + buffer->data, + slot->tts_attinmeta->attioparams[i], + slot->tts_attinmeta->atttypmods[i]); + slot->tts_isnull[i] = false; + + resetStringInfo(buffer); + } + } + pfree(buffer->data); + pfree(buffer); + + slot->tts_nvalid = attnum; +} +#endif + /* * slot_getattr * This function fetches an attribute of the slot's current tuple. @@ -1250,6 +1327,11 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull) /* * Extract the attribute, along with any preceding attributes. */ +#ifdef PGXC + if (slot->tts_dataRow) + slot_deform_datarow(slot); + else +#endif slot_deform_tuple(slot, attnum); /* @@ -1276,6 +1358,15 @@ slot_getallattrs(TupleTableSlot *slot) if (slot->tts_nvalid == tdesc_natts) return; +#ifdef PGXC + /* Handle the DataRow tuple case */ + if (slot->tts_dataRow) + { + slot_deform_datarow(slot); + return; + } +#endif + /* * otherwise we had better have a physical tuple (tts_nvalid should equal * natts in all virtual-tuple cases) @@ -1319,6 +1410,15 @@ slot_getsomeattrs(TupleTableSlot *slot, int attnum) if (slot->tts_nvalid >= attnum) return; +#ifdef PGXC + /* Handle the DataRow tuple case */ + if (slot->tts_dataRow) + { + slot_deform_datarow(slot); + return; + } +#endif + /* Check for caller error */ if (attnum <= 0 || attnum > slot->tts_tupleDescriptor->natts) elog(ERROR, "invalid attribute number %d", attnum); diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c index 13a09dd9b3..3c77ab4ff4 100644 --- a/src/backend/access/common/printtup.c +++ b/src/backend/access/common/printtup.c @@ -292,6 +292,19 @@ printtup(TupleTableSlot *slot, DestReceiver *self) int natts = typeinfo->natts; int i; +#ifdef PGXC + /* + * If we are having DataRow-based tuple we do not have to encode attribute + * values, just send over the DataRow message as we received it from the + * data node + */ + if (slot->tts_dataRow) + { + pq_putmessage('D', slot->tts_dataRow, slot->tts_dataLen); + return; + } +#endif + /* Set or update my derived attribute info, if needed */ if (myState->attrinfo != typeinfo || myState->nattrs != natts) printtup_prepare_info(myState, typeinfo, natts); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 92f5db85f2..50650190ee 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -35,7 +35,7 @@ #include "parser/parse_relation.h" #ifdef PGXC #include "pgxc/pgxc.h" -#include "pgxc/datanode.h" +#include "pgxc/execRemote.h" #include "pgxc/locator.h" #include "pgxc/poolmgr.h" #endif @@ -1511,8 +1511,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) DataNodeCopyFinish( cstate->connections, primary_data_node, - COMBINE_TYPE_NONE, - whereToSendOutput); + COMBINE_TYPE_NONE); pfree(cstate->connections); pfree(cstate->query_buf.data); FreeRelationLocInfo(cstate->rel_loc); @@ -1526,14 +1525,12 @@ DoCopy(const CopyStmt *stmt, const char *queryString) cstate->processed = DataNodeCopyFinish( cstate->connections, primary_data_node, - COMBINE_TYPE_SAME, - whereToSendOutput); + COMBINE_TYPE_SAME); else cstate->processed = DataNodeCopyFinish( cstate->connections, 0, - COMBINE_TYPE_SUM, - whereToSendOutput); + COMBINE_TYPE_SUM); pfree(cstate->connections); pfree(cstate->query_buf.data); FreeRelationLocInfo(cstate->rel_loc); @@ -1775,10 +1772,10 @@ CopyTo(CopyState cstate) #ifdef PGXC if (IS_PGXC_COORDINATOR && !cstate->on_coord) { - DataNodeCopyOut(GetRelationNodes(cstate->rel_loc, NULL, true), - cstate->connections, - whereToSendOutput, - cstate->copy_file); + cstate->processed = DataNodeCopyOut( + GetRelationNodes(cstate->rel_loc, NULL, true), + cstate->connections, + cstate->copy_file); } else { diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 7b5ba41f5f..a6c496eee9 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -149,6 +149,12 @@ ExecCreateTupleTable(int tableSize) slot->tts_shouldFreeMin = false; slot->tts_tuple = NULL; slot->tts_tupleDescriptor = NULL; +#ifdef PGXC + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + slot->tts_attinmeta = NULL; +#endif slot->tts_mcxt = CurrentMemoryContext; slot->tts_buffer = InvalidBuffer; slot->tts_nvalid = 0; @@ -228,6 +234,12 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc) slot->tts_shouldFreeMin = false; slot->tts_tuple = NULL; slot->tts_tupleDescriptor = NULL; +#ifdef PGXC + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + slot->tts_attinmeta = NULL; +#endif slot->tts_mcxt = CurrentMemoryContext; slot->tts_buffer = InvalidBuffer; slot->tts_nvalid = 0; @@ -334,6 +346,12 @@ ExecSetSlotDescriptor(TupleTableSlot *slot, /* slot to change */ if (slot->tts_tupleDescriptor) ReleaseTupleDesc(slot->tts_tupleDescriptor); +#ifdef PGXC + /* XXX there in no routine to release AttInMetadata instance */ + if (slot->tts_attinmeta) + slot->tts_attinmeta = NULL; +#endif + if (slot->tts_values) pfree(slot->tts_values); if (slot->tts_isnull) @@ -415,6 +433,14 @@ ExecStoreTuple(HeapTuple tuple, heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif /* * Store the new tuple into the specified slot. @@ -476,6 +502,14 @@ ExecStoreMinimalTuple(MinimalTuple mtup, heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif /* * Drop the pin on the referenced buffer, if there is one. @@ -504,6 +538,62 @@ ExecStoreMinimalTuple(MinimalTuple mtup, return slot; } +#ifdef PGXC +/* -------------------------------- + * ExecStoreDataRowTuple + * + * Store a buffer in DataRow message format into the slot. + * + * -------------------------------- + */ +TupleTableSlot * +ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree) +{ + /* + * sanity checks + */ + Assert(msg != NULL); + Assert(len > 0); + Assert(slot != NULL); + Assert(slot->tts_tupleDescriptor != NULL); + + /* + * Free any old physical tuple belonging to the slot. + */ + if (slot->tts_shouldFree) + heap_freetuple(slot->tts_tuple); + if (slot->tts_shouldFreeMin) + heap_free_minimal_tuple(slot->tts_mintuple); + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + /* + * Drop the pin on the referenced buffer, if there is one. + */ + if (BufferIsValid(slot->tts_buffer)) + ReleaseBuffer(slot->tts_buffer); + + slot->tts_buffer = InvalidBuffer; + + /* + * Store the new tuple into the specified slot. + */ + slot->tts_isempty = false; + slot->tts_shouldFree = false; + slot->tts_shouldFreeMin = false; + slot->tts_shouldFreeRow = shouldFree; + slot->tts_tuple = NULL; + slot->tts_mintuple = NULL; + slot->tts_dataRow = msg; + slot->tts_dataLen = len; + + /* Mark extracted state invalid */ + slot->tts_nvalid = 0; + + return slot; +} +#endif + /* -------------------------------- * ExecClearTuple * @@ -527,6 +617,14 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */ heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif slot->tts_tuple = NULL; slot->tts_mintuple = NULL; @@ -634,7 +732,13 @@ ExecCopySlotTuple(TupleTableSlot *slot) return heap_copytuple(slot->tts_tuple); if (slot->tts_mintuple) return heap_tuple_from_minimal_tuple(slot->tts_mintuple); - +#ifdef PGXC + /* + * Ensure values are extracted from data row to the Datum array + */ + if (slot->tts_dataRow) + slot_getallattrs(slot); +#endif /* * Otherwise we need to build a tuple from the Datum array. */ @@ -667,7 +771,13 @@ ExecCopySlotMinimalTuple(TupleTableSlot *slot) return heap_copy_minimal_tuple(slot->tts_mintuple); if (slot->tts_tuple) return minimal_tuple_from_heap_tuple(slot->tts_tuple); - +#ifdef PGXC + /* + * Ensure values are extracted from data row to the Datum array + */ + if (slot->tts_dataRow) + slot_getallattrs(slot); +#endif /* * Otherwise we need to build a tuple from the Datum array. */ @@ -861,6 +971,14 @@ ExecMaterializeSlot(TupleTableSlot *slot) if (!slot->tts_shouldFreeMin) slot->tts_mintuple = NULL; +#ifdef PGXC + if (!slot->tts_shouldFreeRow) + { + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + } +#endif + return slot->tts_tuple; } diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index ae537e5f1f..1bbbb75848 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -22,8 +22,10 @@ #include "catalog/pg_type.h" #include "lib/stringinfo.h" #include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "optimizer/clauses.h" +#include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "pgxc/locator.h" @@ -123,12 +125,10 @@ typedef struct XCWalkerContext int varno; bool within_or; bool within_not; + List *join_list; /* A list of List*'s, one for each relation. */ } XCWalkerContext; -/* A list of List*'s, one for each relation. */ -List *join_list = NULL; - /* Forbid unsafe SQL statements */ bool StrictStatementChecking = true; @@ -185,12 +185,12 @@ new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * Look up the join struct for a particular join */ static PGXC_Join * -find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) +find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context) { ListCell *lc; /* return if list is still empty */ - if (join_list == NULL) + if (context->join_list == NULL) return NULL; /* in the PGXC_Join struct, we always sort with relid1 < relid2 */ @@ -209,7 +209,7 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * there should be a small number, so we just search linearly, although * long term a hash table would be better. */ - foreach(lc, join_list) + foreach(lc, context->join_list) { PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc); @@ -225,16 +225,16 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * Find or create a join between 2 relations */ static PGXC_Join * -find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) +find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context) { PGXC_Join *pgxcjoin; - pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2); + pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2, context); if (pgxcjoin == NULL) { pgxcjoin = new_pgxc_join(relid1, aliasname1, relid2, aliasname2); - join_list = lappend(join_list, pgxcjoin); + context->join_list = lappend(context->join_list, pgxcjoin); } return pgxcjoin; @@ -277,7 +277,7 @@ free_special_relations(Special_Conditions *special_conditions) * frees join_list */ static void -free_join_list(void) +free_join_list(List *join_list) { if (join_list == NULL) return; @@ -368,13 +368,13 @@ get_base_var(Var *var, XCWalkerContext *context) } else if (rte->rtekind == RTE_SUBQUERY) { - /* + /* * Handle views like select * from v1 where col1 = 1 * where col1 is partition column of base relation */ /* the varattno corresponds with the subquery's target list (projections) */ TargetEntry *tle = list_nth(rte->subquery->targetList, var->varattno - 1); /* or varno? */ - + if (!IsA(tle->expr, Var)) return NULL; /* not column based expressoin, return */ else @@ -684,7 +684,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) /* get data struct about these two relations joining */ pgxc_join = find_or_create_pgxc_join(column_base->relid, column_base->relalias, - column_base2->relid, column_base2->relalias); + column_base2->relid, column_base2->relalias, context); if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED) { @@ -914,7 +914,7 @@ contains_only_pg_catalog (List *rtable) { if (get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE) return false; - } else if (rte->rtekind == RTE_SUBQUERY && + } else if (rte->rtekind == RTE_SUBQUERY && !contains_only_pg_catalog (rte->subquery->rtable)) return false; } @@ -967,7 +967,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) { /* May be complicated. Before giving up, just check for pg_catalog usage */ if (contains_only_pg_catalog (query->rtable)) - { + { /* just pg_catalog tables */ context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; @@ -1018,7 +1018,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* We compare to make sure that the subquery is safe to execute with previous- * we may have multiple ones in the FROM clause. - * We handle the simple case of allowing multiple subqueries in the from clause, + * We handle the simple case of allowing multiple subqueries in the from clause, * but only allow one of them to not contain replicated tables */ if (!from_query_nodes) @@ -1028,20 +1028,20 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* ok, safe */ if (!from_query_nodes) from_query_nodes = current_nodes; - } + } else { if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) from_query_nodes = current_nodes; else { - /* Allow if they are both using one node, and the same one */ + /* Allow if they are both using one node, and the same one */ if (!same_single_node (from_query_nodes->nodelist, current_nodes->nodelist)) /* Complicated */ return true; } } - } + } else if (rte->rtekind == RTE_RELATION) { /* Look for pg_catalog tables */ @@ -1049,7 +1049,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) current_usage_type = TABLE_USAGE_TYPE_PGCATALOG; else current_usage_type = TABLE_USAGE_TYPE_USER; - } + } else if (rte->rtekind == RTE_FUNCTION) { /* See if it is a catalog function */ @@ -1095,9 +1095,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) return true; /* Examine join conditions, see if each join is single-node safe */ - if (join_list != NULL) + if (context->join_list != NULL) { - foreach(lc, join_list) + foreach(lc, context->join_list) { PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc); @@ -1254,22 +1254,28 @@ static Exec_Nodes * get_plan_nodes(Query *query, bool isRead) { Exec_Nodes *result_nodes; - XCWalkerContext *context = palloc0(sizeof(XCWalkerContext)); - - context->query = query; - context->isRead = isRead; - - context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); - context->rtables = lappend(context->rtables, query->rtable); - - join_list = NULL; - - if (get_plan_nodes_walker((Node *) query, context)) + XCWalkerContext context; + + + context.query = query; + context.isRead = isRead; + context.exec_nodes = NULL; + context.conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); + context.rtables = NIL; + context.rtables = lappend(context.rtables, query->rtable); + context.multilevel_join = false; + context.varno = 0; + context.within_or = false; + context.within_not = false; + context.join_list = NIL; + + if (get_plan_nodes_walker((Node *) query, &context)) result_nodes = NULL; else - result_nodes = context->exec_nodes; + result_nodes = context.exec_nodes; - free_special_relations(context->conditions); + free_special_relations(context.conditions); + free_join_list(context.join_list); return result_nodes; } @@ -1304,7 +1310,6 @@ get_plan_nodes_command(Query *query) return NULL; } - free_join_list(); return exec_nodes; } @@ -1345,17 +1350,17 @@ static List * get_simple_aggregates(Query * query) { List *simple_agg_list = NIL; - + /* Check for simple multi-node aggregate */ if (query->hasAggs) { ListCell *lc; int column_pos = 0; - + foreach (lc, query->targetList) { TargetEntry *tle = (TargetEntry *) lfirst(lc); - + if (IsA(tle->expr, Aggref)) { /*PGXC borrowed this code from nodeAgg.c, see ExecInitAgg()*/ @@ -1422,7 +1427,7 @@ get_simple_aggregates(Query * query) get_func_name(finalfn_oid)); } } - + /* resolve actual type of transition state, if polymorphic */ aggcollecttype = aggform->aggcollecttype; @@ -1468,7 +1473,7 @@ get_simple_aggregates(Query * query) get_typlenbyval(aggcollecttype, &simple_agg->transtypeLen, &simple_agg->transtypeByVal); - + /* * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. @@ -1534,6 +1539,427 @@ get_simple_aggregates(Query * query) /* + * add_sort_column --- utility subroutine for building sort info arrays + * + * We need this routine because the same column might be selected more than + * once as a sort key column; if so, the extra mentions are redundant. + * + * Caller is assumed to have allocated the arrays large enough for the + * max possible number of columns. Return value is the new column count. + * + * PGXC: copied from optimizer/plan/planner.c + */ +static int +add_sort_column(AttrNumber colIdx, Oid sortOp, bool nulls_first, + int numCols, AttrNumber *sortColIdx, + Oid *sortOperators, bool *nullsFirst) +{ + int i; + + Assert(OidIsValid(sortOp)); + + for (i = 0; i < numCols; i++) + { + /* + * Note: we check sortOp because it's conceivable that "ORDER BY foo + * USING <, foo USING <<<" is not redundant, if <<< distinguishes + * values that < considers equal. We need not check nulls_first + * however because a lower-order column with the same sortop but + * opposite nulls direction is redundant. + */ + if (sortColIdx[i] == colIdx && sortOperators[i] == sortOp) + { + /* Already sorting by this col, so extra sort key is useless */ + return numCols; + } + } + + /* Add the column */ + sortColIdx[numCols] = colIdx; + sortOperators[numCols] = sortOp; + nullsFirst[numCols] = nulls_first; + return numCols + 1; +} + +/* + * add_distinct_column - utility subroutine to remove redundant columns, just + * like add_sort_column + */ +static int +add_distinct_column(AttrNumber colIdx, Oid eqOp, int numCols, + AttrNumber *sortColIdx, Oid *eqOperators) +{ + int i; + + Assert(OidIsValid(eqOp)); + + for (i = 0; i < numCols; i++) + { + if (sortColIdx[i] == colIdx && eqOperators[i] == eqOp) + { + /* Already sorting by this col, so extra sort key is useless */ + return numCols; + } + } + + /* Add the column */ + sortColIdx[numCols] = colIdx; + eqOperators[numCols] = eqOp; + return numCols + 1; +} + + +/* + * Reconstruct the step query + */ +static void +reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, + RemoteQuery *step) +{ + List *context; + bool useprefix; + List *sub_tlist = step->plan.targetlist; + ListCell *l; + StringInfo buf = makeStringInfo(); + char *sql; + char *cur; + char *sql_from; + + context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL); + useprefix = list_length(rtable) > 1; + + foreach(l, sub_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + char *exprstr = deparse_expression((Node *) tle->expr, context, + useprefix, false); + + if (buf->len == 0) + { + appendStringInfo(buf, "SELECT "); + if (step->distinct) + appendStringInfo(buf, "DISTINCT "); + } + else + appendStringInfo(buf, ", "); + + appendStringInfoString(buf, exprstr); + } + + /* + * A kind of dummy + * Do not reconstruct remaining query, just search original statement + * for " FROM " and append remainder to the target list we just generated. + * Do not handle the case if " FROM " we found is not a "FROM" keyword, but, + * for example, a part of string constant. + */ + sql = pstrdup(step->sql_statement); /* mutable copy */ + /* string to upper case, for comparing */ + cur = sql; + while (*cur) + { + /* replace whitespace with a space */ + if (isspace((unsigned char) *cur)) + *cur = ' '; + *cur++ = toupper(*cur); + } + + /* find the keyword */ + sql_from = strstr(sql, " FROM "); + if (sql_from) + { + /* the same offset in the original string */ + int offset = sql_from - sql; + /* remove terminating semicolon */ + char *end = strrchr(step->sql_statement, ';'); + *end = '\0'; + + appendStringInfoString(buf, step->sql_statement + offset); + } + + if (extra_sort) + { + foreach(l, extra_sort) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + char *exprstr = deparse_expression((Node *) tle->expr, context, + useprefix, false); + + if (has_order_by) + appendStringInfo(buf, ", "); + else + { + appendStringInfo(buf, " ORDER BY "); + has_order_by = true; + } + + appendStringInfoString(buf, exprstr); + } + } + + /* do not need the copy */ + pfree(sql); + + /* free previous query */ + pfree(step->sql_statement); + /* get a copy of new query */ + step->sql_statement = pstrdup(buf->data); + /* free the query buffer */ + pfree(buf->data); + pfree(buf); +} + + +/* + * Plan to sort step tuples + * PGXC: copied and adopted from optimizer/plan/planner.c + */ +static void +make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step) +{ + List *sortcls = query->sortClause; + List *distinctcls = query->distinctClause; + List *sub_tlist = step->plan.targetlist; + SimpleSort *sort; + SimpleDistinct *distinct; + ListCell *l; + int numsortkeys; + int numdistkeys; + AttrNumber *sortColIdx; + AttrNumber *distColIdx; + Oid *sortOperators; + Oid *eqOperators; + bool *nullsFirst; + bool need_reconstruct = false; + /* + * List of target list entries from DISTINCT which are not in the ORDER BY. + * The exressions should be appended to the ORDER BY clause of remote query + */ + List *extra_distincts = NIL; + + Assert(step->sort == NULL); + Assert(step->distinct == NULL); + + /* + * We will need at most list_length(sortcls) sort columns; possibly less + * Also need room for extra distinct expressions if we need to append them + */ + numsortkeys = list_length(sortcls) + list_length(distinctcls); + sortColIdx = (AttrNumber *) palloc(numsortkeys * sizeof(AttrNumber)); + sortOperators = (Oid *) palloc(numsortkeys * sizeof(Oid)); + nullsFirst = (bool *) palloc(numsortkeys * sizeof(bool)); + + numsortkeys = 0; + sort = (SimpleSort *) palloc(sizeof(SimpleSort)); + + if (sortcls) + { + foreach(l, sortcls) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist); + + if (tle->resjunk) + need_reconstruct = true; + + /* + * Check for the possibility of duplicate order-by clauses --- the + * parser should have removed 'em, but no point in sorting + * redundantly. + */ + numsortkeys = add_sort_column(tle->resno, sortcl->sortop, + sortcl->nulls_first, + numsortkeys, + sortColIdx, sortOperators, nullsFirst); + } + } + + if (distinctcls) + { + /* + * Validate distinct clause + * We have to sort tuples to filter duplicates, and if ORDER BY clause + * is already present the sort order specified here may be incompatible + * with order needed for distinct. + * + * To be compatible, all expressions from DISTINCT must appear at the + * beginning of ORDER BY list. If list of DISTINCT expressions is longer + * then ORDER BY we can make ORDER BY compatible we can append remaining + * expressions from DISTINCT to ORDER BY. Obviously ORDER BY must not + * contain expressions not from the DISTINCT list in this case. + * + * For validation purposes we use column indexes (AttrNumber) to + * identify expressions. May be this is not enough and we should revisit + * the algorithm. + * + * We validate compatibility as follow: + * 1. Make working copy of DISTINCT + * 1a. Remove possible duplicates when copying: do not add expression + * 2. If order by is empty they are already compatible, skip 3 + * 3. Iterate over ORDER BY items + * 3a. If the item is in the working copy delete it from the working + * list. If working list is empty after deletion DISTINCT and + * ORDER BY are compatible, so break the loop. If working list is + * not empty continue iterating + * 3b. ORDER BY clause may contain duplicates. So if we can not found + * expression in the remainder of DISTINCT, probably it has already + * been removed because of duplicate ORDER BY entry. Check original + * DISTINCT clause, if expression is there continue iterating. + * 3c. DISTINCT and ORDER BY are not compatible, emit error + * 4. DISTINCT and ORDER BY are compatible, if we have remaining items + * in the working copy we should append it to the order by list + */ + /* + * Create the list of unique DISTINCT clause expressions + */ + foreach(l, distinctcls) + { + SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist); + bool found = false; + + if (extra_distincts) + { + ListCell *xl; + + foreach(xl, extra_distincts) + { + TargetEntry *xtle = (TargetEntry *) lfirst(xl); + if (xtle->resno == tle->resno) + { + found = true; + break; + } + } + } + + if (!found) + extra_distincts = lappend(extra_distincts, tle); + } + + if (sortcls) + { + foreach(l, sortcls) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist); + bool found = false; + ListCell *xl; + ListCell *prev = NULL; + + /* Search for the expression in the DISTINCT clause */ + foreach(xl, extra_distincts) + { + TargetEntry *xtle = (TargetEntry *) lfirst(xl); + if (xtle->resno == tle->resno) + { + extra_distincts = list_delete_cell(extra_distincts, xl, + prev); + found = true; + break; + } + prev = xl; + } + + /* Probably we've done */ + if (found && list_length(extra_distincts) == 0) + break; + + /* Ensure sort expression is not a duplicate */ + if (!found) + { + foreach(xl, distinctcls) + { + SortGroupClause *xcl = (SortGroupClause *) lfirst(xl); + TargetEntry *xtle = get_sortgroupclause_tle(xcl, sub_tlist); + if (xtle->resno == tle->resno) + { + /* it is a duplicate then */ + found = true; + break; + } + } + } + + /* Give up, we do not support it */ + if (!found) + { + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Such combination of ORDER BY and DISTINCT is not yet supported")))); + } + } + } + /* need to append to the ORDER BY */ + if (list_length(extra_distincts) > 0) + need_reconstruct = true; + + /* + * End of validation, expression to append to ORDER BY are in the + * extra_distincts list + */ + + distinct = (SimpleDistinct *) palloc(sizeof(SimpleDistinct)); + + /* + * We will need at most list_length(distinctcls) sort columns + */ + numdistkeys = list_length(distinctcls); + distColIdx = (AttrNumber *) palloc(numdistkeys * sizeof(AttrNumber)); + eqOperators = (Oid *) palloc(numdistkeys * sizeof(Oid)); + + numdistkeys = 0; + + foreach(l, distinctcls) + { + SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist); + + /* + * Check for the possibility of duplicate order-by clauses --- the + * parser should have removed 'em, but no point in sorting + * redundantly. + */ + numdistkeys = add_distinct_column(tle->resno, + distinctcl->eqop, + numdistkeys, + distColIdx, + eqOperators); + /* append also extra sort operator, if not already there */ + numsortkeys = add_sort_column(tle->resno, + distinctcl->sortop, + distinctcl->nulls_first, + numsortkeys, + sortColIdx, + sortOperators, + nullsFirst); + } + + Assert(numdistkeys > 0); + + distinct->numCols = numdistkeys; + distinct->uniqColIdx = distColIdx; + distinct->eqOperators = eqOperators; + + step->distinct = distinct; + } + + + Assert(numsortkeys > 0); + + sort->numCols = numsortkeys; + sort->sortColIdx = sortColIdx; + sort->sortOperators = sortOperators; + sort->nullsFirst = nullsFirst; + + step->sort = sort; + + if (need_reconstruct) + reconstruct_step_query(query->rtable, sortcls != NULL, extra_distincts, + step); +} + +/* * Build up a QueryPlan to execute on. * * For the prototype, there will only be one step, @@ -1543,17 +1969,16 @@ Query_Plan * GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) { Query_Plan *query_plan = palloc(sizeof(Query_Plan)); - Query_Step *query_step = palloc(sizeof(Query_Step)); + RemoteQuery *query_step = makeNode(RemoteQuery); Query *query; - - query_plan->force_autocommit = false; - query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1); strcpy(query_step->sql_statement, sql_statement); query_step->exec_nodes = NULL; query_step->combine_type = COMBINE_TYPE_NONE; query_step->simple_aggregates = NULL; + query_step->read_only = false; + query_step->force_autocommit = false; query_plan->query_step_list = lappend(NULL, query_step); @@ -1565,11 +1990,16 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) switch (nodeTag(parsetree)) { case T_SelectStmt: + /* Optimize multi-node handling */ + query_step->read_only = true; + /* fallthru */ case T_InsertStmt: case T_UpdateStmt: case T_DeleteStmt: /* just use first one in querytree_list */ query = (Query *) linitial(querytree_list); + /* should copy instead ? */ + query_step->plan.targetlist = query->targetList; /* Perform some checks to make sure we can support the statement */ if (nodeTag(parsetree) == T_SelectStmt) @@ -1633,6 +2063,12 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) } /* + * Add sortring to the step + */ + if (query->sortClause || query->distinctClause) + make_simple_sort_from_sortclauses(query, query_step); + + /* * PG-XC cannot yet support some variations of SQL statements. * We perform some checks to at least catch common cases */ @@ -1658,15 +2094,6 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) ereport(ERROR, (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("Multi-node LIMIT not yet supported")))); - if (query->sortClause && StrictSelectChecking) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node ORDER BY not yet supported")))); - /* PGXCTODO - check if first column partitioning column */ - if (query->distinctClause) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node DISTINCT`not yet supported")))); } } break; @@ -1686,7 +2113,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) case T_DropdbStmt: case T_VacuumStmt: query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - query_plan->force_autocommit = true; + query_step->force_autocommit = true; break; case T_DropPropertyStmt: @@ -1864,7 +2291,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) * Free Query_Step struct */ static void -free_query_step(Query_Step *query_step) +free_query_step(RemoteQuery *query_step) { if (query_step == NULL) return; @@ -1894,7 +2321,7 @@ FreeQueryPlan(Query_Plan *query_plan) return; foreach(item, query_plan->query_step_list) - free_query_step((Query_Step *) lfirst(item)); + free_query_step((RemoteQuery *) lfirst(item)); pfree(query_plan->query_step_list); pfree(query_plan); diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile index 7143af5d97..e8753031c2 100644 --- a/src/backend/pgxc/pool/Makefile +++ b/src/backend/pgxc/pool/Makefile @@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = combiner.o datanode.o poolmgr.o poolcomm.o +OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c deleted file mode 100644 index 53e5dfb11c..0000000000 --- a/src/backend/pgxc/pool/combiner.c +++ /dev/null @@ -1,652 +0,0 @@ -/*------------------------------------------------------------------------- - * - * combiner.c - * - * Combine responses from multiple Data Nodes - * - * - * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group - * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation - * - * IDENTIFICATION - * $$ - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" -#include "pgxc/combiner.h" -#include "pgxc/planner.h" -#include "catalog/pg_type.h" -#include "libpq/libpq.h" -#include "libpq/pqformat.h" -#include "utils/builtins.h" -#include "utils/datum.h" - - -/* - * Create a structure to store parameters needed to combine responses from - * multiple connections as well as state information - */ -ResponseCombiner -CreateResponseCombiner(int node_count, CombineType combine_type, - CommandDest dest) -{ - ResponseCombiner combiner; - - /* ResponseComber is a typedef for pointer to ResponseCombinerData */ - combiner = (ResponseCombiner) palloc(sizeof(ResponseCombinerData)); - if (combiner == NULL) - { - /* Out of memory */ - return combiner; - } - - combiner->node_count = node_count; - combiner->combine_type = combine_type; - combiner->dest = dest; - combiner->command_complete_count = 0; - combiner->row_count = 0; - combiner->request_type = REQUEST_TYPE_NOT_DEFINED; - combiner->description_count = 0; - combiner->copy_in_count = 0; - combiner->copy_out_count = 0; - combiner->inErrorState = false; - combiner->initAggregates = true; - combiner->simple_aggregates = NULL; - combiner->copy_file = NULL; - - return combiner; -} - -/* - * Parse out row count from the command status response and convert it to integer - */ -static int -parse_row_count(const char *message, size_t len, int *rowcount) -{ - int digits = 0; - int pos; - - *rowcount = 0; - /* skip \0 string terminator */ - for (pos = 0; pos < len - 1; pos++) - { - if (message[pos] >= '0' && message[pos] <= '9') - { - *rowcount = *rowcount * 10 + message[pos] - '0'; - digits++; - } - else - { - *rowcount = 0; - digits = 0; - } - } - return digits; -} - -/* - * Extract a transition value from data row. Invoke the Input Function - * associated with the transition data type to represent value as a Datum. - * Output parameters value and val_null, receive extracted value and indicate - * whether it is null. - */ -static void -parse_aggregate_value(SimpleAgg *simple_agg, char *col_data, size_t datalen, Datum *value, bool *val_null) -{ - /* Check NULL */ - if (datalen == -1) - { - *value = (Datum) 0; - *val_null = true; - } - else - { - resetStringInfo(&simple_agg->valuebuf); - appendBinaryStringInfo(&simple_agg->valuebuf, col_data, datalen); - *value = InputFunctionCall(&simple_agg->arginputfn, simple_agg->valuebuf.data, simple_agg->argioparam, -1); - *val_null = false; - } -} - -/* - * Initialize the collection value, when agregation is first set up, or for a - * new group (grouping support is not implemented yet) - */ -static void -initialize_collect_aggregates(SimpleAgg *simple_agg) -{ - if (simple_agg->initValueIsNull) - simple_agg->collectValue = simple_agg->initValue; - else - simple_agg->collectValue = datumCopy(simple_agg->initValue, - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - simple_agg->noCollectValue = simple_agg->initValueIsNull; - simple_agg->collectValueNull = simple_agg->initValueIsNull; -} - -/* - * Finalize the aggregate after current group or entire relation is processed - * (grouping support is not implemented yet) - */ -static void -finalize_collect_aggregates(SimpleAgg *simple_agg, Datum *resultVal, bool *resultIsNull) -{ - /* - * Apply the agg's finalfn if one is provided, else return collectValue. - */ - if (OidIsValid(simple_agg->finalfn_oid)) - { - FunctionCallInfoData fcinfo; - - InitFunctionCallInfoData(fcinfo, &(simple_agg->finalfn), 1, - (void *) simple_agg, NULL); - fcinfo.arg[0] = simple_agg->collectValue; - fcinfo.argnull[0] = simple_agg->collectValueNull; - if (fcinfo.flinfo->fn_strict && simple_agg->collectValueNull) - { - /* don't call a strict function with NULL inputs */ - *resultVal = (Datum) 0; - *resultIsNull = true; - } - else - { - *resultVal = FunctionCallInvoke(&fcinfo); - *resultIsNull = fcinfo.isnull; - } - } - else - { - *resultVal = simple_agg->collectValue; - *resultIsNull = simple_agg->collectValueNull; - } -} - -/* - * Given new input value(s), advance the transition function of an aggregate. - * - * The new values (and null flags) have been preloaded into argument positions - * 1 and up in fcinfo, so that we needn't copy them again to pass to the - * collection function. No other fields of fcinfo are assumed valid. - * - * It doesn't matter which memory context this is called in. - */ -static void -advance_collect_function(SimpleAgg *simple_agg, FunctionCallInfoData *fcinfo) -{ - Datum newVal; - - if (simple_agg->transfn.fn_strict) - { - /* - * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. - */ - if (fcinfo->argnull[1]) - return; - if (simple_agg->noCollectValue) - { - /* - * result has not been initialized - * We must copy the datum into result if it is pass-by-ref. We - * do not need to pfree the old result, since it's NULL. - */ - simple_agg->collectValue = datumCopy(fcinfo->arg[1], - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - simple_agg->collectValueNull = false; - simple_agg->noCollectValue = false; - return; - } - if (simple_agg->collectValueNull) - { - /* - * Don't call a strict function with NULL inputs. Note it is - * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. - */ - return; - } - } - - /* - * OK to call the transition function - */ - InitFunctionCallInfoData(*fcinfo, &(simple_agg->transfn), 2, (void *) simple_agg, NULL); - fcinfo->arg[0] = simple_agg->collectValue; - fcinfo->argnull[0] = simple_agg->collectValueNull; - newVal = FunctionCallInvoke(fcinfo); - - /* - * If pass-by-ref datatype, must copy the new value into aggcontext and - * pfree the prior transValue. But if transfn returned a pointer to its - * first input, we don't need to do anything. - */ - if (!simple_agg->transtypeByVal && - DatumGetPointer(newVal) != DatumGetPointer(simple_agg->collectValue)) - { - if (!fcinfo->isnull) - { - newVal = datumCopy(newVal, - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - } - if (!simple_agg->collectValueNull) - pfree(DatumGetPointer(simple_agg->collectValue)); - } - - simple_agg->collectValue = newVal; - simple_agg->collectValueNull = fcinfo->isnull; -} - -/* - * Handle response message and update combiner's state. - * This function contains main combiner logic - */ -int -CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t len) -{ - int digits = 0; - - /* Ignore anything if we have encountered error */ - if (combiner->inErrorState) - return EOF; - - switch (msg_type) - { - case 'c': /* CopyOutCommandComplete */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - /* Just do nothing, close message is managed by the coordinator */ - combiner->copy_out_count++; - break; - case 'C': /* CommandComplete */ - /* - * If we did not receive description we are having rowcount or OK - * response - */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COMMAND; - /* Extract rowcount */ - if (combiner->combine_type != COMBINE_TYPE_NONE) - { - int rowcount; - digits = parse_row_count(msg_body, len, &rowcount); - if (digits > 0) - { - /* Replicated write, make sure they are the same */ - if (combiner->combine_type == COMBINE_TYPE_SAME) - { - if (combiner->command_complete_count) - { - if (rowcount != combiner->row_count) - /* There is a consistency issue in the database with the replicated table */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Write to replicated table returned different results from the data nodes"))); - } - else - /* first result */ - combiner->row_count = rowcount; - } - else - combiner->row_count += rowcount; - } - else - combiner->combine_type = COMBINE_TYPE_NONE; - } - if (++combiner->command_complete_count == combiner->node_count) - { - - if (combiner->simple_aggregates - /* - * Aggregates has not been initialized - that means - * no rows received from data nodes, nothing to send - * It is possible if HAVING clause is present - */ - && !combiner->initAggregates) - { - /* Build up and send a datarow with aggregates */ - StringInfo dataRowBuffer = makeStringInfo(); - ListCell *lc; - - /* Number of fields */ - pq_sendint(dataRowBuffer, list_length(combiner->simple_aggregates), 2); - - foreach (lc, combiner->simple_aggregates) - { - SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); - Datum resultVal; - bool resultIsNull; - - finalize_collect_aggregates(simple_agg, &resultVal, &resultIsNull); - /* Aggregation result */ - if (resultIsNull) - { - pq_sendint(dataRowBuffer, -1, 4); - } - else - { - char *text = OutputFunctionCall(&simple_agg->resoutputfn, resultVal); - size_t len = strlen(text); - pq_sendint(dataRowBuffer, len, 4); - pq_sendtext(dataRowBuffer, text, len); - } - } - pq_putmessage('D', dataRowBuffer->data, dataRowBuffer->len); - pfree(dataRowBuffer->data); - pfree(dataRowBuffer); - } - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - { - if (combiner->combine_type == COMBINE_TYPE_NONE) - { - pq_putmessage(msg_type, msg_body, len); - } - else - { - char command_complete_buffer[256]; - - /* Truncate msg_body to get base string */ - msg_body[len - digits - 1] = '\0'; - len = sprintf(command_complete_buffer, "%s%d", msg_body, combiner->row_count) + 1; - pq_putmessage(msg_type, command_complete_buffer, len); - } - } - } - break; - case 'T': /* RowDescription */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_QUERY; - if (combiner->request_type != REQUEST_TYPE_QUERY) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy first */ - if (combiner->description_count++ == 0) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'S': /* ParameterStatus (SET command) */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_QUERY; - if (combiner->request_type != REQUEST_TYPE_QUERY) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy last */ - if (++combiner->description_count == combiner->node_count) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'G': /* CopyInResponse */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_IN; - if (combiner->request_type != REQUEST_TYPE_COPY_IN) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy first */ - if (combiner->copy_in_count++ == 0) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'H': /* CopyOutResponse */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* - * The normal PG code will output an H message when it runs in the - * coordinator, so do not proxy message here, just count it. - */ - combiner->copy_out_count++; - break; - case 'd': /* CopyOutDataRow */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - - /* Inconsistent responses */ - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - - /* If there is a copy file, data has to be sent to the local file */ - if (combiner->copy_file) - { - /* write data to the copy file */ - char *data_row; - data_row = (char *) palloc0(len); - memcpy(data_row, msg_body, len); - - fwrite(data_row, 1, len, combiner->copy_file); - break; - } - /* - * In this case data is sent back to the client - */ - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - { - StringInfo data_buffer; - - data_buffer = makeStringInfo(); - - pq_sendtext(data_buffer, msg_body, len); - pq_putmessage(msg_type, - data_buffer->data, - data_buffer->len); - - pfree(data_buffer->data); - pfree(data_buffer); - } - break; - case 'D': /* DataRow */ - if (!combiner->simple_aggregates) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - else - { - ListCell *lc; - char **col_values; - int *col_value_len; - uint16 col_count; - int i, cur = 0; - - /* Get values from the data row into array to speed up access */ - memcpy(&col_count, msg_body, 2); - col_count = ntohs(col_count); - cur += 2; - - col_values = (char **) palloc0(col_count * sizeof(char *)); - col_value_len = (int *) palloc0(col_count * sizeof(int)); - for (i = 0; i < col_count; i++) - { - int n32; - - memcpy(&n32, msg_body + cur, 4); - col_value_len[i] = ntohl(n32); - cur += 4; - - if (col_value_len[i] != -1) - { - col_values[i] = msg_body + cur; - cur += col_value_len[i]; - } - } - - if (combiner->initAggregates) - { - foreach (lc, combiner->simple_aggregates) - initialize_collect_aggregates((SimpleAgg *) lfirst(lc)); - - combiner->initAggregates = false; - } - - foreach (lc, combiner->simple_aggregates) - { - SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); - FunctionCallInfoData fcinfo; - - parse_aggregate_value(simple_agg, - col_values[simple_agg->column_pos], - col_value_len[simple_agg->column_pos], - fcinfo.arg + 1, - fcinfo.argnull + 1); - - advance_collect_function(simple_agg, &fcinfo); - } - pfree(col_values); - pfree(col_value_len); - } - break; - case 'E': /* ErrorResponse */ - combiner->inErrorState = true; - /* fallthru */ - case 'A': /* NotificationResponse */ - case 'N': /* NoticeResponse */ - /* Proxy error message back if specified, - * or if doing internal primary copy - */ - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - break; - case 'I': /* EmptyQuery */ - default: - /* Unexpected message */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - return 0; -} - -/* - * Examine the specified combiner state and determine if command was completed - * successfully - */ -static bool -validate_combiner(ResponseCombiner combiner) -{ - /* There was error message while combining */ - if (combiner->inErrorState) - return false; - /* Check if state is defined */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - return false; - /* Check all nodes completed */ - if ((combiner->request_type == REQUEST_TYPE_COMMAND - || combiner->request_type == REQUEST_TYPE_QUERY) - && combiner->command_complete_count != combiner->node_count) - return false; - - /* Check count of description responses */ - if (combiner->request_type == REQUEST_TYPE_QUERY - && combiner->description_count != combiner->node_count) - return false; - - /* Check count of copy-in responses */ - if (combiner->request_type == REQUEST_TYPE_COPY_IN - && combiner->copy_in_count != combiner->node_count) - return false; - - /* Check count of copy-out responses */ - if (combiner->request_type == REQUEST_TYPE_COPY_OUT - && combiner->copy_out_count != combiner->node_count) - return false; - - /* Add other checks here as needed */ - - /* All is good if we are here */ - return true; -} - -/* - * Validate combiner and release storage freeing allocated memory - */ -bool -ValidateAndCloseCombiner(ResponseCombiner combiner) -{ - bool valid = validate_combiner(combiner); - - pfree(combiner); - - return valid; -} - -/* - * Validate combiner and reset storage - */ -bool -ValidateAndResetCombiner(ResponseCombiner combiner) -{ - bool valid = validate_combiner(combiner); - - combiner->command_complete_count = 0; - combiner->row_count = 0; - combiner->request_type = REQUEST_TYPE_NOT_DEFINED; - combiner->description_count = 0; - combiner->copy_in_count = 0; - combiner->copy_out_count = 0; - combiner->inErrorState = false; - combiner->simple_aggregates = NULL; - combiner->copy_file = NULL; - - return valid; -} - -/* - * Close combiner and free allocated memory, if it is not needed - */ -void -CloseCombiner(ResponseCombiner combiner) -{ - if (combiner) - pfree(combiner); -} - -/* - * Assign combiner aggregates - */ -void -AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates) -{ - combiner->simple_aggregates = simple_aggregates; -} diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 6a1aba8190..517b1e4d78 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -15,6 +15,7 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" #include <sys/select.h> #include <sys/time.h> #include <sys/types.h> @@ -22,166 +23,33 @@ #include <string.h> #include <unistd.h> #include <errno.h> -#include "pgxc/poolmgr.h" #include "access/gtm.h" #include "access/transam.h" #include "access/xact.h" -#include "postgres.h" -#include "utils/snapmgr.h" -#include "pgxc/pgxc.h" #include "gtm/gtm_c.h" #include "pgxc/datanode.h" #include "pgxc/locator.h" -#include "../interfaces/libpq/libpq-fe.h" +#include "pgxc/pgxc.h" +#include "pgxc/poolmgr.h" +#include "tcop/dest.h" #include "utils/elog.h" #include "utils/memutils.h" - +#include "utils/snapmgr.h" +#include "../interfaces/libpq/libpq-fe.h" #define NO_SOCKET -1 -/* - * Buffer size does not affect performance significantly, just do not allow - * connection buffer grows infinitely - */ -#define COPY_BUFFER_SIZE 8192 -#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 - static int node_count = 0; static DataNodeHandle *handles = NULL; -static bool autocommit = true; -static DataNodeHandle **write_node_list = NULL; -static int write_node_count = 0; -static DataNodeHandle **get_handles(List *nodelist); -static int get_transaction_nodes(DataNodeHandle **connections); -static void release_handles(void); - -static void data_node_init(DataNodeHandle *handle, int sock); +static void data_node_init(DataNodeHandle *handle, int sock, int nodenum); static void data_node_free(DataNodeHandle *handle); -static int data_node_begin(int conn_count, DataNodeHandle **connections, CommandDest dest, GlobalTransactionId gxid); -static int data_node_commit(int conn_count, DataNodeHandle **connections, CommandDest dest); -static int data_node_rollback(int conn_count, DataNodeHandle **connections, CommandDest dest); - -static int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle); -static int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle); - -static int data_node_send_query(DataNodeHandle *handle, const char *query); -static int data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid); -static int data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot); - -static void add_error_message(DataNodeHandle *handle, const char *message); - -static int data_node_read_data(DataNodeHandle *conn); -static int handle_response(DataNodeHandle *conn, ResponseCombiner combiner); - -static int get_int(DataNodeHandle *conn, size_t len, int *out); -static int get_char(DataNodeHandle *conn, char *out); - -static void clear_write_node_list(); - -#define MAX_STATEMENTS_PER_TRAN 10 - -/* Variables to collect statistics */ -static int total_transactions = 0; -static int total_statements = 0; -static int total_autocommit = 0; -static int nonautocommit_2pc = 0; -static int autocommit_2pc = 0; -static int current_tran_statements = 0; -static int *statements_per_transaction = NULL; -static int *nodes_per_transaction = NULL; - -/* - * statistics collection: count a statement - */ -static void -stat_statement() -{ - total_statements++; - current_tran_statements++; -} - -/* - * To collect statistics: count a transaction - */ -static void -stat_transaction(int node_count) -{ - total_transactions++; - if (autocommit) - total_autocommit++; - - if (!statements_per_transaction) - { - statements_per_transaction = (int *) malloc((MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); - memset(statements_per_transaction, 0, (MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); - } - - if (current_tran_statements > MAX_STATEMENTS_PER_TRAN) - statements_per_transaction[MAX_STATEMENTS_PER_TRAN]++; - else - statements_per_transaction[current_tran_statements]++; - - current_tran_statements = 0; - if (node_count > 0 && node_count <= NumDataNodes) - { - if (!nodes_per_transaction) - { - nodes_per_transaction = (int *) malloc(NumDataNodes * sizeof(int)); - memset(nodes_per_transaction, 0, NumDataNodes * sizeof(int)); - } - nodes_per_transaction[node_count - 1]++; - } -} - - -/* - * To collect statistics: count a two-phase commit on nodes - */ -static void -stat_2pc(void) -{ - if (autocommit) - autocommit_2pc++; - else - nonautocommit_2pc++; -} +static int get_int(DataNodeHandle * conn, size_t len, int *out); +static int get_char(DataNodeHandle * conn, char *out); /* - * Output collected statistics to the log - */ -static void -stat_log(void) -{ - elog(DEBUG1, "Total Transactions: %d Total Statements: %d", total_transactions, total_statements); - elog(DEBUG1, "Autocommit: %d 2PC for Autocommit: %d 2PC for non-Autocommit: %d", - total_autocommit, autocommit_2pc, nonautocommit_2pc); - if (total_transactions) - { - if (statements_per_transaction) - { - int i; - - for (i = 0; i < MAX_STATEMENTS_PER_TRAN; i++) - elog(DEBUG1, "%d Statements per Transaction: %d (%d%%)", - i, statements_per_transaction[i], statements_per_transaction[i] * 100 / total_transactions); - } - elog(DEBUG1, "%d+ Statements per Transaction: %d (%d%%)", - MAX_STATEMENTS_PER_TRAN, statements_per_transaction[MAX_STATEMENTS_PER_TRAN], statements_per_transaction[MAX_STATEMENTS_PER_TRAN] * 100 / total_transactions); - if (nodes_per_transaction) - { - int i; - - for (i = 0; i < NumDataNodes; i++) - elog(DEBUG1, "%d Nodes per Transaction: %d (%d%%)", - i + 1, nodes_per_transaction[i], nodes_per_transaction[i] * 100 / total_transactions); - } - } -} - -/* * Allocate and initialize memory to store DataNode handles. */ void @@ -325,8 +193,9 @@ data_node_free(DataNodeHandle *handle) * Structure stores state info and I/O buffers */ static void -data_node_init(DataNodeHandle *handle, int sock) +data_node_init(DataNodeHandle *handle, int sock, int nodenum) { + handle->nodenum = nodenum; handle->sock = sock; handle->transaction_status = 'I'; handle->state = DN_CONNECTION_STATE_IDLE; @@ -339,136 +208,99 @@ data_node_init(DataNodeHandle *handle, int sock) /* - * Handle responses from the Data node connections + * Wait while at least one of specified connections has data available and read + * the data into the buffer */ -static void -data_node_receive_responses(const int conn_count, DataNodeHandle **connections, - struct timeval *timeout, ResponseCombiner combiner) +void +data_node_receive(const int conn_count, + DataNodeHandle ** connections, struct timeval * timeout) { - int count = conn_count; - DataNodeHandle *to_receive[conn_count]; - - /* make a copy of the pointers to the connections */ - memcpy(to_receive, connections, conn_count * sizeof(DataNodeHandle *)); + int i, + res_select, + nfds = 0; + fd_set readfds; - /* - * Read results. - * Note we try and read from data node connections even if there is an error on one, - * so as to avoid reading incorrect results on the next statement. - * It might be better to just destroy these connections and tell the pool manager. - */ - while (count > 0) + FD_ZERO(&readfds); + for (i = 0; i < conn_count; i++) { - int i, - res_select, - nfds = 0; - fd_set readfds; - - FD_ZERO(&readfds); - for (i = 0; i < count; i++) - { - /* note if a connection has error */ - if (!to_receive[i] - || to_receive[i]->state == DN_CONNECTION_STATE_ERROR_FATAL) - { - /* Handling is done, do not track this connection */ - count--; - - /* Move last connection in its place */ - if (i < count) - { - to_receive[i] = to_receive[count]; - /* stay on the current position */ - i--; - } - continue; - } + /* If connection finised sending do not wait input from it */ + if (connections[i]->state == DN_CONNECTION_STATE_IDLE + || connections[i]->state == DN_CONNECTION_STATE_HAS_DATA) + continue; - /* prepare select params */ - if (nfds < to_receive[i]->sock) - nfds = to_receive[i]->sock; + /* prepare select params */ + if (nfds < connections[i]->sock) + nfds = connections[i]->sock; - FD_SET (to_receive[i]->sock, &readfds); - } + FD_SET(connections[i]->sock, &readfds); + } - /* Make sure we still have valid connections */ - if (count == 0) - break; + /* + * Return if we do not have connections to receive input + */ + if (nfds == 0) + return; retry: - res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); - if (res_select < 0) - { - /* error - retry if EINTR or EAGAIN */ - if (errno == EINTR || errno == EAGAIN) - goto retry; - - /* - * PGXCTODO - we may want to close the connections and notify the - * pooler that these are invalid. - */ - if (errno == EBADF) - { - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() bad file descriptor set"))); - } - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() error: %d", errno))); - } + res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); + if (res_select < 0) + { + /* error - retry if EINTR or EAGAIN */ + if (errno == EINTR || errno == EAGAIN) + goto retry; - if (res_select == 0) + /* + * PGXCTODO - we may want to close the connections and notify the + * pooler that these are invalid. + */ + if (errno == EBADF) { - /* Handle timeout */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("timeout while waiting for response"))); + errmsg("select() bad file descriptor set"))); } + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("select() error: %d", errno))); + } + + if (res_select == 0) + { + /* Handle timeout */ + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("timeout while waiting for response"))); + } - /* read data */ - for (i = 0; i < count; i++) + /* read data */ + for (i = 0; i < conn_count; i++) + { + DataNodeHandle *conn = connections[i]; + + if (FD_ISSET(conn->sock, &readfds)) { - DataNodeHandle *conn = to_receive[i]; + int read_status = data_node_read_data(conn); - if (FD_ISSET(conn->sock, &readfds)) + if (read_status == EOF || read_status < 0) { - int read_status = data_node_read_data(conn); - - if (read_status == EOF || read_status < 0) - { - /* PGXCTODO - we should notify the pooler to destroy the connections */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on datanode connection"))); - } + /* PGXCTODO - we should notify the pooler to destroy the connections */ + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on datanode connection"))); } - - if (conn->inStart < conn->inEnd) + else { - if (handle_response(conn, combiner) == 0 - || conn->state == DN_CONNECTION_STATE_ERROR_READY - || conn->state == DN_CONNECTION_STATE_ERROR_FATAL) - { - /* Handling is done, do not track this connection */ - count--; - /* Move last connection in place */ - if (i < count) - { - to_receive[i] = to_receive[count]; - /* stay on the current position */ - i--; - } - } + conn->state = DN_CONNECTION_STATE_HAS_DATA; } } } } + /* * Read up incoming messages from the Data ndoe connection */ -static int +int data_node_read_data(DataNodeHandle *conn) { int someread = 0; @@ -601,11 +433,12 @@ retry: return 0; } + /* * Get one character from the connection buffer and advance cursor */ static int -get_char(DataNodeHandle *conn, char *out) +get_char(DataNodeHandle * conn, char *out) { if (conn->inCursor < conn->inEnd) { @@ -647,379 +480,62 @@ get_int(DataNodeHandle *conn, size_t len, int *out) return 0; } -/* - * Read next message from the connection and update the combiner accordingly - * If we are in an error state we just consume the messages, and do not proxy - * Long term, we should look into cancelling executing statements - * and closing the connections. - */ -static int -handle_response(DataNodeHandle *conn, ResponseCombiner combiner) -{ - char msg_type; - int msg_len; - - for (;;) - { - /* try to read the message, return if not enough data */ - conn->inCursor = conn->inStart; - - /* - * Make sure we receive a complete message, otherwise, return EOF, and - * we will be called again after more data has been read. - */ - if (conn->inEnd - conn->inCursor < 5) - return EOF; - - if (get_char(conn, &msg_type)) - return EOF; - - if (get_int(conn, 4, &msg_len)) - return EOF; - - msg_len -= 4; - - if (conn->inEnd - conn->inCursor < msg_len) - { - ensure_in_buffer_capacity(conn->inCursor + (size_t) msg_len, conn); - return EOF; - } - - /* TODO handle other possible responses */ - switch (msg_type) - { - case 'c': /* CopyToCommandComplete */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COMPLETED; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'C': /* CommandComplete */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COMPLETED; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'T': /* RowDescription */ - case 'D': /* DataRow */ - case 'S': /* ParameterStatus */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'G': /* CopyInResponse */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COPY_IN; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - /* Done, return to caller to let it know the data can be passed in */ - conn->inStart = conn->inCursor; - conn->state = DN_CONNECTION_STATE_COPY_IN; - return 0; - case 'H': /* CopyOutResponse */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COPY_OUT; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - conn->inStart = conn->inCursor; - conn->state = DN_CONNECTION_STATE_COPY_OUT; - return 0; - case 'd': /* CopyOutDataRow */ - /* No need to parse, just send a row back to client */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COPY_OUT; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'E': /* ErrorResponse */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - conn->inStart = conn->inCursor; - conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; - /* - * Do not return with an error, we still need to consume Z, - * ready-for-query - */ - break; - case 'A': /* NotificationResponse */ - case 'N': /* NoticeResponse */ - conn->inCursor += msg_len; - - /* - * Ignore these to prevent multiple messages, one from each - * node. Coordinator will send one for DDL anyway - */ - break; - case 'Z': /* ReadyForQuery */ - get_char(conn, &conn->transaction_status); - conn->inStart = conn->inCursor; - if (conn->state == DN_CONNECTION_STATE_ERROR_NOT_READY) - { - conn->state = DN_CONNECTION_STATE_ERROR_READY; - return EOF; - } else - conn->state = DN_CONNECTION_STATE_IDLE; - return 0; - case 'I': /* EmptyQuery */ - default: - /* sync lost? */ - conn->state = DN_CONNECTION_STATE_ERROR_FATAL; - return EOF; - } - conn->inStart = conn->inCursor; - } - /* Keep compiler quiet */ - return EOF; -} /* - * Send BEGIN command to the Data nodes and receive responses + * get_message + * If connection has enough data read entire message from the connection buffer + * and returns message type. Message data and data length are returned as + * var parameters. + * If buffer does not have enough data leaves cursor unchanged, changes + * connection status to DN_CONNECTION_STATE_QUERY indicating it needs to + * receive more and returns \0 + * conn - connection to read from + * len - returned length of the data where msg is pointing to + * msg - returns pointer to memory in the incoming buffer. The buffer probably + * will be overwritten upon next receive, so if caller wants to refer it later + * it should make a copy. */ -static int -data_node_begin(int conn_count, DataNodeHandle **connections, CommandDest dest, GlobalTransactionId gxid) +char +get_message(DataNodeHandle *conn, int *len, char **msg) { - int i; - struct timeval *timeout = NULL; - ResponseCombiner combiner; + char msgtype; - /* Send BEGIN */ - for (i = 0; i < conn_count; i++) + if (get_char(conn, &msgtype) || get_int(conn, 4, len)) { - if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid)) - return EOF; - - if (data_node_send_query(connections[i], "BEGIN")) - return EOF; + /* Successful get_char would move cursor, restore position */ + conn->inCursor = conn->inStart; + return '\0'; } - combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, dest); - - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - - /* Verify status */ - return ValidateAndCloseCombiner(combiner) ? 0 : EOF; -} - -/* Clears the write node list */ -static void -clear_write_node_list(void) -{ - /* we just malloc once and use counter */ - if (write_node_list == NULL) - write_node_list = (DataNodeHandle **) malloc(NumDataNodes * sizeof(DataNodeHandle *)); - - write_node_count = 0; -} - - -/* - * Switch autocommmit mode off, so all subsequent statements will be in the same transaction - */ -void -DataNodeBegin(void) -{ - autocommit = false; - clear_write_node_list(); -} - - -/* - * Commit current transaction, use two-phase commit if necessary - */ -int -DataNodeCommit(CommandDest dest) -{ - int res; - int tran_count; - DataNodeHandle *connections[node_count]; - /* Quick check to make sure we have connections */ - if (node_count == 0) - goto finish; - - /* gather connections to commit */ - tran_count = get_transaction_nodes(connections); - - /* - * If we do not have open transactions we have nothing to commit, just - * report success - */ - if (tran_count == 0) - goto finish; - - res = data_node_commit(tran_count, connections, dest); - -finish: - /* In autocommit mode statistics is collected in DataNodeExec */ - if (!autocommit) - stat_transaction(node_count); - if (!PersistentConnections) - release_handles(); - autocommit = true; - clear_write_node_list(); - return res; -} - + *len -= 4; -/* - * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses - */ -static int -data_node_commit(int conn_count, DataNodeHandle **connections, CommandDest dest) -{ - int i; - struct timeval *timeout = NULL; - char buffer[256]; - GlobalTransactionId gxid = InvalidGlobalTransactionId; - int result = 0; - ResponseCombiner combiner = NULL; - - - /* can set this to false to disable temporarily */ - /* bool do2PC = conn_count > 1; */ - - /* - * Only use 2PC if more than one node was written to. Otherwise, just send - * COMMIT to all - */ - bool do2PC = write_node_count > 1; - - /* Extra XID for Two Phase Commit */ - GlobalTransactionId two_phase_xid = 0; - - if (do2PC) + if (conn->inCursor + *len > conn->inEnd) { - stat_2pc(); - /* - * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here, - * but since we need 2pc, we surely have sent down a command and got - * gxid for it. Hence GetCurrentGlobalTransactionId() just returns - * already allocated gxid + * Not enough data in the buffer, we should read more. + * Reading function will discard already consumed data in the buffer + * till conn->inBegin. Then we want the message that is partly in the + * buffer now has been read completely, to avoid extra read/handle + * cycles. The space needed is 1 byte for message type, 4 bytes for + * message length and message itself which size is currently in *len. + * The buffer may already be large enough, in this case the function + * ensure_in_buffer_capacity() will immediately return */ - gxid = GetCurrentGlobalTransactionId(); - - sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid); - /* Send PREPARE */ - for (i = 0; i < conn_count; i++) - { - if (data_node_send_query(connections[i], buffer)) - return EOF; - } - - combiner = CreateResponseCombiner(conn_count, - COMBINE_TYPE_NONE, dest); - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - - /* Reset combiner */ - if (!ValidateAndResetCombiner(combiner)) - { - result = EOF; - } - } - - if (!do2PC) - strcpy(buffer, "COMMIT"); - else - { - if (result) - sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid); - else - sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); - - /* We need to use a new xid, the data nodes have reset */ - two_phase_xid = BeginTranGTM(); - for (i = 0; i < conn_count; i++) - { - if (data_node_send_gxid(connections[i], two_phase_xid)) - { - add_error_message(connections[i], "Can not send request"); - result = EOF; - goto finish; - } - } - } - - /* Send COMMIT */ - for (i = 0; i < conn_count; i++) - { - if (data_node_send_query(connections[i], buffer)) - { - result = EOF; - goto finish; - } + ensure_in_buffer_capacity(5 + (size_t) *len, conn); + conn->state == DN_CONNECTION_STATE_QUERY; + conn->inCursor = conn->inStart; + return '\0'; } - if (!combiner) - combiner = CreateResponseCombiner(conn_count, - COMBINE_TYPE_NONE, dest); - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - result = ValidateAndCloseCombiner(combiner) ? result : EOF; - -finish: - if (do2PC) - CommitTranGTM((GlobalTransactionId) two_phase_xid); - - return result; -} - - -/* - * Rollback current transaction - */ -int -DataNodeRollback(CommandDest dest) -{ - int res = 0; - int tran_count; - DataNodeHandle *connections[node_count]; - - /* Quick check to make sure we have connections */ - if (node_count == 0) - goto finish; - - /* gather connections to rollback */ - tran_count = get_transaction_nodes(connections); - - /* - * If we do not have open transactions we have nothing to rollback just - * report success - */ - if (tran_count == 0) - goto finish; - - res = data_node_rollback(tran_count, connections, dest); - -finish: - /* In autocommit mode statistics is collected in DataNodeExec */ - if (!autocommit) - stat_transaction(node_count); - if (!PersistentConnections) - release_handles(); - autocommit = true; - clear_write_node_list(); - return res; + *msg = conn->inBuffer + conn->inCursor; + conn->inCursor += *len; + conn->inStart = conn->inCursor; + return msgtype; } /* Release all data node connections back to pool and release occupied memory */ -static void +void release_handles(void) { int i; @@ -1041,320 +557,11 @@ release_handles(void) /* - * Send ROLLBACK command down to the Data nodes and handle responses - */ -static int -data_node_rollback(int conn_count, DataNodeHandle **connections, CommandDest dest) -{ - int i; - struct timeval *timeout = NULL; - int result = 0; - ResponseCombiner combiner; - - /* Send ROLLBACK - */ - for (i = 0; i < conn_count; i++) - { - if (data_node_send_query(connections[i], "ROLLBACK")) - result = EOF; - } - - combiner = CreateResponseCombiner(conn_count, - COMBINE_TYPE_NONE, dest); - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - - /* Verify status */ - return ValidateAndCloseCombiner(combiner) ? 0 : EOF; -} - - -/* - * Execute specified statement on specified Data nodes, combine responses and - * send results back to the client - * - * const char *query - SQL string to execute - * List *primarynode - if a write operation on a replicated table, the primary node - * List *nodelist - the nodes to execute on (excludes primary, if set in primarynode - * CommandDest dest - destination for results - * Snapshot snapshot - snapshot to use - * bool force_autocommit - force autocommit - * List *simple_aggregates - list of simple aggregates to execute - * bool is_read_only - if this is a read-only query - */ -int -DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, - CommandDest dest, Snapshot snapshot, bool force_autocommit, - List *simple_aggregates, bool is_read_only) -{ - int i; - int j; - int regular_conn_count; - int total_conn_count; - struct timeval *timeout = NULL; /* wait forever */ - ResponseCombiner combiner = NULL; - int row_count = 0; - int new_count = 0; - bool need_tran; - bool found; - GlobalTransactionId gxid = InvalidGlobalTransactionId; - DataNodeHandle *new_connections[NumDataNodes]; - DataNodeHandle **connections = NULL; - DataNodeHandle **primaryconnection = NULL; - List *nodelist = NIL; - List *primarynode = NIL; - /* add up affected row count by default, override for replicated writes */ - - if (exec_nodes) - { - nodelist = exec_nodes->nodelist; - primarynode = exec_nodes->primarynodelist; - } - - if (list_length(nodelist) == 0) - { - if (primarynode) - regular_conn_count = NumDataNodes - 1; - else - regular_conn_count = NumDataNodes; - } - else - { - regular_conn_count = list_length(nodelist); - } - - total_conn_count = regular_conn_count; - - /* Get connection for primary node, if used */ - if (primarynode) - { - primaryconnection = get_handles(primarynode); - total_conn_count++; - } - - /* Get other connections (non-primary) */ - if (regular_conn_count == 0) - return EOF; - else - { - connections = get_handles(nodelist); - if (!connections) - return EOF; - } - - if (force_autocommit) - need_tran = false; - else - need_tran = !autocommit || total_conn_count > 1; - - elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); - - stat_statement(); - if (autocommit) - stat_transaction(total_conn_count); - - /* We normally clear for transactions, but if autocommit, clear here, too */ - if (autocommit == true) - { - clear_write_node_list(); - } - - /* Check status of connections */ - - /* - * We want to track new "write" nodes, and new nodes in the current - * transaction whether or not they are write nodes. - */ - if (!is_read_only && write_node_count < NumDataNodes) - { - if (primaryconnection) - { - found = false; - for (j = 0; j < write_node_count && !found; j++) - { - if (write_node_list[j] == primaryconnection[0]) - found = true; - } - if (!found) - { - /* Add to transaction wide-list */ - write_node_list[write_node_count++] = primaryconnection[0]; - /* Add to current statement list */ - new_connections[new_count++] = primaryconnection[0]; - } - } - for (i = 0; i < regular_conn_count; i++) - { - found = false; - for (j = 0; j < write_node_count && !found; j++) - { - if (write_node_list[j] == connections[i]) - found = true; - } - if (!found) - { - /* Add to transaction wide-list */ - write_node_list[write_node_count++] = connections[i]; - /* Add to current statement list */ - new_connections[new_count++] = connections[i]; - } - } - /* Check connection state is DN_CONNECTION_STATE_IDLE */ - } - - gxid = GetCurrentGlobalTransactionId(); - - if (!GlobalTransactionIdIsValid(gxid)) - { - pfree(connections); - return EOF; - } - if (new_count > 0 && need_tran) - { - /* Start transaction on connections where it is not started */ - if (data_node_begin(new_count, new_connections, DestNone, gxid)) - { - pfree(connections); - return EOF; - } - } - - /* See if we have a primary nodes, execute on it first before the others */ - if (primaryconnection) - { - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid)) - { - add_error_message(primaryconnection[0], "Can not send request"); - if (connections) - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - return EOF; - } - if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot)) - { - add_error_message(primaryconnection[0], "Can not send request"); - if (connections) - pfree(connections); - pfree(primaryconnection); - return EOF; - } - if (data_node_send_query(primaryconnection[0], query) != 0) - { - add_error_message(primaryconnection[0], "Can not send request"); - if (connections) - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - return EOF; - } - - Assert(combine_type == COMBINE_TYPE_SAME); - - /* - * Create combiner. - * Note that we use the same combiner later with the secondary nodes, - * so that we do not prematurely send a response to the client - * until all nodes have completed execution. - */ - combiner = CreateResponseCombiner(total_conn_count, combine_type, dest); - AssignCombinerAggregates(combiner, simple_aggregates); - - /* Receive responses */ - data_node_receive_responses(1, primaryconnection, timeout, combiner); - /* If we got an error response return immediately */ - if (DN_CONNECTION_STATE_ERROR(primaryconnection[0])) - { - /* We are going to exit, so release combiner */ - CloseCombiner(combiner); - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - if (primaryconnection) - pfree(primaryconnection); - if (connections) - pfree(connections); - return EOF; - } - } - - /* Send query to nodes */ - for (i = 0; i < regular_conn_count; i++) - { - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && data_node_send_gxid(connections[i], gxid)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - return EOF; - } - if (snapshot && data_node_send_snapshot(connections[i], snapshot)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - return EOF; - } - if (data_node_send_query(connections[i], query) != 0) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - return EOF; - } - } - - /* We may already have combiner if it is replicated case with primary data node */ - if (!combiner) - { - combiner = CreateResponseCombiner(regular_conn_count, combine_type, dest); - AssignCombinerAggregates(combiner, simple_aggregates); - } - - /* Receive responses */ - data_node_receive_responses(regular_conn_count, connections, timeout, combiner); - row_count = combiner->row_count; - - /* Check for errors and if primary nodeMake sure primary and secondary nodes were updated the same */ - if (!ValidateAndCloseCombiner(combiner)) - { - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - pfree(connections); - - return EOF; - } - - if (autocommit) - { - if (need_tran) - DataNodeCommit(DestNone); /* PGXCTODO - call CommitTransaction() - * instead? */ - else if (!PersistentConnections) - release_handles(); - } - - /* Verify status? */ - pfree(connections); - return 0; -} - - -/* * Ensure specified amount of data can fit to the incoming buffer and * increase it if necessary */ -static int -ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) +int +ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle) { int newsize = handle->inSize; char *newbuf; @@ -1405,8 +612,8 @@ ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) * Ensure specified amount of data can fit to the outgoing buffer and * increase it if necessary */ -static int -ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) +int +ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle) { int newsize = handle->outSize; char *newbuf; @@ -1456,8 +663,8 @@ ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) /* * Send specified amount of data from the outgoing buffer over the connection */ -static int -send_some(DataNodeHandle * handle, int len) +int +send_some(DataNodeHandle *handle, int len) { char *ptr = handle->outBuffer; int remaining = handle->outEnd; @@ -1556,7 +763,7 @@ send_some(DataNodeHandle * handle, int len) * This method won't return until connection buffer is empty or error occurs * To ensure all data are on the wire before waiting for response */ -static int +int data_node_flush(DataNodeHandle *handle) { while (handle->outEnd) @@ -1573,8 +780,8 @@ data_node_flush(DataNodeHandle *handle) /* * Send specified statement down to the Data node */ -static int -data_node_send_query(DataNodeHandle *handle, const char *query) +int +data_node_send_query(DataNodeHandle * handle, const char *query) { int strLen = strlen(query) + 1; @@ -1595,7 +802,7 @@ data_node_send_query(DataNodeHandle *handle, const char *query) memcpy(handle->outBuffer + handle->outEnd, query, strLen); handle->outEnd += strLen; - handle->state = DN_CONNECTION_STATE_BUSY; + handle->state = DN_CONNECTION_STATE_QUERY; return data_node_flush(handle); } @@ -1604,7 +811,7 @@ data_node_send_query(DataNodeHandle *handle, const char *query) /* * Send the GXID down to the Data node */ -static int +int data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid) { int msglen = 8; @@ -1632,7 +839,7 @@ data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid) /* * Send the snapshot down to the Data node */ -static int +int data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot) { int msglen; @@ -1686,11 +893,11 @@ data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot) * Add another message to the list of errors to be returned back to the client * at the convenient time */ -static void +void add_error_message(DataNodeHandle *handle, const char *message) { handle->transaction_status = 'E'; - handle->state = DN_CONNECTION_STATE_ERROR_READY; + handle->state = DN_CONNECTION_STATE_IDLE; if (handle->error) { /* PGXCTODO append */ @@ -1706,16 +913,19 @@ add_error_message(DataNodeHandle *handle, const char *message) * Special case is empty or NIL nodeList, in this case return all the nodes. * The returned list should be pfree'd when no longer needed. */ -static DataNodeHandle ** +DataNodeHandle ** get_handles(List *nodelist) { DataNodeHandle **result; ListCell *node_list_item; List *allocate = NIL; + MemoryContext old_context; /* index of the result array */ int i = 0; + /* Handles should be there while transaction lasts */ + old_context = MemoryContextSwitchTo(TopTransactionContext); /* If node list is empty execute request on current nodes */ if (list_length(nodelist) == 0) { @@ -1757,8 +967,8 @@ get_handles(List *nodelist) { int node = lfirst_int(node_list_item); - if (node > NumDataNodes || node <= 0) - elog(ERROR, "Node number: %d passed is not a known node", node); + Assert(node > 0 && node <= NumDataNodes); + result[i++] = &handles[node - 1]; if (handles[node - 1].sock == NO_SOCKET) allocate = lappend_int(allocate, node); @@ -1781,13 +991,16 @@ get_handles(List *nodelist) int node = lfirst_int(node_list_item); int fdsock = fds[j++]; - data_node_init(&handles[node - 1], fdsock); + data_node_init(&handles[node - 1], fdsock, node); node_count++; } pfree(fds); list_free(allocate); } + /* restore context */ + MemoryContextSwitchTo(old_context); + return result; } @@ -1802,8 +1015,8 @@ get_handles(List *nodelist) * The function returns number of pointers written to the connections array. * Remaining items in the array, if any, will be kept unchanged */ -static int -get_transaction_nodes(DataNodeHandle ** connections) +int +get_transaction_nodes(DataNodeHandle **connections) { int tran_count = 0; int i; @@ -1819,503 +1032,3 @@ get_transaction_nodes(DataNodeHandle ** connections) return tran_count; } - - -/* - * Called when the backend is ending. - */ -void -DataNodeCleanAndRelease(int code, Datum arg) -{ - /* Rollback on Data Nodes */ - if (IsTransactionState()) - { - DataNodeRollback(DestNone); - - /* Rollback on GTM if transaction id opened. */ - RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny()); - } - - /* Release data node connections */ - release_handles(); - - /* Close connection with GTM */ - CloseGTM(); - - /* Dump collected statistics to the log */ - stat_log(); -} - -/* - * Begin COPY command - * The copy_connections array must have room for NumDataNodes items - */ -DataNodeHandle** -DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from) -{ - int i, j; - int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist); - struct timeval *timeout = NULL; - DataNodeHandle **connections; - DataNodeHandle **copy_connections; - DataNodeHandle *newConnections[conn_count]; - int new_count = 0; - ListCell *nodeitem; - bool need_tran; - GlobalTransactionId gxid; - ResponseCombiner combiner; - - if (conn_count == 0) - return NULL; - - /* Get needed datanode connections */ - connections = get_handles(nodelist); - if (!connections) - return NULL; - - need_tran = !autocommit || conn_count > 1; - - elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false"); - - /* - * We need to be able quickly find a connection handle for specified node number, - * So store connections in an array where index is node-1. - * Unused items in the array should be NULL - */ - copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *)); - i = 0; - foreach(nodeitem, nodelist) - copy_connections[lfirst_int(nodeitem) - 1] = connections[i++]; - - /* Gather statistics */ - stat_statement(); - if (autocommit) - stat_transaction(conn_count); - - /* We normally clear for transactions, but if autocommit, clear here, too */ - if (autocommit) - clear_write_node_list(); - - /* Check status of connections */ - /* We want to track new "write" nodes, and new nodes in the current transaction - * whether or not they are write nodes. */ - if (write_node_count < NumDataNodes) - { - for (i = 0; i < conn_count; i++) - { - bool found = false; - for (j=0; j<write_node_count && !found; j++) - { - if (write_node_list[j] == connections[i]) - found = true; - } - if (!found) - { - /* - * Add to transaction wide-list if COPY FROM - * CopyOut (COPY TO) is not a write operation, no need to update - */ - if (is_from) - write_node_list[write_node_count++] = connections[i]; - /* Add to current statement list */ - newConnections[new_count++] = connections[i]; - } - } - // Check connection state is DN_CONNECTION_STATE_IDLE - } - - gxid = GetCurrentGlobalTransactionId(); - - /* elog(DEBUG1, "Current gxid = %d", gxid); */ - - if (!GlobalTransactionIdIsValid(gxid)) - { - pfree(connections); - pfree(copy_connections); - return NULL; - } - - if (new_count > 0 && need_tran) - { - /* Start transaction on connections where it is not started */ - if (data_node_begin(new_count, newConnections, DestNone, gxid)) - { - pfree(connections); - pfree(copy_connections); - return NULL; - } - } - - /* Send query to nodes */ - for (i = 0; i < conn_count; i++) - { - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && data_node_send_gxid(connections[i], gxid)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - pfree(copy_connections); - return NULL; - } - if (snapshot && data_node_send_snapshot(connections[i], snapshot)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - pfree(copy_connections); - return NULL; - } - if (data_node_send_query(connections[i], query) != 0) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - pfree(copy_connections); - return NULL; - } - } - - /* - * We are expecting CopyIn response, but do not want to send it to client, - * caller should take care about this, because here we do not know if - * client runs console or file copy - */ - combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, DestNone); - - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - if (!ValidateAndCloseCombiner(combiner)) - { - if (autocommit) - { - if (need_tran) - DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE, DestNone); - else if (!PersistentConnections) - release_handles(); - } - - pfree(connections); - pfree(copy_connections); - return NULL; - } - - pfree(connections); - return copy_connections; -} - -/* - * Send a data row to the specified nodes - */ -int -DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections) -{ - DataNodeHandle *primary_handle = NULL; - ListCell *nodeitem; - /* size + data row + \n */ - int msgLen = 4 + len + 1; - int nLen = htonl(msgLen); - - if (exec_nodes->primarynodelist) - primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1]; - - if (primary_handle) - { - if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN) - { - /* precalculate to speed up access */ - int bytes_needed = primary_handle->outEnd + 1 + msgLen; - - /* flush buffer if it is almost full */ - if (bytes_needed > COPY_BUFFER_SIZE) - { - /* First look if data node has sent a error message */ - int read_status = data_node_read_data(primary_handle); - if (read_status == EOF || read_status < 0) - { - add_error_message(primary_handle, "failed to read data from data node"); - return EOF; - } - - if (primary_handle->inStart < primary_handle->inEnd) - { - ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone); - handle_response(primary_handle, combiner); - if (!ValidateAndCloseCombiner(combiner)) - return EOF; - } - - if (DN_CONNECTION_STATE_ERROR(primary_handle)) - return EOF; - - if (send_some(primary_handle, primary_handle->outEnd) < 0) - { - add_error_message(primary_handle, "failed to send data to data node"); - return EOF; - } - } - - if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - primary_handle->outBuffer[primary_handle->outEnd++] = 'd'; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); - primary_handle->outEnd += 4; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len); - primary_handle->outEnd += len; - primary_handle->outBuffer[primary_handle->outEnd++] = '\n'; - } - else - { - add_error_message(primary_handle, "Invalid data node connection"); - return EOF; - } - } - - foreach(nodeitem, exec_nodes->nodelist) - { - DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1]; - if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN) - { - /* precalculate to speed up access */ - int bytes_needed = handle->outEnd + 1 + msgLen; - - /* flush buffer if it is almost full */ - if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD) - || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE)) - { - int to_send = handle->outEnd; - - /* First look if data node has sent a error message */ - int read_status = data_node_read_data(handle); - if (read_status == EOF || read_status < 0) - { - add_error_message(handle, "failed to read data from data node"); - return EOF; - } - - if (handle->inStart < handle->inEnd) - { - ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone); - handle_response(handle, combiner); - if (!ValidateAndCloseCombiner(combiner)) - return EOF; - } - - if (DN_CONNECTION_STATE_ERROR(handle)) - return EOF; - - /* - * Allow primary node to write out data before others. - * If primary node was blocked it would not accept copy data. - * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes. - * If primary node is blocked and is buffering, other buffers will - * grow accordingly. - */ - if (primary_handle) - { - if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd) - to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD; - else - to_send = 0; - } - - /* - * Try to send down buffered data if we have - */ - if (to_send && send_some(handle, to_send) < 0) - { - add_error_message(handle, "failed to send data to data node"); - return EOF; - } - } - - if (ensure_out_buffer_capacity(bytes_needed, handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - handle->outBuffer[handle->outEnd++] = 'd'; - memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); - handle->outEnd += 4; - memcpy(handle->outBuffer + handle->outEnd, data_row, len); - handle->outEnd += len; - handle->outBuffer[handle->outEnd++] = '\n'; - } - else - { - add_error_message(handle, "Invalid data node connection"); - return EOF; - } - } - - return 0; -} - -int -DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle **copy_connections, CommandDest dest, FILE *copy_file) -{ - ResponseCombiner combiner; - int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist); - int count = 0; - bool need_tran; - List *nodelist; - ListCell *nodeitem; - - nodelist = exec_nodes->nodelist; - need_tran = !autocommit || conn_count > 1; - - combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM, dest); - /* If there is an existing file where to copy data, pass it to combiner */ - if (copy_file) - combiner->copy_file = copy_file; - - foreach(nodeitem, exec_nodes->nodelist) - { - DataNodeHandle *handle = copy_connections[count]; - count++; - - if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - int read_status = 0; - /* H message has been consumed, continue to manage data row messages */ - while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */ - { - if (handle_response(handle,combiner) == EOF) - { - /* read some extra-data */ - read_status = data_node_read_data(handle); - if (read_status < 0) - return EOF; - } - /* There is no more data that can be read from connection */ - } - } - } - - if (!ValidateAndCloseCombiner(combiner)) - { - if (autocommit && !PersistentConnections) - release_handles(); - pfree(copy_connections); - return EOF; - } - - return 0; -} - -/* - * Finish copy process on all connections - */ -uint64 -DataNodeCopyFinish(DataNodeHandle **copy_connections, int primary_data_node, - CombineType combine_type, CommandDest dest) -{ - int i; - int nLen = htonl(4); - ResponseCombiner combiner = NULL; - bool need_tran; - bool res = 0; - struct timeval *timeout = NULL; /* wait forever */ - DataNodeHandle *connections[NumDataNodes]; - DataNodeHandle *primary_handle = NULL; - int conn_count = 0; - - for (i = 0; i < NumDataNodes; i++) - { - DataNodeHandle *handle = copy_connections[i]; - - if (!handle) - continue; - - if (i == primary_data_node - 1) - primary_handle = handle; - else - connections[conn_count++] = handle; - } - - if (primary_handle) - { - if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - primary_handle->outBuffer[primary_handle->outEnd++] = 'c'; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); - primary_handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (data_node_flush(primary_handle) < 0) - res = EOF; - } - else - res = EOF; - - combiner = CreateResponseCombiner(conn_count + 1, combine_type, dest); - data_node_receive_responses(1, &primary_handle, timeout, combiner); - } - - for (i = 0; i < conn_count; i++) - { - DataNodeHandle *handle = connections[i]; - - if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - - handle->outBuffer[handle->outEnd++] = 'c'; - memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); - handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (data_node_flush(handle) < 0) - res = EOF; - } - else - res = EOF; - } - - need_tran = !autocommit || primary_handle || conn_count > 1; - - if (!combiner) - combiner = CreateResponseCombiner(conn_count, combine_type, dest); - - data_node_receive_responses(conn_count, connections, timeout, combiner); - - if (!ValidateAndCloseCombiner(combiner) || res) - { - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - return 0; - } - - if (autocommit) - { - if (need_tran) - DataNodeCommit(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - // Verify status? - return combiner->row_count; -} diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c new file mode 100644 index 0000000000..1ee1d59e99 --- /dev/null +++ b/src/backend/pgxc/pool/execRemote.c @@ -0,0 +1,2453 @@ +/*------------------------------------------------------------------------- + * + * execRemote.c + * + * Functions to execute commands on remote data nodes + * + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * IDENTIFICATION + * $$ + * + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "access/gtm.h" +#include "access/xact.h" +#include "executor/executor.h" +#include "gtm/gtm_c.h" +#include "libpq/libpq.h" +#include "miscadmin.h" +#include "pgxc/execRemote.h" +#include "pgxc/poolmgr.h" +#include "utils/datum.h" +#include "utils/memutils.h" +#include "utils/tuplesort.h" +#include "utils/snapmgr.h" + +/* + * Buffer size does not affect performance significantly, just do not allow + * connection buffer grows infinitely + */ +#define COPY_BUFFER_SIZE 8192 +#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 + +static bool autocommit = true; +static DataNodeHandle **write_node_list = NULL; +static int write_node_count = 0; + +static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid); +static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest); +static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest); + +static void clear_write_node_list(); + +#define MAX_STATEMENTS_PER_TRAN 10 + +/* Variables to collect statistics */ +static int total_transactions = 0; +static int total_statements = 0; +static int total_autocommit = 0; +static int nonautocommit_2pc = 0; +static int autocommit_2pc = 0; +static int current_tran_statements = 0; +static int *statements_per_transaction = NULL; +static int *nodes_per_transaction = NULL; + +/* + * statistics collection: count a statement + */ +static void +stat_statement() +{ + total_statements++; + current_tran_statements++; +} + +/* + * To collect statistics: count a transaction + */ +static void +stat_transaction(int node_count) +{ + total_transactions++; + if (autocommit) + total_autocommit++; + if (!statements_per_transaction) + { + statements_per_transaction = (int *) malloc((MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); + memset(statements_per_transaction, 0, (MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); + } + if (current_tran_statements > MAX_STATEMENTS_PER_TRAN) + statements_per_transaction[MAX_STATEMENTS_PER_TRAN]++; + else + statements_per_transaction[current_tran_statements]++; + current_tran_statements = 0; + if (node_count > 0 && node_count <= NumDataNodes) + { + if (!nodes_per_transaction) + { + nodes_per_transaction = (int *) malloc(NumDataNodes * sizeof(int)); + memset(nodes_per_transaction, 0, NumDataNodes * sizeof(int)); + } + nodes_per_transaction[node_count - 1]++; + } +} + + +/* + * To collect statistics: count a two-phase commit on nodes + */ +static void +stat_2pc() +{ + if (autocommit) + autocommit_2pc++; + else + nonautocommit_2pc++; +} + + +/* + * Output collected statistics to the log + */ +static void +stat_log() +{ + elog(DEBUG1, "Total Transactions: %d Total Statements: %d", total_transactions, total_statements); + elog(DEBUG1, "Autocommit: %d 2PC for Autocommit: %d 2PC for non-Autocommit: %d", + total_autocommit, autocommit_2pc, nonautocommit_2pc); + if (total_transactions) + { + if (statements_per_transaction) + { + int i; + + for (i = 0; i < MAX_STATEMENTS_PER_TRAN; i++) + elog(DEBUG1, "%d Statements per Transaction: %d (%d%%)", + i, statements_per_transaction[i], statements_per_transaction[i] * 100 / total_transactions); + } + elog(DEBUG1, "%d+ Statements per Transaction: %d (%d%%)", + MAX_STATEMENTS_PER_TRAN, statements_per_transaction[MAX_STATEMENTS_PER_TRAN], statements_per_transaction[MAX_STATEMENTS_PER_TRAN] * 100 / total_transactions); + if (nodes_per_transaction) + { + int i; + + for (i = 0; i < NumDataNodes; i++) + elog(DEBUG1, "%d Nodes per Transaction: %d (%d%%)", + i + 1, nodes_per_transaction[i], nodes_per_transaction[i] * 100 / total_transactions); + } + } +} + + +/* + * Create a structure to store parameters needed to combine responses from + * multiple connections as well as state information + */ +static RemoteQueryState * +CreateResponseCombiner(int node_count, CombineType combine_type) +{ + RemoteQueryState *combiner; + + /* ResponseComber is a typedef for pointer to ResponseCombinerData */ + combiner = makeNode(RemoteQueryState); + if (combiner == NULL) + { + /* Out of memory */ + return combiner; + } + + combiner->node_count = node_count; + combiner->connections = NULL; + combiner->conn_count = 0; + combiner->combine_type = combine_type; + combiner->dest = NULL; + combiner->command_complete_count = 0; + combiner->row_count = 0; + combiner->request_type = REQUEST_TYPE_NOT_DEFINED; + combiner->tuple_desc = NULL; + combiner->description_count = 0; + combiner->copy_in_count = 0; + combiner->copy_out_count = 0; + combiner->errorMessage = NULL; + combiner->query_Done = false; + combiner->completionTag = NULL; + combiner->msg = NULL; + combiner->msglen = 0; + combiner->initAggregates = true; + combiner->simple_aggregates = NULL; + combiner->copy_file = NULL; + + return combiner; +} + +/* + * Parse out row count from the command status response and convert it to integer + */ +static int +parse_row_count(const char *message, size_t len, uint64 *rowcount) +{ + int digits = 0; + int pos; + + *rowcount = 0; + /* skip \0 string terminator */ + for (pos = 0; pos < len - 1; pos++) + { + if (message[pos] >= '0' && message[pos] <= '9') + { + *rowcount = *rowcount * 10 + message[pos] - '0'; + digits++; + } + else + { + *rowcount = 0; + digits = 0; + } + } + return digits; +} + +/* + * Initialize the collection value, when agregation is first set up, or for a + * new group (grouping support is not implemented yet) + */ +static void +initialize_collect_aggregates(SimpleAgg *simple_agg) +{ + if (simple_agg->initValueIsNull) + simple_agg->collectValue = simple_agg->initValue; + else + simple_agg->collectValue = datumCopy(simple_agg->initValue, + simple_agg->transtypeByVal, + simple_agg->transtypeLen); + simple_agg->noCollectValue = simple_agg->initValueIsNull; + simple_agg->collectValueNull = simple_agg->initValueIsNull; +} + +/* + * Finalize the aggregate after current group or entire relation is processed + * (grouping support is not implemented yet) + */ +static void +finalize_collect_aggregates(SimpleAgg *simple_agg, Datum *resultVal, bool *resultIsNull) +{ + /* + * Apply the agg's finalfn if one is provided, else return collectValue. + */ + if (OidIsValid(simple_agg->finalfn_oid)) + { + FunctionCallInfoData fcinfo; + + InitFunctionCallInfoData(fcinfo, &(simple_agg->finalfn), 1, + (void *) simple_agg, NULL); + fcinfo.arg[0] = simple_agg->collectValue; + fcinfo.argnull[0] = simple_agg->collectValueNull; + if (fcinfo.flinfo->fn_strict && simple_agg->collectValueNull) + { + /* don't call a strict function with NULL inputs */ + *resultVal = (Datum) 0; + *resultIsNull = true; + } + else + { + *resultVal = FunctionCallInvoke(&fcinfo); + *resultIsNull = fcinfo.isnull; + } + } + else + { + *resultVal = simple_agg->collectValue; + *resultIsNull = simple_agg->collectValueNull; + } +} + +/* + * Given new input value(s), advance the transition function of an aggregate. + * + * The new values (and null flags) have been preloaded into argument positions + * 1 and up in fcinfo, so that we needn't copy them again to pass to the + * collection function. No other fields of fcinfo are assumed valid. + * + * It doesn't matter which memory context this is called in. + */ +static void +advance_collect_function(SimpleAgg *simple_agg, FunctionCallInfoData *fcinfo) +{ + Datum newVal; + + if (simple_agg->transfn.fn_strict) + { + /* + * For a strict transfn, nothing happens when there's a NULL input; we + * just keep the prior transValue. + */ + if (fcinfo->argnull[1]) + return; + if (simple_agg->noCollectValue) + { + /* + * result has not been initialized + * We must copy the datum into result if it is pass-by-ref. We + * do not need to pfree the old result, since it's NULL. + */ + simple_agg->collectValue = datumCopy(fcinfo->arg[1], + simple_agg->transtypeByVal, + simple_agg->transtypeLen); + simple_agg->collectValueNull = false; + simple_agg->noCollectValue = false; + return; + } + if (simple_agg->collectValueNull) + { + /* + * Don't call a strict function with NULL inputs. Note it is + * possible to get here despite the above tests, if the transfn is + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. + */ + return; + } + } + + /* + * OK to call the transition function + */ + InitFunctionCallInfoData(*fcinfo, &(simple_agg->transfn), 2, (void *) simple_agg, NULL); + fcinfo->arg[0] = simple_agg->collectValue; + fcinfo->argnull[0] = simple_agg->collectValueNull; + newVal = FunctionCallInvoke(fcinfo); + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * pfree the prior transValue. But if transfn returned a pointer to its + * first input, we don't need to do anything. + */ + if (!simple_agg->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(simple_agg->collectValue)) + { + if (!fcinfo->isnull) + { + newVal = datumCopy(newVal, + simple_agg->transtypeByVal, + simple_agg->transtypeLen); + } + if (!simple_agg->collectValueNull) + pfree(DatumGetPointer(simple_agg->collectValue)); + } + + simple_agg->collectValue = newVal; + simple_agg->collectValueNull = fcinfo->isnull; +} + +/* + * Convert RowDescription message to a TupleDesc + */ +static TupleDesc +create_tuple_desc(char *msg_body, size_t len) +{ + TupleDesc result; + int i, nattr; + uint16 n16; + + /* get number of attributes */ + memcpy(&n16, msg_body, 2); + nattr = ntohs(n16); + msg_body += 2; + + result = CreateTemplateTupleDesc(nattr, false); + + /* decode attributes */ + for (i = 1; i <= nattr; i++) + { + AttrNumber attnum; + char *attname; + Oid oidtypeid; + int32 typmod; + + uint32 n32; + + attnum = (AttrNumber) i; + + /* attribute name */ + attname = msg_body; + msg_body += strlen(attname) + 1; + + /* table OID, ignored */ + msg_body += 4; + + /* column no, ignored */ + msg_body += 2; + + /* data type */ + memcpy(&n32, msg_body, 4); + oidtypeid = ntohl(n32); + msg_body += 4; + + /* type len, ignored */ + msg_body += 2; + + /* type mod */ + memcpy(&n32, msg_body, 4); + typmod = ntohl(n32); + msg_body += 4; + + /* PGXCTODO text/binary flag? */ + msg_body += 2; + + TupleDescInitEntry(result, attnum, attname, oidtypeid, typmod, 0); + } + return result; +} + +static void +exec_simple_aggregates(RemoteQueryState *combiner, TupleTableSlot *slot) +{ + ListCell *lc; + + Assert(combiner->simple_aggregates); + Assert(!TupIsNull(slot)); + + if (combiner->initAggregates) + { + foreach (lc, combiner->simple_aggregates) + initialize_collect_aggregates((SimpleAgg *) lfirst(lc)); + + combiner->initAggregates = false; + } + + foreach (lc, combiner->simple_aggregates) + { + SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); + FunctionCallInfoData fcinfo; + int attr = simple_agg->column_pos; + + slot_getsomeattrs(slot, attr + 1); + fcinfo.arg[1] = slot->tts_values[attr]; + fcinfo.argnull[1] = slot->tts_isnull[attr]; + + advance_collect_function(simple_agg, &fcinfo); + } +} + +static void +finish_simple_aggregates(RemoteQueryState *combiner, TupleTableSlot *slot) +{ + ListCell *lc; + ExecClearTuple(slot); + + /* + * Aggregates may not been initialized if no rows has been received + * from the data nodes because of HAVING clause. + * In this case finish_simple_aggregates() should return empty slot + */ + if (!combiner->initAggregates) + { + foreach (lc, combiner->simple_aggregates) + { + SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); + int attr = simple_agg->column_pos; + + finalize_collect_aggregates(simple_agg, + slot->tts_values + attr, + slot->tts_isnull + attr); + } + ExecStoreVirtualTuple(slot); + /* To prevent aggregates get finalized again */ + combiner->initAggregates = true; + } +} + +/* + * Handle CopyOutCommandComplete ('c') message from a data node connection + */ +static void +HandleCopyOutComplete(RemoteQueryState *combiner) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + /* Just do nothing, close message is managed by the coordinator */ + combiner->copy_out_count++; +} + +/* + * Handle CommandComplete ('C') message from a data node connection + */ +static void +HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + int digits = 0; + + /* + * If we did not receive description we are having rowcount or OK response + */ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COMMAND; + /* Extract rowcount */ + if (combiner->combine_type != COMBINE_TYPE_NONE) + { + uint64 rowcount; + digits = parse_row_count(msg_body, len, &rowcount); + if (digits > 0) + { + /* Replicated write, make sure they are the same */ + if (combiner->combine_type == COMBINE_TYPE_SAME) + { + if (combiner->command_complete_count) + { + if (rowcount != combiner->row_count) + /* There is a consistency issue in the database with the replicated table */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Write to replicated table returned different results from the data nodes"))); + } + else + /* first result */ + combiner->row_count = rowcount; + } + else + combiner->row_count += rowcount; + } + else + combiner->combine_type = COMBINE_TYPE_NONE; + } + if (++combiner->command_complete_count == combiner->node_count) + { + if (combiner->completionTag) + { + if (combiner->combine_type == COMBINE_TYPE_NONE) + { + /* ensure we do not go beyond buffer bounds */ + if (len > COMPLETION_TAG_BUFSIZE) + len = COMPLETION_TAG_BUFSIZE; + memcpy(combiner->completionTag, msg_body, len); + } + else + { + /* Truncate msg_body to get base string */ + msg_body[len - digits - 1] = '\0'; + snprintf(combiner->completionTag, + COMPLETION_TAG_BUFSIZE, + "%s" UINT64_FORMAT, + msg_body, + combiner->row_count); + } + } + } +} + +/* + * Handle RowDescription ('T') message from a data node connection + */ +static bool +HandleRowDescription(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_QUERY; + if (combiner->request_type != REQUEST_TYPE_QUERY) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* Increment counter and check if it was first */ + if (combiner->description_count++ == 0) + { + combiner->tuple_desc = create_tuple_desc(msg_body, len); + return true; + } + return false; +} + +/* + * Handle ParameterStatus ('S') message from a data node connection (SET command) + */ +static void +HandleParameterStatus(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_QUERY; + if (combiner->request_type != REQUEST_TYPE_QUERY) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* Proxy last */ + if (++combiner->description_count == combiner->node_count) + { + pq_putmessage('S', msg_body, len); + } +} + +/* + * Handle CopyInResponse ('G') message from a data node connection + */ +static void +HandleCopyIn(RemoteQueryState *combiner) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_IN; + if (combiner->request_type != REQUEST_TYPE_COPY_IN) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* + * The normal PG code will output an G message when it runs in the + * coordinator, so do not proxy message here, just count it. + */ + combiner->copy_in_count++; +} + +/* + * Handle CopyOutResponse ('H') message from a data node connection + */ +static void +HandleCopyOut(RemoteQueryState *combiner) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* + * The normal PG code will output an H message when it runs in the + * coordinator, so do not proxy message here, just count it. + */ + combiner->copy_out_count++; +} + +/* + * Handle CopyOutDataRow ('d') message from a data node connection + */ +static void +HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + + /* Inconsistent responses */ + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + + /* If there is a copy file, data has to be sent to the local file */ + if (combiner->copy_file) + /* write data to the copy file */ + fwrite(msg_body, 1, len, combiner->copy_file); + else + pq_putmessage('d', msg_body, len); +} + +/* + * Handle DataRow ('D') message from a data node connection + * The function returns true if buffer can accept more data rows. + * Caller must stop reading if function returns false + */ +static void +HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + /* We expect previous message is consumed */ + Assert(combiner->msg == NULL); + + if (combiner->request_type != REQUEST_TYPE_QUERY) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + + /* + * If we got an error already ignore incoming data rows from other nodes + * Still we want to continue reading until get CommandComplete + */ + if (combiner->errorMessage) + return; + + /* + * We are copying message because it points into connection buffer, and + * will be overwritten on next socket read + */ + combiner->msg = (char *) palloc(len); + memcpy(combiner->msg, msg_body, len); + combiner->msglen = len; +} + +/* + * Handle ErrorResponse ('E') message from a data node connection + */ +static void +HandleError(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + /* parse error message */ + char *severity = NULL; + char *code = NULL; + char *message = NULL; + char *detail = NULL; + char *hint = NULL; + char *position = NULL; + char *int_position = NULL; + char *int_query = NULL; + char *where = NULL; + char *file = NULL; + char *line = NULL; + char *routine = NULL; + int offset = 0; + + /* + * Scan until point to terminating \0 + */ + while (offset + 1 < len) + { + /* pointer to the field message */ + char *str = msg_body + offset + 1; + + switch (msg_body[offset]) + { + case 'S': + severity = str; + break; + case 'C': + code = str; + break; + case 'M': + message = str; + break; + case 'D': + detail = str; + break; + case 'H': + hint = str; + break; + case 'P': + position = str; + break; + case 'p': + int_position = str; + break; + case 'q': + int_query = str; + break; + case 'W': + where = str; + break; + case 'F': + file = str; + break; + case 'L': + line = str; + break; + case 'R': + routine = str; + break; + } + + /* code, message and \0 */ + offset += strlen(str) + 2; + } + + /* + * We may have special handling for some errors, default handling is to + * throw out error with the same message. We can not ereport immediately + * because we should read from this and other connections until + * ReadyForQuery is received, so we just store the error message. + * If multiple connections return errors only first one is reported. + */ + if (!combiner->errorMessage) + { + combiner->errorMessage = pstrdup(message); + /* Error Code is exactly 5 significant bytes */ + memcpy(combiner->errorCode, code, 5); + } + + /* + * If data node have sent ErrorResponse it will never send CommandComplete. + * Increment the counter to prevent endless waiting for it. + */ + combiner->command_complete_count++; +} + +/* + * Examine the specified combiner state and determine if command was completed + * successfully + */ +static bool +validate_combiner(RemoteQueryState *combiner) +{ + /* There was error message while combining */ + if (combiner->errorMessage) + return false; + /* Check if state is defined */ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + return false; + /* Check all nodes completed */ + if ((combiner->request_type == REQUEST_TYPE_COMMAND + || combiner->request_type == REQUEST_TYPE_QUERY) + && combiner->command_complete_count != combiner->node_count) + return false; + + /* Check count of description responses */ + if (combiner->request_type == REQUEST_TYPE_QUERY + && combiner->description_count != combiner->node_count) + return false; + + /* Check count of copy-in responses */ + if (combiner->request_type == REQUEST_TYPE_COPY_IN + && combiner->copy_in_count != combiner->node_count) + return false; + + /* Check count of copy-out responses */ + if (combiner->request_type == REQUEST_TYPE_COPY_OUT + && combiner->copy_out_count != combiner->node_count) + return false; + + /* Add other checks here as needed */ + + /* All is good if we are here */ + return true; +} + +/* + * Close combiner and free allocated memory, if it is not needed + */ +static void +CloseCombiner(RemoteQueryState *combiner) +{ + if (combiner) + { + if (combiner->connections) + pfree(combiner->connections); + if (combiner->tuple_desc) + FreeTupleDesc(combiner->tuple_desc); + if (combiner->errorMessage) + pfree(combiner->errorMessage); + pfree(combiner); + } +} + +/* + * Validate combiner and release storage freeing allocated memory + */ +static bool +ValidateAndCloseCombiner(RemoteQueryState *combiner) +{ + bool valid = validate_combiner(combiner); + + CloseCombiner(combiner); + + return valid; +} + +/* + * Validate combiner and reset storage + */ +static bool +ValidateAndResetCombiner(RemoteQueryState *combiner) +{ + bool valid = validate_combiner(combiner); + + if (combiner->connections) + pfree(combiner->connections); + if (combiner->tuple_desc) + FreeTupleDesc(combiner->tuple_desc); + if (combiner->msg) + pfree(combiner->msg); + if (combiner->errorMessage) + pfree(combiner->errorMessage); + + combiner->command_complete_count = 0; + combiner->connections = NULL; + combiner->conn_count = 0; + combiner->row_count = 0; + combiner->request_type = REQUEST_TYPE_NOT_DEFINED; + combiner->tuple_desc = NULL; + combiner->description_count = 0; + combiner->copy_in_count = 0; + combiner->copy_out_count = 0; + combiner->errorMessage = NULL; + combiner->query_Done = false; + combiner->msg = NULL; + combiner->msglen = 0; + combiner->simple_aggregates = NULL; + combiner->copy_file = NULL; + + return valid; +} + +/* + * Get next data row from the combiner's buffer into provided slot + * Just clear slot and return false if buffer is empty, that means more data + * should be read + */ +bool +FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot) +{ + /* have messages in the buffer, consume them */ + if (combiner->msg) + { + ExecStoreDataRowTuple(combiner->msg, combiner->msglen, slot, true); + combiner->msg = NULL; + combiner->msglen = 0; + return true; + } + /* inform caller that buffer is empty */ + ExecClearTuple(slot); + return false; +} + + +/* + * Handle responses from the Data node connections + */ +static void +data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, + struct timeval * timeout, RemoteQueryState *combiner) +{ + int count = conn_count; + DataNodeHandle *to_receive[conn_count]; + + /* make a copy of the pointers to the connections */ + memcpy(to_receive, connections, conn_count * sizeof(DataNodeHandle *)); + + /* + * Read results. + * Note we try and read from data node connections even if there is an error on one, + * so as to avoid reading incorrect results on the next statement. + * It might be better to just destroy these connections and tell the pool manager. + */ + while (count > 0) + { + int i = 0; + + data_node_receive(count, to_receive, timeout); + while (i < count) + { + switch (handle_response(to_receive[i], combiner)) + { + case RESPONSE_EOF: /* have something to read, keep receiving */ + i++; + break; + case RESPONSE_COMPLETE: + case RESPONSE_COPY: + /* Handling is done, do not track this connection */ + count--; + /* Move last connection in place */ + if (i < count) + to_receive[i] = to_receive[count]; + break; + default: + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unexpected response from the data nodes"))); + } + } + } +} + +/* + * Read next message from the connection and update the combiner accordingly + * If we are in an error state we just consume the messages, and do not proxy + * Long term, we should look into cancelling executing statements + * and closing the connections. + * Return values: + * EOF - need to receive more data for the connection + * 0 - done with the connection + * 1 - got data row + * 2 - got copy response + */ +int +handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) +{ + char *msg; + int msg_len; + + for (;;) + { + /* No data available, exit */ + if (conn->state == DN_CONNECTION_STATE_QUERY) + return RESPONSE_EOF; + + /* TODO handle other possible responses */ + switch (get_message(conn, &msg_len, &msg)) + { + case '\0': /* Not enough data in the buffer */ + conn->state = DN_CONNECTION_STATE_QUERY; + return RESPONSE_EOF; + case 'c': /* CopyToCommandComplete */ + conn->state = DN_CONNECTION_STATE_COMPLETED; + HandleCopyOutComplete(combiner); + break; + case 'C': /* CommandComplete */ + conn->state = DN_CONNECTION_STATE_COMPLETED; + HandleCommandComplete(combiner, msg, msg_len); + break; + case 'T': /* RowDescription */ + if (HandleRowDescription(combiner, msg, msg_len)) + return RESPONSE_TUPDESC; + break; + case 'D': /* DataRow */ + HandleDataRow(combiner, msg, msg_len); + return RESPONSE_DATAROW; + case 'G': /* CopyInResponse */ + conn->state = DN_CONNECTION_STATE_COPY_IN; + HandleCopyIn(combiner); + /* Done, return to caller to let it know the data can be passed in */ + return RESPONSE_COPY; + case 'H': /* CopyOutResponse */ + conn->state = DN_CONNECTION_STATE_COPY_OUT; + HandleCopyOut(combiner); + return RESPONSE_COPY; + case 'd': /* CopyOutDataRow */ + conn->state = DN_CONNECTION_STATE_COPY_OUT; + HandleCopyDataRow(combiner, msg, msg_len); + break; + case 'E': /* ErrorResponse */ + HandleError(combiner, msg, msg_len); + conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; + /* + * Do not return with an error, we still need to consume Z, + * ready-for-query + */ + break; + case 'A': /* NotificationResponse */ + case 'N': /* NoticeResponse */ + /* + * Ignore these to prevent multiple messages, one from each + * node. Coordinator will send one for DDL anyway + */ + break; + case 'Z': /* ReadyForQuery */ + conn->transaction_status = msg[0]; + conn->state = DN_CONNECTION_STATE_IDLE; + return RESPONSE_COMPLETE; + case 'I': /* EmptyQuery */ + default: + /* sync lost? */ + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + return RESPONSE_EOF; + } + } + /* Keep compiler quiet */ + return RESPONSE_EOF; +} + +/* + * Send BEGIN command to the Data nodes and receive responses + */ +static int +data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid) +{ + int i; + struct timeval *timeout = NULL; + RemoteQueryState *combiner; + + /* Send BEGIN */ + for (i = 0; i < conn_count; i++) + { + if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid)) + return EOF; + + if (data_node_send_query(connections[i], "BEGIN")) + return EOF; + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + + /* Verify status */ + return ValidateAndCloseCombiner(combiner) ? 0 : EOF; +} + +/* Clears the write node list */ +static void +clear_write_node_list() +{ + /* we just malloc once and use counter */ + if (write_node_list == NULL) + { + write_node_list = (DataNodeHandle **) malloc(NumDataNodes * sizeof(DataNodeHandle *)); + } + write_node_count = 0; +} + + +/* + * Switch autocommmit mode off, so all subsequent statements will be in the same transaction + */ +void +DataNodeBegin(void) +{ + autocommit = false; + clear_write_node_list(); +} + + +/* + * Commit current transaction, use two-phase commit if necessary + */ +int +DataNodeCommit(CommandDest dest) +{ + int res; + int tran_count; + DataNodeHandle *connections[NumDataNodes]; + + /* gather connections to commit */ + tran_count = get_transaction_nodes(connections); + + /* + * If we do not have open transactions we have nothing to commit, just + * report success + */ + if (tran_count == 0) + goto finish; + + res = data_node_commit(tran_count, connections, dest); + +finish: + /* In autocommit mode statistics is collected in DataNodeExec */ + if (!autocommit) + stat_transaction(tran_count); + if (!PersistentConnections) + release_handles(); + autocommit = true; + clear_write_node_list(); + return res; +} + + +/* + * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses + */ +static int +data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest) +{ + int i; + struct timeval *timeout = NULL; + char buffer[256]; + GlobalTransactionId gxid = InvalidGlobalTransactionId; + int result = 0; + RemoteQueryState *combiner = NULL; + + + /* can set this to false to disable temporarily */ + /* bool do2PC = conn_count > 1; */ + + /* + * Only use 2PC if more than one node was written to. Otherwise, just send + * COMMIT to all + */ + bool do2PC = write_node_count > 1; + + /* Extra XID for Two Phase Commit */ + GlobalTransactionId two_phase_xid = 0; + + if (do2PC) + { + stat_2pc(); + + /* + * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here, + * but since we need 2pc, we surely have sent down a command and got + * gxid for it. Hence GetCurrentGlobalTransactionId() just returns + * already allocated gxid + */ + gxid = GetCurrentGlobalTransactionId(); + + sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid); + /* Send PREPARE */ + for (i = 0; i < conn_count; i++) + { + if (data_node_send_query(connections[i], buffer)) + return EOF; + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + + /* Reset combiner */ + if (!ValidateAndResetCombiner(combiner)) + { + result = EOF; + } + } + + if (!do2PC) + strcpy(buffer, "COMMIT"); + else + { + if (result) + sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid); + else + sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); + + /* We need to use a new xid, the data nodes have reset */ + two_phase_xid = BeginTranGTM(); + for (i = 0; i < conn_count; i++) + { + if (data_node_send_gxid(connections[i], two_phase_xid)) + { + add_error_message(connections[i], "Can not send request"); + result = EOF; + goto finish; + } + } + } + + /* Send COMMIT */ + for (i = 0; i < conn_count; i++) + { + if (data_node_send_query(connections[i], buffer)) + { + result = EOF; + goto finish; + } + } + + if (!combiner) + { + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + } + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + result = ValidateAndCloseCombiner(combiner) ? result : EOF; + +finish: + if (do2PC) + CommitTranGTM((GlobalTransactionId) two_phase_xid); + + return result; +} + + +/* + * Rollback current transaction + */ +int +DataNodeRollback(CommandDest dest) +{ + int res = 0; + int tran_count; + DataNodeHandle *connections[NumDataNodes]; + + /* gather connections to rollback */ + tran_count = get_transaction_nodes(connections); + + /* + * If we do not have open transactions we have nothing to rollback just + * report success + */ + if (tran_count == 0) + goto finish; + + res = data_node_rollback(tran_count, connections, dest); + +finish: + /* In autocommit mode statistics is collected in DataNodeExec */ + if (!autocommit) + stat_transaction(tran_count); + if (!PersistentConnections) + release_handles(); + autocommit = true; + clear_write_node_list(); + return res; +} + + +/* + * Send ROLLBACK command down to the Data nodes and handle responses + */ +static int +data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest) +{ + int i; + struct timeval *timeout = NULL; + int result = 0; + RemoteQueryState *combiner; + + /* Send ROLLBACK - */ + for (i = 0; i < conn_count; i++) + { + if (data_node_send_query(connections[i], "ROLLBACK")) + result = EOF; + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + + /* Verify status */ + return ValidateAndCloseCombiner(combiner) ? 0 : EOF; +} + + +/* + * Begin COPY command + * The copy_connections array must have room for NumDataNodes items + */ +DataNodeHandle** +DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from) +{ + int i, j; + int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist); + struct timeval *timeout = NULL; + DataNodeHandle **connections; + DataNodeHandle **copy_connections; + DataNodeHandle *newConnections[conn_count]; + int new_count = 0; + ListCell *nodeitem; + bool need_tran; + GlobalTransactionId gxid; + RemoteQueryState *combiner; + + if (conn_count == 0) + return NULL; + + /* Get needed datanode connections */ + connections = get_handles(nodelist); + if (!connections) + return NULL; + + need_tran = !autocommit || conn_count > 1; + + elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false"); + + /* + * We need to be able quickly find a connection handle for specified node number, + * So store connections in an array where index is node-1. + * Unused items in the array should be NULL + */ + copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *)); + i = 0; + foreach(nodeitem, nodelist) + copy_connections[lfirst_int(nodeitem) - 1] = connections[i++]; + + /* Gather statistics */ + stat_statement(); + if (autocommit) + stat_transaction(conn_count); + + /* We normally clear for transactions, but if autocommit, clear here, too */ + if (autocommit) + { + clear_write_node_list(); + } + + /* Check status of connections */ + /* We want to track new "write" nodes, and new nodes in the current transaction + * whether or not they are write nodes. */ + if (write_node_count < NumDataNodes) + { + for (i = 0; i < conn_count; i++) + { + bool found = false; + for (j=0; j<write_node_count && !found; j++) + { + if (write_node_list[j] == connections[i]) + found = true; + } + if (!found) + { + /* + * Add to transaction wide-list if COPY FROM + * CopyOut (COPY TO) is not a write operation, no need to update + */ + if (is_from) + write_node_list[write_node_count++] = connections[i]; + /* Add to current statement list */ + newConnections[new_count++] = connections[i]; + } + } + // Check connection state is DN_CONNECTION_STATE_IDLE + } + + gxid = GetCurrentGlobalTransactionId(); + + /* elog(DEBUG1, "Current gxid = %d", gxid); */ + + if (!GlobalTransactionIdIsValid(gxid)) + { + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (new_count > 0 && need_tran) + { + /* Start transaction on connections where it is not started */ + if (data_node_begin(new_count, newConnections, DestNone, gxid)) + { + pfree(connections); + pfree(copy_connections); + return NULL; + } + } + + /* Send query to nodes */ + for (i = 0; i < conn_count; i++) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(connections[i], gxid)) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (snapshot && data_node_send_snapshot(connections[i], snapshot)) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (data_node_send_query(connections[i], query) != 0) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + } + + /* + * We are expecting CopyIn response, but do not want to send it to client, + * caller should take care about this, because here we do not know if + * client runs console or file copy + */ + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + if (!ValidateAndCloseCombiner(combiner)) + { + if (autocommit) + { + if (need_tran) + DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE); + else + if (!PersistentConnections) release_handles(); + } + + pfree(connections); + pfree(copy_connections); + return NULL; + } + + pfree(connections); + return copy_connections; +} + +/* + * Send a data row to the specified nodes + */ +int +DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections) +{ + DataNodeHandle *primary_handle = NULL; + ListCell *nodeitem; + /* size + data row + \n */ + int msgLen = 4 + len + 1; + int nLen = htonl(msgLen); + + if (exec_nodes->primarynodelist) + { + primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1]; + } + + if (primary_handle) + { + if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN) + { + /* precalculate to speed up access */ + int bytes_needed = primary_handle->outEnd + 1 + msgLen; + + /* flush buffer if it is almost full */ + if (bytes_needed > COPY_BUFFER_SIZE) + { + /* First look if data node has sent a error message */ + int read_status = data_node_read_data(primary_handle); + if (read_status == EOF || read_status < 0) + { + add_error_message(primary_handle, "failed to read data from data node"); + return EOF; + } + + if (primary_handle->inStart < primary_handle->inEnd) + { + RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + handle_response(primary_handle, combiner); + if (!ValidateAndCloseCombiner(combiner)) + return EOF; + } + + if (DN_CONNECTION_STATE_ERROR(primary_handle)) + return EOF; + + if (send_some(primary_handle, primary_handle->outEnd) < 0) + { + add_error_message(primary_handle, "failed to send data to data node"); + return EOF; + } + } + + if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + primary_handle->outBuffer[primary_handle->outEnd++] = 'd'; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); + primary_handle->outEnd += 4; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len); + primary_handle->outEnd += len; + primary_handle->outBuffer[primary_handle->outEnd++] = '\n'; + } + else + { + add_error_message(primary_handle, "Invalid data node connection"); + return EOF; + } + } + + foreach(nodeitem, exec_nodes->nodelist) + { + DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1]; + if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN) + { + /* precalculate to speed up access */ + int bytes_needed = handle->outEnd + 1 + msgLen; + + /* flush buffer if it is almost full */ + if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD) + || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE)) + { + int to_send = handle->outEnd; + + /* First look if data node has sent a error message */ + int read_status = data_node_read_data(handle); + if (read_status == EOF || read_status < 0) + { + add_error_message(handle, "failed to read data from data node"); + return EOF; + } + + if (handle->inStart < handle->inEnd) + { + RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + handle_response(handle, combiner); + if (!ValidateAndCloseCombiner(combiner)) + return EOF; + } + + if (DN_CONNECTION_STATE_ERROR(handle)) + return EOF; + + /* + * Allow primary node to write out data before others. + * If primary node was blocked it would not accept copy data. + * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes. + * If primary node is blocked and is buffering, other buffers will + * grow accordingly. + */ + if (primary_handle) + { + if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd) + to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD; + else + to_send = 0; + } + + /* + * Try to send down buffered data if we have + */ + if (to_send && send_some(handle, to_send) < 0) + { + add_error_message(handle, "failed to send data to data node"); + return EOF; + } + } + + if (ensure_out_buffer_capacity(bytes_needed, handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + handle->outBuffer[handle->outEnd++] = 'd'; + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + memcpy(handle->outBuffer + handle->outEnd, data_row, len); + handle->outEnd += len; + handle->outBuffer[handle->outEnd++] = '\n'; + } + else + { + add_error_message(handle, "Invalid data node connection"); + return EOF; + } + } + + return 0; +} + +uint64 +DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file) +{ + RemoteQueryState *combiner; + int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist); + int count = 0; + bool need_tran; + List *nodelist; + ListCell *nodeitem; + uint64 processed = 0; + + nodelist = exec_nodes->nodelist; + need_tran = !autocommit || conn_count > 1; + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM); + combiner->dest = None_Receiver; + /* If there is an existing file where to copy data, pass it to combiner */ + if (copy_file) + combiner->copy_file = copy_file; + + foreach(nodeitem, exec_nodes->nodelist) + { + DataNodeHandle *handle = copy_connections[count]; + count++; + + if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + int read_status = 0; + /* H message has been consumed, continue to manage data row messages */ + while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */ + { + if (handle_response(handle,combiner) == RESPONSE_EOF) + { + /* read some extra-data */ + read_status = data_node_read_data(handle); + if (read_status < 0) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on datanode connection"))); + } + /* There is no more data that can be read from connection */ + } + } + } + + processed = combiner->row_count; + + if (!ValidateAndCloseCombiner(combiner)) + { + if (autocommit && !PersistentConnections) + release_handles(); + pfree(copy_connections); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + + return processed; +} + +/* + * Finish copy process on all connections + */ +uint64 +DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, + CombineType combine_type) +{ + int i; + int nLen = htonl(4); + RemoteQueryState *combiner = NULL; + bool need_tran; + bool error = false; + struct timeval *timeout = NULL; /* wait forever */ + DataNodeHandle *connections[NumDataNodes]; + DataNodeHandle *primary_handle = NULL; + int conn_count = 0; + uint64 processed; + + for (i = 0; i < NumDataNodes; i++) + { + DataNodeHandle *handle = copy_connections[i]; + + if (!handle) + continue; + + if (i == primary_data_node - 1) + primary_handle = handle; + else + connections[conn_count++] = handle; + } + + if (primary_handle) + { + if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + primary_handle->outBuffer[primary_handle->outEnd++] = 'c'; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); + primary_handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (data_node_flush(primary_handle) < 0) + { + error = true; + } + } + else + { + error = true; + } + + combiner = CreateResponseCombiner(conn_count + 1, combine_type); + combiner->dest = None_Receiver; + data_node_receive_responses(1, &primary_handle, timeout, combiner); + } + + for (i = 0; i < conn_count; i++) + { + DataNodeHandle *handle = connections[i]; + + if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + handle->outBuffer[handle->outEnd++] = 'c'; + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (data_node_flush(handle) < 0) + { + error = true; + } + } + else + { + error = true; + } + } + + need_tran = !autocommit || primary_handle || conn_count > 1; + + if (!combiner) + { + combiner = CreateResponseCombiner(conn_count, combine_type); + combiner->dest = None_Receiver; + } + data_node_receive_responses(conn_count, connections, timeout, combiner); + + processed = combiner->row_count; + + if (!ValidateAndCloseCombiner(combiner) || error) + { + if (autocommit) + { + if (need_tran) + DataNodeRollback(DestNone); + else + if (!PersistentConnections) release_handles(); + } + + return 0; + } + + if (autocommit) + { + if (need_tran) + DataNodeCommit(DestNone); + else + if (!PersistentConnections) release_handles(); + } + + return processed; +} + +RemoteQueryState * +ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) +{ + RemoteQueryState *remotestate; + + remotestate = CreateResponseCombiner(0, node->combine_type); + remotestate->ss.ps.plan = (Plan *) node; + remotestate->ss.ps.state = estate; + remotestate->simple_aggregates = node->simple_aggregates; + + ExecInitResultTupleSlot(estate, &remotestate->ss.ps); + if (node->plan.targetlist) + { + TupleDesc typeInfo = ExecCleanTypeFromTL(node->plan.targetlist, false); + ExecSetSlotDescriptor(remotestate->ss.ps.ps_ResultTupleSlot, typeInfo); + } + + ExecInitScanTupleSlot(estate, &remotestate->ss); + /* + * Tuple description for the scan slot will be set on runtime from + * a RowDescription message + */ + + if (node->distinct) + { + /* prepare equate functions */ + remotestate->eqfunctions = + execTuplesMatchPrepare(node->distinct->numCols, + node->distinct->eqOperators); + /* create memory context for execTuplesMatch */ + remotestate->tmp_ctx = + AllocSetContextCreate(CurrentMemoryContext, + "RemoteUnique", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } + return remotestate; +} + + +static void +copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) +{ + if (src->tts_dataRow + && dst->tts_tupleDescriptor->natts == src->tts_tupleDescriptor->natts) + { + if (src->tts_mcxt == dst->tts_mcxt) + { + /* now dst slot controls the backing message */ + ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, dst, src->tts_shouldFreeRow); + src->tts_shouldFreeRow = false; + } + else + { + /* have to make a copy */ + MemoryContext oldcontext = MemoryContextSwitchTo(dst->tts_mcxt); + int len = src->tts_dataLen; + char *msg = (char *) palloc(len); + + memcpy(msg, src->tts_dataRow, len); + ExecStoreDataRowTuple(msg, len, dst, true); + MemoryContextSwitchTo(oldcontext); + } + } + else + { + int i; + ExecClearTuple(dst); + slot_getallattrs(src); + /* PGXCTODO revisit: probably incorrect */ + for (i = 0; i < dst->tts_tupleDescriptor->natts; i++) + { + dst->tts_values[i] = src->tts_values[i]; + dst->tts_isnull[i] = src->tts_isnull[i]; + } + ExecStoreVirtualTuple(dst); + } +} + +/* + * Execute step of PGXC plan. + * The step specifies a command to be executed on specified nodes. + * On first invocation connections to the data nodes are initialized and + * command is executed. Further, as well as within subsequent invocations, + * responses are received until step is completed or there is a tuple to emit. + * If there is a tuple it is returned, otherwise returned NULL. The NULL result + * from the function indicates completed step. + * The function returns at most one tuple per invocation. + */ +TupleTableSlot * +ExecRemoteQuery(RemoteQueryState *node) +{ + RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; + TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; + + if (!node->query_Done) + { + /* First invocation, initialize */ + Exec_Nodes *exec_nodes = step->exec_nodes; + bool force_autocommit = step->force_autocommit; + bool is_read_only = step->read_only; + GlobalTransactionId gxid = InvalidGlobalTransactionId; + Snapshot snapshot = GetActiveSnapshot(); + DataNodeHandle **connections = NULL; + DataNodeHandle **primaryconnection = NULL; + List *nodelist = NIL; + List *primarynode = NIL; + int i; + int j; + int regular_conn_count; + int total_conn_count; + bool need_tran; + + if (exec_nodes) + { + nodelist = exec_nodes->nodelist; + primarynode = exec_nodes->primarynodelist; + } + + if (list_length(nodelist) == 0) + { + if (primarynode) + regular_conn_count = NumDataNodes - 1; + else + regular_conn_count = NumDataNodes; + } + else + { + regular_conn_count = list_length(nodelist); + } + + total_conn_count = regular_conn_count; + node->node_count = total_conn_count; + + /* Get connection for primary node, if used */ + if (primarynode) + { + primaryconnection = get_handles(primarynode); + if (!primaryconnection) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not obtain connection from pool"))); + total_conn_count++; + } + + /* Get other connections (non-primary) */ + connections = get_handles(nodelist); + if (!connections) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not obtain connection from pool"))); + + if (force_autocommit) + need_tran = false; + else + need_tran = !autocommit || total_conn_count > 1; + + elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + + stat_statement(); + if (autocommit) + { + stat_transaction(total_conn_count); + /* We normally clear for transactions, but if autocommit, clear here, too */ + clear_write_node_list(); + } + + /* Check status of connections */ + /* + * We would want to run 2PC if current transaction modified more then + * one node. So optimize little bit and do not look further if we + * already have two. + */ + if (!is_read_only && write_node_count < 2) + { + bool found; + + if (primaryconnection) + { + found = false; + for (j = 0; j < write_node_count && !found; j++) + { + if (write_node_list[j] == primaryconnection[0]) + found = true; + } + if (!found) + { + /* Add to transaction wide-list */ + write_node_list[write_node_count++] = primaryconnection[0]; + } + } + for (i = 0; i < regular_conn_count && write_node_count < 2; i++) + { + found = false; + for (j = 0; j < write_node_count && !found; j++) + { + if (write_node_list[j] == connections[i]) + found = true; + } + if (!found) + { + /* Add to transaction wide-list */ + write_node_list[write_node_count++] = connections[i]; + } + } + } + + gxid = GetCurrentGlobalTransactionId(); + + if (!GlobalTransactionIdIsValid(gxid)) + { + if (primaryconnection) + pfree(primaryconnection); + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to get next transaction ID"))); + } + + if (need_tran) + { + /* + * Check if data node connections are in transaction and start + * transactions on nodes where it is not started + */ + DataNodeHandle *new_connections[total_conn_count]; + int new_count = 0; + + if (primaryconnection && primaryconnection[0]->transaction_status != 'T') + new_connections[new_count++] = primaryconnection[0]; + for (i = 0; i < regular_conn_count; i++) + if (connections[i]->transaction_status != 'T') + new_connections[new_count++] = connections[i]; + + if (new_count) + data_node_begin(new_count, new_connections, DestNone, gxid); + } + + /* See if we have a primary nodes, execute on it first before the others */ + if (primaryconnection) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid)) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot)) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (data_node_send_query(primaryconnection[0], step->sql_statement) != 0) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + + Assert(node->combine_type == COMBINE_TYPE_SAME); + + while (node->command_complete_count < 1) + { + PG_TRY(); + { + data_node_receive(1, primaryconnection, NULL); + while (handle_response(primaryconnection[0], node) == RESPONSE_EOF) + data_node_receive(1, primaryconnection, NULL); + if (node->errorMessage) + { + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); + } + } + /* If we got an error response return immediately */ + PG_CATCH(); + { + /* We are going to exit, so release combiner */ + if (autocommit) + { + if (need_tran) + DataNodeRollback(DestNone); + else if (!PersistentConnections) + release_handles(); + } + + pfree(primaryconnection); + pfree(connections); + PG_RE_THROW(); + } + PG_END_TRY(); + } + pfree(primaryconnection); + } + + for (i = 0; i < regular_conn_count; i++) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(connections[i], gxid)) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && data_node_send_snapshot(connections[i], snapshot)) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (data_node_send_query(connections[i], step->sql_statement) != 0) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + } + + PG_TRY(); + { + /* + * Stop if all commands are completed or we got a data row and + * initialized state node for subsequent invocations + */ + while (regular_conn_count > 0 && node->connections == NULL) + { + int i = 0; + + data_node_receive(regular_conn_count, connections, NULL); + /* + * Handle input from the data nodes. + * If we got a RESPONSE_DATAROW we can break handling to wrap + * it into a tuple and return. Handling will be continued upon + * subsequent invocations. + * If we got 0, we exclude connection from the list. We do not + * expect more input from it. In case of non-SELECT query we quit + * the loop when all nodes finish their work and send ReadyForQuery + * with empty connections array. + * If we got EOF, move to the next connection, will receive more + * data on the next iteration. + */ + while (i < regular_conn_count) + { + int res = handle_response(connections[i], node); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) + { + if (i < --regular_conn_count) + connections[i] = connections[regular_conn_count]; + } + else if (res == RESPONSE_TUPDESC) + { + ExecSetSlotDescriptor(scanslot, node->tuple_desc); + /* + * we should send to client not the tuple_desc we just + * received, but tuple_desc from the planner. + * Data node may be sending junk columns for sorting + */ + (*node->dest->rStartup) (node->dest, CMD_SELECT, + resultslot->tts_tupleDescriptor); + if (step->sort) + { + SimpleSort *sort = step->sort; + + node->connections = connections; + node->conn_count = regular_conn_count; + /* + * First message is already in the buffer + * Further fetch will be under tuplesort control + * If query does not produce rows tuplesort will not + * be initialized + */ + node->tuplesortstate = tuplesort_begin_merge( + node->tuple_desc, + sort->numCols, + sort->sortColIdx, + sort->sortOperators, + sort->nullsFirst, + node, + work_mem); + /* + * Break the loop, do not wait for first row. + * Tuplesort module want to control node it is + * fetching rows from, while in this loop first + * row would be got from random node + */ + break; + } + } + else if (res == RESPONSE_DATAROW) + { + /* + * Got first data row, quit the loop + */ + node->connections = connections; + node->conn_count = regular_conn_count; + node->current_conn = i; + break; + } + } + } + } + /* If we got an error response return immediately */ + PG_CATCH(); + { + /* We are going to exit, so release combiner */ + if (autocommit) + { + if (need_tran) + DataNodeRollback(DestNone); + else if (!PersistentConnections) + release_handles(); + } + PG_RE_THROW(); + } + PG_END_TRY(); + node->query_Done = true; + node->need_tran = need_tran; + } + + PG_TRY(); + { + bool have_tuple = false; + + if (node->tuplesortstate) + { + while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, + true, scanslot)) + { + have_tuple = true; + /* + * If DISTINCT is specified and current tuple matches to + * previous skip it and get next one. + * Othervise return current tuple + */ + if (step->distinct) + { + /* + * Always receive very first tuple and + * skip to next if scan slot match to previous (result slot) + */ + if (!TupIsNull(resultslot) && + execTuplesMatch(scanslot, + resultslot, + step->distinct->numCols, + step->distinct->uniqColIdx, + node->eqfunctions, + node->tmp_ctx)) + { + have_tuple = false; + continue; + } + } + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + break; + } + if (!have_tuple) + ExecClearTuple(resultslot); + } + else + { + while (node->conn_count > 0 && !have_tuple) + { + int i; + + /* + * If combiner already has tuple go ahead and return it + * otherwise tuple will be cleared + */ + if (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) + { + if (node->simple_aggregates) + { + /* + * Advance aggregate functions and allow to read up next + * data row message and get tuple in the same slot on + * next iteration + */ + exec_simple_aggregates(node, scanslot); + } + else + { + /* + * Receive current slot and read up next data row + * message before exiting the loop. Next time when this + * function is invoked we will have either data row + * message ready or EOF + */ + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; + } + } + + /* + * Handle input to get next row or ensure command is completed, + * starting from connection next after current. If connection + * does not + */ + if ((i = node->current_conn + 1) == node->conn_count) + i = 0; + + for (;;) + { + int res = handle_response(node->connections[i], node); + if (res == RESPONSE_EOF) + { + /* go to next connection */ + if (++i == node->conn_count) + i = 0; + /* if we cycled over all connections we need to receive more */ + if (i == node->current_conn) + data_node_receive(node->conn_count, node->connections, NULL); + } + else if (res == RESPONSE_COMPLETE) + { + if (--node->conn_count == 0) + break; + if (i == node->conn_count) + i = 0; + else + node->connections[i] = node->connections[node->conn_count]; + if (node->current_conn == node->conn_count) + node->current_conn = i; + } + else if (res == RESPONSE_DATAROW) + { + node->current_conn = i; + break; + } + } + } + + /* + * We may need to finalize aggregates + */ + if (!have_tuple && node->simple_aggregates) + { + finish_simple_aggregates(node, resultslot); + if (!TupIsNull(resultslot)) + { + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; + } + } + + if (!have_tuple) /* report end of scan */ + ExecClearTuple(resultslot); + + } + + if (node->errorMessage) + { + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); + } + + /* + * If command is completed we should commit work. + */ + if (node->conn_count == 0 && autocommit && node->need_tran) + DataNodeCommit(DestNone); + } + /* If we got an error response return immediately */ + PG_CATCH(); + { + /* We are going to exit, so release combiner */ + if (autocommit) + { + if (node->need_tran) + DataNodeRollback(DestNone); + else if (!PersistentConnections) + release_handles(); + } + PG_RE_THROW(); + } + PG_END_TRY(); + + return resultslot; +} + +void +ExecEndRemoteQuery(RemoteQueryState *node) +{ + (*node->dest->rShutdown) (node->dest); + if (node->tmp_ctx) + MemoryContextDelete(node->tmp_ctx); + CloseCombiner(node); +} + + +/* + * Called when the backend is ending. + */ +void +DataNodeCleanAndRelease(int code, Datum arg) +{ + /* Rollback on Data Nodes */ + if (IsTransactionState()) + { + DataNodeRollback(DestNone); + + /* Rollback on GTM if transaction id opened. */ + RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny()); + } + + /* Release data node connections */ + release_handles(); + + /* Close connection with GTM */ + CloseGTM(); + + /* Dump collected statistics to the log */ + stat_log(); +} + diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 8c58b62940..077e8f9190 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -77,6 +77,7 @@ #include "pgxc/pgxc.h" #include "access/gtm.h" /* PGXC_COORD */ +#include "pgxc/execRemote.h" #include "pgxc/planner.h" #include "pgxc/datanode.h" #include "commands/copy.h" @@ -198,7 +199,6 @@ static void log_disconnections(int code, Datum arg); #ifdef PGXC /* PGXC_DATANODE */ static void pgxc_transaction_stmt (Node *parsetree); -static List * pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bool snapshot_set, bool *exec_on_coord); /* ---------------------------------------------------------------- * PG-XC routines @@ -900,11 +900,9 @@ exec_simple_query(const char *query_string) DestReceiver *receiver; int16 format; #ifdef PGXC - Query_Plan *query_plan; - Query_Step *query_step; + Query_Plan *query_plan; + RemoteQuery *query_step; bool exec_on_coord; - int data_node_error = 0; - /* * By default we do not want data nodes to contact GTM directly, @@ -977,12 +975,78 @@ exec_simple_query(const char *query_string) pgxc_transaction_stmt(parsetree); else if (IsA(parsetree, ExecDirectStmt)) - querytree_list = pgxc_execute_direct(parsetree, querytree_list, dest, snapshot_set, &exec_on_coord); + { + ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree; + List *inner_parse_tree_list; + + Assert(IS_PGXC_COORDINATOR); + + exec_on_coord = execdirect->coordinator; + + /* + * Switch to appropriate context for constructing parse and + * query trees (these must outlive the execution context). + */ + oldcontext = MemoryContextSwitchTo(MessageContext); + + inner_parse_tree_list = pg_parse_query(execdirect->query); + /* + * we do not support complex commands (expanded to multiple + * parse trees) within EXEC DIRECT + */ + if (list_length(parsetree_list) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Can not execute %s with EXECUTE DIRECT", + execdirect->query))); + } + parsetree = linitial(inner_parse_tree_list); + /* + * Set up a snapshot if parse analysis/planning will need + * one. + */ + if (analyze_requires_snapshot(parsetree)) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + + querytree_list = pg_analyze_and_rewrite(parsetree, + query_string, + NULL, + 0); + + if (execdirect->nodes) + { + ListCell *lc; + Query *query = (Query *) linitial(querytree_list); + + query_plan = (Query_Plan *) palloc0(sizeof(Query_Plan)); + query_step = makeNode(RemoteQuery); + query_step->plan.targetlist = query->targetList; + query_step->sql_statement = pstrdup(execdirect->query); + query_step->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); + foreach (lc, execdirect->nodes) + { + int node = intVal(lfirst(lc)); + query_step->exec_nodes->nodelist = lappend_int(query_step->exec_nodes->nodelist, node); + } + query_step->combine_type = COMBINE_TYPE_SAME; + + query_plan->query_step_list = lappend(NULL, query_step); + query_plan->exec_loc_type = EXEC_ON_DATA_NODES; + } + + /* Restore context */ + MemoryContextSwitchTo(oldcontext); + + } else if (IsA(parsetree, CopyStmt)) { - CopyStmt *copy = (CopyStmt *) parsetree; - bool done; + CopyStmt *copy = (CopyStmt *) parsetree; + uint64 processed; /* Snapshot is needed for the Copy */ if (!snapshot_set) { @@ -994,13 +1058,15 @@ exec_simple_query(const char *query_string) * Datanode or on Coordinator. * If a table has no locator data, then IsCoordPortalCopy returns false and copy is launched * on Coordinator instead (e.g., using pg_catalog tables). - * If a table has some locator data (user tables), then copy is launched normally + * If a table has some locator data (user tables), then copy was launched normally * in Datanodes */ if (!IsCoordPortalCopy(copy)) { - DoCopy(copy, query_string, false); exec_on_coord = false; + processed = DoCopy(copy, query_string, false); + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, processed); } else exec_on_coord = true; @@ -1015,6 +1081,7 @@ exec_simple_query(const char *query_string) /* First execute on the coordinator, if involved (DDL), then data nodes */ } + plantree_list = NIL; if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) #endif plantree_list = pg_plan_queries(querytree_list, 0, NULL); @@ -1036,10 +1103,11 @@ exec_simple_query(const char *query_string) if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment) SetForceXidFromGTM(true); - /* PGXC_COORD */ - /* Force getting Xid from GTM if not autovacuum, but a vacuum */ - /* Skip the Portal stuff on coordinator if command only executes on data nodes */ - if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) + /* + * Create and run Portal only if it is needed. + * In some special cases we have nothing to run at this point + */ + if (plantree_list || query_plan) { #endif @@ -1102,6 +1170,11 @@ exec_simple_query(const char *query_string) */ MemoryContextSwitchTo(oldcontext); +#ifdef PGXC + /* Skip the Portal stuff on coordinator if command only executes on data nodes */ + if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) + { +#endif /* * Run the portal to completion, and then drop it (and the receiver). */ @@ -1112,10 +1185,6 @@ exec_simple_query(const char *query_string) receiver, completionTag); - (*receiver->rDestroy) (receiver); - - PortalDrop(portal, false); - #ifdef PGXC } @@ -1125,36 +1194,45 @@ exec_simple_query(const char *query_string) { if (query_plan && (query_plan->exec_loc_type & EXEC_ON_DATA_NODES)) { + RemoteQueryState *state; + TupleTableSlot *slot; + EState *estate = CreateExecutorState(); + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); query_step = linitial(query_plan->query_step_list); - - data_node_error = DataNodeExec(query_step->sql_statement, - query_step->exec_nodes, - query_step->combine_type, - dest, - snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(), - query_plan->force_autocommit, - query_step->simple_aggregates, - IsA(parsetree, SelectStmt)); - - if (data_node_error) + estate->es_tupleTable = ExecCreateTupleTable(2); + state = ExecInitRemoteQuery(query_step, estate, 0); + state->dest = receiver; + state->completionTag = completionTag; + if (!snapshot_set) { - /* An error occurred, change status on Coordinator, too, - * even if no statements ran on it. - * We want to only allow COMMIT/ROLLBACK - */ - AbortCurrentTransaction(); - xact_started = false; - /* AT() clears active snapshot */ - snapshot_set = false; + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + do + { + slot = ExecRemoteQuery(state); } + while (!TupIsNull(slot)); + + ExecEndRemoteQuery(state); + /* Restore context */ + MemoryContextSwitchTo(oldcontext); } FreeQueryPlan(query_plan); } +#endif /* PGXC_COORD */ + + (*receiver->rDestroy) (receiver); + + PortalDrop(portal, false); + +#ifdef PGXC + } if (snapshot_set) PopActiveSnapshot(); -#endif /* PGXC_COORD */ +#endif if (IsA(parsetree, TransactionStmt)) { @@ -1186,11 +1264,6 @@ exec_simple_query(const char *query_string) */ CommandCounterIncrement(); } -#ifdef PGXC /* PGXC_COORD */ - /* In case of PGXC handling client already received a response */ - if ((IS_PGXC_COORDINATOR && exec_on_coord && !data_node_error) || IS_PGXC_DATANODE) - { -#endif /* * Tell client that we're done with this query. Note we emit exactly @@ -1199,9 +1272,6 @@ exec_simple_query(const char *query_string) * aborted by error will not send an EndCommand report at all.) */ EndCommand(completionTag, dest); -#ifdef PGXC /* PGXC_COORD */ - } -#endif } /* end loop over parsetrees */ /* @@ -4328,80 +4398,4 @@ pgxc_transaction_stmt (Node *parsetree) } } } - - -/* - * Handle EXECUTE DIRECT - */ -List * -pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bool snapshot_set, bool *exec_on_coord) -{ - List *parsetree_list; - ListCell *node_cell; - ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree; - bool on_coord = execdirect->coordinator; - Exec_Nodes *exec_nodes; - - - Assert(IS_PGXC_COORDINATOR); - Assert(IsA(parsetree, ExecDirectStmt)); - - - exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); - - foreach (node_cell, execdirect->nodes) - { - int node_int = intVal(lfirst(node_cell)); - exec_nodes->nodelist = lappend_int(exec_nodes->nodelist, node_int); - } - if (exec_nodes->nodelist) - if (DataNodeExec(execdirect->query, - exec_nodes, - COMBINE_TYPE_SAME, - dest, - snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(), - FALSE, - FALSE, - FALSE) != 0) - on_coord = false; - - if (on_coord) - { - /* - * Parse inner statement, like at the begiining of the function - * We do not have to release wrapper trees, the message context - * will be deleted later - * Also, no need to switch context - current is already - * the MessageContext - */ - parsetree_list = pg_parse_query(execdirect->query); - - /* We do not want to log or display the inner command */ - - /* - * we do not support complex commands (expanded to multiple - * parse trees) within EXEC DIRECT - */ - if (list_length(parsetree_list) != 1) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Can not execute %s with EXECUTE DIRECT", - execdirect->query))); - } - - /* - * Get parse tree from the list - */ - parsetree = (Node *) lfirst(list_head(parsetree_list)); - - /* - * Build new query tree */ - querytree_list = pg_analyze_and_rewrite(parsetree, - execdirect->query, NULL, 0); - } - *exec_on_coord = on_coord; - - return querytree_list; -} #endif /* PGXC */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 268fe5a8c5..0676f0da0d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -55,6 +55,7 @@ #include "parser/scansup.h" #include "pgstat.h" #ifdef PGXC +#include "pgxc/execRemote.h" #include "pgxc/locator.h" #include "pgxc/planner.h" #include "pgxc/poolmgr.h" @@ -1261,7 +1262,7 @@ static struct config_bool ConfigureNamesBool[] = {"strict_select_checking", PGC_USERSET, DEVELOPER_OPTIONS, gettext_noop("Forbid if SELECT has ORDER BY"), gettext_noop("and is not safe for the cluster"), - GUC_NOT_IN_SAMPLE + GUC_NOT_IN_SAMPLE }, &StrictSelectChecking, false, NULL, NULL @@ -1301,7 +1302,7 @@ static struct config_int ConfigureNamesInt[] = gettext_noop("This applies to table columns that have not had a " "column-specific target set via ALTER TABLE SET STATISTICS.") }, - &default_statistics_target, + &default_statistics_target, 100, 1, 10000, NULL, NULL }, { @@ -2008,8 +2009,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &NumDataNodes, - 2, 1, 65535, NULL, NULL - }, + 2, 1, 65535, NULL, NULL + }, { {"min_pool_size", PGC_POSTMASTER, DATA_NODES, @@ -2018,8 +2019,8 @@ static struct config_int ConfigureNamesInt[] = "new connections are established") }, &MinPoolSize, - 1, 1, 65535, NULL, NULL - }, + 1, 1, 65535, NULL, NULL + }, { {"max_pool_size", PGC_POSTMASTER, DATA_NODES, @@ -2028,8 +2029,8 @@ static struct config_int ConfigureNamesInt[] = "other connection requests will be refused") }, &MaxPoolSize, - 100, 1, 65535, NULL, NULL - }, + 100, 1, 65535, NULL, NULL + }, { {"pooler_port", PGC_POSTMASTER, DATA_NODES, @@ -2037,8 +2038,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &PoolerPort, - 6667, 1, 65535, NULL, NULL - }, + 6667, 1, 65535, NULL, NULL + }, { {"gtm_port", PGC_POSTMASTER, GTM, @@ -2046,8 +2047,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &GtmPort, - 6666, 1, 65535, NULL, NULL - }, + 6666, 1, 65535, NULL, NULL + }, { {"gtm_coordinator_id", PGC_POSTMASTER, GTM, @@ -2055,8 +2056,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &GtmCoordinatorId, - 1, 1, INT_MAX, NULL, NULL - }, + 1, 1, INT_MAX, NULL, NULL + }, { {"primary_data_node", PGC_POSTMASTER, DATA_NODES, @@ -2064,8 +2065,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &primary_data_node, - 1, 0, INT_MAX, NULL, NULL - }, + 1, 0, INT_MAX, NULL, NULL + }, #endif /* End-of-list marker */ { @@ -2624,8 +2625,8 @@ static struct config_string ConfigureNamesString[] = gettext_noop("A list of data nodes to read from replicated tables") }, &PreferredDataNodes, - "", NULL, NULL - }, + "", NULL, NULL + }, { {"data_node_hosts", PGC_POSTMASTER, DATA_NODES, diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c index 9b08df366d..8161e9bc92 100644 --- a/src/backend/utils/mmgr/mcxt.c +++ b/src/backend/utils/mmgr/mcxt.c @@ -507,7 +507,7 @@ MemoryContextAlloc(MemoryContext context, Size size) AssertArg(MemoryContextIsValid(context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); return (*context->methods->alloc) (context, size); @@ -528,7 +528,7 @@ MemoryContextAllocZero(MemoryContext context, Size size) AssertArg(MemoryContextIsValid(context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); ret = (*context->methods->alloc) (context, size); @@ -553,7 +553,7 @@ MemoryContextAllocZeroAligned(MemoryContext context, Size size) AssertArg(MemoryContextIsValid(context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); ret = (*context->methods->alloc) (context, size); @@ -617,7 +617,7 @@ repalloc(void *pointer, Size size) AssertArg(MemoryContextIsValid(header->context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); return (*header->context->methods->realloc) (header->context, diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 6d07296d59..c506ce064f 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -107,6 +107,9 @@ #include "commands/tablespace.h" #include "miscadmin.h" #include "pg_trace.h" +#ifdef PGXC +#include "pgxc/execRemote.h" +#endif #include "utils/datum.h" #include "utils/logtape.h" #include "utils/lsyscache.h" @@ -121,6 +124,9 @@ #define HEAP_SORT 0 #define INDEX_SORT 1 #define DATUM_SORT 2 +#ifdef PGXC +#define MERGE_SORT 3 +#endif /* GUC variables */ #ifdef TRACE_SORT @@ -213,6 +219,9 @@ struct Tuplesortstate int tapeRange; /* maxTapes-1 (Knuth's P) */ MemoryContext sortcontext; /* memory context holding all sort data */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ +#ifdef PGXC + RemoteQueryState *combiner; /* tuple source, alternate to tapeset */ +#endif /* * These function pointers decouple the routines that must know what kind @@ -253,6 +262,14 @@ struct Tuplesortstate void (*readtup) (Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); +#ifdef PGXC + /* + * Function to read length of next stored tuple. + * Used as 'len' parameter for readtup function. + */ + unsigned int (*getlen) (Tuplesortstate *state, int tapenum, bool eofOK); +#endif + /* * Function to reverse the sort direction from its current state. (We * could dispense with this if we wanted to enforce that all variants @@ -378,6 +395,9 @@ struct Tuplesortstate #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) +#ifdef PGXC +#define GETLEN(state,tape,eofOK) ((*(state)->getlen) (state, tape, eofOK)) +#endif #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state)) #define LACKMEM(state) ((state)->availMem < 0) #define USEMEM(state,amt) ((state)->availMem -= (amt)) @@ -450,6 +470,12 @@ static void writetup_heap(Tuplesortstate *state, int tapenum, static void readtup_heap(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); static void reversedirection_heap(Tuplesortstate *state); +#ifdef PGXC +static unsigned int getlen_datanode(Tuplesortstate *state, int tapenum, + bool eofOK); +static void readtup_datanode(Tuplesortstate *state, SortTuple *stup, + int tapenum, unsigned int len); +#endif static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, @@ -587,6 +613,9 @@ tuplesort_begin_heap(TupleDesc tupDesc, state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_heap; state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ @@ -657,6 +686,9 @@ tuplesort_begin_index_btree(Relation indexRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_index_btree; state->indexRel = indexRel; @@ -692,6 +724,9 @@ tuplesort_begin_index_hash(Relation indexRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_index_hash; state->indexRel = indexRel; @@ -736,6 +771,9 @@ tuplesort_begin_datum(Oid datumType, state->copytup = copytup_datum; state->writetup = writetup_datum; state->readtup = readtup_datum; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_datum; state->datumType = datumType; @@ -762,6 +800,121 @@ tuplesort_begin_datum(Oid datumType, return state; } +#ifdef PGXC +/* + * Tuples are coming from source where they are already sorted. + * It is pretty much like sorting heap tuples but no need to load sorter. + * Sorter initial status is final merge, and correct readtup and getlen + * callbacks should be passed in. + * Usage pattern of the merge sorter + * tuplesort_begin_merge + * while (tuple = tuplesort_gettuple()) + * { + * // process + * } + * tuplesort_end_merge + */ +Tuplesortstate * +tuplesort_begin_merge(TupleDesc tupDesc, + int nkeys, AttrNumber *attNums, + Oid *sortOperators, bool *nullsFirstFlags, + RemoteQueryState *combiner, + int workMem) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, false); + MemoryContext oldcontext; + int i; + + oldcontext = MemoryContextSwitchTo(state->sortcontext); + + AssertArg(nkeys > 0); + AssertArg(combiner); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin merge sort: nkeys = %d, workMem = %d", nkeys, workMem); +#endif + + state->nKeys = nkeys; + + TRACE_POSTGRESQL_SORT_START(MERGE_SORT, + false, /* no unique check */ + nkeys, + workMem, + false); + + state->combiner = combiner; + state->comparetup = comparetup_heap; + state->copytup = NULL; + state->writetup = NULL; + state->readtup = readtup_datanode; + state->getlen = getlen_datanode; + state->reversedirection = reversedirection_heap; + + state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ + state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData)); + + for (i = 0; i < nkeys; i++) + { + Oid sortFunction; + bool reverse; + + AssertArg(attNums[i] != 0); + AssertArg(sortOperators[i] != 0); + + if (!get_compare_function_for_ordering_op(sortOperators[i], + &sortFunction, &reverse)) + elog(ERROR, "operator %u is not a valid ordering operator", + sortOperators[i]); + + /* + * We needn't fill in sk_strategy or sk_subtype since these scankeys + * will never be passed to an index. + */ + ScanKeyInit(&state->scanKeys[i], + attNums[i], + InvalidStrategy, + sortFunction, + (Datum) 0); + + /* However, we use btree's conventions for encoding directionality */ + if (reverse) + state->scanKeys[i].sk_flags |= SK_BT_DESC; + if (nullsFirstFlags[i]) + state->scanKeys[i].sk_flags |= SK_BT_NULLS_FIRST; + } + + /* + * logical tape in this case is a sorted stream + */ + state->maxTapes = combiner->conn_count; + state->tapeRange = combiner->conn_count; + + state->mergeactive = (bool *) palloc0(combiner->conn_count * sizeof(bool)); + state->mergenext = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->mergelast = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->mergeavailslots = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->mergeavailmem = (long *) palloc0(combiner->conn_count * sizeof(long)); + + state->tp_runs = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->tp_dummy = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->tp_tapenum = (int *) palloc0(combiner->conn_count * sizeof(int)); + /* mark each stream (tape) has one run */ + for (i = 0; i < combiner->conn_count; i++) + { + state->tp_runs[i] = 1; + state->tp_tapenum[i] = i; + } + beginmerge(state); + state->status = TSS_FINALMERGE; + + MemoryContextSwitchTo(oldcontext); + + return state; +} +#endif + /* * tuplesort_set_bound * @@ -1296,7 +1449,6 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, sizeof(unsigned int))) return false; tuplen = getlen(state, state->result_tape, false); - /* * Back up to get ending length word of tuple before it. */ @@ -2005,7 +2157,11 @@ mergeprereadone(Tuplesortstate *state, int srcTape) state->mergenext[srcTape] == 0) { /* read next tuple, if any */ +#ifdef PGXC + if ((tuplen = GETLEN(state, srcTape, true)) == 0) +#else if ((tuplen = getlen(state, srcTape, true)) == 0) +#endif { state->mergeactive[srcTape] = false; break; @@ -2468,7 +2624,6 @@ markrunend(Tuplesortstate *state, int tapenum) LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); } - /* * Set up for an external caller of ApplySortFunction. This function * basically just exists to localize knowledge of the encoding of sk_flags @@ -2716,6 +2871,56 @@ reversedirection_heap(Tuplesortstate *state) } } +#ifdef PGXC +static unsigned int +getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK) +{ + for (;;) + { + switch (handle_response(state->combiner->connections[tapenum], state->combiner)) + { + case RESPONSE_EOF: + data_node_receive(1, state->combiner->connections + tapenum, NULL); + break; + case RESPONSE_COMPLETE: + if (eofOK) + return 0; + else + elog(ERROR, "unexpected end of data"); + break; + case RESPONSE_DATAROW: + return state->combiner->msglen; + default: + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unexpected response from the data nodes"))); + } + } +} + +static void +readtup_datanode(Tuplesortstate *state, SortTuple *stup, + int tapenum, unsigned int len) +{ + TupleTableSlot *slot = state->combiner->ss.ss_ScanTupleSlot; + MinimalTuple tuple; + HeapTupleData htup; + + FetchTuple(state->combiner, slot); + + /* copy the tuple into sort storage */ + tuple = ExecCopySlotMinimalTuple(slot); + stup->tuple = (void *) tuple; + USEMEM(state, GetMemoryChunkSpace(tuple)); + /* set up first-column key value */ + htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; + htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); + stup->datum1 = heap_getattr(&htup, + state->scanKeys[0].sk_attno, + state->tupDesc, + &stup->isnull1); +} +#endif /* * Routines specialized for IndexTuple case diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index 879a310c68..c85b1b0c61 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -118,6 +118,15 @@ typedef struct TupleTableSlot bool tts_shouldFreeMin; /* should pfree tts_mintuple? */ bool tts_slow; /* saved state for slot_deform_tuple */ HeapTuple tts_tuple; /* physical tuple, or NULL if virtual */ +#ifdef PGXC + /* + * PGXC extension to support tuples sent from remote data node. + */ + char *tts_dataRow; /* Tuple data in DataRow format */ + int tts_dataLen; /* Actual length of the data row */ + bool tts_shouldFreeRow; /* should pfree tts_dataRow? */ + struct AttInMetadata *tts_attinmeta; /* store here info to extract values from the DataRow */ +#endif TupleDesc tts_tupleDescriptor; /* slot's tuple descriptor */ MemoryContext tts_mcxt; /* slot itself is in this context */ Buffer tts_buffer; /* tuple's buffer, or InvalidBuffer */ @@ -165,6 +174,12 @@ extern TupleTableSlot *ExecStoreTuple(HeapTuple tuple, extern TupleTableSlot *ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree); +#ifdef PGXC +extern TupleTableSlot *ExecStoreDataRowTuple(char *msg, + size_t len, + TupleTableSlot *slot, + bool shouldFree); +#endif extern TupleTableSlot *ExecClearTuple(TupleTableSlot *slot); extern TupleTableSlot *ExecStoreVirtualTuple(TupleTableSlot *slot); extern TupleTableSlot *ExecStoreAllNullTuple(TupleTableSlot *slot); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 078b6733e7..4f1c59f8bc 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -72,6 +72,9 @@ typedef enum NodeTag T_Hash, T_SetOp, T_Limit, +#ifdef PGXC + T_RemoteQuery, +#endif /* this one isn't a subclass of Plan: */ T_PlanInvalItem, @@ -110,6 +113,9 @@ typedef enum NodeTag T_HashState, T_SetOpState, T_LimitState, +#ifdef PGXC + T_RemoteQueryState, +#endif /* * TAGS FOR PRIMITIVE NODES (primnodes.h) diff --git a/src/include/pgxc/combiner.h b/src/include/pgxc/combiner.h deleted file mode 100644 index f977c4ab88..0000000000 --- a/src/include/pgxc/combiner.h +++ /dev/null @@ -1,73 +0,0 @@ -/*------------------------------------------------------------------------- - * - * combiner.h - * - * Combine responses from multiple Data Nodes - * - * - * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ? - * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation - * - * IDENTIFICATION - * $$ - * - *------------------------------------------------------------------------- - */ - -#ifndef COMBINER_H -#define COMBINER_H - -#include "postgres.h" -#include "tcop/dest.h" - -typedef enum -{ - COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */ - COMBINE_TYPE_SUM, /* sum row counts (partitioned, round robin) */ - COMBINE_TYPE_SAME /* expect all row counts to be the same (replicated write) */ -} CombineType; - -typedef enum -{ - REQUEST_TYPE_NOT_DEFINED, /* not determined yet */ - REQUEST_TYPE_COMMAND, /* OK or row count response */ - REQUEST_TYPE_QUERY, /* Row description response */ - REQUEST_TYPE_COPY_IN, /* Copy In response */ - REQUEST_TYPE_COPY_OUT /* Copy Out response */ -} RequestType; - - -typedef struct -{ - int node_count; - CombineType combine_type; - CommandDest dest; - int command_complete_count; - int row_count; - RequestType request_type; - int description_count; - uint64 copy_in_count; - uint64 copy_out_count; - bool inErrorState; - /* - * While we are not supporting grouping use this flag to indicate we need - * to initialize collecting of aggregates from the DNs - */ - bool initAggregates; - List *simple_aggregates; - FILE *copy_file; /* used if copy_dest == COPY_FILE */ -} ResponseCombinerData; - - -typedef ResponseCombinerData *ResponseCombiner; - -extern ResponseCombiner CreateResponseCombiner(int node_count, - CombineType combine_type, CommandDest dest); -extern int CombineResponse(ResponseCombiner combiner, char msg_type, - char *msg_body, size_t len); -extern bool ValidateAndCloseCombiner(ResponseCombiner combiner); -extern bool ValidateAndResetCombiner(ResponseCombiner combiner); -extern void AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates); -extern void CloseCombiner(ResponseCombiner combiner); - -#endif /* COMBINER_H */ diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h index 28c5d8748e..ab95022b5f 100644 --- a/src/include/pgxc/datanode.h +++ b/src/include/pgxc/datanode.h @@ -16,9 +16,9 @@ #ifndef DATANODE_H #define DATANODE_H -#include "combiner.h" +#include "postgres.h" +#include "gtm/gtm_c.h" #include "nodes/pg_list.h" -#include "pgxc/locator.h" #include "utils/snapshot.h" #include <unistd.h> @@ -28,23 +28,23 @@ typedef struct PGconn NODE_CONNECTION; /* Helper structure to access data node from Session */ typedef enum { - DN_CONNECTION_STATE_IDLE, - DN_CONNECTION_STATE_BUSY, - DN_CONNECTION_STATE_COMPLETED, + DN_CONNECTION_STATE_IDLE, /* idle, ready for query */ + DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */ + DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */ + DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */ DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */ - DN_CONNECTION_STATE_ERROR_READY, /* error and received ReadyForQuery */ - DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */ + DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */ DN_CONNECTION_STATE_COPY_IN, DN_CONNECTION_STATE_COPY_OUT } DNConnectionState; #define DN_CONNECTION_STATE_ERROR(dnconn) \ (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \ - || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY \ - || (dnconn)->state == DN_CONNECTION_STATE_ERROR_READY + || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY struct data_node_handle { + int nodenum; /* node identifier 1..NumDataNodes */ /* fd of the connection */ int sock; /* Connection state */ @@ -75,17 +75,25 @@ extern int DataNodeConnected(NODE_CONNECTION * conn); extern int DataNodeConnClean(NODE_CONNECTION * conn); extern void DataNodeCleanAndRelease(int code, Datum arg); -/* Multinode Executor */ -extern void DataNodeBegin(void); -extern int DataNodeCommit(CommandDest dest); -extern int DataNodeRollback(CommandDest dest); +extern DataNodeHandle **get_handles(List *nodelist); +extern void release_handles(void); +extern int get_transaction_nodes(DataNodeHandle ** connections); -extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only); +extern int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle); +extern int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle); -extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); -extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); -extern int DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, CommandDest dest, FILE* copy_file); -extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type, CommandDest dest); +extern int data_node_send_query(DataNodeHandle * handle, const char *query); +extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid); +extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot); + +extern void data_node_receive(const int conn_count, + DataNodeHandle ** connections, struct timeval * timeout); +extern int data_node_read_data(DataNodeHandle * conn); +extern int send_some(DataNodeHandle * handle, int len); +extern int data_node_flush(DataNodeHandle *handle); + +extern char get_message(DataNodeHandle *conn, int *len, char **msg); + +extern void add_error_message(DataNodeHandle * handle, const char *message); -extern int primary_data_node; #endif diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h new file mode 100644 index 0000000000..d99806a01b --- /dev/null +++ b/src/include/pgxc/execRemote.h @@ -0,0 +1,101 @@ +/*------------------------------------------------------------------------- + * + * execRemote.h + * + * Functions to execute commands on multiple Data Nodes + * + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ? + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * IDENTIFICATION + * $$ + * + *------------------------------------------------------------------------- + */ + +#ifndef EXECREMOTE_H +#define EXECREMOTE_H +#include "datanode.h" +#include "locator.h" +#include "planner.h" +#include "access/tupdesc.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" +#include "nodes/pg_list.h" +#include "tcop/dest.h" +#include "utils/snapshot.h" + +/* Outputs of handle_response() */ +#define RESPONSE_EOF EOF +#define RESPONSE_COMPLETE 0 +#define RESPONSE_TUPDESC 1 +#define RESPONSE_DATAROW 2 +#define RESPONSE_COPY 3 + +typedef enum +{ + REQUEST_TYPE_NOT_DEFINED, /* not determined yet */ + REQUEST_TYPE_COMMAND, /* OK or row count response */ + REQUEST_TYPE_QUERY, /* Row description response */ + REQUEST_TYPE_COPY_IN, /* Copy In response */ + REQUEST_TYPE_COPY_OUT /* Copy Out response */ +} RequestType; + + +typedef struct RemoteQueryState +{ + ScanState ss; /* its first field is NodeTag */ + int node_count; /* total count of participating nodes */ + DataNodeHandle **connections; /* data node connections being combined */ + int conn_count; /* count of active connections */ + int current_conn; /* used to balance load when reading from connections */ + CombineType combine_type; /* see CombineType enum */ + DestReceiver *dest; /* output destination */ + int command_complete_count; /* count of received CommandComplete messages */ + uint64 row_count; /* how many rows affected by the query */ + RequestType request_type; /* see RequestType enum */ + TupleDesc tuple_desc; /* tuple descriptor to be referenced by emitted tuples */ + int description_count; /* count of received RowDescription messages */ + int copy_in_count; /* count of received CopyIn messages */ + int copy_out_count; /* count of received CopyOut messages */ + char errorCode[5]; /* error code to send back to client */ + char *errorMessage; /* error message to send back to client */ + bool query_Done; /* query has been sent down to data nodes */ + bool need_tran; /* auto commit on nodes after completion */ + char *completionTag; /* completion tag to present to caller */ + char *msg; /* last data row message */ + int msglen; /* length of the data row message */ + /* + * While we are not supporting grouping use this flag to indicate we need + * to initialize collecting of aggregates from the DNs + */ + bool initAggregates; + List *simple_aggregates; /* description of aggregate functions */ + void *tuplesortstate; /* for merge sort */ + /* Simple DISTINCT support */ + FmgrInfo *eqfunctions; /* functions to compare tuples */ + MemoryContext tmp_ctx; /* separate context is needed to compare tuples */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ +} RemoteQueryState; + +/* Multinode Executor */ +extern void DataNodeBegin(void); +extern int DataNodeCommit(CommandDest dest); +extern int DataNodeRollback(CommandDest dest); + +extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); +extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); +extern uint64 DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file); +extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type); + +extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags); +extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step); +extern void ExecEndRemoteQuery(RemoteQueryState *step); + +extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner); +extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); + +extern int primary_data_node; + +#endif
\ No newline at end of file diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index 4920aace0a..47665b2341 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -17,27 +17,59 @@ #include "fmgr.h" #include "lib/stringinfo.h" +#include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "pgxc/locator.h" -#include "pgxc/combiner.h" +#include "tcop/dest.h" /* for Query_Plan.exec_loc_type can have these OR'ed*/ #define EXEC_ON_COORD 0x1 #define EXEC_ON_DATA_NODES 0x2 +typedef enum +{ + COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */ + COMBINE_TYPE_SUM, /* sum row counts (partitioned, round robin) */ + COMBINE_TYPE_SAME /* expect all row counts to be the same (replicated write) */ +} CombineType; + +/* For sorting within RemoteQuery handling */ +/* + * It is pretty much like Sort, but without Plan. We may use Sort later. + */ +typedef struct +{ + int numCols; /* number of sort-key columns */ + AttrNumber *sortColIdx; /* their indexes in the target list */ + Oid *sortOperators; /* OIDs of operators to sort them by */ + bool *nullsFirst; /* NULLS FIRST/LAST directions */ +} SimpleSort; + +/* For returning distinct results from the RemoteQuery*/ +typedef struct +{ + int numCols; /* number of sort-key columns */ + AttrNumber *uniqColIdx; /* their indexes in the target list */ + Oid *eqOperators; /* OIDs of operators to equate them by */ +} SimpleDistinct; + /* Contains instructions on processing a step of a query. * In the prototype this will be simple, but it will eventually * evolve into a GridSQL-style QueryStep. */ typedef struct { + Plan plan; char *sql_statement; Exec_Nodes *exec_nodes; CombineType combine_type; - List *simple_aggregates; /* simple aggregate to combine on this - * step */ -} Query_Step; + List *simple_aggregates; /* simple aggregate to combine on this step */ + SimpleSort *sort; + SimpleDistinct *distinct; + bool read_only; /* do not use 2PC when committing read only steps */ + bool force_autocommit; /* some commands like VACUUM require autocommit mode */ +} RemoteQuery; /* @@ -48,7 +80,6 @@ typedef struct typedef struct { int exec_loc_type; - bool force_autocommit; /* For CREATE DATABASE */ List *query_step_list; /* List of QuerySteps */ } Query_Plan; @@ -67,7 +98,6 @@ typedef enum /* For handling simple aggregates */ -/* For now, only support int/long types */ typedef struct { int column_pos; /* Only use 1 for now */ diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index e351536d47..e2b3c033e0 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -23,6 +23,9 @@ #include "access/itup.h" #include "executor/tuptable.h" #include "fmgr.h" +#ifdef PGXC +#include "pgxc/execRemote.h" +#endif #include "utils/relcache.h" @@ -64,6 +67,13 @@ extern Tuplesortstate *tuplesort_begin_index_hash(Relation indexRel, extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, bool nullsFirstFlag, int workMem, bool randomAccess); +#ifdef PGXC +extern Tuplesortstate *tuplesort_begin_merge(TupleDesc tupDesc, + int nkeys, AttrNumber *attNums, + Oid *sortOperators, bool *nullsFirstFlags, + RemoteQueryState *combiner, + int workMem); +#endif extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound); |
