diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/access/common/heaptuple.c | 100 | ||||
| -rw-r--r-- | src/backend/access/common/printtup.c | 13 | ||||
| -rw-r--r-- | src/backend/commands/copy.c | 19 | ||||
| -rw-r--r-- | src/backend/executor/execTuples.c | 122 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 541 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/Makefile | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/combiner.c | 652 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/datanode.c | 1571 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 2453 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 238 | ||||
| -rw-r--r-- | src/backend/utils/misc/guc.c | 37 | ||||
| -rw-r--r-- | src/backend/utils/mmgr/mcxt.c | 8 | ||||
| -rw-r--r-- | src/backend/utils/sort/tuplesort.c | 209 | ||||
| -rw-r--r-- | src/include/executor/tuptable.h | 15 | ||||
| -rw-r--r-- | src/include/nodes/nodes.h | 6 | ||||
| -rw-r--r-- | src/include/pgxc/combiner.h | 73 | ||||
| -rw-r--r-- | src/include/pgxc/datanode.h | 46 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 101 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 42 | ||||
| -rw-r--r-- | src/include/utils/tuplesort.h | 10 |
20 files changed, 3862 insertions, 2396 deletions
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c index ac5749c713..cae5794103 100644 --- a/src/backend/access/common/heaptuple.c +++ b/src/backend/access/common/heaptuple.c @@ -57,6 +57,9 @@ #include "postgres.h" +#ifdef PGXC +#include "funcapi.h" +#endif #include "access/heapam.h" #include "access/sysattr.h" #include "access/tuptoaster.h" @@ -1157,6 +1160,80 @@ slot_deform_tuple(TupleTableSlot *slot, int natts) slot->tts_slow = slow; } +#ifdef PGXC +/* + * slot_deform_datarow + * Extract data from the DataRow message into Datum/isnull arrays. + * We always extract all atributes, as specified in tts_tupleDescriptor, + * because there is no easy way to find random attribute in the DataRow. + */ +static void +slot_deform_datarow(TupleTableSlot *slot) +{ + int attnum = slot->tts_tupleDescriptor->natts; + int i; + int col_count; + char *cur = slot->tts_dataRow; + StringInfo buffer; + uint16 n16; + uint32 n32; + + /* fastpath: exit if values already extracted */ + if (slot->tts_nvalid == attnum) + return; + + Assert(slot->tts_dataRow); + + memcpy(&n16, cur, 2); + cur += 2; + col_count = ntohs(n16); + + if (col_count != attnum) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Tuple does not match the descriptor"))); + + if (slot->tts_attinmeta == NULL) + slot->tts_attinmeta = TupleDescGetAttInMetadata(slot->tts_tupleDescriptor); + + buffer = makeStringInfo(); + for (i = 0; i < attnum; i++) + { + Form_pg_attribute attr = slot->tts_tupleDescriptor->attrs[i]; + int len; + + /* get size */ + memcpy(&n32, cur, 4); + cur += 4; + len = ntohl(n32); + + /* get data */ + if (len == -1) + { + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + else + { + appendBinaryStringInfo(buffer, cur, len); + cur += len; + + slot->tts_values[i] = InputFunctionCall(slot->tts_attinmeta->attinfuncs + i, + buffer->data, + slot->tts_attinmeta->attioparams[i], + slot->tts_attinmeta->atttypmods[i]); + slot->tts_isnull[i] = false; + + resetStringInfo(buffer); + } + } + pfree(buffer->data); + pfree(buffer); + + slot->tts_nvalid = attnum; +} +#endif + /* * slot_getattr * This function fetches an attribute of the slot's current tuple. @@ -1250,6 +1327,11 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull) /* * Extract the attribute, along with any preceding attributes. */ +#ifdef PGXC + if (slot->tts_dataRow) + slot_deform_datarow(slot); + else +#endif slot_deform_tuple(slot, attnum); /* @@ -1276,6 +1358,15 @@ slot_getallattrs(TupleTableSlot *slot) if (slot->tts_nvalid == tdesc_natts) return; +#ifdef PGXC + /* Handle the DataRow tuple case */ + if (slot->tts_dataRow) + { + slot_deform_datarow(slot); + return; + } +#endif + /* * otherwise we had better have a physical tuple (tts_nvalid should equal * natts in all virtual-tuple cases) @@ -1319,6 +1410,15 @@ slot_getsomeattrs(TupleTableSlot *slot, int attnum) if (slot->tts_nvalid >= attnum) return; +#ifdef PGXC + /* Handle the DataRow tuple case */ + if (slot->tts_dataRow) + { + slot_deform_datarow(slot); + return; + } +#endif + /* Check for caller error */ if (attnum <= 0 || attnum > slot->tts_tupleDescriptor->natts) elog(ERROR, "invalid attribute number %d", attnum); diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c index 13a09dd9b3..3c77ab4ff4 100644 --- a/src/backend/access/common/printtup.c +++ b/src/backend/access/common/printtup.c @@ -292,6 +292,19 @@ printtup(TupleTableSlot *slot, DestReceiver *self) int natts = typeinfo->natts; int i; +#ifdef PGXC + /* + * If we are having DataRow-based tuple we do not have to encode attribute + * values, just send over the DataRow message as we received it from the + * data node + */ + if (slot->tts_dataRow) + { + pq_putmessage('D', slot->tts_dataRow, slot->tts_dataLen); + return; + } +#endif + /* Set or update my derived attribute info, if needed */ if (myState->attrinfo != typeinfo || myState->nattrs != natts) printtup_prepare_info(myState, typeinfo, natts); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 92f5db85f2..50650190ee 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -35,7 +35,7 @@ #include "parser/parse_relation.h" #ifdef PGXC #include "pgxc/pgxc.h" -#include "pgxc/datanode.h" +#include "pgxc/execRemote.h" #include "pgxc/locator.h" #include "pgxc/poolmgr.h" #endif @@ -1511,8 +1511,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) DataNodeCopyFinish( cstate->connections, primary_data_node, - COMBINE_TYPE_NONE, - whereToSendOutput); + COMBINE_TYPE_NONE); pfree(cstate->connections); pfree(cstate->query_buf.data); FreeRelationLocInfo(cstate->rel_loc); @@ -1526,14 +1525,12 @@ DoCopy(const CopyStmt *stmt, const char *queryString) cstate->processed = DataNodeCopyFinish( cstate->connections, primary_data_node, - COMBINE_TYPE_SAME, - whereToSendOutput); + COMBINE_TYPE_SAME); else cstate->processed = DataNodeCopyFinish( cstate->connections, 0, - COMBINE_TYPE_SUM, - whereToSendOutput); + COMBINE_TYPE_SUM); pfree(cstate->connections); pfree(cstate->query_buf.data); FreeRelationLocInfo(cstate->rel_loc); @@ -1775,10 +1772,10 @@ CopyTo(CopyState cstate) #ifdef PGXC if (IS_PGXC_COORDINATOR && !cstate->on_coord) { - DataNodeCopyOut(GetRelationNodes(cstate->rel_loc, NULL, true), - cstate->connections, - whereToSendOutput, - cstate->copy_file); + cstate->processed = DataNodeCopyOut( + GetRelationNodes(cstate->rel_loc, NULL, true), + cstate->connections, + cstate->copy_file); } else { diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 7b5ba41f5f..a6c496eee9 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -149,6 +149,12 @@ ExecCreateTupleTable(int tableSize) slot->tts_shouldFreeMin = false; slot->tts_tuple = NULL; slot->tts_tupleDescriptor = NULL; +#ifdef PGXC + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + slot->tts_attinmeta = NULL; +#endif slot->tts_mcxt = CurrentMemoryContext; slot->tts_buffer = InvalidBuffer; slot->tts_nvalid = 0; @@ -228,6 +234,12 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc) slot->tts_shouldFreeMin = false; slot->tts_tuple = NULL; slot->tts_tupleDescriptor = NULL; +#ifdef PGXC + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + slot->tts_attinmeta = NULL; +#endif slot->tts_mcxt = CurrentMemoryContext; slot->tts_buffer = InvalidBuffer; slot->tts_nvalid = 0; @@ -334,6 +346,12 @@ ExecSetSlotDescriptor(TupleTableSlot *slot, /* slot to change */ if (slot->tts_tupleDescriptor) ReleaseTupleDesc(slot->tts_tupleDescriptor); +#ifdef PGXC + /* XXX there in no routine to release AttInMetadata instance */ + if (slot->tts_attinmeta) + slot->tts_attinmeta = NULL; +#endif + if (slot->tts_values) pfree(slot->tts_values); if (slot->tts_isnull) @@ -415,6 +433,14 @@ ExecStoreTuple(HeapTuple tuple, heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif /* * Store the new tuple into the specified slot. @@ -476,6 +502,14 @@ ExecStoreMinimalTuple(MinimalTuple mtup, heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif /* * Drop the pin on the referenced buffer, if there is one. @@ -504,6 +538,62 @@ ExecStoreMinimalTuple(MinimalTuple mtup, return slot; } +#ifdef PGXC +/* -------------------------------- + * ExecStoreDataRowTuple + * + * Store a buffer in DataRow message format into the slot. + * + * -------------------------------- + */ +TupleTableSlot * +ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree) +{ + /* + * sanity checks + */ + Assert(msg != NULL); + Assert(len > 0); + Assert(slot != NULL); + Assert(slot->tts_tupleDescriptor != NULL); + + /* + * Free any old physical tuple belonging to the slot. + */ + if (slot->tts_shouldFree) + heap_freetuple(slot->tts_tuple); + if (slot->tts_shouldFreeMin) + heap_free_minimal_tuple(slot->tts_mintuple); + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + /* + * Drop the pin on the referenced buffer, if there is one. + */ + if (BufferIsValid(slot->tts_buffer)) + ReleaseBuffer(slot->tts_buffer); + + slot->tts_buffer = InvalidBuffer; + + /* + * Store the new tuple into the specified slot. + */ + slot->tts_isempty = false; + slot->tts_shouldFree = false; + slot->tts_shouldFreeMin = false; + slot->tts_shouldFreeRow = shouldFree; + slot->tts_tuple = NULL; + slot->tts_mintuple = NULL; + slot->tts_dataRow = msg; + slot->tts_dataLen = len; + + /* Mark extracted state invalid */ + slot->tts_nvalid = 0; + + return slot; +} +#endif + /* -------------------------------- * ExecClearTuple * @@ -527,6 +617,14 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */ heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif slot->tts_tuple = NULL; slot->tts_mintuple = NULL; @@ -634,7 +732,13 @@ ExecCopySlotTuple(TupleTableSlot *slot) return heap_copytuple(slot->tts_tuple); if (slot->tts_mintuple) return heap_tuple_from_minimal_tuple(slot->tts_mintuple); - +#ifdef PGXC + /* + * Ensure values are extracted from data row to the Datum array + */ + if (slot->tts_dataRow) + slot_getallattrs(slot); +#endif /* * Otherwise we need to build a tuple from the Datum array. */ @@ -667,7 +771,13 @@ ExecCopySlotMinimalTuple(TupleTableSlot *slot) return heap_copy_minimal_tuple(slot->tts_mintuple); if (slot->tts_tuple) return minimal_tuple_from_heap_tuple(slot->tts_tuple); - +#ifdef PGXC + /* + * Ensure values are extracted from data row to the Datum array + */ + if (slot->tts_dataRow) + slot_getallattrs(slot); +#endif /* * Otherwise we need to build a tuple from the Datum array. */ @@ -861,6 +971,14 @@ ExecMaterializeSlot(TupleTableSlot *slot) if (!slot->tts_shouldFreeMin) slot->tts_mintuple = NULL; +#ifdef PGXC + if (!slot->tts_shouldFreeRow) + { + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + } +#endif + return slot->tts_tuple; } diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index ae537e5f1f..1bbbb75848 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -22,8 +22,10 @@ #include "catalog/pg_type.h" #include "lib/stringinfo.h" #include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "optimizer/clauses.h" +#include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "pgxc/locator.h" @@ -123,12 +125,10 @@ typedef struct XCWalkerContext int varno; bool within_or; bool within_not; + List *join_list; /* A list of List*'s, one for each relation. */ } XCWalkerContext; -/* A list of List*'s, one for each relation. */ -List *join_list = NULL; - /* Forbid unsafe SQL statements */ bool StrictStatementChecking = true; @@ -185,12 +185,12 @@ new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * Look up the join struct for a particular join */ static PGXC_Join * -find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) +find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context) { ListCell *lc; /* return if list is still empty */ - if (join_list == NULL) + if (context->join_list == NULL) return NULL; /* in the PGXC_Join struct, we always sort with relid1 < relid2 */ @@ -209,7 +209,7 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * there should be a small number, so we just search linearly, although * long term a hash table would be better. */ - foreach(lc, join_list) + foreach(lc, context->join_list) { PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc); @@ -225,16 +225,16 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * Find or create a join between 2 relations */ static PGXC_Join * -find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) +find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context) { PGXC_Join *pgxcjoin; - pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2); + pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2, context); if (pgxcjoin == NULL) { pgxcjoin = new_pgxc_join(relid1, aliasname1, relid2, aliasname2); - join_list = lappend(join_list, pgxcjoin); + context->join_list = lappend(context->join_list, pgxcjoin); } return pgxcjoin; @@ -277,7 +277,7 @@ free_special_relations(Special_Conditions *special_conditions) * frees join_list */ static void -free_join_list(void) +free_join_list(List *join_list) { if (join_list == NULL) return; @@ -368,13 +368,13 @@ get_base_var(Var *var, XCWalkerContext *context) } else if (rte->rtekind == RTE_SUBQUERY) { - /* + /* * Handle views like select * from v1 where col1 = 1 * where col1 is partition column of base relation */ /* the varattno corresponds with the subquery's target list (projections) */ TargetEntry *tle = list_nth(rte->subquery->targetList, var->varattno - 1); /* or varno? */ - + if (!IsA(tle->expr, Var)) return NULL; /* not column based expressoin, return */ else @@ -684,7 +684,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) /* get data struct about these two relations joining */ pgxc_join = find_or_create_pgxc_join(column_base->relid, column_base->relalias, - column_base2->relid, column_base2->relalias); + column_base2->relid, column_base2->relalias, context); if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED) { @@ -914,7 +914,7 @@ contains_only_pg_catalog (List *rtable) { if (get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE) return false; - } else if (rte->rtekind == RTE_SUBQUERY && + } else if (rte->rtekind == RTE_SUBQUERY && !contains_only_pg_catalog (rte->subquery->rtable)) return false; } @@ -967,7 +967,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) { /* May be complicated. Before giving up, just check for pg_catalog usage */ if (contains_only_pg_catalog (query->rtable)) - { + { /* just pg_catalog tables */ context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; @@ -1018,7 +1018,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* We compare to make sure that the subquery is safe to execute with previous- * we may have multiple ones in the FROM clause. - * We handle the simple case of allowing multiple subqueries in the from clause, + * We handle the simple case of allowing multiple subqueries in the from clause, * but only allow one of them to not contain replicated tables */ if (!from_query_nodes) @@ -1028,20 +1028,20 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* ok, safe */ if (!from_query_nodes) from_query_nodes = current_nodes; - } + } else { if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) from_query_nodes = current_nodes; else { - /* Allow if they are both using one node, and the same one */ + /* Allow if they are both using one node, and the same one */ if (!same_single_node (from_query_nodes->nodelist, current_nodes->nodelist)) /* Complicated */ return true; } } - } + } else if (rte->rtekind == RTE_RELATION) { /* Look for pg_catalog tables */ @@ -1049,7 +1049,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) current_usage_type = TABLE_USAGE_TYPE_PGCATALOG; else current_usage_type = TABLE_USAGE_TYPE_USER; - } + } else if (rte->rtekind == RTE_FUNCTION) { /* See if it is a catalog function */ @@ -1095,9 +1095,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) return true; /* Examine join conditions, see if each join is single-node safe */ - if (join_list != NULL) + if (context->join_list != NULL) { - foreach(lc, join_list) + foreach(lc, context->join_list) { PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc); @@ -1254,22 +1254,28 @@ static Exec_Nodes * get_plan_nodes(Query *query, bool isRead) { Exec_Nodes *result_nodes; - XCWalkerContext *context = palloc0(sizeof(XCWalkerContext)); - - context->query = query; - context->isRead = isRead; - - context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); - context->rtables = lappend(context->rtables, query->rtable); - - join_list = NULL; - - if (get_plan_nodes_walker((Node *) query, context)) + XCWalkerContext context; + + + context.query = query; + context.isRead = isRead; + context.exec_nodes = NULL; + context.conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); + context.rtables = NIL; + context.rtables = lappend(context.rtables, query->rtable); + context.multilevel_join = false; + context.varno = 0; + context.within_or = false; + context.within_not = false; + context.join_list = NIL; + + if (get_plan_nodes_walker((Node *) query, &context)) result_nodes = NULL; else - result_nodes = context->exec_nodes; + result_nodes = context.exec_nodes; - free_special_relations(context->conditions); + free_special_relations(context.conditions); + free_join_list(context.join_list); return result_nodes; } @@ -1304,7 +1310,6 @@ get_plan_nodes_command(Query *query) return NULL; } - free_join_list(); return exec_nodes; } @@ -1345,17 +1350,17 @@ static List * get_simple_aggregates(Query * query) { List *simple_agg_list = NIL; - + /* Check for simple multi-node aggregate */ if (query->hasAggs) { ListCell *lc; int column_pos = 0; - + foreach (lc, query->targetList) { TargetEntry *tle = (TargetEntry *) lfirst(lc); - + if (IsA(tle->expr, Aggref)) { /*PGXC borrowed this code from nodeAgg.c, see ExecInitAgg()*/ @@ -1422,7 +1427,7 @@ get_simple_aggregates(Query * query) get_func_name(finalfn_oid)); } } - + /* resolve actual type of transition state, if polymorphic */ aggcollecttype = aggform->aggcollecttype; @@ -1468,7 +1473,7 @@ get_simple_aggregates(Query * query) get_typlenbyval(aggcollecttype, &simple_agg->transtypeLen, &simple_agg->transtypeByVal); - + /* * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. @@ -1534,6 +1539,427 @@ get_simple_aggregates(Query * query) /* + * add_sort_column --- utility subroutine for building sort info arrays + * + * We need this routine because the same column might be selected more than + * once as a sort key column; if so, the extra mentions are redundant. + * + * Caller is assumed to have allocated the arrays large enough for the + * max possible number of columns. Return value is the new column count. + * + * PGXC: copied from optimizer/plan/planner.c + */ +static int +add_sort_column(AttrNumber colIdx, Oid sortOp, bool nulls_first, + int numCols, AttrNumber *sortColIdx, + Oid *sortOperators, bool *nullsFirst) +{ + int i; + + Assert(OidIsValid(sortOp)); + + for (i = 0; i < numCols; i++) + { + /* + * Note: we check sortOp because it's conceivable that "ORDER BY foo + * USING <, foo USING <<<" is not redundant, if <<< distinguishes + * values that < considers equal. We need not check nulls_first + * however because a lower-order column with the same sortop but + * opposite nulls direction is redundant. + */ + if (sortColIdx[i] == colIdx && sortOperators[i] == sortOp) + { + /* Already sorting by this col, so extra sort key is useless */ + return numCols; + } + } + + /* Add the column */ + sortColIdx[numCols] = colIdx; + sortOperators[numCols] = sortOp; + nullsFirst[numCols] = nulls_first; + return numCols + 1; +} + +/* + * add_distinct_column - utility subroutine to remove redundant columns, just + * like add_sort_column + */ +static int +add_distinct_column(AttrNumber colIdx, Oid eqOp, int numCols, + AttrNumber *sortColIdx, Oid *eqOperators) +{ + int i; + + Assert(OidIsValid(eqOp)); + + for (i = 0; i < numCols; i++) + { + if (sortColIdx[i] == colIdx && eqOperators[i] == eqOp) + { + /* Already sorting by this col, so extra sort key is useless */ + return numCols; + } + } + + /* Add the column */ + sortColIdx[numCols] = colIdx; + eqOperators[numCols] = eqOp; + return numCols + 1; +} + + +/* + * Reconstruct the step query + */ +static void +reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, + RemoteQuery *step) +{ + List *context; + bool useprefix; + List *sub_tlist = step->plan.targetlist; + ListCell *l; + StringInfo buf = makeStringInfo(); + char *sql; + char *cur; + char *sql_from; + + context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL); + useprefix = list_length(rtable) > 1; + + foreach(l, sub_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + char *exprstr = deparse_expression((Node *) tle->expr, context, + useprefix, false); + + if (buf->len == 0) + { + appendStringInfo(buf, "SELECT "); + if (step->distinct) + appendStringInfo(buf, "DISTINCT "); + } + else + appendStringInfo(buf, ", "); + + appendStringInfoString(buf, exprstr); + } + + /* + * A kind of dummy + * Do not reconstruct remaining query, just search original statement + * for " FROM " and append remainder to the target list we just generated. + * Do not handle the case if " FROM " we found is not a "FROM" keyword, but, + * for example, a part of string constant. + */ + sql = pstrdup(step->sql_statement); /* mutable copy */ + /* string to upper case, for comparing */ + cur = sql; + while (*cur) + { + /* replace whitespace with a space */ + if (isspace((unsigned char) *cur)) + *cur = ' '; + *cur++ = toupper(*cur); + } + + /* find the keyword */ + sql_from = strstr(sql, " FROM "); + if (sql_from) + { + /* the same offset in the original string */ + int offset = sql_from - sql; + /* remove terminating semicolon */ + char *end = strrchr(step->sql_statement, ';'); + *end = '\0'; + + appendStringInfoString(buf, step->sql_statement + offset); + } + + if (extra_sort) + { + foreach(l, extra_sort) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + char *exprstr = deparse_expression((Node *) tle->expr, context, + useprefix, false); + + if (has_order_by) + appendStringInfo(buf, ", "); + else + { + appendStringInfo(buf, " ORDER BY "); + has_order_by = true; + } + + appendStringInfoString(buf, exprstr); + } + } + + /* do not need the copy */ + pfree(sql); + + /* free previous query */ + pfree(step->sql_statement); + /* get a copy of new query */ + step->sql_statement = pstrdup(buf->data); + /* free the query buffer */ + pfree(buf->data); + pfree(buf); +} + + +/* + * Plan to sort step tuples + * PGXC: copied and adopted from optimizer/plan/planner.c + */ +static void +make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step) +{ + List *sortcls = query->sortClause; + List *distinctcls = query->distinctClause; + List *sub_tlist = step->plan.targetlist; + SimpleSort *sort; + SimpleDistinct *distinct; + ListCell *l; + int numsortkeys; + int numdistkeys; + AttrNumber *sortColIdx; + AttrNumber *distColIdx; + Oid *sortOperators; + Oid *eqOperators; + bool *nullsFirst; + bool need_reconstruct = false; + /* + * List of target list entries from DISTINCT which are not in the ORDER BY. + * The exressions should be appended to the ORDER BY clause of remote query + */ + List *extra_distincts = NIL; + + Assert(step->sort == NULL); + Assert(step->distinct == NULL); + + /* + * We will need at most list_length(sortcls) sort columns; possibly less + * Also need room for extra distinct expressions if we need to append them + */ + numsortkeys = list_length(sortcls) + list_length(distinctcls); + sortColIdx = (AttrNumber *) palloc(numsortkeys * sizeof(AttrNumber)); + sortOperators = (Oid *) palloc(numsortkeys * sizeof(Oid)); + nullsFirst = (bool *) palloc(numsortkeys * sizeof(bool)); + + numsortkeys = 0; + sort = (SimpleSort *) palloc(sizeof(SimpleSort)); + + if (sortcls) + { + foreach(l, sortcls) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist); + + if (tle->resjunk) + need_reconstruct = true; + + /* + * Check for the possibility of duplicate order-by clauses --- the + * parser should have removed 'em, but no point in sorting + * redundantly. + */ + numsortkeys = add_sort_column(tle->resno, sortcl->sortop, + sortcl->nulls_first, + numsortkeys, + sortColIdx, sortOperators, nullsFirst); + } + } + + if (distinctcls) + { + /* + * Validate distinct clause + * We have to sort tuples to filter duplicates, and if ORDER BY clause + * is already present the sort order specified here may be incompatible + * with order needed for distinct. + * + * To be compatible, all expressions from DISTINCT must appear at the + * beginning of ORDER BY list. If list of DISTINCT expressions is longer + * then ORDER BY we can make ORDER BY compatible we can append remaining + * expressions from DISTINCT to ORDER BY. Obviously ORDER BY must not + * contain expressions not from the DISTINCT list in this case. + * + * For validation purposes we use column indexes (AttrNumber) to + * identify expressions. May be this is not enough and we should revisit + * the algorithm. + * + * We validate compatibility as follow: + * 1. Make working copy of DISTINCT + * 1a. Remove possible duplicates when copying: do not add expression + * 2. If order by is empty they are already compatible, skip 3 + * 3. Iterate over ORDER BY items + * 3a. If the item is in the working copy delete it from the working + * list. If working list is empty after deletion DISTINCT and + * ORDER BY are compatible, so break the loop. If working list is + * not empty continue iterating + * 3b. ORDER BY clause may contain duplicates. So if we can not found + * expression in the remainder of DISTINCT, probably it has already + * been removed because of duplicate ORDER BY entry. Check original + * DISTINCT clause, if expression is there continue iterating. + * 3c. DISTINCT and ORDER BY are not compatible, emit error + * 4. DISTINCT and ORDER BY are compatible, if we have remaining items + * in the working copy we should append it to the order by list + */ + /* + * Create the list of unique DISTINCT clause expressions + */ + foreach(l, distinctcls) + { + SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist); + bool found = false; + + if (extra_distincts) + { + ListCell *xl; + + foreach(xl, extra_distincts) + { + TargetEntry *xtle = (TargetEntry *) lfirst(xl); + if (xtle->resno == tle->resno) + { + found = true; + break; + } + } + } + + if (!found) + extra_distincts = lappend(extra_distincts, tle); + } + + if (sortcls) + { + foreach(l, sortcls) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist); + bool found = false; + ListCell *xl; + ListCell *prev = NULL; + + /* Search for the expression in the DISTINCT clause */ + foreach(xl, extra_distincts) + { + TargetEntry *xtle = (TargetEntry *) lfirst(xl); + if (xtle->resno == tle->resno) + { + extra_distincts = list_delete_cell(extra_distincts, xl, + prev); + found = true; + break; + } + prev = xl; + } + + /* Probably we've done */ + if (found && list_length(extra_distincts) == 0) + break; + + /* Ensure sort expression is not a duplicate */ + if (!found) + { + foreach(xl, distinctcls) + { + SortGroupClause *xcl = (SortGroupClause *) lfirst(xl); + TargetEntry *xtle = get_sortgroupclause_tle(xcl, sub_tlist); + if (xtle->resno == tle->resno) + { + /* it is a duplicate then */ + found = true; + break; + } + } + } + + /* Give up, we do not support it */ + if (!found) + { + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Such combination of ORDER BY and DISTINCT is not yet supported")))); + } + } + } + /* need to append to the ORDER BY */ + if (list_length(extra_distincts) > 0) + need_reconstruct = true; + + /* + * End of validation, expression to append to ORDER BY are in the + * extra_distincts list + */ + + distinct = (SimpleDistinct *) palloc(sizeof(SimpleDistinct)); + + /* + * We will need at most list_length(distinctcls) sort columns + */ + numdistkeys = list_length(distinctcls); + distColIdx = (AttrNumber *) palloc(numdistkeys * sizeof(AttrNumber)); + eqOperators = (Oid *) palloc(numdistkeys * sizeof(Oid)); + + numdistkeys = 0; + + foreach(l, distinctcls) + { + SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist); + + /* + * Check for the possibility of duplicate order-by clauses --- the + * parser should have removed 'em, but no point in sorting + * redundantly. + */ + numdistkeys = add_distinct_column(tle->resno, + distinctcl->eqop, + numdistkeys, + distColIdx, + eqOperators); + /* append also extra sort operator, if not already there */ + numsortkeys = add_sort_column(tle->resno, + distinctcl->sortop, + distinctcl->nulls_first, + numsortkeys, + sortColIdx, + sortOperators, + nullsFirst); + } + + Assert(numdistkeys > 0); + + distinct->numCols = numdistkeys; + distinct->uniqColIdx = distColIdx; + distinct->eqOperators = eqOperators; + + step->distinct = distinct; + } + + + Assert(numsortkeys > 0); + + sort->numCols = numsortkeys; + sort->sortColIdx = sortColIdx; + sort->sortOperators = sortOperators; + sort->nullsFirst = nullsFirst; + + step->sort = sort; + + if (need_reconstruct) + reconstruct_step_query(query->rtable, sortcls != NULL, extra_distincts, + step); +} + +/* * Build up a QueryPlan to execute on. * * For the prototype, there will only be one step, @@ -1543,17 +1969,16 @@ Query_Plan * GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) { Query_Plan *query_plan = palloc(sizeof(Query_Plan)); - Query_Step *query_step = palloc(sizeof(Query_Step)); + RemoteQuery *query_step = makeNode(RemoteQuery); Query *query; - - query_plan->force_autocommit = false; - query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1); strcpy(query_step->sql_statement, sql_statement); query_step->exec_nodes = NULL; query_step->combine_type = COMBINE_TYPE_NONE; query_step->simple_aggregates = NULL; + query_step->read_only = false; + query_step->force_autocommit = false; query_plan->query_step_list = lappend(NULL, query_step); @@ -1565,11 +1990,16 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) switch (nodeTag(parsetree)) { case T_SelectStmt: + /* Optimize multi-node handling */ + query_step->read_only = true; + /* fallthru */ case T_InsertStmt: case T_UpdateStmt: case T_DeleteStmt: /* just use first one in querytree_list */ query = (Query *) linitial(querytree_list); + /* should copy instead ? */ + query_step->plan.targetlist = query->targetList; /* Perform some checks to make sure we can support the statement */ if (nodeTag(parsetree) == T_SelectStmt) @@ -1633,6 +2063,12 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) } /* + * Add sortring to the step + */ + if (query->sortClause || query->distinctClause) + make_simple_sort_from_sortclauses(query, query_step); + + /* * PG-XC cannot yet support some variations of SQL statements. * We perform some checks to at least catch common cases */ @@ -1658,15 +2094,6 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) ereport(ERROR, (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("Multi-node LIMIT not yet supported")))); - if (query->sortClause && StrictSelectChecking) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node ORDER BY not yet supported")))); - /* PGXCTODO - check if first column partitioning column */ - if (query->distinctClause) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node DISTINCT`not yet supported")))); } } break; @@ -1686,7 +2113,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) case T_DropdbStmt: case T_VacuumStmt: query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - query_plan->force_autocommit = true; + query_step->force_autocommit = true; break; case T_DropPropertyStmt: @@ -1864,7 +2291,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) * Free Query_Step struct */ static void -free_query_step(Query_Step *query_step) +free_query_step(RemoteQuery *query_step) { if (query_step == NULL) return; @@ -1894,7 +2321,7 @@ FreeQueryPlan(Query_Plan *query_plan) return; foreach(item, query_plan->query_step_list) - free_query_step((Query_Step *) lfirst(item)); + free_query_step((RemoteQuery *) lfirst(item)); pfree(query_plan->query_step_list); pfree(query_plan); diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile index 7143af5d97..e8753031c2 100644 --- a/src/backend/pgxc/pool/Makefile +++ b/src/backend/pgxc/pool/Makefile @@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = combiner.o datanode.o poolmgr.o poolcomm.o +OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c deleted file mode 100644 index 53e5dfb11c..0000000000 --- a/src/backend/pgxc/pool/combiner.c +++ /dev/null @@ -1,652 +0,0 @@ -/*------------------------------------------------------------------------- - * - * combiner.c - * - * Combine responses from multiple Data Nodes - * - * - * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group - * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation - * - * IDENTIFICATION - * $$ - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" -#include "pgxc/combiner.h" -#include "pgxc/planner.h" -#include "catalog/pg_type.h" -#include "libpq/libpq.h" -#include "libpq/pqformat.h" -#include "utils/builtins.h" -#include "utils/datum.h" - - -/* - * Create a structure to store parameters needed to combine responses from - * multiple connections as well as state information - */ -ResponseCombiner -CreateResponseCombiner(int node_count, CombineType combine_type, - CommandDest dest) -{ - ResponseCombiner combiner; - - /* ResponseComber is a typedef for pointer to ResponseCombinerData */ - combiner = (ResponseCombiner) palloc(sizeof(ResponseCombinerData)); - if (combiner == NULL) - { - /* Out of memory */ - return combiner; - } - - combiner->node_count = node_count; - combiner->combine_type = combine_type; - combiner->dest = dest; - combiner->command_complete_count = 0; - combiner->row_count = 0; - combiner->request_type = REQUEST_TYPE_NOT_DEFINED; - combiner->description_count = 0; - combiner->copy_in_count = 0; - combiner->copy_out_count = 0; - combiner->inErrorState = false; - combiner->initAggregates = true; - combiner->simple_aggregates = NULL; - combiner->copy_file = NULL; - - return combiner; -} - -/* - * Parse out row count from the command status response and convert it to integer - */ -static int -parse_row_count(const char *message, size_t len, int *rowcount) -{ - int digits = 0; - int pos; - - *rowcount = 0; - /* skip \0 string terminator */ - for (pos = 0; pos < len - 1; pos++) - { - if (message[pos] >= '0' && message[pos] <= '9') - { - *rowcount = *rowcount * 10 + message[pos] - '0'; - digits++; - } - else - { - *rowcount = 0; - digits = 0; - } - } - return digits; -} - -/* - * Extract a transition value from data row. Invoke the Input Function - * associated with the transition data type to represent value as a Datum. - * Output parameters value and val_null, receive extracted value and indicate - * whether it is null. - */ -static void -parse_aggregate_value(SimpleAgg *simple_agg, char *col_data, size_t datalen, Datum *value, bool *val_null) -{ - /* Check NULL */ - if (datalen == -1) - { - *value = (Datum) 0; - *val_null = true; - } - else - { - resetStringInfo(&simple_agg->valuebuf); - appendBinaryStringInfo(&simple_agg->valuebuf, col_data, datalen); - *value = InputFunctionCall(&simple_agg->arginputfn, simple_agg->valuebuf.data, simple_agg->argioparam, -1); - *val_null = false; - } -} - -/* - * Initialize the collection value, when agregation is first set up, or for a - * new group (grouping support is not implemented yet) - */ -static void -initialize_collect_aggregates(SimpleAgg *simple_agg) -{ - if (simple_agg->initValueIsNull) - simple_agg->collectValue = simple_agg->initValue; - else - simple_agg->collectValue = datumCopy(simple_agg->initValue, - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - simple_agg->noCollectValue = simple_agg->initValueIsNull; - simple_agg->collectValueNull = simple_agg->initValueIsNull; -} - -/* - * Finalize the aggregate after current group or entire relation is processed - * (grouping support is not implemented yet) - */ -static void -finalize_collect_aggregates(SimpleAgg *simple_agg, Datum *resultVal, bool *resultIsNull) -{ - /* - * Apply the agg's finalfn if one is provided, else return collectValue. - */ - if (OidIsValid(simple_agg->finalfn_oid)) - { - FunctionCallInfoData fcinfo; - - InitFunctionCallInfoData(fcinfo, &(simple_agg->finalfn), 1, - (void *) simple_agg, NULL); - fcinfo.arg[0] = simple_agg->collectValue; - fcinfo.argnull[0] = simple_agg->collectValueNull; - if (fcinfo.flinfo->fn_strict && simple_agg->collectValueNull) - { - /* don't call a strict function with NULL inputs */ - *resultVal = (Datum) 0; - *resultIsNull = true; - } - else - { - *resultVal = FunctionCallInvoke(&fcinfo); - *resultIsNull = fcinfo.isnull; - } - } - else - { - *resultVal = simple_agg->collectValue; - *resultIsNull = simple_agg->collectValueNull; - } -} - -/* - * Given new input value(s), advance the transition function of an aggregate. - * - * The new values (and null flags) have been preloaded into argument positions - * 1 and up in fcinfo, so that we needn't copy them again to pass to the - * collection function. No other fields of fcinfo are assumed valid. - * - * It doesn't matter which memory context this is called in. - */ -static void -advance_collect_function(SimpleAgg *simple_agg, FunctionCallInfoData *fcinfo) -{ - Datum newVal; - - if (simple_agg->transfn.fn_strict) - { - /* - * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. - */ - if (fcinfo->argnull[1]) - return; - if (simple_agg->noCollectValue) - { - /* - * result has not been initialized - * We must copy the datum into result if it is pass-by-ref. We - * do not need to pfree the old result, since it's NULL. - */ - simple_agg->collectValue = datumCopy(fcinfo->arg[1], - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - simple_agg->collectValueNull = false; - simple_agg->noCollectValue = false; - return; - } - if (simple_agg->collectValueNull) - { - /* - * Don't call a strict function with NULL inputs. Note it is - * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. - */ - return; - } - } - - /* - * OK to call the transition function - */ - InitFunctionCallInfoData(*fcinfo, &(simple_agg->transfn), 2, (void *) simple_agg, NULL); - fcinfo->arg[0] = simple_agg->collectValue; - fcinfo->argnull[0] = simple_agg->collectValueNull; - newVal = FunctionCallInvoke(fcinfo); - - /* - * If pass-by-ref datatype, must copy the new value into aggcontext and - * pfree the prior transValue. But if transfn returned a pointer to its - * first input, we don't need to do anything. - */ - if (!simple_agg->transtypeByVal && - DatumGetPointer(newVal) != DatumGetPointer(simple_agg->collectValue)) - { - if (!fcinfo->isnull) - { - newVal = datumCopy(newVal, - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - } - if (!simple_agg->collectValueNull) - pfree(DatumGetPointer(simple_agg->collectValue)); - } - - simple_agg->collectValue = newVal; - simple_agg->collectValueNull = fcinfo->isnull; -} - -/* - * Handle response message and update combiner's state. - * This function contains main combiner logic - */ -int -CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t len) -{ - int digits = 0; - - /* Ignore anything if we have encountered error */ - if (combiner->inErrorState) - return EOF; - - switch (msg_type) - { - case 'c': /* CopyOutCommandComplete */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - /* Just do nothing, close message is managed by the coordinator */ - combiner->copy_out_count++; - break; - case 'C': /* CommandComplete */ - /* - * If we did not receive description we are having rowcount or OK - * response - */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COMMAND; - /* Extract rowcount */ - if (combiner->combine_type != COMBINE_TYPE_NONE) - { - int rowcount; - digits = parse_row_count(msg_body, len, &rowcount); - if (digits > 0) - { - /* Replicated write, make sure they are the same */ - if (combiner->combine_type == COMBINE_TYPE_SAME) - { - if (combiner->command_complete_count) - { - if (rowcount != combiner->row_count) - /* There is a consistency issue in the database with the replicated table */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Write to replicated table returned different results from the data nodes"))); - } - else - /* first result */ - combiner->row_count = rowcount; - } - else - combiner->row_count += rowcount; - } - else - combiner->combine_type = COMBINE_TYPE_NONE; - } - if (++combiner->command_complete_count == combiner->node_count) - { - - if (combiner->simple_aggregates - /* - * Aggregates has not been initialized - that means - * no rows received from data nodes, nothing to send - * It is possible if HAVING clause is present - */ - && !combiner->initAggregates) - { - /* Build up and send a datarow with aggregates */ - StringInfo dataRowBuffer = makeStringInfo(); - ListCell *lc; - - /* Number of fields */ - pq_sendint(dataRowBuffer, list_length(combiner->simple_aggregates), 2); - - foreach (lc, combiner->simple_aggregates) - { - SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); - Datum resultVal; - bool resultIsNull; - - finalize_collect_aggregates(simple_agg, &resultVal, &resultIsNull); - /* Aggregation result */ - if (resultIsNull) - { - pq_sendint(dataRowBuffer, -1, 4); - } - else - { - char *text = OutputFunctionCall(&simple_agg->resoutputfn, resultVal); - size_t len = strlen(text); - pq_sendint(dataRowBuffer, len, 4); - pq_sendtext(dataRowBuffer, text, len); - } - } - pq_putmessage('D', dataRowBuffer->data, dataRowBuffer->len); - pfree(dataRowBuffer->data); - pfree(dataRowBuffer); - } - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - { - if (combiner->combine_type == COMBINE_TYPE_NONE) - { - pq_putmessage(msg_type, msg_body, len); - } - else - { - char command_complete_buffer[256]; - - /* Truncate msg_body to get base string */ - msg_body[len - digits - 1] = '\0'; - len = sprintf(command_complete_buffer, "%s%d", msg_body, combiner->row_count) + 1; - pq_putmessage(msg_type, command_complete_buffer, len); - } - } - } - break; - case 'T': /* RowDescription */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_QUERY; - if (combiner->request_type != REQUEST_TYPE_QUERY) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy first */ - if (combiner->description_count++ == 0) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'S': /* ParameterStatus (SET command) */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_QUERY; - if (combiner->request_type != REQUEST_TYPE_QUERY) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy last */ - if (++combiner->description_count == combiner->node_count) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'G': /* CopyInResponse */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_IN; - if (combiner->request_type != REQUEST_TYPE_COPY_IN) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy first */ - if (combiner->copy_in_count++ == 0) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'H': /* CopyOutResponse */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* - * The normal PG code will output an H message when it runs in the - * coordinator, so do not proxy message here, just count it. - */ - combiner->copy_out_count++; - break; - case 'd': /* CopyOutDataRow */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - - /* Inconsistent responses */ - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - - /* If there is a copy file, data has to be sent to the local file */ - if (combiner->copy_file) - { - /* write data to the copy file */ - char *data_row; - data_row = (char *) palloc0(len); - memcpy(data_row, msg_body, len); - - fwrite(data_row, 1, len, combiner->copy_file); - break; - } - /* - * In this case data is sent back to the client - */ - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - { - StringInfo data_buffer; - - data_buffer = makeStringInfo(); - - pq_sendtext(data_buffer, msg_body, len); - pq_putmessage(msg_type, - data_buffer->data, - data_buffer->len); - - pfree(data_buffer->data); - pfree(data_buffer); - } - break; - case 'D': /* DataRow */ - if (!combiner->simple_aggregates) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - else - { - ListCell *lc; - char **col_values; - int *col_value_len; - uint16 col_count; - int i, cur = 0; - - /* Get values from the data row into array to speed up access */ - memcpy(&col_count, msg_body, 2); - col_count = ntohs(col_count); - cur += 2; - - col_values = (char **) palloc0(col_count * sizeof(char *)); - col_value_len = (int *) palloc0(col_count * sizeof(int)); - for (i = 0; i < col_count; i++) - { - int n32; - - memcpy(&n32, msg_body + cur, 4); - col_value_len[i] = ntohl(n32); - cur += 4; - - if (col_value_len[i] != -1) - { - col_values[i] = msg_body + cur; - cur += col_value_len[i]; - } - } - - if (combiner->initAggregates) - { - foreach (lc, combiner->simple_aggregates) - initialize_collect_aggregates((SimpleAgg *) lfirst(lc)); - - combiner->initAggregates = false; - } - - foreach (lc, combiner->simple_aggregates) - { - SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); - FunctionCallInfoData fcinfo; - - parse_aggregate_value(simple_agg, - col_values[simple_agg->column_pos], - col_value_len[simple_agg->column_pos], - fcinfo.arg + 1, - fcinfo.argnull + 1); - - advance_collect_function(simple_agg, &fcinfo); - } - pfree(col_values); - pfree(col_value_len); - } - break; - case 'E': /* ErrorResponse */ - combiner->inErrorState = true; - /* fallthru */ - case 'A': /* NotificationResponse */ - case 'N': /* NoticeResponse */ - /* Proxy error message back if specified, - * or if doing internal primary copy - */ - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - break; - case 'I': /* EmptyQuery */ - default: - /* Unexpected message */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - return 0; -} - -/* - * Examine the specified combiner state and determine if command was completed - * successfully - */ -static bool -validate_combiner(ResponseCombiner combiner) -{ - /* There was error message while combining */ - if (combiner->inErrorState) - return false; - /* Check if state is defined */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - return false; - /* Check all nodes completed */ - if ((combiner->request_type == REQUEST_TYPE_COMMAND - || combiner->request_type == REQUEST_TYPE_QUERY) - && combiner->command_complete_count != combiner->node_count) - return false; - - /* Check count of description responses */ - if (combiner->request_type == REQUEST_TYPE_QUERY - && combiner->description_count != combiner->node_count) - return false; - - /* Check count of copy-in responses */ - if (combiner->request_type == REQUEST_TYPE_COPY_IN - && combiner->copy_in_count != combiner->node_count) - return false; - - /* Check count of copy-out responses */ - if (combiner->request_type == REQUEST_TYPE_COPY_OUT - && combiner->copy_out_count != combiner->node_count) - return false; - - /* Add other checks here as needed */ - - /* All is good if we are here */ - return true; -} - -/* - * Validate combiner and release storage freeing allocated memory - */ -bool -ValidateAndCloseCombiner(ResponseCombiner combiner) -{ - bool valid = validate_combiner(combiner); - - pfree(combiner); - - return valid; -} - -/* - * Validate combiner and reset storage - */ -bool -ValidateAndResetCombiner(ResponseCombiner combiner) -{ - bool valid = validate_combiner(combiner); - - combiner->command_complete_count = 0; - combiner->row_count = 0; - combiner->request_type = REQUEST_TYPE_NOT_DEFINED; - combiner->description_count = 0; - combiner->copy_in_count = 0; - combiner->copy_out_count = 0; - combiner->inErrorState = false; - combiner->simple_aggregates = NULL; - combiner->copy_file = NULL; - - return valid; -} - -/* - * Close combiner and free allocated memory, if it is not needed - */ -void -CloseCombiner(ResponseCombiner combiner) -{ - if (combiner) - pfree(combiner); -} - -/* - * Assign combiner aggregates - */ -void -AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates) -{ - combiner->simple_aggregates = simple_aggregates; -} diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 6a1aba8190..517b1e4d78 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -15,6 +15,7 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" #include <sys/select.h> #include <sys/time.h> #include <sys/types.h> @@ -22,166 +23,33 @@ #include <string.h> #include <unistd.h> #include <errno.h> -#include "pgxc/poolmgr.h" #include "access/gtm.h" #include "access/transam.h" #include "access/xact.h" -#include "postgres.h" -#include "utils/snapmgr.h" -#include "pgxc/pgxc.h" #include "gtm/gtm_c.h" #include "pgxc/datanode.h" #include "pgxc/locator.h" -#include "../interfaces/libpq/libpq-fe.h" +#include "pgxc/pgxc.h" +#include "pgxc/poolmgr.h" +#include "tcop/dest.h" #include "utils/elog.h" #include "utils/memutils.h" - +#include "utils/snapmgr.h" +#include "../interfaces/libpq/libpq-fe.h" #define NO_SOCKET -1 -/* - * Buffer size does not affect performance significantly, just do not allow - * connection buffer grows infinitely - */ -#define COPY_BUFFER_SIZE 8192 -#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 - static int node_count = 0; static DataNodeHandle *handles = NULL; -static bool autocommit = true; -static DataNodeHandle **write_node_list = NULL; -static int write_node_count = 0; -static DataNodeHandle **get_handles(List *nodelist); -static int get_transaction_nodes(DataNodeHandle **connections); -static void release_handles(void); - -static void data_node_init(DataNodeHandle *handle, int sock); +static void data_node_init(DataNodeHandle *handle, int sock, int nodenum); static void data_node_free(DataNodeHandle *handle); -static int data_node_begin(int conn_count, DataNodeHandle **connections, CommandDest dest, GlobalTransactionId gxid); -static int data_node_commit(int conn_count, DataNodeHandle **connections, CommandDest dest); -static int data_node_rollback(int conn_count, DataNodeHandle **connections, CommandDest dest); - -static int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle); -static int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle); - -static int data_node_send_query(DataNodeHandle *handle, const char *query); -static int data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid); -static int data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot); - -static void add_error_message(DataNodeHandle *handle, const char *message); - -static int data_node_read_data(DataNodeHandle *conn); -static int handle_response(DataNodeHandle *conn, ResponseCombiner combiner); - -static int get_int(DataNodeHandle *conn, size_t len, int *out); -static int get_char(DataNodeHandle *conn, char *out); - -static void clear_write_node_list(); - -#define MAX_STATEMENTS_PER_TRAN 10 - -/* Variables to collect statistics */ -static int total_transactions = 0; -static int total_statements = 0; -static int total_autocommit = 0; -static int nonautocommit_2pc = 0; -static int autocommit_2pc = 0; -static int current_tran_statements = 0; -static int *statements_per_transaction = NULL; -static int *nodes_per_transaction = NULL; - -/* - * statistics collection: count a statement - */ -static void -stat_statement() -{ - total_statements++; - current_tran_statements++; -} - -/* - * To collect statistics: count a transaction - */ -static void -stat_transaction(int node_count) -{ - total_transactions++; - if (autocommit) - total_autocommit++; - - if (!statements_per_transaction) - { - statements_per_transaction = (int *) malloc((MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); - memset(statements_per_transaction, 0, (MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); - } - - if (current_tran_statements > MAX_STATEMENTS_PER_TRAN) - statements_per_transaction[MAX_STATEMENTS_PER_TRAN]++; - else - statements_per_transaction[current_tran_statements]++; - - current_tran_statements = 0; - if (node_count > 0 && node_count <= NumDataNodes) - { - if (!nodes_per_transaction) - { - nodes_per_transaction = (int *) malloc(NumDataNodes * sizeof(int)); - memset(nodes_per_transaction, 0, NumDataNodes * sizeof(int)); - } - nodes_per_transaction[node_count - 1]++; - } -} - - -/* - * To collect statistics: count a two-phase commit on nodes - */ -static void -stat_2pc(void) -{ - if (autocommit) - autocommit_2pc++; - else - nonautocommit_2pc++; -} +static int get_int(DataNodeHandle * conn, size_t len, int *out); +static int get_char(DataNodeHandle * conn, char *out); /* - * Output collected statistics to the log - */ -static void -stat_log(void) -{ - elog(DEBUG1, "Total Transactions: %d Total Statements: %d", total_transactions, total_statements); - elog(DEBUG1, "Autocommit: %d 2PC for Autocommit: %d 2PC for non-Autocommit: %d", - total_autocommit, autocommit_2pc, nonautocommit_2pc); - if (total_transactions) - { - if (statements_per_transaction) - { - int i; - - for (i = 0; i < MAX_STATEMENTS_PER_TRAN; i++) - elog(DEBUG1, "%d Statements per Transaction: %d (%d%%)", - i, statements_per_transaction[i], statements_per_transaction[i] * 100 / total_transactions); - } - elog(DEBUG1, "%d+ Statements per Transaction: %d (%d%%)", - MAX_STATEMENTS_PER_TRAN, statements_per_transaction[MAX_STATEMENTS_PER_TRAN], statements_per_transaction[MAX_STATEMENTS_PER_TRAN] * 100 / total_transactions); - if (nodes_per_transaction) - { - int i; - - for (i = 0; i < NumDataNodes; i++) - elog(DEBUG1, "%d Nodes per Transaction: %d (%d%%)", - i + 1, nodes_per_transaction[i], nodes_per_transaction[i] * 100 / total_transactions); - } - } -} - -/* * Allocate and initialize memory to store DataNode handles. */ void @@ -325,8 +193,9 @@ data_node_free(DataNodeHandle *handle) * Structure stores state info and I/O buffers */ static void -data_node_init(DataNodeHandle *handle, int sock) +data_node_init(DataNodeHandle *handle, int sock, int nodenum) { + handle->nodenum = nodenum; handle->sock = sock; handle->transaction_status = 'I'; handle->state = DN_CONNECTION_STATE_IDLE; @@ -339,136 +208,99 @@ data_node_init(DataNodeHandle *handle, int sock) /* - * Handle responses from the Data node connections + * Wait while at least one of specified connections has data available and read + * the data into the buffer */ -static void -data_node_receive_responses(const int conn_count, DataNodeHandle **connections, - struct timeval *timeout, ResponseCombiner combiner) +void +data_node_receive(const int conn_count, + DataNodeHandle ** connections, struct timeval * timeout) { - int count = conn_count; - DataNodeHandle *to_receive[conn_count]; - - /* make a copy of the pointers to the connections */ - memcpy(to_receive, connections, conn_count * sizeof(DataNodeHandle *)); + int i, + res_select, + nfds = 0; + fd_set readfds; - /* - * Read results. - * Note we try and read from data node connections even if there is an error on one, - * so as to avoid reading incorrect results on the next statement. - * It might be better to just destroy these connections and tell the pool manager. - */ - while (count > 0) + FD_ZERO(&readfds); + for (i = 0; i < conn_count; i++) { - int i, - res_select, - nfds = 0; - fd_set readfds; - - FD_ZERO(&readfds); - for (i = 0; i < count; i++) - { - /* note if a connection has error */ - if (!to_receive[i] - || to_receive[i]->state == DN_CONNECTION_STATE_ERROR_FATAL) - { - /* Handling is done, do not track this connection */ - count--; - - /* Move last connection in its place */ - if (i < count) - { - to_receive[i] = to_receive[count]; - /* stay on the current position */ - i--; - } - continue; - } + /* If connection finised sending do not wait input from it */ + if (connections[i]->state == DN_CONNECTION_STATE_IDLE + || connections[i]->state == DN_CONNECTION_STATE_HAS_DATA) + continue; - /* prepare select params */ - if (nfds < to_receive[i]->sock) - nfds = to_receive[i]->sock; + /* prepare select params */ + if (nfds < connections[i]->sock) + nfds = connections[i]->sock; - FD_SET (to_receive[i]->sock, &readfds); - } + FD_SET(connections[i]->sock, &readfds); + } - /* Make sure we still have valid connections */ - if (count == 0) - break; + /* + * Return if we do not have connections to receive input + */ + if (nfds == 0) + return; retry: - res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); - if (res_select < 0) - { - /* error - retry if EINTR or EAGAIN */ - if (errno == EINTR || errno == EAGAIN) - goto retry; - - /* - * PGXCTODO - we may want to close the connections and notify the - * pooler that these are invalid. - */ - if (errno == EBADF) - { - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() bad file descriptor set"))); - } - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() error: %d", errno))); - } + res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); + if (res_select < 0) + { + /* error - retry if EINTR or EAGAIN */ + if (errno == EINTR || errno == EAGAIN) + goto retry; - if (res_select == 0) + /* + * PGXCTODO - we may want to close the connections and notify the + * pooler that these are invalid. + */ + if (errno == EBADF) { - /* Handle timeout */ ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("timeout while waiting for response"))); + errmsg("select() bad file descriptor set"))); } + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("select() error: %d", errno))); + } + + if (res_select == 0) + { + /* Handle timeout */ + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("timeout while waiting for response"))); + } - /* read data */ - for (i = 0; i < count; i++) + /* read data */ + for (i = 0; i < conn_count; i++) + { + DataNodeHandle *conn = connections[i]; + + if (FD_ISSET(conn->sock, &readfds)) { - DataNodeHandle *conn = to_receive[i]; + int read_status = data_node_read_data(conn); - if (FD_ISSET(conn->sock, &readfds)) + if (read_status == EOF || read_status < 0) { - int read_status = data_node_read_data(conn); - - if (read_status == EOF || read_status < 0) - { - /* PGXCTODO - we should notify the pooler to destroy the connections */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on datanode connection"))); - } + /* PGXCTODO - we should notify the pooler to destroy the connections */ + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on datanode connection"))); } - - if (conn->inStart < conn->inEnd) + else { - if (handle_response(conn, combiner) == 0 - || conn->state == DN_CONNECTION_STATE_ERROR_READY - || conn->state == DN_CONNECTION_STATE_ERROR_FATAL) - { - /* Handling is done, do not track this connection */ - count--; - /* Move last connection in place */ - if (i < count) - { - to_receive[i] = to_receive[count]; - /* stay on the current position */ - i--; - } - } + conn->state = DN_CONNECTION_STATE_HAS_DATA; } } } } + /* * Read up incoming messages from the Data ndoe connection */ -static int +int data_node_read_data(DataNodeHandle *conn) { int someread = 0; @@ -601,11 +433,12 @@ retry: return 0; } + /* * Get one character from the connection buffer and advance cursor */ static int -get_char(DataNodeHandle *conn, char *out) +get_char(DataNodeHandle * conn, char *out) { if (conn->inCursor < conn->inEnd) { @@ -647,379 +480,62 @@ get_int(DataNodeHandle *conn, size_t len, int *out) return 0; } -/* - * Read next message from the connection and update the combiner accordingly - * If we are in an error state we just consume the messages, and do not proxy - * Long term, we should look into cancelling executing statements - * and closing the connections. - */ -static int -handle_response(DataNodeHandle *conn, ResponseCombiner combiner) -{ - char msg_type; - int msg_len; - - for (;;) - { - /* try to read the message, return if not enough data */ - conn->inCursor = conn->inStart; - - /* - * Make sure we receive a complete message, otherwise, return EOF, and - * we will be called again after more data has been read. - */ - if (conn->inEnd - conn->inCursor < 5) - return EOF; - - if (get_char(conn, &msg_type)) - return EOF; - - if (get_int(conn, 4, &msg_len)) - return EOF; - - msg_len -= 4; - - if (conn->inEnd - conn->inCursor < msg_len) - { - ensure_in_buffer_capacity(conn->inCursor + (size_t) msg_len, conn); - return EOF; - } - - /* TODO handle other possible responses */ - switch (msg_type) - { - case 'c': /* CopyToCommandComplete */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COMPLETED; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'C': /* CommandComplete */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COMPLETED; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'T': /* RowDescription */ - case 'D': /* DataRow */ - case 'S': /* ParameterStatus */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'G': /* CopyInResponse */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COPY_IN; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - /* Done, return to caller to let it know the data can be passed in */ - conn->inStart = conn->inCursor; - conn->state = DN_CONNECTION_STATE_COPY_IN; - return 0; - case 'H': /* CopyOutResponse */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COPY_OUT; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - conn->inStart = conn->inCursor; - conn->state = DN_CONNECTION_STATE_COPY_OUT; - return 0; - case 'd': /* CopyOutDataRow */ - /* No need to parse, just send a row back to client */ - conn->inCursor += msg_len; - conn->state = DN_CONNECTION_STATE_COPY_OUT; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - break; - case 'E': /* ErrorResponse */ - /* no need to parse, just move cursor */ - conn->inCursor += msg_len; - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); - conn->inStart = conn->inCursor; - conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; - /* - * Do not return with an error, we still need to consume Z, - * ready-for-query - */ - break; - case 'A': /* NotificationResponse */ - case 'N': /* NoticeResponse */ - conn->inCursor += msg_len; - - /* - * Ignore these to prevent multiple messages, one from each - * node. Coordinator will send one for DDL anyway - */ - break; - case 'Z': /* ReadyForQuery */ - get_char(conn, &conn->transaction_status); - conn->inStart = conn->inCursor; - if (conn->state == DN_CONNECTION_STATE_ERROR_NOT_READY) - { - conn->state = DN_CONNECTION_STATE_ERROR_READY; - return EOF; - } else - conn->state = DN_CONNECTION_STATE_IDLE; - return 0; - case 'I': /* EmptyQuery */ - default: - /* sync lost? */ - conn->state = DN_CONNECTION_STATE_ERROR_FATAL; - return EOF; - } - conn->inStart = conn->inCursor; - } - /* Keep compiler quiet */ - return EOF; -} /* - * Send BEGIN command to the Data nodes and receive responses + * get_message + * If connection has enough data read entire message from the connection buffer + * and returns message type. Message data and data length are returned as + * var parameters. + * If buffer does not have enough data leaves cursor unchanged, changes + * connection status to DN_CONNECTION_STATE_QUERY indicating it needs to + * receive more and returns \0 + * conn - connection to read from + * len - returned length of the data where msg is pointing to + * msg - returns pointer to memory in the incoming buffer. The buffer probably + * will be overwritten upon next receive, so if caller wants to refer it later + * it should make a copy. */ -static int -data_node_begin(int conn_count, DataNodeHandle **connections, CommandDest dest, GlobalTransactionId gxid) +char +get_message(DataNodeHandle *conn, int *len, char **msg) { - int i; - struct timeval *timeout = NULL; - ResponseCombiner combiner; + char msgtype; - /* Send BEGIN */ - for (i = 0; i < conn_count; i++) + if (get_char(conn, &msgtype) || get_int(conn, 4, len)) { - if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid)) - return EOF; - - if (data_node_send_query(connections[i], "BEGIN")) - return EOF; + /* Successful get_char would move cursor, restore position */ + conn->inCursor = conn->inStart; + return '\0'; } - combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, dest); - - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - - /* Verify status */ - return ValidateAndCloseCombiner(combiner) ? 0 : EOF; -} - -/* Clears the write node list */ -static void -clear_write_node_list(void) -{ - /* we just malloc once and use counter */ - if (write_node_list == NULL) - write_node_list = (DataNodeHandle **) malloc(NumDataNodes * sizeof(DataNodeHandle *)); - - write_node_count = 0; -} - - -/* - * Switch autocommmit mode off, so all subsequent statements will be in the same transaction - */ -void -DataNodeBegin(void) -{ - autocommit = false; - clear_write_node_list(); -} - - -/* - * Commit current transaction, use two-phase commit if necessary - */ -int -DataNodeCommit(CommandDest dest) -{ - int res; - int tran_count; - DataNodeHandle *connections[node_count]; - /* Quick check to make sure we have connections */ - if (node_count == 0) - goto finish; - - /* gather connections to commit */ - tran_count = get_transaction_nodes(connections); - - /* - * If we do not have open transactions we have nothing to commit, just - * report success - */ - if (tran_count == 0) - goto finish; - - res = data_node_commit(tran_count, connections, dest); - -finish: - /* In autocommit mode statistics is collected in DataNodeExec */ - if (!autocommit) - stat_transaction(node_count); - if (!PersistentConnections) - release_handles(); - autocommit = true; - clear_write_node_list(); - return res; -} - + *len -= 4; -/* - * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses - */ -static int -data_node_commit(int conn_count, DataNodeHandle **connections, CommandDest dest) -{ - int i; - struct timeval *timeout = NULL; - char buffer[256]; - GlobalTransactionId gxid = InvalidGlobalTransactionId; - int result = 0; - ResponseCombiner combiner = NULL; - - - /* can set this to false to disable temporarily */ - /* bool do2PC = conn_count > 1; */ - - /* - * Only use 2PC if more than one node was written to. Otherwise, just send - * COMMIT to all - */ - bool do2PC = write_node_count > 1; - - /* Extra XID for Two Phase Commit */ - GlobalTransactionId two_phase_xid = 0; - - if (do2PC) + if (conn->inCursor + *len > conn->inEnd) { - stat_2pc(); - /* - * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here, - * but since we need 2pc, we surely have sent down a command and got - * gxid for it. Hence GetCurrentGlobalTransactionId() just returns - * already allocated gxid + * Not enough data in the buffer, we should read more. + * Reading function will discard already consumed data in the buffer + * till conn->inBegin. Then we want the message that is partly in the + * buffer now has been read completely, to avoid extra read/handle + * cycles. The space needed is 1 byte for message type, 4 bytes for + * message length and message itself which size is currently in *len. + * The buffer may already be large enough, in this case the function + * ensure_in_buffer_capacity() will immediately return */ - gxid = GetCurrentGlobalTransactionId(); - - sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid); - /* Send PREPARE */ - for (i = 0; i < conn_count; i++) - { - if (data_node_send_query(connections[i], buffer)) - return EOF; - } - - combiner = CreateResponseCombiner(conn_count, - COMBINE_TYPE_NONE, dest); - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - - /* Reset combiner */ - if (!ValidateAndResetCombiner(combiner)) - { - result = EOF; - } - } - - if (!do2PC) - strcpy(buffer, "COMMIT"); - else - { - if (result) - sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid); - else - sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); - - /* We need to use a new xid, the data nodes have reset */ - two_phase_xid = BeginTranGTM(); - for (i = 0; i < conn_count; i++) - { - if (data_node_send_gxid(connections[i], two_phase_xid)) - { - add_error_message(connections[i], "Can not send request"); - result = EOF; - goto finish; - } - } - } - - /* Send COMMIT */ - for (i = 0; i < conn_count; i++) - { - if (data_node_send_query(connections[i], buffer)) - { - result = EOF; - goto finish; - } + ensure_in_buffer_capacity(5 + (size_t) *len, conn); + conn->state == DN_CONNECTION_STATE_QUERY; + conn->inCursor = conn->inStart; + return '\0'; } - if (!combiner) - combiner = CreateResponseCombiner(conn_count, - COMBINE_TYPE_NONE, dest); - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - result = ValidateAndCloseCombiner(combiner) ? result : EOF; - -finish: - if (do2PC) - CommitTranGTM((GlobalTransactionId) two_phase_xid); - - return result; -} - - -/* - * Rollback current transaction - */ -int -DataNodeRollback(CommandDest dest) -{ - int res = 0; - int tran_count; - DataNodeHandle *connections[node_count]; - - /* Quick check to make sure we have connections */ - if (node_count == 0) - goto finish; - - /* gather connections to rollback */ - tran_count = get_transaction_nodes(connections); - - /* - * If we do not have open transactions we have nothing to rollback just - * report success - */ - if (tran_count == 0) - goto finish; - - res = data_node_rollback(tran_count, connections, dest); - -finish: - /* In autocommit mode statistics is collected in DataNodeExec */ - if (!autocommit) - stat_transaction(node_count); - if (!PersistentConnections) - release_handles(); - autocommit = true; - clear_write_node_list(); - return res; + *msg = conn->inBuffer + conn->inCursor; + conn->inCursor += *len; + conn->inStart = conn->inCursor; + return msgtype; } /* Release all data node connections back to pool and release occupied memory */ -static void +void release_handles(void) { int i; @@ -1041,320 +557,11 @@ release_handles(void) /* - * Send ROLLBACK command down to the Data nodes and handle responses - */ -static int -data_node_rollback(int conn_count, DataNodeHandle **connections, CommandDest dest) -{ - int i; - struct timeval *timeout = NULL; - int result = 0; - ResponseCombiner combiner; - - /* Send ROLLBACK - */ - for (i = 0; i < conn_count; i++) - { - if (data_node_send_query(connections[i], "ROLLBACK")) - result = EOF; - } - - combiner = CreateResponseCombiner(conn_count, - COMBINE_TYPE_NONE, dest); - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - - /* Verify status */ - return ValidateAndCloseCombiner(combiner) ? 0 : EOF; -} - - -/* - * Execute specified statement on specified Data nodes, combine responses and - * send results back to the client - * - * const char *query - SQL string to execute - * List *primarynode - if a write operation on a replicated table, the primary node - * List *nodelist - the nodes to execute on (excludes primary, if set in primarynode - * CommandDest dest - destination for results - * Snapshot snapshot - snapshot to use - * bool force_autocommit - force autocommit - * List *simple_aggregates - list of simple aggregates to execute - * bool is_read_only - if this is a read-only query - */ -int -DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, - CommandDest dest, Snapshot snapshot, bool force_autocommit, - List *simple_aggregates, bool is_read_only) -{ - int i; - int j; - int regular_conn_count; - int total_conn_count; - struct timeval *timeout = NULL; /* wait forever */ - ResponseCombiner combiner = NULL; - int row_count = 0; - int new_count = 0; - bool need_tran; - bool found; - GlobalTransactionId gxid = InvalidGlobalTransactionId; - DataNodeHandle *new_connections[NumDataNodes]; - DataNodeHandle **connections = NULL; - DataNodeHandle **primaryconnection = NULL; - List *nodelist = NIL; - List *primarynode = NIL; - /* add up affected row count by default, override for replicated writes */ - - if (exec_nodes) - { - nodelist = exec_nodes->nodelist; - primarynode = exec_nodes->primarynodelist; - } - - if (list_length(nodelist) == 0) - { - if (primarynode) - regular_conn_count = NumDataNodes - 1; - else - regular_conn_count = NumDataNodes; - } - else - { - regular_conn_count = list_length(nodelist); - } - - total_conn_count = regular_conn_count; - - /* Get connection for primary node, if used */ - if (primarynode) - { - primaryconnection = get_handles(primarynode); - total_conn_count++; - } - - /* Get other connections (non-primary) */ - if (regular_conn_count == 0) - return EOF; - else - { - connections = get_handles(nodelist); - if (!connections) - return EOF; - } - - if (force_autocommit) - need_tran = false; - else - need_tran = !autocommit || total_conn_count > 1; - - elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); - - stat_statement(); - if (autocommit) - stat_transaction(total_conn_count); - - /* We normally clear for transactions, but if autocommit, clear here, too */ - if (autocommit == true) - { - clear_write_node_list(); - } - - /* Check status of connections */ - - /* - * We want to track new "write" nodes, and new nodes in the current - * transaction whether or not they are write nodes. - */ - if (!is_read_only && write_node_count < NumDataNodes) - { - if (primaryconnection) - { - found = false; - for (j = 0; j < write_node_count && !found; j++) - { - if (write_node_list[j] == primaryconnection[0]) - found = true; - } - if (!found) - { - /* Add to transaction wide-list */ - write_node_list[write_node_count++] = primaryconnection[0]; - /* Add to current statement list */ - new_connections[new_count++] = primaryconnection[0]; - } - } - for (i = 0; i < regular_conn_count; i++) - { - found = false; - for (j = 0; j < write_node_count && !found; j++) - { - if (write_node_list[j] == connections[i]) - found = true; - } - if (!found) - { - /* Add to transaction wide-list */ - write_node_list[write_node_count++] = connections[i]; - /* Add to current statement list */ - new_connections[new_count++] = connections[i]; - } - } - /* Check connection state is DN_CONNECTION_STATE_IDLE */ - } - - gxid = GetCurrentGlobalTransactionId(); - - if (!GlobalTransactionIdIsValid(gxid)) - { - pfree(connections); - return EOF; - } - if (new_count > 0 && need_tran) - { - /* Start transaction on connections where it is not started */ - if (data_node_begin(new_count, new_connections, DestNone, gxid)) - { - pfree(connections); - return EOF; - } - } - - /* See if we have a primary nodes, execute on it first before the others */ - if (primaryconnection) - { - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid)) - { - add_error_message(primaryconnection[0], "Can not send request"); - if (connections) - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - return EOF; - } - if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot)) - { - add_error_message(primaryconnection[0], "Can not send request"); - if (connections) - pfree(connections); - pfree(primaryconnection); - return EOF; - } - if (data_node_send_query(primaryconnection[0], query) != 0) - { - add_error_message(primaryconnection[0], "Can not send request"); - if (connections) - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - return EOF; - } - - Assert(combine_type == COMBINE_TYPE_SAME); - - /* - * Create combiner. - * Note that we use the same combiner later with the secondary nodes, - * so that we do not prematurely send a response to the client - * until all nodes have completed execution. - */ - combiner = CreateResponseCombiner(total_conn_count, combine_type, dest); - AssignCombinerAggregates(combiner, simple_aggregates); - - /* Receive responses */ - data_node_receive_responses(1, primaryconnection, timeout, combiner); - /* If we got an error response return immediately */ - if (DN_CONNECTION_STATE_ERROR(primaryconnection[0])) - { - /* We are going to exit, so release combiner */ - CloseCombiner(combiner); - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - if (primaryconnection) - pfree(primaryconnection); - if (connections) - pfree(connections); - return EOF; - } - } - - /* Send query to nodes */ - for (i = 0; i < regular_conn_count; i++) - { - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && data_node_send_gxid(connections[i], gxid)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - return EOF; - } - if (snapshot && data_node_send_snapshot(connections[i], snapshot)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - return EOF; - } - if (data_node_send_query(connections[i], query) != 0) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - return EOF; - } - } - - /* We may already have combiner if it is replicated case with primary data node */ - if (!combiner) - { - combiner = CreateResponseCombiner(regular_conn_count, combine_type, dest); - AssignCombinerAggregates(combiner, simple_aggregates); - } - - /* Receive responses */ - data_node_receive_responses(regular_conn_count, connections, timeout, combiner); - row_count = combiner->row_count; - - /* Check for errors and if primary nodeMake sure primary and secondary nodes were updated the same */ - if (!ValidateAndCloseCombiner(combiner)) - { - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - pfree(connections); - - return EOF; - } - - if (autocommit) - { - if (need_tran) - DataNodeCommit(DestNone); /* PGXCTODO - call CommitTransaction() - * instead? */ - else if (!PersistentConnections) - release_handles(); - } - - /* Verify status? */ - pfree(connections); - return 0; -} - - -/* * Ensure specified amount of data can fit to the incoming buffer and * increase it if necessary */ -static int -ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) +int +ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle) { int newsize = handle->inSize; char *newbuf; @@ -1405,8 +612,8 @@ ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) * Ensure specified amount of data can fit to the outgoing buffer and * increase it if necessary */ -static int -ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) +int +ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle) { int newsize = handle->outSize; char *newbuf; @@ -1456,8 +663,8 @@ ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle) /* * Send specified amount of data from the outgoing buffer over the connection */ -static int -send_some(DataNodeHandle * handle, int len) +int +send_some(DataNodeHandle *handle, int len) { char *ptr = handle->outBuffer; int remaining = handle->outEnd; @@ -1556,7 +763,7 @@ send_some(DataNodeHandle * handle, int len) * This method won't return until connection buffer is empty or error occurs * To ensure all data are on the wire before waiting for response */ -static int +int data_node_flush(DataNodeHandle *handle) { while (handle->outEnd) @@ -1573,8 +780,8 @@ data_node_flush(DataNodeHandle *handle) /* * Send specified statement down to the Data node */ -static int -data_node_send_query(DataNodeHandle *handle, const char *query) +int +data_node_send_query(DataNodeHandle * handle, const char *query) { int strLen = strlen(query) + 1; @@ -1595,7 +802,7 @@ data_node_send_query(DataNodeHandle *handle, const char *query) memcpy(handle->outBuffer + handle->outEnd, query, strLen); handle->outEnd += strLen; - handle->state = DN_CONNECTION_STATE_BUSY; + handle->state = DN_CONNECTION_STATE_QUERY; return data_node_flush(handle); } @@ -1604,7 +811,7 @@ data_node_send_query(DataNodeHandle *handle, const char *query) /* * Send the GXID down to the Data node */ -static int +int data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid) { int msglen = 8; @@ -1632,7 +839,7 @@ data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid) /* * Send the snapshot down to the Data node */ -static int +int data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot) { int msglen; @@ -1686,11 +893,11 @@ data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot) * Add another message to the list of errors to be returned back to the client * at the convenient time */ -static void +void add_error_message(DataNodeHandle *handle, const char *message) { handle->transaction_status = 'E'; - handle->state = DN_CONNECTION_STATE_ERROR_READY; + handle->state = DN_CONNECTION_STATE_IDLE; if (handle->error) { /* PGXCTODO append */ @@ -1706,16 +913,19 @@ add_error_message(DataNodeHandle *handle, const char *message) * Special case is empty or NIL nodeList, in this case return all the nodes. * The returned list should be pfree'd when no longer needed. */ -static DataNodeHandle ** +DataNodeHandle ** get_handles(List *nodelist) { DataNodeHandle **result; ListCell *node_list_item; List *allocate = NIL; + MemoryContext old_context; /* index of the result array */ int i = 0; + /* Handles should be there while transaction lasts */ + old_context = MemoryContextSwitchTo(TopTransactionContext); /* If node list is empty execute request on current nodes */ if (list_length(nodelist) == 0) { @@ -1757,8 +967,8 @@ get_handles(List *nodelist) { int node = lfirst_int(node_list_item); - if (node > NumDataNodes || node <= 0) - elog(ERROR, "Node number: %d passed is not a known node", node); + Assert(node > 0 && node <= NumDataNodes); + result[i++] = &handles[node - 1]; if (handles[node - 1].sock == NO_SOCKET) allocate = lappend_int(allocate, node); @@ -1781,13 +991,16 @@ get_handles(List *nodelist) int node = lfirst_int(node_list_item); int fdsock = fds[j++]; - data_node_init(&handles[node - 1], fdsock); + data_node_init(&handles[node - 1], fdsock, node); node_count++; } pfree(fds); list_free(allocate); } + /* restore context */ + MemoryContextSwitchTo(old_context); + return result; } @@ -1802,8 +1015,8 @@ get_handles(List *nodelist) * The function returns number of pointers written to the connections array. * Remaining items in the array, if any, will be kept unchanged */ -static int -get_transaction_nodes(DataNodeHandle ** connections) +int +get_transaction_nodes(DataNodeHandle **connections) { int tran_count = 0; int i; @@ -1819,503 +1032,3 @@ get_transaction_nodes(DataNodeHandle ** connections) return tran_count; } - - -/* - * Called when the backend is ending. - */ -void -DataNodeCleanAndRelease(int code, Datum arg) -{ - /* Rollback on Data Nodes */ - if (IsTransactionState()) - { - DataNodeRollback(DestNone); - - /* Rollback on GTM if transaction id opened. */ - RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny()); - } - - /* Release data node connections */ - release_handles(); - - /* Close connection with GTM */ - CloseGTM(); - - /* Dump collected statistics to the log */ - stat_log(); -} - -/* - * Begin COPY command - * The copy_connections array must have room for NumDataNodes items - */ -DataNodeHandle** -DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from) -{ - int i, j; - int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist); - struct timeval *timeout = NULL; - DataNodeHandle **connections; - DataNodeHandle **copy_connections; - DataNodeHandle *newConnections[conn_count]; - int new_count = 0; - ListCell *nodeitem; - bool need_tran; - GlobalTransactionId gxid; - ResponseCombiner combiner; - - if (conn_count == 0) - return NULL; - - /* Get needed datanode connections */ - connections = get_handles(nodelist); - if (!connections) - return NULL; - - need_tran = !autocommit || conn_count > 1; - - elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false"); - - /* - * We need to be able quickly find a connection handle for specified node number, - * So store connections in an array where index is node-1. - * Unused items in the array should be NULL - */ - copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *)); - i = 0; - foreach(nodeitem, nodelist) - copy_connections[lfirst_int(nodeitem) - 1] = connections[i++]; - - /* Gather statistics */ - stat_statement(); - if (autocommit) - stat_transaction(conn_count); - - /* We normally clear for transactions, but if autocommit, clear here, too */ - if (autocommit) - clear_write_node_list(); - - /* Check status of connections */ - /* We want to track new "write" nodes, and new nodes in the current transaction - * whether or not they are write nodes. */ - if (write_node_count < NumDataNodes) - { - for (i = 0; i < conn_count; i++) - { - bool found = false; - for (j=0; j<write_node_count && !found; j++) - { - if (write_node_list[j] == connections[i]) - found = true; - } - if (!found) - { - /* - * Add to transaction wide-list if COPY FROM - * CopyOut (COPY TO) is not a write operation, no need to update - */ - if (is_from) - write_node_list[write_node_count++] = connections[i]; - /* Add to current statement list */ - newConnections[new_count++] = connections[i]; - } - } - // Check connection state is DN_CONNECTION_STATE_IDLE - } - - gxid = GetCurrentGlobalTransactionId(); - - /* elog(DEBUG1, "Current gxid = %d", gxid); */ - - if (!GlobalTransactionIdIsValid(gxid)) - { - pfree(connections); - pfree(copy_connections); - return NULL; - } - - if (new_count > 0 && need_tran) - { - /* Start transaction on connections where it is not started */ - if (data_node_begin(new_count, newConnections, DestNone, gxid)) - { - pfree(connections); - pfree(copy_connections); - return NULL; - } - } - - /* Send query to nodes */ - for (i = 0; i < conn_count; i++) - { - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && data_node_send_gxid(connections[i], gxid)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - pfree(copy_connections); - return NULL; - } - if (snapshot && data_node_send_snapshot(connections[i], snapshot)) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - pfree(copy_connections); - return NULL; - } - if (data_node_send_query(connections[i], query) != 0) - { - add_error_message(connections[i], "Can not send request"); - pfree(connections); - pfree(copy_connections); - return NULL; - } - } - - /* - * We are expecting CopyIn response, but do not want to send it to client, - * caller should take care about this, because here we do not know if - * client runs console or file copy - */ - combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, DestNone); - - /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - if (!ValidateAndCloseCombiner(combiner)) - { - if (autocommit) - { - if (need_tran) - DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE, DestNone); - else if (!PersistentConnections) - release_handles(); - } - - pfree(connections); - pfree(copy_connections); - return NULL; - } - - pfree(connections); - return copy_connections; -} - -/* - * Send a data row to the specified nodes - */ -int -DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections) -{ - DataNodeHandle *primary_handle = NULL; - ListCell *nodeitem; - /* size + data row + \n */ - int msgLen = 4 + len + 1; - int nLen = htonl(msgLen); - - if (exec_nodes->primarynodelist) - primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1]; - - if (primary_handle) - { - if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN) - { - /* precalculate to speed up access */ - int bytes_needed = primary_handle->outEnd + 1 + msgLen; - - /* flush buffer if it is almost full */ - if (bytes_needed > COPY_BUFFER_SIZE) - { - /* First look if data node has sent a error message */ - int read_status = data_node_read_data(primary_handle); - if (read_status == EOF || read_status < 0) - { - add_error_message(primary_handle, "failed to read data from data node"); - return EOF; - } - - if (primary_handle->inStart < primary_handle->inEnd) - { - ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone); - handle_response(primary_handle, combiner); - if (!ValidateAndCloseCombiner(combiner)) - return EOF; - } - - if (DN_CONNECTION_STATE_ERROR(primary_handle)) - return EOF; - - if (send_some(primary_handle, primary_handle->outEnd) < 0) - { - add_error_message(primary_handle, "failed to send data to data node"); - return EOF; - } - } - - if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - primary_handle->outBuffer[primary_handle->outEnd++] = 'd'; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); - primary_handle->outEnd += 4; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len); - primary_handle->outEnd += len; - primary_handle->outBuffer[primary_handle->outEnd++] = '\n'; - } - else - { - add_error_message(primary_handle, "Invalid data node connection"); - return EOF; - } - } - - foreach(nodeitem, exec_nodes->nodelist) - { - DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1]; - if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN) - { - /* precalculate to speed up access */ - int bytes_needed = handle->outEnd + 1 + msgLen; - - /* flush buffer if it is almost full */ - if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD) - || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE)) - { - int to_send = handle->outEnd; - - /* First look if data node has sent a error message */ - int read_status = data_node_read_data(handle); - if (read_status == EOF || read_status < 0) - { - add_error_message(handle, "failed to read data from data node"); - return EOF; - } - - if (handle->inStart < handle->inEnd) - { - ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone); - handle_response(handle, combiner); - if (!ValidateAndCloseCombiner(combiner)) - return EOF; - } - - if (DN_CONNECTION_STATE_ERROR(handle)) - return EOF; - - /* - * Allow primary node to write out data before others. - * If primary node was blocked it would not accept copy data. - * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes. - * If primary node is blocked and is buffering, other buffers will - * grow accordingly. - */ - if (primary_handle) - { - if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd) - to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD; - else - to_send = 0; - } - - /* - * Try to send down buffered data if we have - */ - if (to_send && send_some(handle, to_send) < 0) - { - add_error_message(handle, "failed to send data to data node"); - return EOF; - } - } - - if (ensure_out_buffer_capacity(bytes_needed, handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - handle->outBuffer[handle->outEnd++] = 'd'; - memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); - handle->outEnd += 4; - memcpy(handle->outBuffer + handle->outEnd, data_row, len); - handle->outEnd += len; - handle->outBuffer[handle->outEnd++] = '\n'; - } - else - { - add_error_message(handle, "Invalid data node connection"); - return EOF; - } - } - - return 0; -} - -int -DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle **copy_connections, CommandDest dest, FILE *copy_file) -{ - ResponseCombiner combiner; - int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist); - int count = 0; - bool need_tran; - List *nodelist; - ListCell *nodeitem; - - nodelist = exec_nodes->nodelist; - need_tran = !autocommit || conn_count > 1; - - combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM, dest); - /* If there is an existing file where to copy data, pass it to combiner */ - if (copy_file) - combiner->copy_file = copy_file; - - foreach(nodeitem, exec_nodes->nodelist) - { - DataNodeHandle *handle = copy_connections[count]; - count++; - - if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - int read_status = 0; - /* H message has been consumed, continue to manage data row messages */ - while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */ - { - if (handle_response(handle,combiner) == EOF) - { - /* read some extra-data */ - read_status = data_node_read_data(handle); - if (read_status < 0) - return EOF; - } - /* There is no more data that can be read from connection */ - } - } - } - - if (!ValidateAndCloseCombiner(combiner)) - { - if (autocommit && !PersistentConnections) - release_handles(); - pfree(copy_connections); - return EOF; - } - - return 0; -} - -/* - * Finish copy process on all connections - */ -uint64 -DataNodeCopyFinish(DataNodeHandle **copy_connections, int primary_data_node, - CombineType combine_type, CommandDest dest) -{ - int i; - int nLen = htonl(4); - ResponseCombiner combiner = NULL; - bool need_tran; - bool res = 0; - struct timeval *timeout = NULL; /* wait forever */ - DataNodeHandle *connections[NumDataNodes]; - DataNodeHandle *primary_handle = NULL; - int conn_count = 0; - - for (i = 0; i < NumDataNodes; i++) - { - DataNodeHandle *handle = copy_connections[i]; - - if (!handle) - continue; - - if (i == primary_data_node - 1) - primary_handle = handle; - else - connections[conn_count++] = handle; - } - - if (primary_handle) - { - if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - primary_handle->outBuffer[primary_handle->outEnd++] = 'c'; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); - primary_handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (data_node_flush(primary_handle) < 0) - res = EOF; - } - else - res = EOF; - - combiner = CreateResponseCombiner(conn_count + 1, combine_type, dest); - data_node_receive_responses(1, &primary_handle, timeout, combiner); - } - - for (i = 0; i < conn_count; i++) - { - DataNodeHandle *handle = connections[i]; - - if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - - handle->outBuffer[handle->outEnd++] = 'c'; - memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); - handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (data_node_flush(handle) < 0) - res = EOF; - } - else - res = EOF; - } - - need_tran = !autocommit || primary_handle || conn_count > 1; - - if (!combiner) - combiner = CreateResponseCombiner(conn_count, combine_type, dest); - - data_node_receive_responses(conn_count, connections, timeout, combiner); - - if (!ValidateAndCloseCombiner(combiner) || res) - { - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - return 0; - } - - if (autocommit) - { - if (need_tran) - DataNodeCommit(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - // Verify status? - return combiner->row_count; -} diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c new file mode 100644 index 0000000000..1ee1d59e99 --- /dev/null +++ b/src/backend/pgxc/pool/execRemote.c @@ -0,0 +1,2453 @@ +/*------------------------------------------------------------------------- + * + * execRemote.c + * + * Functions to execute commands on remote data nodes + * + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * IDENTIFICATION + * $$ + * + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "access/gtm.h" +#include "access/xact.h" +#include "executor/executor.h" +#include "gtm/gtm_c.h" +#include "libpq/libpq.h" +#include "miscadmin.h" +#include "pgxc/execRemote.h" +#include "pgxc/poolmgr.h" +#include "utils/datum.h" +#include "utils/memutils.h" +#include "utils/tuplesort.h" +#include "utils/snapmgr.h" + +/* + * Buffer size does not affect performance significantly, just do not allow + * connection buffer grows infinitely + */ +#define COPY_BUFFER_SIZE 8192 +#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 + +static bool autocommit = true; +static DataNodeHandle **write_node_list = NULL; +static int write_node_count = 0; + +static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid); +static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest); +static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest); + +static void clear_write_node_list(); + +#define MAX_STATEMENTS_PER_TRAN 10 + +/* Variables to collect statistics */ +static int total_transactions = 0; +static int total_statements = 0; +static int total_autocommit = 0; +static int nonautocommit_2pc = 0; +static int autocommit_2pc = 0; +static int current_tran_statements = 0; +static int *statements_per_transaction = NULL; +static int *nodes_per_transaction = NULL; + +/* + * statistics collection: count a statement + */ +static void +stat_statement() +{ + total_statements++; + current_tran_statements++; +} + +/* + * To collect statistics: count a transaction + */ +static void +stat_transaction(int node_count) +{ + total_transactions++; + if (autocommit) + total_autocommit++; + if (!statements_per_transaction) + { + statements_per_transaction = (int *) malloc((MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); + memset(statements_per_transaction, 0, (MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); + } + if (current_tran_statements > MAX_STATEMENTS_PER_TRAN) + statements_per_transaction[MAX_STATEMENTS_PER_TRAN]++; + else + statements_per_transaction[current_tran_statements]++; + current_tran_statements = 0; + if (node_count > 0 && node_count <= NumDataNodes) + { + if (!nodes_per_transaction) + { + nodes_per_transaction = (int *) malloc(NumDataNodes * sizeof(int)); + memset(nodes_per_transaction, 0, NumDataNodes * sizeof(int)); + } + nodes_per_transaction[node_count - 1]++; + } +} + + +/* + * To collect statistics: count a two-phase commit on nodes + */ +static void +stat_2pc() +{ + if (autocommit) + autocommit_2pc++; + else + nonautocommit_2pc++; +} + + +/* + * Output collected statistics to the log + */ +static void +stat_log() +{ + elog(DEBUG1, "Total Transactions: %d Total Statements: %d", total_transactions, total_statements); + elog(DEBUG1, "Autocommit: %d 2PC for Autocommit: %d 2PC for non-Autocommit: %d", + total_autocommit, autocommit_2pc, nonautocommit_2pc); + if (total_transactions) + { + if (statements_per_transaction) + { + int i; + + for (i = 0; i < MAX_STATEMENTS_PER_TRAN; i++) + elog(DEBUG1, "%d Statements per Transaction: %d (%d%%)", + i, statements_per_transaction[i], statements_per_transaction[i] * 100 / total_transactions); + } + elog(DEBUG1, "%d+ Statements per Transaction: %d (%d%%)", + MAX_STATEMENTS_PER_TRAN, statements_per_transaction[MAX_STATEMENTS_PER_TRAN], statements_per_transaction[MAX_STATEMENTS_PER_TRAN] * 100 / total_transactions); + if (nodes_per_transaction) + { + int i; + + for (i = 0; i < NumDataNodes; i++) + elog(DEBUG1, "%d Nodes per Transaction: %d (%d%%)", + i + 1, nodes_per_transaction[i], nodes_per_transaction[i] * 100 / total_transactions); + } + } +} + + +/* + * Create a structure to store parameters needed to combine responses from + * multiple connections as well as state information + */ +static RemoteQueryState * +CreateResponseCombiner(int node_count, CombineType combine_type) +{ + RemoteQueryState *combiner; + + /* ResponseComber is a typedef for pointer to ResponseCombinerData */ + combiner = makeNode(RemoteQueryState); + if (combiner == NULL) + { + /* Out of memory */ + return combiner; + } + + combiner->node_count = node_count; + combiner->connections = NULL; + combiner->conn_count = 0; + combiner->combine_type = combine_type; + combiner->dest = NULL; + combiner->command_complete_count = 0; + combiner->row_count = 0; + combiner->request_type = REQUEST_TYPE_NOT_DEFINED; + combiner->tuple_desc = NULL; + combiner->description_count = 0; + combiner->copy_in_count = 0; + combiner->copy_out_count = 0; + combiner->errorMessage = NULL; + combiner->query_Done = false; + combiner->completionTag = NULL; + combiner->msg = NULL; + combiner->msglen = 0; + combiner->initAggregates = true; + combiner->simple_aggregates = NULL; + combiner->copy_file = NULL; + + return combiner; +} + +/* + * Parse out row count from the command status response and convert it to integer + */ +static int +parse_row_count(const char *message, size_t len, uint64 *rowcount) +{ + int digits = 0; + int pos; + + *rowcount = 0; + /* skip \0 string terminator */ + for (pos = 0; pos < len - 1; pos++) + { + if (message[pos] >= '0' && message[pos] <= '9') + { + *rowcount = *rowcount * 10 + message[pos] - '0'; + digits++; + } + else + { + *rowcount = 0; + digits = 0; + } + } + return digits; +} + +/* + * Initialize the collection value, when agregation is first set up, or for a + * new group (grouping support is not implemented yet) + */ +static void +initialize_collect_aggregates(SimpleAgg *simple_agg) +{ + if (simple_agg->initValueIsNull) + simple_agg->collectValue = simple_agg->initValue; + else + simple_agg->collectValue = datumCopy(simple_agg->initValue, + simple_agg->transtypeByVal, + simple_agg->transtypeLen); + simple_agg->noCollectValue = simple_agg->initValueIsNull; + simple_agg->collectValueNull = simple_agg->initValueIsNull; +} + +/* + * Finalize the aggregate after current group or entire relation is processed + * (grouping support is not implemented yet) + */ +static void +finalize_collect_aggregates(SimpleAgg *simple_agg, Datum *resultVal, bool *resultIsNull) +{ + /* + * Apply the agg's finalfn if one is provided, else return collectValue. + */ + if (OidIsValid(simple_agg->finalfn_oid)) + { + FunctionCallInfoData fcinfo; + + InitFunctionCallInfoData(fcinfo, &(simple_agg->finalfn), 1, + (void *) simple_agg, NULL); + fcinfo.arg[0] = simple_agg->collectValue; + fcinfo.argnull[0] = simple_agg->collectValueNull; + if (fcinfo.flinfo->fn_strict && simple_agg->collectValueNull) + { + /* don't call a strict function with NULL inputs */ + *resultVal = (Datum) 0; + *resultIsNull = true; + } + else + { + *resultVal = FunctionCallInvoke(&fcinfo); + *resultIsNull = fcinfo.isnull; + } + } + else + { + *resultVal = simple_agg->collectValue; + *resultIsNull = simple_agg->collectValueNull; + } +} + +/* + * Given new input value(s), advance the transition function of an aggregate. + * + * The new values (and null flags) have been preloaded into argument positions + * 1 and up in fcinfo, so that we needn't copy them again to pass to the + * collection function. No other fields of fcinfo are assumed valid. + * + * It doesn't matter which memory context this is called in. + */ +static void +advance_collect_function(SimpleAgg *simple_agg, FunctionCallInfoData *fcinfo) +{ + Datum newVal; + + if (simple_agg->transfn.fn_strict) + { + /* + * For a strict transfn, nothing happens when there's a NULL input; we + * just keep the prior transValue. + */ + if (fcinfo->argnull[1]) + return; + if (simple_agg->noCollectValue) + { + /* + * result has not been initialized + * We must copy the datum into result if it is pass-by-ref. We + * do not need to pfree the old result, since it's NULL. + */ + simple_agg->collectValue = datumCopy(fcinfo->arg[1], + simple_agg->transtypeByVal, + simple_agg->transtypeLen); + simple_agg->collectValueNull = false; + simple_agg->noCollectValue = false; + return; + } + if (simple_agg->collectValueNull) + { + /* + * Don't call a strict function with NULL inputs. Note it is + * possible to get here despite the above tests, if the transfn is + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. + */ + return; + } + } + + /* + * OK to call the transition function + */ + InitFunctionCallInfoData(*fcinfo, &(simple_agg->transfn), 2, (void *) simple_agg, NULL); + fcinfo->arg[0] = simple_agg->collectValue; + fcinfo->argnull[0] = simple_agg->collectValueNull; + newVal = FunctionCallInvoke(fcinfo); + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * pfree the prior transValue. But if transfn returned a pointer to its + * first input, we don't need to do anything. + */ + if (!simple_agg->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(simple_agg->collectValue)) + { + if (!fcinfo->isnull) + { + newVal = datumCopy(newVal, + simple_agg->transtypeByVal, + simple_agg->transtypeLen); + } + if (!simple_agg->collectValueNull) + pfree(DatumGetPointer(simple_agg->collectValue)); + } + + simple_agg->collectValue = newVal; + simple_agg->collectValueNull = fcinfo->isnull; +} + +/* + * Convert RowDescription message to a TupleDesc + */ +static TupleDesc +create_tuple_desc(char *msg_body, size_t len) +{ + TupleDesc result; + int i, nattr; + uint16 n16; + + /* get number of attributes */ + memcpy(&n16, msg_body, 2); + nattr = ntohs(n16); + msg_body += 2; + + result = CreateTemplateTupleDesc(nattr, false); + + /* decode attributes */ + for (i = 1; i <= nattr; i++) + { + AttrNumber attnum; + char *attname; + Oid oidtypeid; + int32 typmod; + + uint32 n32; + + attnum = (AttrNumber) i; + + /* attribute name */ + attname = msg_body; + msg_body += strlen(attname) + 1; + + /* table OID, ignored */ + msg_body += 4; + + /* column no, ignored */ + msg_body += 2; + + /* data type */ + memcpy(&n32, msg_body, 4); + oidtypeid = ntohl(n32); + msg_body += 4; + + /* type len, ignored */ + msg_body += 2; + + /* type mod */ + memcpy(&n32, msg_body, 4); + typmod = ntohl(n32); + msg_body += 4; + + /* PGXCTODO text/binary flag? */ + msg_body += 2; + + TupleDescInitEntry(result, attnum, attname, oidtypeid, typmod, 0); + } + return result; +} + +static void +exec_simple_aggregates(RemoteQueryState *combiner, TupleTableSlot *slot) +{ + ListCell *lc; + + Assert(combiner->simple_aggregates); + Assert(!TupIsNull(slot)); + + if (combiner->initAggregates) + { + foreach (lc, combiner->simple_aggregates) + initialize_collect_aggregates((SimpleAgg *) lfirst(lc)); + + combiner->initAggregates = false; + } + + foreach (lc, combiner->simple_aggregates) + { + SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); + FunctionCallInfoData fcinfo; + int attr = simple_agg->column_pos; + + slot_getsomeattrs(slot, attr + 1); + fcinfo.arg[1] = slot->tts_values[attr]; + fcinfo.argnull[1] = slot->tts_isnull[attr]; + + advance_collect_function(simple_agg, &fcinfo); + } +} + +static void +finish_simple_aggregates(RemoteQueryState *combiner, TupleTableSlot *slot) +{ + ListCell *lc; + ExecClearTuple(slot); + + /* + * Aggregates may not been initialized if no rows has been received + * from the data nodes because of HAVING clause. + * In this case finish_simple_aggregates() should return empty slot + */ + if (!combiner->initAggregates) + { + foreach (lc, combiner->simple_aggregates) + { + SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); + int attr = simple_agg->column_pos; + + finalize_collect_aggregates(simple_agg, + slot->tts_values + attr, + slot->tts_isnull + attr); + } + ExecStoreVirtualTuple(slot); + /* To prevent aggregates get finalized again */ + combiner->initAggregates = true; + } +} + +/* + * Handle CopyOutCommandComplete ('c') message from a data node connection + */ +static void +HandleCopyOutComplete(RemoteQueryState *combiner) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + /* Just do nothing, close message is managed by the coordinator */ + combiner->copy_out_count++; +} + +/* + * Handle CommandComplete ('C') message from a data node connection + */ +static void +HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + int digits = 0; + + /* + * If we did not receive description we are having rowcount or OK response + */ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COMMAND; + /* Extract rowcount */ + if (combiner->combine_type != COMBINE_TYPE_NONE) + { + uint64 rowcount; + digits = parse_row_count(msg_body, len, &rowcount); + if (digits > 0) + { + /* Replicated write, make sure they are the same */ + if (combiner->combine_type == COMBINE_TYPE_SAME) + { + if (combiner->command_complete_count) + { + if (rowcount != combiner->row_count) + /* There is a consistency issue in the database with the replicated table */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Write to replicated table returned different results from the data nodes"))); + } + else + /* first result */ + combiner->row_count = rowcount; + } + else + combiner->row_count += rowcount; + } + else + combiner->combine_type = COMBINE_TYPE_NONE; + } + if (++combiner->command_complete_count == combiner->node_count) + { + if (combiner->completionTag) + { + if (combiner->combine_type == COMBINE_TYPE_NONE) + { + /* ensure we do not go beyond buffer bounds */ + if (len > COMPLETION_TAG_BUFSIZE) + len = COMPLETION_TAG_BUFSIZE; + memcpy(combiner->completionTag, msg_body, len); + } + else + { + /* Truncate msg_body to get base string */ + msg_body[len - digits - 1] = '\0'; + snprintf(combiner->completionTag, + COMPLETION_TAG_BUFSIZE, + "%s" UINT64_FORMAT, + msg_body, + combiner->row_count); + } + } + } +} + +/* + * Handle RowDescription ('T') message from a data node connection + */ +static bool +HandleRowDescription(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_QUERY; + if (combiner->request_type != REQUEST_TYPE_QUERY) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* Increment counter and check if it was first */ + if (combiner->description_count++ == 0) + { + combiner->tuple_desc = create_tuple_desc(msg_body, len); + return true; + } + return false; +} + +/* + * Handle ParameterStatus ('S') message from a data node connection (SET command) + */ +static void +HandleParameterStatus(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_QUERY; + if (combiner->request_type != REQUEST_TYPE_QUERY) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* Proxy last */ + if (++combiner->description_count == combiner->node_count) + { + pq_putmessage('S', msg_body, len); + } +} + +/* + * Handle CopyInResponse ('G') message from a data node connection + */ +static void +HandleCopyIn(RemoteQueryState *combiner) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_IN; + if (combiner->request_type != REQUEST_TYPE_COPY_IN) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* + * The normal PG code will output an G message when it runs in the + * coordinator, so do not proxy message here, just count it. + */ + combiner->copy_in_count++; +} + +/* + * Handle CopyOutResponse ('H') message from a data node connection + */ +static void +HandleCopyOut(RemoteQueryState *combiner) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + /* + * The normal PG code will output an H message when it runs in the + * coordinator, so do not proxy message here, just count it. + */ + combiner->copy_out_count++; +} + +/* + * Handle CopyOutDataRow ('d') message from a data node connection + */ +static void +HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + + /* Inconsistent responses */ + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + + /* If there is a copy file, data has to be sent to the local file */ + if (combiner->copy_file) + /* write data to the copy file */ + fwrite(msg_body, 1, len, combiner->copy_file); + else + pq_putmessage('d', msg_body, len); +} + +/* + * Handle DataRow ('D') message from a data node connection + * The function returns true if buffer can accept more data rows. + * Caller must stop reading if function returns false + */ +static void +HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + /* We expect previous message is consumed */ + Assert(combiner->msg == NULL); + + if (combiner->request_type != REQUEST_TYPE_QUERY) + { + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + + /* + * If we got an error already ignore incoming data rows from other nodes + * Still we want to continue reading until get CommandComplete + */ + if (combiner->errorMessage) + return; + + /* + * We are copying message because it points into connection buffer, and + * will be overwritten on next socket read + */ + combiner->msg = (char *) palloc(len); + memcpy(combiner->msg, msg_body, len); + combiner->msglen = len; +} + +/* + * Handle ErrorResponse ('E') message from a data node connection + */ +static void +HandleError(RemoteQueryState *combiner, char *msg_body, size_t len) +{ + /* parse error message */ + char *severity = NULL; + char *code = NULL; + char *message = NULL; + char *detail = NULL; + char *hint = NULL; + char *position = NULL; + char *int_position = NULL; + char *int_query = NULL; + char *where = NULL; + char *file = NULL; + char *line = NULL; + char *routine = NULL; + int offset = 0; + + /* + * Scan until point to terminating \0 + */ + while (offset + 1 < len) + { + /* pointer to the field message */ + char *str = msg_body + offset + 1; + + switch (msg_body[offset]) + { + case 'S': + severity = str; + break; + case 'C': + code = str; + break; + case 'M': + message = str; + break; + case 'D': + detail = str; + break; + case 'H': + hint = str; + break; + case 'P': + position = str; + break; + case 'p': + int_position = str; + break; + case 'q': + int_query = str; + break; + case 'W': + where = str; + break; + case 'F': + file = str; + break; + case 'L': + line = str; + break; + case 'R': + routine = str; + break; + } + + /* code, message and \0 */ + offset += strlen(str) + 2; + } + + /* + * We may have special handling for some errors, default handling is to + * throw out error with the same message. We can not ereport immediately + * because we should read from this and other connections until + * ReadyForQuery is received, so we just store the error message. + * If multiple connections return errors only first one is reported. + */ + if (!combiner->errorMessage) + { + combiner->errorMessage = pstrdup(message); + /* Error Code is exactly 5 significant bytes */ + memcpy(combiner->errorCode, code, 5); + } + + /* + * If data node have sent ErrorResponse it will never send CommandComplete. + * Increment the counter to prevent endless waiting for it. + */ + combiner->command_complete_count++; +} + +/* + * Examine the specified combiner state and determine if command was completed + * successfully + */ +static bool +validate_combiner(RemoteQueryState *combiner) +{ + /* There was error message while combining */ + if (combiner->errorMessage) + return false; + /* Check if state is defined */ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + return false; + /* Check all nodes completed */ + if ((combiner->request_type == REQUEST_TYPE_COMMAND + || combiner->request_type == REQUEST_TYPE_QUERY) + && combiner->command_complete_count != combiner->node_count) + return false; + + /* Check count of description responses */ + if (combiner->request_type == REQUEST_TYPE_QUERY + && combiner->description_count != combiner->node_count) + return false; + + /* Check count of copy-in responses */ + if (combiner->request_type == REQUEST_TYPE_COPY_IN + && combiner->copy_in_count != combiner->node_count) + return false; + + /* Check count of copy-out responses */ + if (combiner->request_type == REQUEST_TYPE_COPY_OUT + && combiner->copy_out_count != combiner->node_count) + return false; + + /* Add other checks here as needed */ + + /* All is good if we are here */ + return true; +} + +/* + * Close combiner and free allocated memory, if it is not needed + */ +static void +CloseCombiner(RemoteQueryState *combiner) +{ + if (combiner) + { + if (combiner->connections) + pfree(combiner->connections); + if (combiner->tuple_desc) + FreeTupleDesc(combiner->tuple_desc); + if (combiner->errorMessage) + pfree(combiner->errorMessage); + pfree(combiner); + } +} + +/* + * Validate combiner and release storage freeing allocated memory + */ +static bool +ValidateAndCloseCombiner(RemoteQueryState *combiner) +{ + bool valid = validate_combiner(combiner); + + CloseCombiner(combiner); + + return valid; +} + +/* + * Validate combiner and reset storage + */ +static bool +ValidateAndResetCombiner(RemoteQueryState *combiner) +{ + bool valid = validate_combiner(combiner); + + if (combiner->connections) + pfree(combiner->connections); + if (combiner->tuple_desc) + FreeTupleDesc(combiner->tuple_desc); + if (combiner->msg) + pfree(combiner->msg); + if (combiner->errorMessage) + pfree(combiner->errorMessage); + + combiner->command_complete_count = 0; + combiner->connections = NULL; + combiner->conn_count = 0; + combiner->row_count = 0; + combiner->request_type = REQUEST_TYPE_NOT_DEFINED; + combiner->tuple_desc = NULL; + combiner->description_count = 0; + combiner->copy_in_count = 0; + combiner->copy_out_count = 0; + combiner->errorMessage = NULL; + combiner->query_Done = false; + combiner->msg = NULL; + combiner->msglen = 0; + combiner->simple_aggregates = NULL; + combiner->copy_file = NULL; + + return valid; +} + +/* + * Get next data row from the combiner's buffer into provided slot + * Just clear slot and return false if buffer is empty, that means more data + * should be read + */ +bool +FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot) +{ + /* have messages in the buffer, consume them */ + if (combiner->msg) + { + ExecStoreDataRowTuple(combiner->msg, combiner->msglen, slot, true); + combiner->msg = NULL; + combiner->msglen = 0; + return true; + } + /* inform caller that buffer is empty */ + ExecClearTuple(slot); + return false; +} + + +/* + * Handle responses from the Data node connections + */ +static void +data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, + struct timeval * timeout, RemoteQueryState *combiner) +{ + int count = conn_count; + DataNodeHandle *to_receive[conn_count]; + + /* make a copy of the pointers to the connections */ + memcpy(to_receive, connections, conn_count * sizeof(DataNodeHandle *)); + + /* + * Read results. + * Note we try and read from data node connections even if there is an error on one, + * so as to avoid reading incorrect results on the next statement. + * It might be better to just destroy these connections and tell the pool manager. + */ + while (count > 0) + { + int i = 0; + + data_node_receive(count, to_receive, timeout); + while (i < count) + { + switch (handle_response(to_receive[i], combiner)) + { + case RESPONSE_EOF: /* have something to read, keep receiving */ + i++; + break; + case RESPONSE_COMPLETE: + case RESPONSE_COPY: + /* Handling is done, do not track this connection */ + count--; + /* Move last connection in place */ + if (i < count) + to_receive[i] = to_receive[count]; + break; + default: + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unexpected response from the data nodes"))); + } + } + } +} + +/* + * Read next message from the connection and update the combiner accordingly + * If we are in an error state we just consume the messages, and do not proxy + * Long term, we should look into cancelling executing statements + * and closing the connections. + * Return values: + * EOF - need to receive more data for the connection + * 0 - done with the connection + * 1 - got data row + * 2 - got copy response + */ +int +handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) +{ + char *msg; + int msg_len; + + for (;;) + { + /* No data available, exit */ + if (conn->state == DN_CONNECTION_STATE_QUERY) + return RESPONSE_EOF; + + /* TODO handle other possible responses */ + switch (get_message(conn, &msg_len, &msg)) + { + case '\0': /* Not enough data in the buffer */ + conn->state = DN_CONNECTION_STATE_QUERY; + return RESPONSE_EOF; + case 'c': /* CopyToCommandComplete */ + conn->state = DN_CONNECTION_STATE_COMPLETED; + HandleCopyOutComplete(combiner); + break; + case 'C': /* CommandComplete */ + conn->state = DN_CONNECTION_STATE_COMPLETED; + HandleCommandComplete(combiner, msg, msg_len); + break; + case 'T': /* RowDescription */ + if (HandleRowDescription(combiner, msg, msg_len)) + return RESPONSE_TUPDESC; + break; + case 'D': /* DataRow */ + HandleDataRow(combiner, msg, msg_len); + return RESPONSE_DATAROW; + case 'G': /* CopyInResponse */ + conn->state = DN_CONNECTION_STATE_COPY_IN; + HandleCopyIn(combiner); + /* Done, return to caller to let it know the data can be passed in */ + return RESPONSE_COPY; + case 'H': /* CopyOutResponse */ + conn->state = DN_CONNECTION_STATE_COPY_OUT; + HandleCopyOut(combiner); + return RESPONSE_COPY; + case 'd': /* CopyOutDataRow */ + conn->state = DN_CONNECTION_STATE_COPY_OUT; + HandleCopyDataRow(combiner, msg, msg_len); + break; + case 'E': /* ErrorResponse */ + HandleError(combiner, msg, msg_len); + conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; + /* + * Do not return with an error, we still need to consume Z, + * ready-for-query + */ + break; + case 'A': /* NotificationResponse */ + case 'N': /* NoticeResponse */ + /* + * Ignore these to prevent multiple messages, one from each + * node. Coordinator will send one for DDL anyway + */ + break; + case 'Z': /* ReadyForQuery */ + conn->transaction_status = msg[0]; + conn->state = DN_CONNECTION_STATE_IDLE; + return RESPONSE_COMPLETE; + case 'I': /* EmptyQuery */ + default: + /* sync lost? */ + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + return RESPONSE_EOF; + } + } + /* Keep compiler quiet */ + return RESPONSE_EOF; +} + +/* + * Send BEGIN command to the Data nodes and receive responses + */ +static int +data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid) +{ + int i; + struct timeval *timeout = NULL; + RemoteQueryState *combiner; + + /* Send BEGIN */ + for (i = 0; i < conn_count; i++) + { + if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid)) + return EOF; + + if (data_node_send_query(connections[i], "BEGIN")) + return EOF; + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + + /* Verify status */ + return ValidateAndCloseCombiner(combiner) ? 0 : EOF; +} + +/* Clears the write node list */ +static void +clear_write_node_list() +{ + /* we just malloc once and use counter */ + if (write_node_list == NULL) + { + write_node_list = (DataNodeHandle **) malloc(NumDataNodes * sizeof(DataNodeHandle *)); + } + write_node_count = 0; +} + + +/* + * Switch autocommmit mode off, so all subsequent statements will be in the same transaction + */ +void +DataNodeBegin(void) +{ + autocommit = false; + clear_write_node_list(); +} + + +/* + * Commit current transaction, use two-phase commit if necessary + */ +int +DataNodeCommit(CommandDest dest) +{ + int res; + int tran_count; + DataNodeHandle *connections[NumDataNodes]; + + /* gather connections to commit */ + tran_count = get_transaction_nodes(connections); + + /* + * If we do not have open transactions we have nothing to commit, just + * report success + */ + if (tran_count == 0) + goto finish; + + res = data_node_commit(tran_count, connections, dest); + +finish: + /* In autocommit mode statistics is collected in DataNodeExec */ + if (!autocommit) + stat_transaction(tran_count); + if (!PersistentConnections) + release_handles(); + autocommit = true; + clear_write_node_list(); + return res; +} + + +/* + * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses + */ +static int +data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest) +{ + int i; + struct timeval *timeout = NULL; + char buffer[256]; + GlobalTransactionId gxid = InvalidGlobalTransactionId; + int result = 0; + RemoteQueryState *combiner = NULL; + + + /* can set this to false to disable temporarily */ + /* bool do2PC = conn_count > 1; */ + + /* + * Only use 2PC if more than one node was written to. Otherwise, just send + * COMMIT to all + */ + bool do2PC = write_node_count > 1; + + /* Extra XID for Two Phase Commit */ + GlobalTransactionId two_phase_xid = 0; + + if (do2PC) + { + stat_2pc(); + + /* + * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here, + * but since we need 2pc, we surely have sent down a command and got + * gxid for it. Hence GetCurrentGlobalTransactionId() just returns + * already allocated gxid + */ + gxid = GetCurrentGlobalTransactionId(); + + sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid); + /* Send PREPARE */ + for (i = 0; i < conn_count; i++) + { + if (data_node_send_query(connections[i], buffer)) + return EOF; + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + + /* Reset combiner */ + if (!ValidateAndResetCombiner(combiner)) + { + result = EOF; + } + } + + if (!do2PC) + strcpy(buffer, "COMMIT"); + else + { + if (result) + sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid); + else + sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); + + /* We need to use a new xid, the data nodes have reset */ + two_phase_xid = BeginTranGTM(); + for (i = 0; i < conn_count; i++) + { + if (data_node_send_gxid(connections[i], two_phase_xid)) + { + add_error_message(connections[i], "Can not send request"); + result = EOF; + goto finish; + } + } + } + + /* Send COMMIT */ + for (i = 0; i < conn_count; i++) + { + if (data_node_send_query(connections[i], buffer)) + { + result = EOF; + goto finish; + } + } + + if (!combiner) + { + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + } + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + result = ValidateAndCloseCombiner(combiner) ? result : EOF; + +finish: + if (do2PC) + CommitTranGTM((GlobalTransactionId) two_phase_xid); + + return result; +} + + +/* + * Rollback current transaction + */ +int +DataNodeRollback(CommandDest dest) +{ + int res = 0; + int tran_count; + DataNodeHandle *connections[NumDataNodes]; + + /* gather connections to rollback */ + tran_count = get_transaction_nodes(connections); + + /* + * If we do not have open transactions we have nothing to rollback just + * report success + */ + if (tran_count == 0) + goto finish; + + res = data_node_rollback(tran_count, connections, dest); + +finish: + /* In autocommit mode statistics is collected in DataNodeExec */ + if (!autocommit) + stat_transaction(tran_count); + if (!PersistentConnections) + release_handles(); + autocommit = true; + clear_write_node_list(); + return res; +} + + +/* + * Send ROLLBACK command down to the Data nodes and handle responses + */ +static int +data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest) +{ + int i; + struct timeval *timeout = NULL; + int result = 0; + RemoteQueryState *combiner; + + /* Send ROLLBACK - */ + for (i = 0; i < conn_count; i++) + { + if (data_node_send_query(connections[i], "ROLLBACK")) + result = EOF; + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + + /* Verify status */ + return ValidateAndCloseCombiner(combiner) ? 0 : EOF; +} + + +/* + * Begin COPY command + * The copy_connections array must have room for NumDataNodes items + */ +DataNodeHandle** +DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from) +{ + int i, j; + int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist); + struct timeval *timeout = NULL; + DataNodeHandle **connections; + DataNodeHandle **copy_connections; + DataNodeHandle *newConnections[conn_count]; + int new_count = 0; + ListCell *nodeitem; + bool need_tran; + GlobalTransactionId gxid; + RemoteQueryState *combiner; + + if (conn_count == 0) + return NULL; + + /* Get needed datanode connections */ + connections = get_handles(nodelist); + if (!connections) + return NULL; + + need_tran = !autocommit || conn_count > 1; + + elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false"); + + /* + * We need to be able quickly find a connection handle for specified node number, + * So store connections in an array where index is node-1. + * Unused items in the array should be NULL + */ + copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *)); + i = 0; + foreach(nodeitem, nodelist) + copy_connections[lfirst_int(nodeitem) - 1] = connections[i++]; + + /* Gather statistics */ + stat_statement(); + if (autocommit) + stat_transaction(conn_count); + + /* We normally clear for transactions, but if autocommit, clear here, too */ + if (autocommit) + { + clear_write_node_list(); + } + + /* Check status of connections */ + /* We want to track new "write" nodes, and new nodes in the current transaction + * whether or not they are write nodes. */ + if (write_node_count < NumDataNodes) + { + for (i = 0; i < conn_count; i++) + { + bool found = false; + for (j=0; j<write_node_count && !found; j++) + { + if (write_node_list[j] == connections[i]) + found = true; + } + if (!found) + { + /* + * Add to transaction wide-list if COPY FROM + * CopyOut (COPY TO) is not a write operation, no need to update + */ + if (is_from) + write_node_list[write_node_count++] = connections[i]; + /* Add to current statement list */ + newConnections[new_count++] = connections[i]; + } + } + // Check connection state is DN_CONNECTION_STATE_IDLE + } + + gxid = GetCurrentGlobalTransactionId(); + + /* elog(DEBUG1, "Current gxid = %d", gxid); */ + + if (!GlobalTransactionIdIsValid(gxid)) + { + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (new_count > 0 && need_tran) + { + /* Start transaction on connections where it is not started */ + if (data_node_begin(new_count, newConnections, DestNone, gxid)) + { + pfree(connections); + pfree(copy_connections); + return NULL; + } + } + + /* Send query to nodes */ + for (i = 0; i < conn_count; i++) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(connections[i], gxid)) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (snapshot && data_node_send_snapshot(connections[i], snapshot)) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (data_node_send_query(connections[i], query) != 0) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + } + + /* + * We are expecting CopyIn response, but do not want to send it to client, + * caller should take care about this, because here we do not know if + * client runs console or file copy + */ + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + if (!ValidateAndCloseCombiner(combiner)) + { + if (autocommit) + { + if (need_tran) + DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE); + else + if (!PersistentConnections) release_handles(); + } + + pfree(connections); + pfree(copy_connections); + return NULL; + } + + pfree(connections); + return copy_connections; +} + +/* + * Send a data row to the specified nodes + */ +int +DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections) +{ + DataNodeHandle *primary_handle = NULL; + ListCell *nodeitem; + /* size + data row + \n */ + int msgLen = 4 + len + 1; + int nLen = htonl(msgLen); + + if (exec_nodes->primarynodelist) + { + primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1]; + } + + if (primary_handle) + { + if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN) + { + /* precalculate to speed up access */ + int bytes_needed = primary_handle->outEnd + 1 + msgLen; + + /* flush buffer if it is almost full */ + if (bytes_needed > COPY_BUFFER_SIZE) + { + /* First look if data node has sent a error message */ + int read_status = data_node_read_data(primary_handle); + if (read_status == EOF || read_status < 0) + { + add_error_message(primary_handle, "failed to read data from data node"); + return EOF; + } + + if (primary_handle->inStart < primary_handle->inEnd) + { + RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + handle_response(primary_handle, combiner); + if (!ValidateAndCloseCombiner(combiner)) + return EOF; + } + + if (DN_CONNECTION_STATE_ERROR(primary_handle)) + return EOF; + + if (send_some(primary_handle, primary_handle->outEnd) < 0) + { + add_error_message(primary_handle, "failed to send data to data node"); + return EOF; + } + } + + if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + primary_handle->outBuffer[primary_handle->outEnd++] = 'd'; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); + primary_handle->outEnd += 4; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len); + primary_handle->outEnd += len; + primary_handle->outBuffer[primary_handle->outEnd++] = '\n'; + } + else + { + add_error_message(primary_handle, "Invalid data node connection"); + return EOF; + } + } + + foreach(nodeitem, exec_nodes->nodelist) + { + DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1]; + if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN) + { + /* precalculate to speed up access */ + int bytes_needed = handle->outEnd + 1 + msgLen; + + /* flush buffer if it is almost full */ + if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD) + || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE)) + { + int to_send = handle->outEnd; + + /* First look if data node has sent a error message */ + int read_status = data_node_read_data(handle); + if (read_status == EOF || read_status < 0) + { + add_error_message(handle, "failed to read data from data node"); + return EOF; + } + + if (handle->inStart < handle->inEnd) + { + RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE); + combiner->dest = None_Receiver; + handle_response(handle, combiner); + if (!ValidateAndCloseCombiner(combiner)) + return EOF; + } + + if (DN_CONNECTION_STATE_ERROR(handle)) + return EOF; + + /* + * Allow primary node to write out data before others. + * If primary node was blocked it would not accept copy data. + * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes. + * If primary node is blocked and is buffering, other buffers will + * grow accordingly. + */ + if (primary_handle) + { + if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd) + to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD; + else + to_send = 0; + } + + /* + * Try to send down buffered data if we have + */ + if (to_send && send_some(handle, to_send) < 0) + { + add_error_message(handle, "failed to send data to data node"); + return EOF; + } + } + + if (ensure_out_buffer_capacity(bytes_needed, handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + handle->outBuffer[handle->outEnd++] = 'd'; + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + memcpy(handle->outBuffer + handle->outEnd, data_row, len); + handle->outEnd += len; + handle->outBuffer[handle->outEnd++] = '\n'; + } + else + { + add_error_message(handle, "Invalid data node connection"); + return EOF; + } + } + + return 0; +} + +uint64 +DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file) +{ + RemoteQueryState *combiner; + int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist); + int count = 0; + bool need_tran; + List *nodelist; + ListCell *nodeitem; + uint64 processed = 0; + + nodelist = exec_nodes->nodelist; + need_tran = !autocommit || conn_count > 1; + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM); + combiner->dest = None_Receiver; + /* If there is an existing file where to copy data, pass it to combiner */ + if (copy_file) + combiner->copy_file = copy_file; + + foreach(nodeitem, exec_nodes->nodelist) + { + DataNodeHandle *handle = copy_connections[count]; + count++; + + if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + int read_status = 0; + /* H message has been consumed, continue to manage data row messages */ + while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */ + { + if (handle_response(handle,combiner) == RESPONSE_EOF) + { + /* read some extra-data */ + read_status = data_node_read_data(handle); + if (read_status < 0) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on datanode connection"))); + } + /* There is no more data that can be read from connection */ + } + } + } + + processed = combiner->row_count; + + if (!ValidateAndCloseCombiner(combiner)) + { + if (autocommit && !PersistentConnections) + release_handles(); + pfree(copy_connections); + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + } + + return processed; +} + +/* + * Finish copy process on all connections + */ +uint64 +DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, + CombineType combine_type) +{ + int i; + int nLen = htonl(4); + RemoteQueryState *combiner = NULL; + bool need_tran; + bool error = false; + struct timeval *timeout = NULL; /* wait forever */ + DataNodeHandle *connections[NumDataNodes]; + DataNodeHandle *primary_handle = NULL; + int conn_count = 0; + uint64 processed; + + for (i = 0; i < NumDataNodes; i++) + { + DataNodeHandle *handle = copy_connections[i]; + + if (!handle) + continue; + + if (i == primary_data_node - 1) + primary_handle = handle; + else + connections[conn_count++] = handle; + } + + if (primary_handle) + { + if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + primary_handle->outBuffer[primary_handle->outEnd++] = 'c'; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); + primary_handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (data_node_flush(primary_handle) < 0) + { + error = true; + } + } + else + { + error = true; + } + + combiner = CreateResponseCombiner(conn_count + 1, combine_type); + combiner->dest = None_Receiver; + data_node_receive_responses(1, &primary_handle, timeout, combiner); + } + + for (i = 0; i < conn_count; i++) + { + DataNodeHandle *handle = connections[i]; + + if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + handle->outBuffer[handle->outEnd++] = 'c'; + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (data_node_flush(handle) < 0) + { + error = true; + } + } + else + { + error = true; + } + } + + need_tran = !autocommit || primary_handle || conn_count > 1; + + if (!combiner) + { + combiner = CreateResponseCombiner(conn_count, combine_type); + combiner->dest = None_Receiver; + } + data_node_receive_responses(conn_count, connections, timeout, combiner); + + processed = combiner->row_count; + + if (!ValidateAndCloseCombiner(combiner) || error) + { + if (autocommit) + { + if (need_tran) + DataNodeRollback(DestNone); + else + if (!PersistentConnections) release_handles(); + } + + return 0; + } + + if (autocommit) + { + if (need_tran) + DataNodeCommit(DestNone); + else + if (!PersistentConnections) release_handles(); + } + + return processed; +} + +RemoteQueryState * +ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) +{ + RemoteQueryState *remotestate; + + remotestate = CreateResponseCombiner(0, node->combine_type); + remotestate->ss.ps.plan = (Plan *) node; + remotestate->ss.ps.state = estate; + remotestate->simple_aggregates = node->simple_aggregates; + + ExecInitResultTupleSlot(estate, &remotestate->ss.ps); + if (node->plan.targetlist) + { + TupleDesc typeInfo = ExecCleanTypeFromTL(node->plan.targetlist, false); + ExecSetSlotDescriptor(remotestate->ss.ps.ps_ResultTupleSlot, typeInfo); + } + + ExecInitScanTupleSlot(estate, &remotestate->ss); + /* + * Tuple description for the scan slot will be set on runtime from + * a RowDescription message + */ + + if (node->distinct) + { + /* prepare equate functions */ + remotestate->eqfunctions = + execTuplesMatchPrepare(node->distinct->numCols, + node->distinct->eqOperators); + /* create memory context for execTuplesMatch */ + remotestate->tmp_ctx = + AllocSetContextCreate(CurrentMemoryContext, + "RemoteUnique", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } + return remotestate; +} + + +static void +copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) +{ + if (src->tts_dataRow + && dst->tts_tupleDescriptor->natts == src->tts_tupleDescriptor->natts) + { + if (src->tts_mcxt == dst->tts_mcxt) + { + /* now dst slot controls the backing message */ + ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, dst, src->tts_shouldFreeRow); + src->tts_shouldFreeRow = false; + } + else + { + /* have to make a copy */ + MemoryContext oldcontext = MemoryContextSwitchTo(dst->tts_mcxt); + int len = src->tts_dataLen; + char *msg = (char *) palloc(len); + + memcpy(msg, src->tts_dataRow, len); + ExecStoreDataRowTuple(msg, len, dst, true); + MemoryContextSwitchTo(oldcontext); + } + } + else + { + int i; + ExecClearTuple(dst); + slot_getallattrs(src); + /* PGXCTODO revisit: probably incorrect */ + for (i = 0; i < dst->tts_tupleDescriptor->natts; i++) + { + dst->tts_values[i] = src->tts_values[i]; + dst->tts_isnull[i] = src->tts_isnull[i]; + } + ExecStoreVirtualTuple(dst); + } +} + +/* + * Execute step of PGXC plan. + * The step specifies a command to be executed on specified nodes. + * On first invocation connections to the data nodes are initialized and + * command is executed. Further, as well as within subsequent invocations, + * responses are received until step is completed or there is a tuple to emit. + * If there is a tuple it is returned, otherwise returned NULL. The NULL result + * from the function indicates completed step. + * The function returns at most one tuple per invocation. + */ +TupleTableSlot * +ExecRemoteQuery(RemoteQueryState *node) +{ + RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; + TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; + + if (!node->query_Done) + { + /* First invocation, initialize */ + Exec_Nodes *exec_nodes = step->exec_nodes; + bool force_autocommit = step->force_autocommit; + bool is_read_only = step->read_only; + GlobalTransactionId gxid = InvalidGlobalTransactionId; + Snapshot snapshot = GetActiveSnapshot(); + DataNodeHandle **connections = NULL; + DataNodeHandle **primaryconnection = NULL; + List *nodelist = NIL; + List *primarynode = NIL; + int i; + int j; + int regular_conn_count; + int total_conn_count; + bool need_tran; + + if (exec_nodes) + { + nodelist = exec_nodes->nodelist; + primarynode = exec_nodes->primarynodelist; + } + + if (list_length(nodelist) == 0) + { + if (primarynode) + regular_conn_count = NumDataNodes - 1; + else + regular_conn_count = NumDataNodes; + } + else + { + regular_conn_count = list_length(nodelist); + } + + total_conn_count = regular_conn_count; + node->node_count = total_conn_count; + + /* Get connection for primary node, if used */ + if (primarynode) + { + primaryconnection = get_handles(primarynode); + if (!primaryconnection) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not obtain connection from pool"))); + total_conn_count++; + } + + /* Get other connections (non-primary) */ + connections = get_handles(nodelist); + if (!connections) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not obtain connection from pool"))); + + if (force_autocommit) + need_tran = false; + else + need_tran = !autocommit || total_conn_count > 1; + + elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + + stat_statement(); + if (autocommit) + { + stat_transaction(total_conn_count); + /* We normally clear for transactions, but if autocommit, clear here, too */ + clear_write_node_list(); + } + + /* Check status of connections */ + /* + * We would want to run 2PC if current transaction modified more then + * one node. So optimize little bit and do not look further if we + * already have two. + */ + if (!is_read_only && write_node_count < 2) + { + bool found; + + if (primaryconnection) + { + found = false; + for (j = 0; j < write_node_count && !found; j++) + { + if (write_node_list[j] == primaryconnection[0]) + found = true; + } + if (!found) + { + /* Add to transaction wide-list */ + write_node_list[write_node_count++] = primaryconnection[0]; + } + } + for (i = 0; i < regular_conn_count && write_node_count < 2; i++) + { + found = false; + for (j = 0; j < write_node_count && !found; j++) + { + if (write_node_list[j] == connections[i]) + found = true; + } + if (!found) + { + /* Add to transaction wide-list */ + write_node_list[write_node_count++] = connections[i]; + } + } + } + + gxid = GetCurrentGlobalTransactionId(); + + if (!GlobalTransactionIdIsValid(gxid)) + { + if (primaryconnection) + pfree(primaryconnection); + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to get next transaction ID"))); + } + + if (need_tran) + { + /* + * Check if data node connections are in transaction and start + * transactions on nodes where it is not started + */ + DataNodeHandle *new_connections[total_conn_count]; + int new_count = 0; + + if (primaryconnection && primaryconnection[0]->transaction_status != 'T') + new_connections[new_count++] = primaryconnection[0]; + for (i = 0; i < regular_conn_count; i++) + if (connections[i]->transaction_status != 'T') + new_connections[new_count++] = connections[i]; + + if (new_count) + data_node_begin(new_count, new_connections, DestNone, gxid); + } + + /* See if we have a primary nodes, execute on it first before the others */ + if (primaryconnection) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid)) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot)) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (data_node_send_query(primaryconnection[0], step->sql_statement) != 0) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + + Assert(node->combine_type == COMBINE_TYPE_SAME); + + while (node->command_complete_count < 1) + { + PG_TRY(); + { + data_node_receive(1, primaryconnection, NULL); + while (handle_response(primaryconnection[0], node) == RESPONSE_EOF) + data_node_receive(1, primaryconnection, NULL); + if (node->errorMessage) + { + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); + } + } + /* If we got an error response return immediately */ + PG_CATCH(); + { + /* We are going to exit, so release combiner */ + if (autocommit) + { + if (need_tran) + DataNodeRollback(DestNone); + else if (!PersistentConnections) + release_handles(); + } + + pfree(primaryconnection); + pfree(connections); + PG_RE_THROW(); + } + PG_END_TRY(); + } + pfree(primaryconnection); + } + + for (i = 0; i < regular_conn_count; i++) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(connections[i], gxid)) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && data_node_send_snapshot(connections[i], snapshot)) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (data_node_send_query(connections[i], step->sql_statement) != 0) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + } + + PG_TRY(); + { + /* + * Stop if all commands are completed or we got a data row and + * initialized state node for subsequent invocations + */ + while (regular_conn_count > 0 && node->connections == NULL) + { + int i = 0; + + data_node_receive(regular_conn_count, connections, NULL); + /* + * Handle input from the data nodes. + * If we got a RESPONSE_DATAROW we can break handling to wrap + * it into a tuple and return. Handling will be continued upon + * subsequent invocations. + * If we got 0, we exclude connection from the list. We do not + * expect more input from it. In case of non-SELECT query we quit + * the loop when all nodes finish their work and send ReadyForQuery + * with empty connections array. + * If we got EOF, move to the next connection, will receive more + * data on the next iteration. + */ + while (i < regular_conn_count) + { + int res = handle_response(connections[i], node); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) + { + if (i < --regular_conn_count) + connections[i] = connections[regular_conn_count]; + } + else if (res == RESPONSE_TUPDESC) + { + ExecSetSlotDescriptor(scanslot, node->tuple_desc); + /* + * we should send to client not the tuple_desc we just + * received, but tuple_desc from the planner. + * Data node may be sending junk columns for sorting + */ + (*node->dest->rStartup) (node->dest, CMD_SELECT, + resultslot->tts_tupleDescriptor); + if (step->sort) + { + SimpleSort *sort = step->sort; + + node->connections = connections; + node->conn_count = regular_conn_count; + /* + * First message is already in the buffer + * Further fetch will be under tuplesort control + * If query does not produce rows tuplesort will not + * be initialized + */ + node->tuplesortstate = tuplesort_begin_merge( + node->tuple_desc, + sort->numCols, + sort->sortColIdx, + sort->sortOperators, + sort->nullsFirst, + node, + work_mem); + /* + * Break the loop, do not wait for first row. + * Tuplesort module want to control node it is + * fetching rows from, while in this loop first + * row would be got from random node + */ + break; + } + } + else if (res == RESPONSE_DATAROW) + { + /* + * Got first data row, quit the loop + */ + node->connections = connections; + node->conn_count = regular_conn_count; + node->current_conn = i; + break; + } + } + } + } + /* If we got an error response return immediately */ + PG_CATCH(); + { + /* We are going to exit, so release combiner */ + if (autocommit) + { + if (need_tran) + DataNodeRollback(DestNone); + else if (!PersistentConnections) + release_handles(); + } + PG_RE_THROW(); + } + PG_END_TRY(); + node->query_Done = true; + node->need_tran = need_tran; + } + + PG_TRY(); + { + bool have_tuple = false; + + if (node->tuplesortstate) + { + while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, + true, scanslot)) + { + have_tuple = true; + /* + * If DISTINCT is specified and current tuple matches to + * previous skip it and get next one. + * Othervise return current tuple + */ + if (step->distinct) + { + /* + * Always receive very first tuple and + * skip to next if scan slot match to previous (result slot) + */ + if (!TupIsNull(resultslot) && + execTuplesMatch(scanslot, + resultslot, + step->distinct->numCols, + step->distinct->uniqColIdx, + node->eqfunctions, + node->tmp_ctx)) + { + have_tuple = false; + continue; + } + } + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + break; + } + if (!have_tuple) + ExecClearTuple(resultslot); + } + else + { + while (node->conn_count > 0 && !have_tuple) + { + int i; + + /* + * If combiner already has tuple go ahead and return it + * otherwise tuple will be cleared + */ + if (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) + { + if (node->simple_aggregates) + { + /* + * Advance aggregate functions and allow to read up next + * data row message and get tuple in the same slot on + * next iteration + */ + exec_simple_aggregates(node, scanslot); + } + else + { + /* + * Receive current slot and read up next data row + * message before exiting the loop. Next time when this + * function is invoked we will have either data row + * message ready or EOF + */ + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; + } + } + + /* + * Handle input to get next row or ensure command is completed, + * starting from connection next after current. If connection + * does not + */ + if ((i = node->current_conn + 1) == node->conn_count) + i = 0; + + for (;;) + { + int res = handle_response(node->connections[i], node); + if (res == RESPONSE_EOF) + { + /* go to next connection */ + if (++i == node->conn_count) + i = 0; + /* if we cycled over all connections we need to receive more */ + if (i == node->current_conn) + data_node_receive(node->conn_count, node->connections, NULL); + } + else if (res == RESPONSE_COMPLETE) + { + if (--node->conn_count == 0) + break; + if (i == node->conn_count) + i = 0; + else + node->connections[i] = node->connections[node->conn_count]; + if (node->current_conn == node->conn_count) + node->current_conn = i; + } + else if (res == RESPONSE_DATAROW) + { + node->current_conn = i; + break; + } + } + } + + /* + * We may need to finalize aggregates + */ + if (!have_tuple && node->simple_aggregates) + { + finish_simple_aggregates(node, resultslot); + if (!TupIsNull(resultslot)) + { + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; + } + } + + if (!have_tuple) /* report end of scan */ + ExecClearTuple(resultslot); + + } + + if (node->errorMessage) + { + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); + } + + /* + * If command is completed we should commit work. + */ + if (node->conn_count == 0 && autocommit && node->need_tran) + DataNodeCommit(DestNone); + } + /* If we got an error response return immediately */ + PG_CATCH(); + { + /* We are going to exit, so release combiner */ + if (autocommit) + { + if (node->need_tran) + DataNodeRollback(DestNone); + else if (!PersistentConnections) + release_handles(); + } + PG_RE_THROW(); + } + PG_END_TRY(); + + return resultslot; +} + +void +ExecEndRemoteQuery(RemoteQueryState *node) +{ + (*node->dest->rShutdown) (node->dest); + if (node->tmp_ctx) + MemoryContextDelete(node->tmp_ctx); + CloseCombiner(node); +} + + +/* + * Called when the backend is ending. + */ +void +DataNodeCleanAndRelease(int code, Datum arg) +{ + /* Rollback on Data Nodes */ + if (IsTransactionState()) + { + DataNodeRollback(DestNone); + + /* Rollback on GTM if transaction id opened. */ + RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny()); + } + + /* Release data node connections */ + release_handles(); + + /* Close connection with GTM */ + CloseGTM(); + + /* Dump collected statistics to the log */ + stat_log(); +} + diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 8c58b62940..077e8f9190 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -77,6 +77,7 @@ #include "pgxc/pgxc.h" #include "access/gtm.h" /* PGXC_COORD */ +#include "pgxc/execRemote.h" #include "pgxc/planner.h" #include "pgxc/datanode.h" #include "commands/copy.h" @@ -198,7 +199,6 @@ static void log_disconnections(int code, Datum arg); #ifdef PGXC /* PGXC_DATANODE */ static void pgxc_transaction_stmt (Node *parsetree); -static List * pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bool snapshot_set, bool *exec_on_coord); /* ---------------------------------------------------------------- * PG-XC routines @@ -900,11 +900,9 @@ exec_simple_query(const char *query_string) DestReceiver *receiver; int16 format; #ifdef PGXC - Query_Plan *query_plan; - Query_Step *query_step; + Query_Plan *query_plan; + RemoteQuery *query_step; bool exec_on_coord; - int data_node_error = 0; - /* * By default we do not want data nodes to contact GTM directly, @@ -977,12 +975,78 @@ exec_simple_query(const char *query_string) pgxc_transaction_stmt(parsetree); else if (IsA(parsetree, ExecDirectStmt)) - querytree_list = pgxc_execute_direct(parsetree, querytree_list, dest, snapshot_set, &exec_on_coord); + { + ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree; + List *inner_parse_tree_list; + + Assert(IS_PGXC_COORDINATOR); + + exec_on_coord = execdirect->coordinator; + + /* + * Switch to appropriate context for constructing parse and + * query trees (these must outlive the execution context). + */ + oldcontext = MemoryContextSwitchTo(MessageContext); + + inner_parse_tree_list = pg_parse_query(execdirect->query); + /* + * we do not support complex commands (expanded to multiple + * parse trees) within EXEC DIRECT + */ + if (list_length(parsetree_list) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Can not execute %s with EXECUTE DIRECT", + execdirect->query))); + } + parsetree = linitial(inner_parse_tree_list); + /* + * Set up a snapshot if parse analysis/planning will need + * one. + */ + if (analyze_requires_snapshot(parsetree)) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + + querytree_list = pg_analyze_and_rewrite(parsetree, + query_string, + NULL, + 0); + + if (execdirect->nodes) + { + ListCell *lc; + Query *query = (Query *) linitial(querytree_list); + + query_plan = (Query_Plan *) palloc0(sizeof(Query_Plan)); + query_step = makeNode(RemoteQuery); + query_step->plan.targetlist = query->targetList; + query_step->sql_statement = pstrdup(execdirect->query); + query_step->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); + foreach (lc, execdirect->nodes) + { + int node = intVal(lfirst(lc)); + query_step->exec_nodes->nodelist = lappend_int(query_step->exec_nodes->nodelist, node); + } + query_step->combine_type = COMBINE_TYPE_SAME; + + query_plan->query_step_list = lappend(NULL, query_step); + query_plan->exec_loc_type = EXEC_ON_DATA_NODES; + } + + /* Restore context */ + MemoryContextSwitchTo(oldcontext); + + } else if (IsA(parsetree, CopyStmt)) { - CopyStmt *copy = (CopyStmt *) parsetree; - bool done; + CopyStmt *copy = (CopyStmt *) parsetree; + uint64 processed; /* Snapshot is needed for the Copy */ if (!snapshot_set) { @@ -994,13 +1058,15 @@ exec_simple_query(const char *query_string) * Datanode or on Coordinator. * If a table has no locator data, then IsCoordPortalCopy returns false and copy is launched * on Coordinator instead (e.g., using pg_catalog tables). - * If a table has some locator data (user tables), then copy is launched normally + * If a table has some locator data (user tables), then copy was launched normally * in Datanodes */ if (!IsCoordPortalCopy(copy)) { - DoCopy(copy, query_string, false); exec_on_coord = false; + processed = DoCopy(copy, query_string, false); + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, processed); } else exec_on_coord = true; @@ -1015,6 +1081,7 @@ exec_simple_query(const char *query_string) /* First execute on the coordinator, if involved (DDL), then data nodes */ } + plantree_list = NIL; if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) #endif plantree_list = pg_plan_queries(querytree_list, 0, NULL); @@ -1036,10 +1103,11 @@ exec_simple_query(const char *query_string) if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment) SetForceXidFromGTM(true); - /* PGXC_COORD */ - /* Force getting Xid from GTM if not autovacuum, but a vacuum */ - /* Skip the Portal stuff on coordinator if command only executes on data nodes */ - if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) + /* + * Create and run Portal only if it is needed. + * In some special cases we have nothing to run at this point + */ + if (plantree_list || query_plan) { #endif @@ -1102,6 +1170,11 @@ exec_simple_query(const char *query_string) */ MemoryContextSwitchTo(oldcontext); +#ifdef PGXC + /* Skip the Portal stuff on coordinator if command only executes on data nodes */ + if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) + { +#endif /* * Run the portal to completion, and then drop it (and the receiver). */ @@ -1112,10 +1185,6 @@ exec_simple_query(const char *query_string) receiver, completionTag); - (*receiver->rDestroy) (receiver); - - PortalDrop(portal, false); - #ifdef PGXC } @@ -1125,36 +1194,45 @@ exec_simple_query(const char *query_string) { if (query_plan && (query_plan->exec_loc_type & EXEC_ON_DATA_NODES)) { + RemoteQueryState *state; + TupleTableSlot *slot; + EState *estate = CreateExecutorState(); + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); query_step = linitial(query_plan->query_step_list); - - data_node_error = DataNodeExec(query_step->sql_statement, - query_step->exec_nodes, - query_step->combine_type, - dest, - snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(), - query_plan->force_autocommit, - query_step->simple_aggregates, - IsA(parsetree, SelectStmt)); - - if (data_node_error) + estate->es_tupleTable = ExecCreateTupleTable(2); + state = ExecInitRemoteQuery(query_step, estate, 0); + state->dest = receiver; + state->completionTag = completionTag; + if (!snapshot_set) { - /* An error occurred, change status on Coordinator, too, - * even if no statements ran on it. - * We want to only allow COMMIT/ROLLBACK - */ - AbortCurrentTransaction(); - xact_started = false; - /* AT() clears active snapshot */ - snapshot_set = false; + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + do + { + slot = ExecRemoteQuery(state); } + while (!TupIsNull(slot)); + + ExecEndRemoteQuery(state); + /* Restore context */ + MemoryContextSwitchTo(oldcontext); } FreeQueryPlan(query_plan); } +#endif /* PGXC_COORD */ + + (*receiver->rDestroy) (receiver); + + PortalDrop(portal, false); + +#ifdef PGXC + } if (snapshot_set) PopActiveSnapshot(); -#endif /* PGXC_COORD */ +#endif if (IsA(parsetree, TransactionStmt)) { @@ -1186,11 +1264,6 @@ exec_simple_query(const char *query_string) */ CommandCounterIncrement(); } -#ifdef PGXC /* PGXC_COORD */ - /* In case of PGXC handling client already received a response */ - if ((IS_PGXC_COORDINATOR && exec_on_coord && !data_node_error) || IS_PGXC_DATANODE) - { -#endif /* * Tell client that we're done with this query. Note we emit exactly @@ -1199,9 +1272,6 @@ exec_simple_query(const char *query_string) * aborted by error will not send an EndCommand report at all.) */ EndCommand(completionTag, dest); -#ifdef PGXC /* PGXC_COORD */ - } -#endif } /* end loop over parsetrees */ /* @@ -4328,80 +4398,4 @@ pgxc_transaction_stmt (Node *parsetree) } } } - - -/* - * Handle EXECUTE DIRECT - */ -List * -pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bool snapshot_set, bool *exec_on_coord) -{ - List *parsetree_list; - ListCell *node_cell; - ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree; - bool on_coord = execdirect->coordinator; - Exec_Nodes *exec_nodes; - - - Assert(IS_PGXC_COORDINATOR); - Assert(IsA(parsetree, ExecDirectStmt)); - - - exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); - - foreach (node_cell, execdirect->nodes) - { - int node_int = intVal(lfirst(node_cell)); - exec_nodes->nodelist = lappend_int(exec_nodes->nodelist, node_int); - } - if (exec_nodes->nodelist) - if (DataNodeExec(execdirect->query, - exec_nodes, - COMBINE_TYPE_SAME, - dest, - snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(), - FALSE, - FALSE, - FALSE) != 0) - on_coord = false; - - if (on_coord) - { - /* - * Parse inner statement, like at the begiining of the function - * We do not have to release wrapper trees, the message context - * will be deleted later - * Also, no need to switch context - current is already - * the MessageContext - */ - parsetree_list = pg_parse_query(execdirect->query); - - /* We do not want to log or display the inner command */ - - /* - * we do not support complex commands (expanded to multiple - * parse trees) within EXEC DIRECT - */ - if (list_length(parsetree_list) != 1) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Can not execute %s with EXECUTE DIRECT", - execdirect->query))); - } - - /* - * Get parse tree from the list - */ - parsetree = (Node *) lfirst(list_head(parsetree_list)); - - /* - * Build new query tree */ - querytree_list = pg_analyze_and_rewrite(parsetree, - execdirect->query, NULL, 0); - } - *exec_on_coord = on_coord; - - return querytree_list; -} #endif /* PGXC */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 268fe5a8c5..0676f0da0d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -55,6 +55,7 @@ #include "parser/scansup.h" #include "pgstat.h" #ifdef PGXC +#include "pgxc/execRemote.h" #include "pgxc/locator.h" #include "pgxc/planner.h" #include "pgxc/poolmgr.h" @@ -1261,7 +1262,7 @@ static struct config_bool ConfigureNamesBool[] = {"strict_select_checking", PGC_USERSET, DEVELOPER_OPTIONS, gettext_noop("Forbid if SELECT has ORDER BY"), gettext_noop("and is not safe for the cluster"), - GUC_NOT_IN_SAMPLE + GUC_NOT_IN_SAMPLE }, &StrictSelectChecking, false, NULL, NULL @@ -1301,7 +1302,7 @@ static struct config_int ConfigureNamesInt[] = gettext_noop("This applies to table columns that have not had a " "column-specific target set via ALTER TABLE SET STATISTICS.") }, - &default_statistics_target, + &default_statistics_target, 100, 1, 10000, NULL, NULL }, { @@ -2008,8 +2009,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &NumDataNodes, - 2, 1, 65535, NULL, NULL - }, + 2, 1, 65535, NULL, NULL + }, { {"min_pool_size", PGC_POSTMASTER, DATA_NODES, @@ -2018,8 +2019,8 @@ static struct config_int ConfigureNamesInt[] = "new connections are established") }, &MinPoolSize, - 1, 1, 65535, NULL, NULL - }, + 1, 1, 65535, NULL, NULL + }, { {"max_pool_size", PGC_POSTMASTER, DATA_NODES, @@ -2028,8 +2029,8 @@ static struct config_int ConfigureNamesInt[] = "other connection requests will be refused") }, &MaxPoolSize, - 100, 1, 65535, NULL, NULL - }, + 100, 1, 65535, NULL, NULL + }, { {"pooler_port", PGC_POSTMASTER, DATA_NODES, @@ -2037,8 +2038,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &PoolerPort, - 6667, 1, 65535, NULL, NULL - }, + 6667, 1, 65535, NULL, NULL + }, { {"gtm_port", PGC_POSTMASTER, GTM, @@ -2046,8 +2047,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &GtmPort, - 6666, 1, 65535, NULL, NULL - }, + 6666, 1, 65535, NULL, NULL + }, { {"gtm_coordinator_id", PGC_POSTMASTER, GTM, @@ -2055,8 +2056,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &GtmCoordinatorId, - 1, 1, INT_MAX, NULL, NULL - }, + 1, 1, INT_MAX, NULL, NULL + }, { {"primary_data_node", PGC_POSTMASTER, DATA_NODES, @@ -2064,8 +2065,8 @@ static struct config_int ConfigureNamesInt[] = NULL }, &primary_data_node, - 1, 0, INT_MAX, NULL, NULL - }, + 1, 0, INT_MAX, NULL, NULL + }, #endif /* End-of-list marker */ { @@ -2624,8 +2625,8 @@ static struct config_string ConfigureNamesString[] = gettext_noop("A list of data nodes to read from replicated tables") }, &PreferredDataNodes, - "", NULL, NULL - }, + "", NULL, NULL + }, { {"data_node_hosts", PGC_POSTMASTER, DATA_NODES, diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c index 9b08df366d..8161e9bc92 100644 --- a/src/backend/utils/mmgr/mcxt.c +++ b/src/backend/utils/mmgr/mcxt.c @@ -507,7 +507,7 @@ MemoryContextAlloc(MemoryContext context, Size size) AssertArg(MemoryContextIsValid(context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); return (*context->methods->alloc) (context, size); @@ -528,7 +528,7 @@ MemoryContextAllocZero(MemoryContext context, Size size) AssertArg(MemoryContextIsValid(context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); ret = (*context->methods->alloc) (context, size); @@ -553,7 +553,7 @@ MemoryContextAllocZeroAligned(MemoryContext context, Size size) AssertArg(MemoryContextIsValid(context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); ret = (*context->methods->alloc) (context, size); @@ -617,7 +617,7 @@ repalloc(void *pointer, Size size) AssertArg(MemoryContextIsValid(header->context)); if (!AllocSizeIsValid(size)) - elog(ERROR, "invalid memory alloc request size %lu", + elog(PANIC, "invalid memory alloc request size %lu", (unsigned long) size); return (*header->context->methods->realloc) (header->context, diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 6d07296d59..c506ce064f 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -107,6 +107,9 @@ #include "commands/tablespace.h" #include "miscadmin.h" #include "pg_trace.h" +#ifdef PGXC +#include "pgxc/execRemote.h" +#endif #include "utils/datum.h" #include "utils/logtape.h" #include "utils/lsyscache.h" @@ -121,6 +124,9 @@ #define HEAP_SORT 0 #define INDEX_SORT 1 #define DATUM_SORT 2 +#ifdef PGXC +#define MERGE_SORT 3 +#endif /* GUC variables */ #ifdef TRACE_SORT @@ -213,6 +219,9 @@ struct Tuplesortstate int tapeRange; /* maxTapes-1 (Knuth's P) */ MemoryContext sortcontext; /* memory context holding all sort data */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ +#ifdef PGXC + RemoteQueryState *combiner; /* tuple source, alternate to tapeset */ +#endif /* * These function pointers decouple the routines that must know what kind @@ -253,6 +262,14 @@ struct Tuplesortstate void (*readtup) (Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); +#ifdef PGXC + /* + * Function to read length of next stored tuple. + * Used as 'len' parameter for readtup function. + */ + unsigned int (*getlen) (Tuplesortstate *state, int tapenum, bool eofOK); +#endif + /* * Function to reverse the sort direction from its current state. (We * could dispense with this if we wanted to enforce that all variants @@ -378,6 +395,9 @@ struct Tuplesortstate #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) +#ifdef PGXC +#define GETLEN(state,tape,eofOK) ((*(state)->getlen) (state, tape, eofOK)) +#endif #define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state)) #define LACKMEM(state) ((state)->availMem < 0) #define USEMEM(state,amt) ((state)->availMem -= (amt)) @@ -450,6 +470,12 @@ static void writetup_heap(Tuplesortstate *state, int tapenum, static void readtup_heap(Tuplesortstate *state, SortTuple *stup, int tapenum, unsigned int len); static void reversedirection_heap(Tuplesortstate *state); +#ifdef PGXC +static unsigned int getlen_datanode(Tuplesortstate *state, int tapenum, + bool eofOK); +static void readtup_datanode(Tuplesortstate *state, SortTuple *stup, + int tapenum, unsigned int len); +#endif static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, @@ -587,6 +613,9 @@ tuplesort_begin_heap(TupleDesc tupDesc, state->copytup = copytup_heap; state->writetup = writetup_heap; state->readtup = readtup_heap; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_heap; state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ @@ -657,6 +686,9 @@ tuplesort_begin_index_btree(Relation indexRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_index_btree; state->indexRel = indexRel; @@ -692,6 +724,9 @@ tuplesort_begin_index_hash(Relation indexRel, state->copytup = copytup_index; state->writetup = writetup_index; state->readtup = readtup_index; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_index_hash; state->indexRel = indexRel; @@ -736,6 +771,9 @@ tuplesort_begin_datum(Oid datumType, state->copytup = copytup_datum; state->writetup = writetup_datum; state->readtup = readtup_datum; +#ifdef PGXC + state->getlen = getlen; +#endif state->reversedirection = reversedirection_datum; state->datumType = datumType; @@ -762,6 +800,121 @@ tuplesort_begin_datum(Oid datumType, return state; } +#ifdef PGXC +/* + * Tuples are coming from source where they are already sorted. + * It is pretty much like sorting heap tuples but no need to load sorter. + * Sorter initial status is final merge, and correct readtup and getlen + * callbacks should be passed in. + * Usage pattern of the merge sorter + * tuplesort_begin_merge + * while (tuple = tuplesort_gettuple()) + * { + * // process + * } + * tuplesort_end_merge + */ +Tuplesortstate * +tuplesort_begin_merge(TupleDesc tupDesc, + int nkeys, AttrNumber *attNums, + Oid *sortOperators, bool *nullsFirstFlags, + RemoteQueryState *combiner, + int workMem) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, false); + MemoryContext oldcontext; + int i; + + oldcontext = MemoryContextSwitchTo(state->sortcontext); + + AssertArg(nkeys > 0); + AssertArg(combiner); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin merge sort: nkeys = %d, workMem = %d", nkeys, workMem); +#endif + + state->nKeys = nkeys; + + TRACE_POSTGRESQL_SORT_START(MERGE_SORT, + false, /* no unique check */ + nkeys, + workMem, + false); + + state->combiner = combiner; + state->comparetup = comparetup_heap; + state->copytup = NULL; + state->writetup = NULL; + state->readtup = readtup_datanode; + state->getlen = getlen_datanode; + state->reversedirection = reversedirection_heap; + + state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ + state->scanKeys = (ScanKey) palloc0(nkeys * sizeof(ScanKeyData)); + + for (i = 0; i < nkeys; i++) + { + Oid sortFunction; + bool reverse; + + AssertArg(attNums[i] != 0); + AssertArg(sortOperators[i] != 0); + + if (!get_compare_function_for_ordering_op(sortOperators[i], + &sortFunction, &reverse)) + elog(ERROR, "operator %u is not a valid ordering operator", + sortOperators[i]); + + /* + * We needn't fill in sk_strategy or sk_subtype since these scankeys + * will never be passed to an index. + */ + ScanKeyInit(&state->scanKeys[i], + attNums[i], + InvalidStrategy, + sortFunction, + (Datum) 0); + + /* However, we use btree's conventions for encoding directionality */ + if (reverse) + state->scanKeys[i].sk_flags |= SK_BT_DESC; + if (nullsFirstFlags[i]) + state->scanKeys[i].sk_flags |= SK_BT_NULLS_FIRST; + } + + /* + * logical tape in this case is a sorted stream + */ + state->maxTapes = combiner->conn_count; + state->tapeRange = combiner->conn_count; + + state->mergeactive = (bool *) palloc0(combiner->conn_count * sizeof(bool)); + state->mergenext = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->mergelast = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->mergeavailslots = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->mergeavailmem = (long *) palloc0(combiner->conn_count * sizeof(long)); + + state->tp_runs = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->tp_dummy = (int *) palloc0(combiner->conn_count * sizeof(int)); + state->tp_tapenum = (int *) palloc0(combiner->conn_count * sizeof(int)); + /* mark each stream (tape) has one run */ + for (i = 0; i < combiner->conn_count; i++) + { + state->tp_runs[i] = 1; + state->tp_tapenum[i] = i; + } + beginmerge(state); + state->status = TSS_FINALMERGE; + + MemoryContextSwitchTo(oldcontext); + + return state; +} +#endif + /* * tuplesort_set_bound * @@ -1296,7 +1449,6 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, sizeof(unsigned int))) return false; tuplen = getlen(state, state->result_tape, false); - /* * Back up to get ending length word of tuple before it. */ @@ -2005,7 +2157,11 @@ mergeprereadone(Tuplesortstate *state, int srcTape) state->mergenext[srcTape] == 0) { /* read next tuple, if any */ +#ifdef PGXC + if ((tuplen = GETLEN(state, srcTape, true)) == 0) +#else if ((tuplen = getlen(state, srcTape, true)) == 0) +#endif { state->mergeactive[srcTape] = false; break; @@ -2468,7 +2624,6 @@ markrunend(Tuplesortstate *state, int tapenum) LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); } - /* * Set up for an external caller of ApplySortFunction. This function * basically just exists to localize knowledge of the encoding of sk_flags @@ -2716,6 +2871,56 @@ reversedirection_heap(Tuplesortstate *state) } } +#ifdef PGXC +static unsigned int +getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK) +{ + for (;;) + { + switch (handle_response(state->combiner->connections[tapenum], state->combiner)) + { + case RESPONSE_EOF: + data_node_receive(1, state->combiner->connections + tapenum, NULL); + break; + case RESPONSE_COMPLETE: + if (eofOK) + return 0; + else + elog(ERROR, "unexpected end of data"); + break; + case RESPONSE_DATAROW: + return state->combiner->msglen; + default: + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unexpected response from the data nodes"))); + } + } +} + +static void +readtup_datanode(Tuplesortstate *state, SortTuple *stup, + int tapenum, unsigned int len) +{ + TupleTableSlot *slot = state->combiner->ss.ss_ScanTupleSlot; + MinimalTuple tuple; + HeapTupleData htup; + + FetchTuple(state->combiner, slot); + + /* copy the tuple into sort storage */ + tuple = ExecCopySlotMinimalTuple(slot); + stup->tuple = (void *) tuple; + USEMEM(state, GetMemoryChunkSpace(tuple)); + /* set up first-column key value */ + htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; + htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); + stup->datum1 = heap_getattr(&htup, + state->scanKeys[0].sk_attno, + state->tupDesc, + &stup->isnull1); +} +#endif /* * Routines specialized for IndexTuple case diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index 879a310c68..c85b1b0c61 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -118,6 +118,15 @@ typedef struct TupleTableSlot bool tts_shouldFreeMin; /* should pfree tts_mintuple? */ bool tts_slow; /* saved state for slot_deform_tuple */ HeapTuple tts_tuple; /* physical tuple, or NULL if virtual */ +#ifdef PGXC + /* + * PGXC extension to support tuples sent from remote data node. + */ + char *tts_dataRow; /* Tuple data in DataRow format */ + int tts_dataLen; /* Actual length of the data row */ + bool tts_shouldFreeRow; /* should pfree tts_dataRow? */ + struct AttInMetadata *tts_attinmeta; /* store here info to extract values from the DataRow */ +#endif TupleDesc tts_tupleDescriptor; /* slot's tuple descriptor */ MemoryContext tts_mcxt; /* slot itself is in this context */ Buffer tts_buffer; /* tuple's buffer, or InvalidBuffer */ @@ -165,6 +174,12 @@ extern TupleTableSlot *ExecStoreTuple(HeapTuple tuple, extern TupleTableSlot *ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree); +#ifdef PGXC +extern TupleTableSlot *ExecStoreDataRowTuple(char *msg, + size_t len, + TupleTableSlot *slot, + bool shouldFree); +#endif extern TupleTableSlot *ExecClearTuple(TupleTableSlot *slot); extern TupleTableSlot *ExecStoreVirtualTuple(TupleTableSlot *slot); extern TupleTableSlot *ExecStoreAllNullTuple(TupleTableSlot *slot); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 078b6733e7..4f1c59f8bc 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -72,6 +72,9 @@ typedef enum NodeTag T_Hash, T_SetOp, T_Limit, +#ifdef PGXC + T_RemoteQuery, +#endif /* this one isn't a subclass of Plan: */ T_PlanInvalItem, @@ -110,6 +113,9 @@ typedef enum NodeTag T_HashState, T_SetOpState, T_LimitState, +#ifdef PGXC + T_RemoteQueryState, +#endif /* * TAGS FOR PRIMITIVE NODES (primnodes.h) diff --git a/src/include/pgxc/combiner.h b/src/include/pgxc/combiner.h deleted file mode 100644 index f977c4ab88..0000000000 --- a/src/include/pgxc/combiner.h +++ /dev/null @@ -1,73 +0,0 @@ -/*------------------------------------------------------------------------- - * - * combiner.h - * - * Combine responses from multiple Data Nodes - * - * - * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ? - * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation - * - * IDENTIFICATION - * $$ - * - *------------------------------------------------------------------------- - */ - -#ifndef COMBINER_H -#define COMBINER_H - -#include "postgres.h" -#include "tcop/dest.h" - -typedef enum -{ - COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */ - COMBINE_TYPE_SUM, /* sum row counts (partitioned, round robin) */ - COMBINE_TYPE_SAME /* expect all row counts to be the same (replicated write) */ -} CombineType; - -typedef enum -{ - REQUEST_TYPE_NOT_DEFINED, /* not determined yet */ - REQUEST_TYPE_COMMAND, /* OK or row count response */ - REQUEST_TYPE_QUERY, /* Row description response */ - REQUEST_TYPE_COPY_IN, /* Copy In response */ - REQUEST_TYPE_COPY_OUT /* Copy Out response */ -} RequestType; - - -typedef struct -{ - int node_count; - CombineType combine_type; - CommandDest dest; - int command_complete_count; - int row_count; - RequestType request_type; - int description_count; - uint64 copy_in_count; - uint64 copy_out_count; - bool inErrorState; - /* - * While we are not supporting grouping use this flag to indicate we need - * to initialize collecting of aggregates from the DNs - */ - bool initAggregates; - List *simple_aggregates; - FILE *copy_file; /* used if copy_dest == COPY_FILE */ -} ResponseCombinerData; - - -typedef ResponseCombinerData *ResponseCombiner; - -extern ResponseCombiner CreateResponseCombiner(int node_count, - CombineType combine_type, CommandDest dest); -extern int CombineResponse(ResponseCombiner combiner, char msg_type, - char *msg_body, size_t len); -extern bool ValidateAndCloseCombiner(ResponseCombiner combiner); -extern bool ValidateAndResetCombiner(ResponseCombiner combiner); -extern void AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates); -extern void CloseCombiner(ResponseCombiner combiner); - -#endif /* COMBINER_H */ diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h index 28c5d8748e..ab95022b5f 100644 --- a/src/include/pgxc/datanode.h +++ b/src/include/pgxc/datanode.h @@ -16,9 +16,9 @@ #ifndef DATANODE_H #define DATANODE_H -#include "combiner.h" +#include "postgres.h" +#include "gtm/gtm_c.h" #include "nodes/pg_list.h" -#include "pgxc/locator.h" #include "utils/snapshot.h" #include <unistd.h> @@ -28,23 +28,23 @@ typedef struct PGconn NODE_CONNECTION; /* Helper structure to access data node from Session */ typedef enum { - DN_CONNECTION_STATE_IDLE, - DN_CONNECTION_STATE_BUSY, - DN_CONNECTION_STATE_COMPLETED, + DN_CONNECTION_STATE_IDLE, /* idle, ready for query */ + DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */ + DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */ + DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */ DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */ - DN_CONNECTION_STATE_ERROR_READY, /* error and received ReadyForQuery */ - DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */ + DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */ DN_CONNECTION_STATE_COPY_IN, DN_CONNECTION_STATE_COPY_OUT } DNConnectionState; #define DN_CONNECTION_STATE_ERROR(dnconn) \ (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \ - || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY \ - || (dnconn)->state == DN_CONNECTION_STATE_ERROR_READY + || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY struct data_node_handle { + int nodenum; /* node identifier 1..NumDataNodes */ /* fd of the connection */ int sock; /* Connection state */ @@ -75,17 +75,25 @@ extern int DataNodeConnected(NODE_CONNECTION * conn); extern int DataNodeConnClean(NODE_CONNECTION * conn); extern void DataNodeCleanAndRelease(int code, Datum arg); -/* Multinode Executor */ -extern void DataNodeBegin(void); -extern int DataNodeCommit(CommandDest dest); -extern int DataNodeRollback(CommandDest dest); +extern DataNodeHandle **get_handles(List *nodelist); +extern void release_handles(void); +extern int get_transaction_nodes(DataNodeHandle ** connections); -extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only); +extern int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle); +extern int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle); -extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); -extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); -extern int DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, CommandDest dest, FILE* copy_file); -extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type, CommandDest dest); +extern int data_node_send_query(DataNodeHandle * handle, const char *query); +extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid); +extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot); + +extern void data_node_receive(const int conn_count, + DataNodeHandle ** connections, struct timeval * timeout); +extern int data_node_read_data(DataNodeHandle * conn); +extern int send_some(DataNodeHandle * handle, int len); +extern int data_node_flush(DataNodeHandle *handle); + +extern char get_message(DataNodeHandle *conn, int *len, char **msg); + +extern void add_error_message(DataNodeHandle * handle, const char *message); -extern int primary_data_node; #endif diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h new file mode 100644 index 0000000000..d99806a01b --- /dev/null +++ b/src/include/pgxc/execRemote.h @@ -0,0 +1,101 @@ +/*------------------------------------------------------------------------- + * + * execRemote.h + * + * Functions to execute commands on multiple Data Nodes + * + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ? + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * IDENTIFICATION + * $$ + * + *------------------------------------------------------------------------- + */ + +#ifndef EXECREMOTE_H +#define EXECREMOTE_H +#include "datanode.h" +#include "locator.h" +#include "planner.h" +#include "access/tupdesc.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" +#include "nodes/pg_list.h" +#include "tcop/dest.h" +#include "utils/snapshot.h" + +/* Outputs of handle_response() */ +#define RESPONSE_EOF EOF +#define RESPONSE_COMPLETE 0 +#define RESPONSE_TUPDESC 1 +#define RESPONSE_DATAROW 2 +#define RESPONSE_COPY 3 + +typedef enum +{ + REQUEST_TYPE_NOT_DEFINED, /* not determined yet */ + REQUEST_TYPE_COMMAND, /* OK or row count response */ + REQUEST_TYPE_QUERY, /* Row description response */ + REQUEST_TYPE_COPY_IN, /* Copy In response */ + REQUEST_TYPE_COPY_OUT /* Copy Out response */ +} RequestType; + + +typedef struct RemoteQueryState +{ + ScanState ss; /* its first field is NodeTag */ + int node_count; /* total count of participating nodes */ + DataNodeHandle **connections; /* data node connections being combined */ + int conn_count; /* count of active connections */ + int current_conn; /* used to balance load when reading from connections */ + CombineType combine_type; /* see CombineType enum */ + DestReceiver *dest; /* output destination */ + int command_complete_count; /* count of received CommandComplete messages */ + uint64 row_count; /* how many rows affected by the query */ + RequestType request_type; /* see RequestType enum */ + TupleDesc tuple_desc; /* tuple descriptor to be referenced by emitted tuples */ + int description_count; /* count of received RowDescription messages */ + int copy_in_count; /* count of received CopyIn messages */ + int copy_out_count; /* count of received CopyOut messages */ + char errorCode[5]; /* error code to send back to client */ + char *errorMessage; /* error message to send back to client */ + bool query_Done; /* query has been sent down to data nodes */ + bool need_tran; /* auto commit on nodes after completion */ + char *completionTag; /* completion tag to present to caller */ + char *msg; /* last data row message */ + int msglen; /* length of the data row message */ + /* + * While we are not supporting grouping use this flag to indicate we need + * to initialize collecting of aggregates from the DNs + */ + bool initAggregates; + List *simple_aggregates; /* description of aggregate functions */ + void *tuplesortstate; /* for merge sort */ + /* Simple DISTINCT support */ + FmgrInfo *eqfunctions; /* functions to compare tuples */ + MemoryContext tmp_ctx; /* separate context is needed to compare tuples */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ +} RemoteQueryState; + +/* Multinode Executor */ +extern void DataNodeBegin(void); +extern int DataNodeCommit(CommandDest dest); +extern int DataNodeRollback(CommandDest dest); + +extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); +extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); +extern uint64 DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file); +extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type); + +extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags); +extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step); +extern void ExecEndRemoteQuery(RemoteQueryState *step); + +extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner); +extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); + +extern int primary_data_node; + +#endif
\ No newline at end of file diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index 4920aace0a..47665b2341 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -17,27 +17,59 @@ #include "fmgr.h" #include "lib/stringinfo.h" +#include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "pgxc/locator.h" -#include "pgxc/combiner.h" +#include "tcop/dest.h" /* for Query_Plan.exec_loc_type can have these OR'ed*/ #define EXEC_ON_COORD 0x1 #define EXEC_ON_DATA_NODES 0x2 +typedef enum +{ + COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */ + COMBINE_TYPE_SUM, /* sum row counts (partitioned, round robin) */ + COMBINE_TYPE_SAME /* expect all row counts to be the same (replicated write) */ +} CombineType; + +/* For sorting within RemoteQuery handling */ +/* + * It is pretty much like Sort, but without Plan. We may use Sort later. + */ +typedef struct +{ + int numCols; /* number of sort-key columns */ + AttrNumber *sortColIdx; /* their indexes in the target list */ + Oid *sortOperators; /* OIDs of operators to sort them by */ + bool *nullsFirst; /* NULLS FIRST/LAST directions */ +} SimpleSort; + +/* For returning distinct results from the RemoteQuery*/ +typedef struct +{ + int numCols; /* number of sort-key columns */ + AttrNumber *uniqColIdx; /* their indexes in the target list */ + Oid *eqOperators; /* OIDs of operators to equate them by */ +} SimpleDistinct; + /* Contains instructions on processing a step of a query. * In the prototype this will be simple, but it will eventually * evolve into a GridSQL-style QueryStep. */ typedef struct { + Plan plan; char *sql_statement; Exec_Nodes *exec_nodes; CombineType combine_type; - List *simple_aggregates; /* simple aggregate to combine on this - * step */ -} Query_Step; + List *simple_aggregates; /* simple aggregate to combine on this step */ + SimpleSort *sort; + SimpleDistinct *distinct; + bool read_only; /* do not use 2PC when committing read only steps */ + bool force_autocommit; /* some commands like VACUUM require autocommit mode */ +} RemoteQuery; /* @@ -48,7 +80,6 @@ typedef struct typedef struct { int exec_loc_type; - bool force_autocommit; /* For CREATE DATABASE */ List *query_step_list; /* List of QuerySteps */ } Query_Plan; @@ -67,7 +98,6 @@ typedef enum /* For handling simple aggregates */ -/* For now, only support int/long types */ typedef struct { int column_pos; /* Only use 1 for now */ diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index e351536d47..e2b3c033e0 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -23,6 +23,9 @@ #include "access/itup.h" #include "executor/tuptable.h" #include "fmgr.h" +#ifdef PGXC +#include "pgxc/execRemote.h" +#endif #include "utils/relcache.h" @@ -64,6 +67,13 @@ extern Tuplesortstate *tuplesort_begin_index_hash(Relation indexRel, extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, Oid sortOperator, bool nullsFirstFlag, int workMem, bool randomAccess); +#ifdef PGXC +extern Tuplesortstate *tuplesort_begin_merge(TupleDesc tupDesc, + int nkeys, AttrNumber *attNums, + Oid *sortOperators, bool *nullsFirstFlags, + RemoteQueryState *combiner, + int workMem); +#endif extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound); |
