diff options
| -rw-r--r-- | src/backend/access/transam/xact.c | 20 | ||||
| -rw-r--r-- | src/backend/executor/execTuples.c | 10 | ||||
| -rw-r--r-- | src/backend/nodes/copyfuncs.c | 3 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 596 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 963 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 361 | ||||
| -rw-r--r-- | src/backend/utils/cache/lsyscache.c | 2 | ||||
| -rw-r--r-- | src/backend/utils/sort/tuplesort.c | 100 | ||||
| -rw-r--r-- | src/include/executor/tuptable.h | 2 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 42 | ||||
| -rw-r--r-- | src/include/pgxc/pgxcnode.h | 23 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 1 | ||||
| -rw-r--r-- | src/include/utils/lsyscache.h | 3 |
13 files changed, 1642 insertions, 484 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 262fd3b3d4..2030a0079a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2280,6 +2280,21 @@ AbortTransaction(void) */ SetUserIdAndContext(s->prevUser, s->prevSecDefCxt); +#ifdef PGXC + /* + * We should rollback on the data nodes before cleaning up portals + * to be sure data structures used by connections are not freed yet + */ + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + /* + * Make sure this is rolled back on the DataNodes + * if so it will just return + */ + PGXCNodeRollback(); + } +#endif + /* * do abort processing */ @@ -2300,11 +2315,6 @@ AbortTransaction(void) /* This is done by remote Coordinator */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { - /* - * Make sure this is rolled back on the DataNodes - * if so it will just return - */ - PGXCNodeRollback(); RollbackTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; } diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index a6c496eee9..0c43e9479c 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -153,6 +153,7 @@ ExecCreateTupleTable(int tableSize) slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; slot->tts_attinmeta = NULL; #endif slot->tts_mcxt = CurrentMemoryContext; @@ -238,6 +239,7 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc) slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; slot->tts_attinmeta = NULL; #endif slot->tts_mcxt = CurrentMemoryContext; @@ -440,6 +442,7 @@ ExecStoreTuple(HeapTuple tuple, slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; #endif /* @@ -509,6 +512,7 @@ ExecStoreMinimalTuple(MinimalTuple mtup, slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; #endif /* @@ -547,7 +551,8 @@ ExecStoreMinimalTuple(MinimalTuple mtup, * -------------------------------- */ TupleTableSlot * -ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree) +ExecStoreDataRowTuple(char *msg, size_t len, int node, TupleTableSlot *slot, + bool shouldFree) { /* * sanity checks @@ -586,6 +591,7 @@ ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFr slot->tts_mintuple = NULL; slot->tts_dataRow = msg; slot->tts_dataLen = len; + slot->tts_dataNode = node; /* Mark extracted state invalid */ slot->tts_nvalid = 0; @@ -624,6 +630,7 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */ slot->tts_shouldFreeRow = false; slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; #endif slot->tts_tuple = NULL; @@ -976,6 +983,7 @@ ExecMaterializeSlot(TupleTableSlot *slot) { slot->tts_dataRow = NULL; slot->tts_dataLen = -1; + slot->tts_dataNode = 0; } #endif diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 78c5543dab..aa0587f9c7 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -839,6 +839,7 @@ _copyRemoteQuery(RemoteQuery *from) COPY_NODE_FIELD(distinct); COPY_SCALAR_FIELD(read_only); COPY_SCALAR_FIELD(force_autocommit); + COPY_STRING_FIELD(cursor); COPY_STRING_FIELD(relname); COPY_SCALAR_FIELD(remotejoin); @@ -888,7 +889,7 @@ _copySimpleAgg(SimpleAgg *from) COPY_SCALAR_FIELD(transfn); COPY_SCALAR_FIELD(finalfn); if (!from->initValueIsNull) - newnode->initValue = datumCopy(from->initValue, from->transtypeByVal, + newnode->initValue = datumCopy(from->initValue, from->transtypeByVal, from->transtypeLen); COPY_SCALAR_FIELD(initValueIsNull); COPY_SCALAR_FIELD(inputtypeLen); diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 5ea6d8a0a5..af368e424a 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -31,6 +31,7 @@ #include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" +#include "pgxc/execRemote.h" #include "pgxc/locator.h" #include "pgxc/planner.h" #include "tcop/pquery.h" @@ -38,6 +39,7 @@ #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/portal.h" #include "utils/syscache.h" @@ -123,7 +125,7 @@ typedef struct XCWalkerContext { Query *query; bool isRead; - ExecNodes *exec_nodes; /* resulting execution nodes */ + RemoteQuery *query_step; /* remote query step being analized */ Special_Conditions *conditions; bool multilevel_join; List *rtables; /* a pointer to a list of rtables */ @@ -141,13 +143,44 @@ bool StrictStatementChecking = true; /* Forbid multi-node SELECT statements with an ORDER BY clause */ bool StrictSelectChecking = false; -static ExecNodes *get_plan_nodes(Query *query, bool isRead); +static void get_plan_nodes(Query *query, RemoteQuery *step, bool isRead); static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); static void InitXCWalkerContext(XCWalkerContext *context); /* + * Find position of specified substring in the string + * All non-printable symbols of str treated as spaces, all letters as uppercase + * Returns pointer to the beginning of the substring or NULL + */ +static char * +strpos(char *str, char *substr) +{ + char copy[strlen(str) + 1]; + char *src = str; + char *dst = copy; + + /* + * Initialize mutable copy, converting letters to uppercase and + * various witespace characters to spaces + */ + while (*src) + { + if (isspace(*src)) + { + src++; + *dst++ = ' '; + } + else + *dst++ = toupper(*src++); + } + *dst = '\0'; + dst = strstr(copy, substr); + return dst ? str + (dst - copy) : NULL; +} + +/* * True if both lists contain only one node and are the same */ static bool @@ -509,7 +542,7 @@ get_plan_nodes_insert(Query *query) * Get list of parent-child joins (partitioned together) * Get list of joins with replicated tables * - * If we encounter an expression such as a cross-node join that cannot + * If we encounter an expression such as a cross-node join that cannot * be easily handled in a single step, we stop processing and return true, * otherwise false. * @@ -536,6 +569,242 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) if (!context->conditions) context->conditions = new_special_conditions(); + /* Handle UPDATE/DELETE ... WHERE CURRENT OF ... */ + if (IsA(expr_node, CurrentOfExpr)) + { + /* Find referenced portal and figure out what was the last fetch node */ + Portal portal; + QueryDesc *queryDesc; + PlanState *state; + CurrentOfExpr *cexpr = (CurrentOfExpr *) expr_node; + char *cursor_name = cexpr->cursor_name; + char *node_cursor; + + /* Find the cursor's portal */ + portal = GetPortalByName(cursor_name); + if (!PortalIsValid(portal)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_CURSOR), + errmsg("cursor \"%s\" does not exist", cursor_name))); + + queryDesc = PortalGetQueryDesc(portal); + if (queryDesc == NULL || queryDesc->estate == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_STATE), + errmsg("cursor \"%s\" is held from a previous transaction", + cursor_name))); + + /* + * The cursor must have a current result row: per the SQL spec, it's + * an error if not. + */ + if (portal->atStart || portal->atEnd) + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_STATE), + errmsg("cursor \"%s\" is not positioned on a row", + cursor_name))); + + state = ExecGetActivePlanTree(queryDesc); + if (IsA(state, RemoteQueryState)) + { + RemoteQueryState *node = (RemoteQueryState *) state; + RemoteQuery *step = (RemoteQuery *) state->plan; + + /* + * 1. step query: SELECT * FROM <table> WHERE ctid = <cur_ctid>, + * <cur_ctid> is taken from the scantuple ot the target step + * step node list: current node of the target step. + * 2. step query: DECLARE <xxx> CURSOR FOR SELECT * FROM <table> + * WHERE <col1> = <val1> AND <col2> = <val2> ... FOR UPDATE + * <xxx> is generated from cursor name of the target step, + * <col> and <val> pairs are taken from the step 1. + * step node list: all nodes of <table> + * 3. step query: MOVE <xxx> + * step node list: all nodes of <table> + */ + RangeTblEntry *table = (RangeTblEntry *) linitial(context->query->rtable); + node_cursor = step->cursor; + rel_loc_info1 = GetRelationLocInfo(table->relid); + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; + context->query_step->exec_nodes->baselocatortype = rel_loc_info1->locatorType; + if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED) + { + RemoteQuery *step1, *step2, *step3; + /* + * We do not need first three steps if cursor already exists and + * positioned. + */ + if (node->update_cursor) + { + step3 = NULL; + node_cursor = node->update_cursor; + } + else + { + char *tableName = get_rel_name(table->relid); + int natts = get_relnatts(table->relid); + char *attnames[natts]; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + /* + * ctid is the last attribute, but more correct to iterate over + * attributes and find by name, or store index for table + */ + Datum ctid = slot->tts_values[slot->tts_tupleDescriptor->natts - 1]; + char *ctid_str = (char *) DirectFunctionCall1(tidout, ctid); + int nodenum = slot->tts_dataNode; + AttrNumber att; + StringInfoData buf; + HeapTuple tp; + int i; + MemoryContext context_save; + + initStringInfo(&buf); + + /* Step 1: select tuple values by ctid */ + step1 = makeNode(RemoteQuery); + appendStringInfoString(&buf, "SELECT "); + for (att = 1; att <= natts; att++) + { + TargetEntry *tle; + Var *expr; + + tp = SearchSysCache(ATTNUM, + ObjectIdGetDatum(table->relid), + Int16GetDatum(att), + 0, 0); + if (HeapTupleIsValid(tp)) + { + Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp); + + /* add comma before all except first attributes */ + if (att > 1) + appendStringInfoString(&buf, ", "); + attnames[att-1] = pstrdup(NameStr(att_tup->attname)); + appendStringInfoString(&buf, attnames[att - 1]); + expr = makeVar(att, att, att_tup->atttypid, + att_tup->atttypmod, 0); + tle = makeTargetEntry((Expr *) expr, att, + attnames[att - 1], false); + step1->scan.plan.targetlist = lappend(step1->scan.plan.targetlist, tle); + ReleaseSysCache(tp); + } + else + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + att, table->relid); + } + appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'", + tableName, ctid_str); + step1->sql_statement = pstrdup(buf.data); + step1->is_single_step = true; + step1->exec_nodes = makeNode(ExecNodes); + step1->read_only = true; + step1->exec_nodes->nodelist = list_make1_int(nodenum); + + /* Step 2: declare cursor for update target table */ + step2 = makeNode(RemoteQuery); + resetStringInfo(&buf); + + appendStringInfoString(&buf, step->cursor); + appendStringInfoString(&buf, "upd"); + /* This need to survive while the target Portal is alive */ + context_save = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + node_cursor = pstrdup(buf.data); + node->update_cursor = node_cursor; + MemoryContextSwitchTo(context_save); + resetStringInfo(&buf); + + appendStringInfo(&buf, + "DECLARE %s CURSOR FOR SELECT * FROM %s WHERE ", + node_cursor, tableName); + for (i = 0; i < natts; i++) + { + /* add comma before all except first attributes */ + if (i) + appendStringInfoString(&buf, "AND "); + appendStringInfo(&buf, "%s = $%d ", attnames[i], i+1); + } + appendStringInfoString(&buf, "FOR UPDATE"); + step2->sql_statement = pstrdup(buf.data); + step2->is_single_step = true; + step2->read_only = true; + step2->exec_nodes = makeNode(ExecNodes); + step2->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); + innerPlan(step2) = (Plan *) step1; + /* Step 3: move cursor to first position */ + step3 = makeNode(RemoteQuery); + resetStringInfo(&buf); + appendStringInfo(&buf, "MOVE %s", node_cursor); + step3->sql_statement = pstrdup(buf.data); + step3->is_single_step = true; + step3->read_only = true; + step3->exec_nodes = makeNode(ExecNodes); + step3->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); + innerPlan(step3) = (Plan *) step2; + + innerPlan(context->query_step) = (Plan *) step3; + + pfree(buf.data); + } + context->query_step->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); + } + else + { + /* Take target node from last scan tuple of referenced step */ + int curr_node = node->ss.ss_ScanTupleSlot->tts_dataNode; + context->query_step->exec_nodes->nodelist = lappend_int(context->query_step->exec_nodes->nodelist, curr_node); + } + FreeRelationLocInfo(rel_loc_info1); + + context->query_step->is_single_step = true; + /* + * replace cursor name in the query if differs + */ + if (strcmp(cursor_name, node_cursor)) + { + StringInfoData buf; + char *str = context->query->sql_statement; + /* + * Find last occurence of cursor_name + */ + for (;;) + { + char *next = strstr(str + 1, cursor_name); + if (next) + str = next; + else + break; + } + + /* + * now str points to cursor name truncate string here + * do not care the string is modified - we will pfree it + * soon anyway + */ + *str = '\0'; + + /* and move str at the beginning of the reminder */ + str += strlen(cursor_name); + + /* build up new statement */ + initStringInfo(&buf); + appendStringInfoString(&buf, context->query->sql_statement); + appendStringInfoString(&buf, node_cursor); + appendStringInfoString(&buf, str); + + /* take the result */ + pfree(context->query->sql_statement); + context->query->sql_statement = buf.data; + } + return false; + } + // ??? current plan node is not a remote query + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->exec_on_coord = true; + return false; + } + if (IsA(expr_node, Var)) { /* If we get here, that meant the previous call before recursing down did not @@ -800,13 +1069,13 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) bool is_multilevel; int save_parent_child_count = 0; SubLink *sublink = (SubLink *) expr_node; - ExecNodes *save_exec_nodes = context->exec_nodes; /* Save old exec_nodes */ + ExecNodes *save_exec_nodes = context->query_step->exec_nodes; /* Save old exec_nodes */ /* save parent-child count */ - if (context->exec_nodes) + if (context->query_step->exec_nodes) save_parent_child_count = list_length(context->conditions->partitioned_parent_child); - context->exec_nodes = NULL; + context->query_step->exec_nodes = NULL; context->multilevel_join = false; current_rtable = ((Query *) sublink->subselect)->rtable; @@ -823,31 +1092,31 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) context->multilevel_join = false; /* Allow for replicated tables */ - if (!context->exec_nodes) - context->exec_nodes = save_exec_nodes; + if (!context->query_step->exec_nodes) + context->query_step->exec_nodes = save_exec_nodes; else { if (save_exec_nodes) { - if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) + if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) { - context->exec_nodes = save_exec_nodes; + context->query_step->exec_nodes = save_exec_nodes; } else { if (save_exec_nodes->tableusagetype != TABLE_USAGE_TYPE_USER_REPLICATED) { /* See if they run on the same node */ - if (same_single_node (context->exec_nodes->nodelist, save_exec_nodes->nodelist)) + if (same_single_node (context->query_step->exec_nodes->nodelist, save_exec_nodes->nodelist)) return false; } else /* use old value */ - context->exec_nodes = save_exec_nodes; + context->query_step->exec_nodes = save_exec_nodes; } } else { - if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) + if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) return false; /* See if subquery safely joins with parent */ if (!is_multilevel) @@ -986,8 +1255,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (contains_only_pg_catalog (query->rtable)) { /* just pg_catalog tables */ - context->exec_nodes = makeNode(ExecNodes); - context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; context->exec_on_coord = true; return false; } @@ -1005,7 +1274,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (rte->rtekind == RTE_SUBQUERY) { - ExecNodes *save_exec_nodes = context->exec_nodes; + ExecNodes *save_exec_nodes = context->query_step->exec_nodes; Special_Conditions *save_conditions = context->conditions; /* Save old conditions */ List *current_rtable = rte->subquery->rtable; @@ -1025,8 +1294,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) context->rtables = list_delete_ptr(context->rtables, current_rtable); context->conditions = save_conditions; - current_nodes = context->exec_nodes; - context->exec_nodes = save_exec_nodes; + current_nodes = context->query_step->exec_nodes; + context->query_step->exec_nodes = save_exec_nodes; if (current_nodes) current_usage_type = current_nodes->tableusagetype; @@ -1103,8 +1372,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* If we are just dealing with pg_catalog, just return */ if (table_usage_type == TABLE_USAGE_TYPE_PGCATALOG) { - context->exec_nodes = makeNode(ExecNodes); - context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; context->exec_on_coord = true; return false; } @@ -1113,6 +1382,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (examine_conditions_walker(query->jointree->quals, context)) return true; + if (context->query_step->exec_nodes) + return false; + /* Examine join conditions, see if each join is single-node safe */ if (context->join_list != NULL) { @@ -1174,27 +1446,27 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) if (!rel_loc_info) return true; - context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); + context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); } } else { - context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); + context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead); } /* Note replicated table usage for determining safe queries */ - if (context->exec_nodes) + if (context->query_step->exec_nodes) { if (table_usage_type == TABLE_USAGE_TYPE_USER && IsReplicated(rel_loc_info)) table_usage_type = TABLE_USAGE_TYPE_USER_REPLICATED; - context->exec_nodes->tableusagetype = table_usage_type; + context->query_step->exec_nodes->tableusagetype = table_usage_type; } } /* check for partitioned col comparison against a literal */ else if (list_length(context->conditions->partitioned_literal_comps) > 0) { - context->exec_nodes = NULL; + context->query_step->exec_nodes = NULL; /* * Make sure that if there are multiple such comparisons, that they @@ -1208,11 +1480,11 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) lit_comp->rel_loc_info, &(lit_comp->constant), true); test_exec_nodes->tableusagetype = table_usage_type; - if (context->exec_nodes == NULL) - context->exec_nodes = test_exec_nodes; + if (context->query_step->exec_nodes == NULL) + context->query_step->exec_nodes = test_exec_nodes; else { - if (!same_single_node(context->exec_nodes->nodelist, test_exec_nodes->nodelist)) + if (!same_single_node(context->query_step->exec_nodes->nodelist, test_exec_nodes->nodelist)) { return true; } @@ -1231,22 +1503,22 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) parent_child = (Parent_Child_Join *) linitial(context->conditions->partitioned_parent_child); - context->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead); - context->exec_nodes->tableusagetype = table_usage_type; + context->query_step->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead); + context->query_step->exec_nodes->tableusagetype = table_usage_type; } if (from_query_nodes) { - if (!context->exec_nodes) + if (!context->query_step->exec_nodes) { - context->exec_nodes = from_query_nodes; + context->query_step->exec_nodes = from_query_nodes; return false; } /* Just use exec_nodes if the from subqueries are all replicated or using the exact * same node */ else if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED - || (same_single_node(from_query_nodes->nodelist, context->exec_nodes->nodelist))) + || (same_single_node(from_query_nodes->nodelist, context->query_step->exec_nodes->nodelist))) return false; else { @@ -1254,7 +1526,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) * but the parent query applies a condition on the from subquery. */ if (list_length(query->jointree->fromlist) == from_subquery_count - && list_length(context->exec_nodes->nodelist) == 1) + && list_length(context->query_step->exec_nodes->nodelist) == 1) return false; } /* Too complicated, give up */ @@ -1270,8 +1542,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) static void InitXCWalkerContext(XCWalkerContext *context) { + context->query = NULL; context->isRead = true; - context->exec_nodes = NULL; + context->query_step = NULL; context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); context->rtables = NIL; context->multilevel_join = false; @@ -1286,27 +1559,26 @@ InitXCWalkerContext(XCWalkerContext *context) * Top level entry point before walking query to determine plan nodes * */ -static ExecNodes * -get_plan_nodes(Query *query, bool isRead) +static void +get_plan_nodes(Query *query, RemoteQuery *step, bool isRead) { - ExecNodes *result_nodes = NULL; XCWalkerContext context; InitXCWalkerContext(&context); + context.query = query; context.isRead = isRead; + context.query_step = step; context.rtables = lappend(context.rtables, query->rtable); - if (!get_plan_nodes_walker((Node *) query, &context)) - result_nodes = context.exec_nodes; - if (context.exec_on_coord && result_nodes) + if ((get_plan_nodes_walker((Node *) query, &context) + || context.exec_on_coord) && context.query_step->exec_nodes) { - pfree(result_nodes); - result_nodes = NULL; + pfree(context.query_step->exec_nodes); + context.query_step->exec_nodes = NULL; } free_special_relations(context.conditions); free_join_list(context.join_list); - return result_nodes; } @@ -1315,32 +1587,25 @@ get_plan_nodes(Query *query, bool isRead) * * return NULL if it is not safe to be done in a single step. */ -static ExecNodes * -get_plan_nodes_command(Query *query) +static void +get_plan_nodes_command(Query *query, RemoteQuery *step) { - ExecNodes *exec_nodes = NULL; - switch (query->commandType) { case CMD_SELECT: - exec_nodes = get_plan_nodes(query, true); + get_plan_nodes(query, step, true); break; case CMD_INSERT: - exec_nodes = get_plan_nodes_insert(query); + step->exec_nodes = get_plan_nodes_insert(query); break; case CMD_UPDATE: case CMD_DELETE: /* treat as a select */ - exec_nodes = get_plan_nodes(query, false); + get_plan_nodes(query, step, false); break; - - default: - return NULL; } - - return exec_nodes; } @@ -1645,8 +1910,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, List *sub_tlist = step->scan.plan.targetlist; ListCell *l; StringInfo buf = makeStringInfo(); - char *sql; - char *cur; char *sql_from; context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL); @@ -1677,23 +1940,9 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, * Do not handle the case if " FROM " we found is not a "FROM" keyword, but, * for example, a part of string constant. */ - sql = pstrdup(step->sql_statement); /* mutable copy */ - /* string to upper case, for comparing */ - cur = sql; - while (*cur) - { - /* replace whitespace with a space */ - if (isspace((unsigned char) *cur)) - *cur = ' '; - *cur++ = toupper(*cur); - } - - /* find the keyword */ - sql_from = strstr(sql, " FROM "); + sql_from = strpos(step->sql_statement, " FROM "); if (sql_from) { - /* the same offset in the original string */ - int offset = sql_from - sql; /* * Truncate query at the position of terminating semicolon to be able * to append extra order by entries. If query is submitted from client @@ -1705,7 +1954,7 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, if (*end == ';') *end = '\0'; - appendStringInfoString(buf, step->sql_statement + offset); + appendStringInfoString(buf, sql_from); } if (extra_sort) @@ -1728,9 +1977,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, } } - /* do not need the copy */ - pfree(sql); - /* free previous query */ pfree(step->sql_statement); /* get a copy of new query */ @@ -1742,6 +1988,81 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, /* + * Traverse the plan subtree and set cursor name for RemoteQuery nodes + * Cursor names must be unique, so append step_no parameter to the initial + * cursor name. Returns next step_no to be assigned + */ +static int +set_cursor_name(Plan *subtree, char *cursor, int step_no) +{ + if (innerPlan(subtree)) + step_no = set_cursor_name(innerPlan(subtree), cursor, step_no); + if (outerPlan(subtree)) + step_no = set_cursor_name(outerPlan(subtree), cursor, step_no); + if (IsA(subtree, RemoteQuery)) + { + RemoteQuery *step = (RemoteQuery *) subtree; + /* + * Keep the name for the very first step, hoping it is the only step and + * we do not have to modify WHERE CURRENT OF + */ + if (step_no) + { + StringInfoData buf; + initStringInfo(&buf); + appendStringInfo(&buf, "%s%d", cursor, step_no++); + /* make a copy before overwriting */ + step->cursor = buf.data; + } + else + { + step_no++; + step->cursor = pstrdup(cursor); + } + } + return step_no; +} + +/* + * Append ctid to the field list of step queries to support update + * WHERE CURRENT OF. The ctid is not sent down to client but used as a key + * to find target tuple + */ +static void +fetch_ctid_of(Plan *subtree, RowMarkClause *rmc) +{ + /* recursively process subnodes */ + if (innerPlan(subtree)) + fetch_ctid_of(innerPlan(subtree), rmc); + if (outerPlan(subtree)) + fetch_ctid_of(outerPlan(subtree), rmc); + + /* we are only interested in RemoteQueries */ + if (IsA(subtree, RemoteQuery)) + { + RemoteQuery *step = (RemoteQuery *) subtree; + /* + * TODO Find if the table is referenced by the step query + */ + + char *from_sql = strpos(step->sql_statement, " FROM "); + if (from_sql) + { + StringInfoData buf; + + initStringInfo(&buf); + appendBinaryStringInfo(&buf, step->sql_statement, + (int) (from_sql - step->sql_statement)); + /* TODO qualify with the table name */ + appendStringInfoString(&buf, ", ctid"); + appendStringInfoString(&buf, from_sql); + pfree(step->sql_statement); + step->sql_statement = buf.data; + } + } +} + +/* * Plan to sort step tuples * PGXC: copied and adopted from optimizer/plan/planner.c */ @@ -2114,44 +2435,9 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) query_step = makeNode(RemoteQuery); query_step->is_single_step = false; - /* - * Declare Cursor case: - * We should leave as a step query only SELECT statement - * Further if we need refer source statement for planning we should take - * the truncated string - */ - if (query->utilityStmt && + if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) - { - - char *src = query->sql_statement; - char str[strlen(src) + 1]; /* mutable copy */ - char *dst = str; - - cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options; - - /* - * Initialize mutable copy, converting letters to uppercase and - * various witespace characters to spaces - */ - while (*src) - { - if (isspace(*src)) - { - src++; - *dst++ = ' '; - } - else - *dst++ = toupper(*src++); - } - *dst = '\0'; - /* search for SELECT keyword in the normalized string */ - dst = strstr(str, " SELECT "); - /* Take substring of the original string using found offset */ - query_step->sql_statement = pstrdup(query->sql_statement + (dst - str + 1)); - } - else - query_step->sql_statement = pstrdup(query->sql_statement); + cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options; query_step->exec_nodes = NULL; query_step->combine_type = COMBINE_TYPE_NONE; @@ -2187,7 +2473,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) if (query->commandType != CMD_SELECT) result->resultRelations = list_make1_int(query->resultRelation); - query_step->exec_nodes = get_plan_nodes_command(query); + get_plan_nodes_command(query, query_step); if (query_step->exec_nodes == NULL) { @@ -2217,26 +2503,29 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) } /* - * Use standard plan if we have more than one data node with either - * group by, hasWindowFuncs, or hasRecursive + * get_plan_nodes_command may alter original statement, so do not + * process it before the call + * + * Declare Cursor case: + * We should leave as a step query only SELECT statement + * Further if we need refer source statement for planning we should take + * the truncated string */ - /* - * PGXCTODO - this could be improved to check if the first - * group by expression is the partitioning column, in which - * case it is ok to treat as a single step. - */ - if (query->commandType == CMD_SELECT - && query_step->exec_nodes - && list_length(query_step->exec_nodes->nodelist) > 1 - && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) + if (query->utilityStmt && + IsA(query->utilityStmt, DeclareCursorStmt)) { - result = standard_planner(query, cursorOptions, boundParams); - return result; + + /* search for SELECT keyword in the normalized string */ + char *select = strpos(query->sql_statement, " SELECT "); + /* Take substring of the original string using found offset */ + query_step->sql_statement = pstrdup(select + 1); } + else + query_step->sql_statement = pstrdup(query->sql_statement); /* - * If there already is an active portal, we may be doing planning - * within a function. Just use the standard plan, but check if + * If there already is an active portal, we may be doing planning + * within a function. Just use the standard plan, but check if * it is part of an EXPLAIN statement so that we do not show that * we plan multiple steps when it is a single-step operation. */ @@ -2285,6 +2574,24 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) result = standard_planner(query, cursorOptions, boundParams); return result; } + + /* + * Use standard plan if we have more than one data node with either + * group by, hasWindowFuncs, or hasRecursive + */ + /* + * PGXCTODO - this could be improved to check if the first + * group by expression is the partitioning column, in which + * case it is ok to treat as a single step. + */ + if (query->commandType == CMD_SELECT + && query_step->exec_nodes + && list_length(query_step->exec_nodes->nodelist) > 1 + && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) + { + result->planTree = standardPlan; + return result; + } break; default: @@ -2307,6 +2614,33 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) result->planTree = materialize_finished_plan(result->planTree); } + /* + * Support for multi-step cursor. + * Ensure uniqueness of remote cursor name + */ + if (query->utilityStmt && + IsA(query->utilityStmt, DeclareCursorStmt)) + { + DeclareCursorStmt *stmt = (DeclareCursorStmt *) query->utilityStmt; + set_cursor_name(result->planTree, stmt->portalname, 0); + } + + /* + * If query is FOR UPDATE fetch CTIDs from the remote node + * Use CTID as a key to update tuples on remote nodes when handling + * WHERE CURRENT OF + */ + if (query->rowMarks) + { + ListCell *lc; + foreach(lc, query->rowMarks) + { + RowMarkClause *rmc = (RowMarkClause *) lfirst(lc); + + fetch_ctid_of(result->planTree, rmc); + } + } + return result; } @@ -2321,6 +2655,8 @@ free_query_step(RemoteQuery *query_step) return; pfree(query_step->sql_statement); + if (query_step->cursor) + pfree(query_step->cursor); if (query_step->exec_nodes) { if (query_step->exec_nodes->nodelist) diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 7fe08beacb..a524c13f6d 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -35,6 +35,8 @@ #define END_QUERY_TIMEOUT 20 #define CLEAR_TIMEOUT 5 +#define DATA_NODE_FETCH_SIZE 1 + extern char *deparseSql(RemoteQueryState *scanstate); @@ -70,6 +72,8 @@ static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles); static int handle_response_clear(PGXCNodeHandle * conn); +static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor); + static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void); #define MAX_STATEMENTS_PER_TRAN 10 @@ -200,8 +204,11 @@ CreateResponseCombiner(int node_count, CombineType combine_type) combiner->copy_out_count = 0; combiner->errorMessage = NULL; combiner->query_Done = false; - combiner->msg = NULL; - combiner->msglen = 0; + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + combiner->rowBuffer = NIL; + combiner->tapenodes = NULL; combiner->initAggregates = true; combiner->simple_aggregates = NULL; combiner->copy_file = NULL; @@ -671,10 +678,10 @@ HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) * Caller must stop reading if function returns false */ static void -HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) +HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len, int node) { /* We expect previous message is consumed */ - Assert(combiner->msg == NULL); + Assert(combiner->currentRow.msg == NULL); if (combiner->request_type != REQUEST_TYPE_QUERY) { @@ -695,9 +702,10 @@ HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) * We are copying message because it points into connection buffer, and * will be overwritten on next socket read */ - combiner->msg = (char *) palloc(len); - memcpy(combiner->msg, msg_body, len); - combiner->msglen = len; + combiner->currentRow.msg = (char *) palloc(len); + memcpy(combiner->currentRow.msg, msg_body, len); + combiner->currentRow.msglen = len; + combiner->currentRow.msgnode = node; } /* @@ -850,6 +858,10 @@ CloseCombiner(RemoteQueryState *combiner) FreeTupleDesc(combiner->tuple_desc); if (combiner->errorMessage) pfree(combiner->errorMessage); + if (combiner->cursor_connections) + pfree(combiner->cursor_connections); + if (combiner->tapenodes) + pfree(combiner->tapenodes); pfree(combiner); } } @@ -874,15 +886,24 @@ static bool ValidateAndResetCombiner(RemoteQueryState *combiner) { bool valid = validate_combiner(combiner); + ListCell *lc; if (combiner->connections) pfree(combiner->connections); if (combiner->tuple_desc) FreeTupleDesc(combiner->tuple_desc); - if (combiner->msg) - pfree(combiner->msg); + if (combiner->currentRow.msg) + pfree(combiner->currentRow.msg); + foreach(lc, combiner->rowBuffer) + { + RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc); + pfree(dataRow->msg); + } + list_free_deep(combiner->rowBuffer); if (combiner->errorMessage) pfree(combiner->errorMessage); + if (combiner->tapenodes) + pfree(combiner->tapenodes); combiner->command_complete_count = 0; combiner->connections = NULL; @@ -894,8 +915,11 @@ ValidateAndResetCombiner(RemoteQueryState *combiner) combiner->copy_out_count = 0; combiner->errorMessage = NULL; combiner->query_Done = false; - combiner->msg = NULL; - combiner->msglen = 0; + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + combiner->rowBuffer = NIL; + combiner->tapenodes = NULL; combiner->simple_aggregates = NULL; combiner->copy_file = NULL; @@ -903,22 +927,247 @@ ValidateAndResetCombiner(RemoteQueryState *combiner) } /* + * It is possible if multiple steps share the same data node connection, when + * executor is running multi-step query or client is running multiple queries + * using Extended Query Protocol. After returning next tuple ExecRemoteQuery + * function passes execution control to the executor and then it can be given + * to the same RemoteQuery or to different one. It is possible that before + * returning a tuple the function do not read all data node responses. In this + * case pending responses should be read in context of original RemoteQueryState + * till ReadyForQuery message and data rows should be stored (buffered) to be + * available when fetch from that RemoteQueryState is requested again. + * BufferConnection function does the job. + * If a RemoteQuery is going to use connection it should check connection state. + * DN_CONNECTION_STATE_QUERY indicates query has data to read and combiner + * points to the original RemoteQueryState. If combiner differs from "this" the + * connection should be buffered. + */ +void +BufferConnection(PGXCNodeHandle *conn) +{ + RemoteQueryState *combiner = conn->combiner; + /* + * When BufferConnection is invoked CurrentContext is related to other + * portal, which is trying to control the connection. + * TODO See if we can find better context to switch to + */ + MemoryContext oldcontext = MemoryContextSwitchTo(combiner->ss.ss_ScanTupleSlot->tts_mcxt); + + Assert(conn->state == DN_CONNECTION_STATE_QUERY && combiner); + + /* Verify the connection is in use by the combiner */ + combiner->current_conn = 0; + while (combiner->current_conn < combiner->conn_count) + { + if (combiner->connections[combiner->current_conn] == conn) + break; + combiner->current_conn++; + } + Assert(combiner->current_conn < combiner->conn_count); + + /* + * Buffer data rows until data node return number of rows specified by the + * fetch_size parameter of last Execute message (PortalSuspended message) + * or end of result set is reached (CommandComplete message) + */ + while (conn->state == DN_CONNECTION_STATE_QUERY) + { + int res; + + /* Move to buffer currentRow (received from the data node) */ + if (combiner->currentRow.msg) + { + RemoteDataRow dataRow = (RemoteDataRow) palloc(sizeof(RemoteDataRowData)); + *dataRow = combiner->currentRow; + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + combiner->rowBuffer = lappend(combiner->rowBuffer, dataRow); + } + + res = handle_response(conn, combiner); + /* + * If response message is a DataRow it will be handled on the next + * iteration. + * PortalSuspended will cause connection state change and break the loop + * The same is for CommandComplete, but we need additional handling - + * remove connection from the list of active connections. + * We may need to add handling error response + */ + if (res == RESPONSE_EOF) + { + /* incomplete message, read more */ + if (pgxc_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + continue; + } + else if (res == RESPONSE_COMPLETE) + { + /* + * End of result set is reached, so either set the pointer to the + * connection to NULL (step with sort) or remove it from the list + * (step without sort) + */ + if (combiner->tuplesortstate) + { + combiner->connections[combiner->current_conn] = NULL; + if (combiner->tapenodes == NULL) + combiner->tapenodes = (int*) palloc0(NumDataNodes * sizeof(int)); + combiner->tapenodes[combiner->current_conn] = conn->nodenum; + } + else + /* Remove current connection, move last in-place, adjust current_conn */ + if (combiner->current_conn < --combiner->conn_count) + combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count]; + else + combiner->current_conn = 0; + } + /* + * Before output RESPONSE_COMPLETE or PORTAL_SUSPENDED handle_response() + * changes connection state to DN_CONNECTION_STATE_IDLE, breaking the + * loop. We do not need to do anything specific in case of + * PORTAL_SUSPENDED so skiping "else if" block for that case + */ + } + MemoryContextSwitchTo(oldcontext); + conn->combiner = NULL; +} + +/* * Get next data row from the combiner's buffer into provided slot - * Just clear slot and return false if buffer is empty, that means more data - * should be read + * Just clear slot and return false if buffer is empty, that means end of result + * set is reached */ bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot) { - /* have messages in the buffer, consume them */ - if (combiner->msg) + bool have_tuple = false; + + while (combiner->conn_count > 0) { - ExecStoreDataRowTuple(combiner->msg, combiner->msglen, slot, true); - combiner->msg = NULL; - combiner->msglen = 0; + PGXCNodeHandle *conn; + int res; + + /* If we have message in the buffer, consume it */ + if (combiner->currentRow.msg) + { + ExecStoreDataRowTuple(combiner->currentRow.msg, + combiner->currentRow.msglen, + combiner->currentRow.msgnode, slot, true); + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; + have_tuple = true; + } + /* + * If this is ordered fetch we can not know what is the node + * to handle next, so sorter will choose next itself and set it as + * currentRow to have it consumed on the next call to FetchTuple + */ + if (((RemoteQuery *)combiner->ss.ps.plan)->sort) + return have_tuple; + + /* + * Note: If we are fetching not sorted results we can not have both + * currentRow and buffered rows. When connection is buffered currentRow + * is moved to buffer, and then it is cleaned after buffering is + * completed. Afterwards rows will be taken from the buffer bypassing + * currentRow until buffer is empty, and only after that data are read + * from a connection. + */ + if (list_length(combiner->rowBuffer) > 0) + { + RemoteDataRow dataRow = (RemoteDataRow) linitial(combiner->rowBuffer); + combiner->rowBuffer = list_delete_first(combiner->rowBuffer); + ExecStoreDataRowTuple(dataRow->msg, dataRow->msglen, + dataRow->msgnode, slot, true); + pfree(dataRow); + return true; + } + + conn = combiner->connections[combiner->current_conn]; + + /* Going to use a connection, buffer it if needed */ + if (conn->state == DN_CONNECTION_STATE_QUERY && conn->combiner != NULL + && conn->combiner != combiner) + BufferConnection(conn); + + /* + * If current connection is idle it means portal on the data node is + * suspended. If we have a tuple do not hurry to request more rows, + * leave connection clean for other RemoteQueries. + * If we do not have, request more and try to get it + */ + if (conn->state == DN_CONNECTION_STATE_IDLE) + { + /* + * If we have tuple to return do not hurry to request more, keep + * connection clean + */ + if (have_tuple) + return have_tuple; + else + { + if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + if (pgxc_node_send_sync(conn) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + if (pgxc_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + conn->combiner = combiner; + } + } + + /* read messages */ + res = handle_response(conn, combiner); + if (res == RESPONSE_EOF) + { + /* incomplete message, read more */ + if (pgxc_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node"))); + continue; + } + else if (res == RESPONSE_SUSPENDED) + { + /* Make next connection current */ + if (++combiner->current_conn >= combiner->conn_count) + combiner->current_conn = 0; + } + else if (res == RESPONSE_COMPLETE) + { + /* Remove current connection, move last in-place, adjust current_conn */ + if (combiner->current_conn < --combiner->conn_count) + combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count]; + else + combiner->current_conn = 0; + } + + /* If we have a tuple we can leave now. */ + if (have_tuple) + return true; + } + /* Wrap up last message if exists */ + if (combiner->currentRow.msg) + { + ExecStoreDataRowTuple(combiner->currentRow.msg, + combiner->currentRow.msglen, + combiner->currentRow.msgnode, slot, true); + combiner->currentRow.msg = NULL; + combiner->currentRow.msglen = 0; + combiner->currentRow.msgnode = 0; return true; } - /* inform caller that buffer is empty */ + /* otherwise report end of data to the caller */ ExecClearTuple(slot); return false; } @@ -986,10 +1235,11 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections, * Long term, we should look into cancelling executing statements * and closing the connections. * Return values: - * EOF - need to receive more data for the connection - * 0 - done with the connection - * 1 - got data row - * 2 - got copy response + * RESPONSE_EOF - need to receive more data for the connection + * RESPONSE_COMPLETE - done with the connection + * RESPONSE_TUPLEDESC - got tuple description + * RESPONSE_DATAROW - got data row + * RESPONSE_COPY - got copy response */ int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) @@ -997,38 +1247,40 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) char *msg; int msg_len; char msg_type; + bool suspended = false; for (;;) { - /* No data available, exit */ - if (conn->state == DN_CONNECTION_STATE_QUERY) - return RESPONSE_EOF; + Assert(conn->state != DN_CONNECTION_STATE_IDLE); + Assert(conn->combiner == combiner || conn->combiner == NULL); /* * If we are in the process of shutting down, we - * may be rolling back, and the buffer may contain other messages. - * We want to avoid a procarray exception - * as well as an error stack overflow. - */ + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ if (proc_exit_inprogress) - { conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + + /* don't read from from the connection if there is a fatal error */ + if (conn->state == DN_CONNECTION_STATE_ERROR_FATAL) + return RESPONSE_COMPLETE; + + /* No data available, exit */ + if (!HAS_MESSAGE_BUFFERED(conn)) return RESPONSE_EOF; - } /* TODO handle other possible responses */ msg_type = get_message(conn, &msg_len, &msg); switch (msg_type) { case '\0': /* Not enough data in the buffer */ - conn->state = DN_CONNECTION_STATE_QUERY; return RESPONSE_EOF; case 'c': /* CopyToCommandComplete */ - conn->state = DN_CONNECTION_STATE_COMPLETED; HandleCopyOutComplete(combiner); break; case 'C': /* CommandComplete */ - conn->state = DN_CONNECTION_STATE_COMPLETED; HandleCommandComplete(combiner, msg, msg_len); break; case 'T': /* RowDescription */ @@ -1043,8 +1295,17 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) #ifdef DN_CONNECTION_DEBUG Assert(conn->have_row_desc); #endif - HandleDataRow(combiner, msg, msg_len); + HandleDataRow(combiner, msg, msg_len, conn->nodenum); return RESPONSE_DATAROW; + case 's': /* PortalSuspended */ + suspended = true; + break; + case '1': /* ParseComplete */ + case '2': /* BindComplete */ + case '3': /* CloseComplete */ + case 'n': /* NoData */ + /* simple notifications, continue reading */ + break; case 'G': /* CopyInResponse */ conn->state = DN_CONNECTION_STATE_COPY_IN; HandleCopyIn(combiner); @@ -1060,7 +1321,6 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) break; case 'E': /* ErrorResponse */ HandleError(combiner, msg, msg_len); - conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; /* * Do not return with an error, we still need to consume Z, * ready-for-query @@ -1074,12 +1334,22 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) */ break; case 'Z': /* ReadyForQuery */ + { + /* + * Return result depends on previous connection state. + * If it was PORTAL_SUSPENDED coordinator want to send down + * another EXECUTE to fetch more rows, otherwise it is done + * with the connection + */ + int result = suspended ? RESPONSE_SUSPENDED : RESPONSE_COMPLETE; conn->transaction_status = msg[0]; conn->state = DN_CONNECTION_STATE_IDLE; + conn->combiner = NULL; #ifdef DN_CONNECTION_DEBUG conn->have_row_desc = false; #endif - return RESPONSE_COMPLETE; + return result; + } case 'I': /* EmptyQuery */ default: /* sync lost? */ @@ -1092,6 +1362,7 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) return RESPONSE_EOF; } + /* * Like handle_response, but for consuming the messages, * in case we of an error to clean the data node connection. @@ -1138,8 +1409,8 @@ handle_response_clear(PGXCNodeHandle * conn) case 'N': /* NoticeResponse */ break; case 'E': /* ErrorResponse */ - conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; /* + * conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; * Do not return with an error, we still need to consume Z, * ready-for-query */ @@ -1176,6 +1447,8 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections, /* Send BEGIN */ for (i = 0; i < conn_count; i++) { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid)) return EOF; @@ -1372,7 +1645,7 @@ finish: rollback_xid = BeginTranGTM(NULL); - /* + /* * Send xid and rollback prepared down to Datanodes and Coordinators * Even if we get an error on one, we try and send to the others */ @@ -1398,7 +1671,7 @@ finish: /* - * Commit prepared transaction on Datanodes and Coordinators (as necessary) + * Commit prepared transaction on Datanodes and Coordinators (as necessary) * where it has been prepared. * Connection to backends has been cut when transaction has been prepared, * So it is necessary to send the COMMIT PREPARE message to all the nodes. @@ -1831,18 +2104,6 @@ pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles) int co_conn_count = pgxc_handles->co_conn_count; int dn_conn_count = pgxc_handles->dn_conn_count; - /* - * Rollback is a special case, being issued because of an error. - * We try to read and throw away any extra data on the connection before - * issuing our rollbacks so that we did not read the results of the - * previous command. - */ - for (i = 0; i < dn_conn_count; i++) - clear_socket_data(pgxc_handles->datanode_handles[i]); - - for (i = 0; i < co_conn_count; i++) - clear_socket_data(pgxc_handles->coord_handles[i]); - /* Send ROLLBACK to all handles */ if (pgxc_all_handles_send_query(pgxc_handles, "ROLLBACK", false)) result = EOF; @@ -1963,6 +2224,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ /* Send query to nodes */ for (i = 0; i < conn_count; i++) { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); /* If explicit transaction is needed gxid is already sent */ if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) { @@ -2369,7 +2632,6 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) RemoteQueryState *remotestate; Relation currentRelation; - remotestate = CreateResponseCombiner(0, node->combine_type); remotestate->ss.ps.plan = (Plan *) node; remotestate->ss.ps.state = estate; @@ -2409,6 +2671,10 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); } + + if (innerPlan(node)) + innerPlanState(remotestate) = ExecInitNode(innerPlan(node), estate, eflags); + if (outerPlan(node)) outerPlanState(remotestate) = ExecInitNode(outerPlan(node), estate, eflags); @@ -2425,7 +2691,9 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) if (src->tts_mcxt == dst->tts_mcxt) { /* now dst slot controls the backing message */ - ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, dst, src->tts_shouldFreeRow); + ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, + src->tts_dataNode, dst, + src->tts_shouldFreeRow); src->tts_shouldFreeRow = false; } else @@ -2433,10 +2701,11 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) /* have to make a copy */ MemoryContext oldcontext = MemoryContextSwitchTo(dst->tts_mcxt); int len = src->tts_dataLen; + int node = src->tts_dataNode; char *msg = (char *) palloc(len); memcpy(msg, src->tts_dataRow, len); - ExecStoreDataRowTuple(msg, len, dst, true); + ExecStoreDataRowTuple(msg, len, node, dst, true); MemoryContextSwitchTo(oldcontext); } } @@ -2610,20 +2879,21 @@ ExecRemoteQuery(RemoteQueryState *node) int total_conn_count; bool need_tran; PGXCNodeAllHandles *pgxc_connections; + TupleTableSlot *innerSlot = NULL; /* - * If coordinator plan is specified execute it first. - * If the plan is returning we are returning these tuples immediately. - * If it is not returning or returned them all by current invocation - * we will go ahead and execute remote query. Then we will never execute - * the outer plan again because node->query_Done flag will be set and - * execution won't get to that place. + * Inner plan for RemoteQuery supplies parameters. + * We execute inner plan to get a tuple and use values of the tuple as + * parameter values when executing this remote query. + * If returned slot contains NULL tuple break execution. + * TODO there is a problem how to handle the case if both inner and + * outer plans exist. We can decide later, since it is never used now. */ - if (outerPlanState(node)) + if (innerPlanState(node)) { - TupleTableSlot *slot = ExecProcNode(outerPlanState(node)); - if (!TupIsNull(slot)) - return slot; + innerSlot = ExecProcNode(innerPlanState(node)); +// if (TupIsNull(innerSlot)) +// return innerSlot; } /* @@ -2712,6 +2982,9 @@ ExecRemoteQuery(RemoteQueryState *node) /* See if we have a primary node, execute on it first before the others */ if (primaryconnection) { + if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(primaryconnection); + /* If explicit transaction is needed gxid is already sent */ if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) { @@ -2751,7 +3024,6 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - Assert(node->combine_type == COMBINE_TYPE_SAME); while (node->command_complete_count < 1) @@ -2760,11 +3032,7 @@ ExecRemoteQuery(RemoteQueryState *node) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to read response from data nodes"))); - while (handle_response(primaryconnection, node) == RESPONSE_EOF) - if (pgxc_node_receive(1, &primaryconnection, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); + handle_response(primaryconnection, node); if (node->errorMessage) { char *code = node->errorCode; @@ -2777,6 +3045,8 @@ ExecRemoteQuery(RemoteQueryState *node) for (i = 0; i < regular_conn_count; i++) { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); /* If explicit transaction is needed gxid is already sent */ if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) { @@ -2811,106 +3081,174 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) + if (step->cursor || innerSlot) { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); + int fetch = 0; + /* + * execute and fetch rows only if they will be consumed + * immediately by the sorter + */ + if (step->cursor) + fetch = 1; + /* + * Here is ad hoc handling of WHERE CURRENT OF clause. + * Previous step have returned the key values to update + * replicated tuple into innerSlot and the query is a + * DECLARE CURSOR. We do not want to execute this query now + * because of unusual fetch procedure: we just need to + * position cursor on the first row on each node. + * So here we just parse and bind query. As the result cursor + * specified by the statement will be created, and next step + * will issue MOVE command positioning the cursor properly. + * If we want to use parametrized requests for any other + * purpose we should return and rework this code + */ + if (pgxc_node_send_query_extended(connections[i], + step->sql_statement, + NULL, + step->cursor, + innerSlot ? innerSlot->tts_dataLen : 0, + innerSlot ? innerSlot->tts_dataRow : NULL, + step->cursor != NULL, + fetch) != 0) + { + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } } + else + { + if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) + { + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + } + connections[i]->combiner = node; } + if (step->cursor) + { + node->cursor = step->cursor; + node->cursor_count = regular_conn_count; + node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *)); + memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *)); + } + + /* + * Stop if all commands are completed or we got a data row and + * initialized state node for subsequent invocations + */ + while (regular_conn_count > 0 && node->connections == NULL) + { + int i = 0; + + if (pgxc_node_receive(regular_conn_count, connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); /* - * Stop if all commands are completed or we got a data row and - * initialized state node for subsequent invocations + * Handle input from the data nodes. + * If we got a RESPONSE_DATAROW we can break handling to wrap + * it into a tuple and return. Handling will be continued upon + * subsequent invocations. + * If we got 0, we exclude connection from the list. We do not + * expect more input from it. In case of non-SELECT query we quit + * the loop when all nodes finish their work and send ReadyForQuery + * with empty connections array. + * If we got EOF, move to the next connection, will receive more + * data on the next iteration. */ - while (regular_conn_count > 0 && node->connections == NULL) + while (i < regular_conn_count) { - int i = 0; - - if (pgxc_node_receive(regular_conn_count, connections, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); - /* - * Handle input from the data nodes. - * If we got a RESPONSE_DATAROW we can break handling to wrap - * it into a tuple and return. Handling will be continued upon - * subsequent invocations. - * If we got 0, we exclude connection from the list. We do not - * expect more input from it. In case of non-SELECT query we quit - * the loop when all nodes finish their work and send ReadyForQuery - * with empty connections array. - * If we got EOF, move to the next connection, will receive more - * data on the next iteration. - */ - while (i < regular_conn_count) + int res = handle_response(connections[i], node); + if (res == RESPONSE_EOF) { - int res = handle_response(connections[i], node); - if (res == RESPONSE_EOF) - { - i++; - } - else if (res == RESPONSE_COMPLETE) - { - if (i < --regular_conn_count) - connections[i] = connections[regular_conn_count]; - } - else if (res == RESPONSE_TUPDESC) + i++; + } + else if (res == RESPONSE_COMPLETE) + { + if (i < --regular_conn_count) + connections[i] = connections[regular_conn_count]; + } + else if (res == RESPONSE_TUPDESC) + { + ExecSetSlotDescriptor(scanslot, node->tuple_desc); + /* + * Now tuple table slot is responsible for freeing the + * descriptor + */ + node->tuple_desc = NULL; + if (step->sort) { - ExecSetSlotDescriptor(scanslot, node->tuple_desc); + SimpleSort *sort = step->sort; + + node->connections = connections; + node->conn_count = regular_conn_count; /* - * Now tuple table slot is responsible for freeing the - * descriptor + * First message is already in the buffer + * Further fetch will be under tuplesort control + * If query does not produce rows tuplesort will not + * be initialized */ - node->tuple_desc = NULL; - if (step->sort) - { - SimpleSort *sort = step->sort; - - node->connections = connections; - node->conn_count = regular_conn_count; - /* - * First message is already in the buffer - * Further fetch will be under tuplesort control - * If query does not produce rows tuplesort will not - * be initialized - */ - node->tuplesortstate = tuplesort_begin_merge( - scanslot->tts_tupleDescriptor, - sort->numCols, - sort->sortColIdx, - sort->sortOperators, - sort->nullsFirst, - node, - work_mem); - /* - * Break the loop, do not wait for first row. - * Tuplesort module want to control node it is - * fetching rows from, while in this loop first - * row would be got from random node - */ - break; - } - } - else if (res == RESPONSE_DATAROW) - { + node->tuplesortstate = tuplesort_begin_merge( + scanslot->tts_tupleDescriptor, + sort->numCols, + sort->sortColIdx, + sort->sortOperators, + sort->nullsFirst, + node, + work_mem); /* - * Got first data row, quit the loop + * Break the loop, do not wait for first row. + * Tuplesort module want to control node it is + * fetching rows from, while in this loop first + * row would be got from random node */ - node->connections = connections; - node->conn_count = regular_conn_count; - node->current_conn = i; break; } } + else if (res == RESPONSE_DATAROW) + { + /* + * Got first data row, quit the loop + */ + node->connections = connections; + node->conn_count = regular_conn_count; + node->current_conn = i; + break; + } } + } + + if (node->cursor_count) + { + node->conn_count = node->cursor_count; + memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *)); + node->connections = connections; + } node->query_Done = true; } + if (node->update_cursor) + { + PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES); + close_node_cursors(all_dn_handles->datanode_handles, + all_dn_handles->dn_conn_count, + node->update_cursor); + pfree(node->update_cursor); + node->update_cursor = NULL; + } + if (node->tuplesortstate) { while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, @@ -2948,98 +3286,35 @@ ExecRemoteQuery(RemoteQueryState *node) } else { - while (node->conn_count > 0 && !have_tuple) + while (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) { - int i; - - /* - * If combiner already has tuple go ahead and return it - * otherwise tuple will be cleared - */ - if (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) + if (node->simple_aggregates) { - if (node->simple_aggregates) - { - if (node->simple_aggregates) - { - /* - * Advance aggregate functions and allow to read up next - * data row message and get tuple in the same slot on - * next iteration - */ - exec_simple_aggregates(node, scanslot); - } - else - { - /* - * Receive current slot and read up next data row - * message before exiting the loop. Next time when this - * function is invoked we will have either data row - * message ready or EOF - */ - copy_slot(node, scanslot, resultslot); - have_tuple = true; - } - } - else - { - /* - * Receive current slot and read up next data row - * message before exiting the loop. Next time when this - * function is invoked we will have either data row - * message ready or EOF - */ - copy_slot(node, scanslot, resultslot); - have_tuple = true; - } + /* + * Advance aggregate functions and allow to read up next + * data row message and get tuple in the same slot on + * next iteration + */ + exec_simple_aggregates(node, scanslot); } - - /* - * Handle input to get next row or ensure command is completed, - * starting from connection next after current. If connection - * does not - */ - if ((i = node->current_conn + 1) == node->conn_count) - i = 0; - - for (;;) + else { - int res = handle_response(node->connections[i], node); - if (res == RESPONSE_EOF) - { - /* go to next connection */ - if (++i == node->conn_count) - i = 0; - /* if we cycled over all connections we need to receive more */ - if (i == node->current_conn) - if (pgxc_node_receive(node->conn_count, node->connections, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); - } - else if (res == RESPONSE_COMPLETE) - { - if (--node->conn_count == 0) - break; - if (i == node->conn_count) - i = 0; - else - node->connections[i] = node->connections[node->conn_count]; - if (node->current_conn == node->conn_count) - node->current_conn = i; - } - else if (res == RESPONSE_DATAROW) - { - node->current_conn = i; - break; - } + /* + * Receive current slot and read up next data row + * message before exiting the loop. Next time when this + * function is invoked we will have either data row + * message ready or EOF + */ + copy_slot(node, scanslot, resultslot); + have_tuple = true; + break; } } /* * We may need to finalize aggregates */ - if (!have_tuple && node->simple_aggregates) + if (node->simple_aggregates) { finish_simple_aggregates(node, resultslot); if (!TupIsNull(resultslot)) @@ -3058,7 +3333,31 @@ ExecRemoteQuery(RemoteQueryState *node) errmsg("%s", node->errorMessage))); } - return resultslot; + /* + * While we are emitting rows we ignore outer plan + */ + if (!TupIsNull(resultslot)) + return resultslot; + + /* + * If coordinator plan is specified execute it first. + * If the plan is returning we are returning these tuples immediately. + * If it is not returning or returned them all by current invocation + * we will go ahead and execute remote query. Then we will never execute + * the outer plan again because node->query_Done flag will be set and + * execution won't get to that place. + */ + if (outerPlanState(node)) + { + TupleTableSlot *slot = ExecProcNode(outerPlanState(node)); + if (!TupIsNull(slot)) + return slot; + } + + /* + * OK, we have nothing to return, so return NULL + */ + return NULL; } /* @@ -3067,90 +3366,88 @@ ExecRemoteQuery(RemoteQueryState *node) void ExecEndRemoteQuery(RemoteQueryState *node) { + ListCell *lc; /* - * If processing was interrupted, (ex: client did not consume all the data, - * or a subquery with LIMIT) we may still have data on the nodes. Try and consume. - * We do not simply call PGXCNodeConsumeMessages, because the same - * connection could be used for multiple RemoteQuery steps. - * - * It seems most stable checking command_complete_count - * and only then working with conn_count - * - * PGXCTODO: Change in the future when we remove materialization nodes. + * shut down the subplan */ - if (node->command_complete_count < node->node_count) - { - elog(WARNING, "Extra data node messages when ending remote query step"); - - while (node->conn_count > 0) - { - int i = 0; - int res; - - /* - * Just consume the rest of the messages - */ - if ((i = node->current_conn + 1) == node->conn_count) - i = 0; + if (innerPlanState(node)) + ExecEndNode(innerPlanState(node)); - for (;;) - { - /* throw away message */ - if (node->msg) - { - pfree(node->msg); - node->msg = NULL; - } + /* clean up the buffer */ + foreach(lc, node->rowBuffer) + { + RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc); + pfree(dataRow->msg); + } + list_free_deep(node->rowBuffer); - res = handle_response(node->connections[i], node); + node->current_conn = 0; + while (node->conn_count > 0) + { + int res; + PGXCNodeHandle *conn = node->connections[node->current_conn]; - if (res == RESPONSE_COMPLETE || - node->connections[i]->state == DN_CONNECTION_STATE_ERROR_FATAL || - node->connections[i]->state == DN_CONNECTION_STATE_ERROR_NOT_READY) - { - if (--node->conn_count == 0) - break; - if (i == node->conn_count) - i = 0; - else - node->connections[i] = node->connections[node->conn_count]; - if (node->current_conn == node->conn_count) - node->current_conn = i; - } - else if (res == RESPONSE_EOF) - { - /* go to next connection */ - if (++i == node->conn_count) - i = 0; + /* throw away message */ + if (node->currentRow.msg) + { + pfree(node->currentRow.msg); + node->currentRow.msg = NULL; + } + /* no data is expected */ + if (conn->state == DN_CONNECTION_STATE_IDLE || + conn->state == DN_CONNECTION_STATE_ERROR_FATAL) + { + if (node->current_conn < --node->conn_count) + node->connections[node->current_conn] = node->connections[node->conn_count]; + continue; + } + res = handle_response(conn, node); + if (res == RESPONSE_EOF) + { + struct timeval timeout; + timeout.tv_sec = END_QUERY_TIMEOUT; + timeout.tv_usec = 0; - /* if we cycled over all connections we need to receive more */ - if (i == node->current_conn) - { - struct timeval timeout; - timeout.tv_sec = END_QUERY_TIMEOUT; - timeout.tv_usec = 0; - - if (pgxc_node_receive(node->conn_count, node->connections, &timeout)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes when ending query"))); - } - } - } + if (pgxc_node_receive(1, &conn, &timeout)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes when ending query"))); } - elog(WARNING, "Data node connection buffers cleaned"); } - /* * Release tuplesort resources */ if (node->tuplesortstate != NULL) + { + /* + * tuplesort_end invalidates minimal tuple if it is in the slot because + * deletes the TupleSort memory context, causing seg fault later when + * releasing tuple table + */ + ExecClearTuple(node->ss.ss_ScanTupleSlot); tuplesort_end((Tuplesortstate *) node->tuplesortstate); + } node->tuplesortstate = NULL; /* + * If there are active cursors close them + */ + if (node->cursor) + close_node_cursors(node->cursor_connections, node->cursor_count, node->cursor); + + if (node->update_cursor) + { + PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES); + close_node_cursors(all_dn_handles->datanode_handles, + all_dn_handles->dn_conn_count, + node->update_cursor); + pfree(node->update_cursor); + node->update_cursor = NULL; + } + + /* * shut down the subplan */ if (outerPlanState(node)) @@ -3165,6 +3462,57 @@ ExecEndRemoteQuery(RemoteQueryState *node) CloseCombiner(node); } +static void +close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor) +{ + int i; + RemoteQueryState *combiner; + + for (i = 0; i < conn_count; i++) + { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); + if (pgxc_node_send_close(connections[i], false, cursor) != 0) + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node cursor"))); + if (pgxc_node_send_sync(connections[i]) != 0) + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node cursor"))); + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + + while (conn_count > 0) + { + if (pgxc_node_receive(conn_count, connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node cursor"))); + i = 0; + while (i < conn_count) + { + int res = handle_response(connections[i], combiner); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) + { + if (--conn_count > i) + connections[i] = connections[conn_count]; + } + else + { + // Unexpected response, ignore? + } + } + } + + ValidateAndCloseCombiner(combiner); +} + /* * Consume any remaining messages on the connections. * This is useful for calling after ereport() @@ -3367,6 +3715,8 @@ ExecRemoteUtility(RemoteQuery *node) /* See if we have a primary nodes, execute on it first before the others */ if (primaryconnection) { + if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(primaryconnection); /* If explicit transaction is needed gxid is already sent */ if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) { @@ -3421,20 +3771,24 @@ ExecRemoteUtility(RemoteQuery *node) { for (i = 0; i < regular_conn_count; i++) { + PGXCNodeHandle *conn = pgxc_connections->datanode_handles[i]; + + if (conn->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(conn); /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(pgxc_connections->datanode_handles[i], gxid)) + if (!need_tran && pgxc_node_send_gxid(conn, gxid)) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (snapshot && pgxc_node_send_snapshot(pgxc_connections->datanode_handles[i], snapshot)) + if (snapshot && pgxc_node_send_snapshot(conn, snapshot)) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (pgxc_node_send_query(pgxc_connections->datanode_handles[i], node->sql_statement) != 0) + if (pgxc_node_send_query(conn, node->sql_statement) != 0) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), @@ -3498,7 +3852,8 @@ ExecRemoteUtility(RemoteQuery *node) */ while (i < regular_conn_count) { - int res = handle_response(pgxc_connections->datanode_handles[i], remotestate); + PGXCNodeHandle *conn = pgxc_connections->datanode_handles[i]; + int res = handle_response(conn, remotestate); if (res == RESPONSE_EOF) { i++; diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index 0d902738f0..3bc3a83aa4 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -78,6 +78,7 @@ init_pgxc_handle(PGXCNodeHandle *pgxc_handle) pgxc_handle->outBuffer = (char *) palloc(pgxc_handle->outSize); pgxc_handle->inSize = 16 * 1024; pgxc_handle->inBuffer = (char *) palloc(pgxc_handle->inSize); + pgxc_handle->combiner = NULL; if (pgxc_handle->outBuffer == NULL || pgxc_handle->inBuffer == NULL) { @@ -239,6 +240,7 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum) handle->sock = sock; handle->transaction_status = 'I'; handle->state = DN_CONNECTION_STATE_IDLE; + handle->combiner = NULL; #ifdef DN_CONNECTION_DEBUG handle->have_row_desc = false; #endif @@ -268,7 +270,7 @@ pgxc_node_receive(const int conn_count, { /* If connection finised sending do not wait input from it */ if (connections[i]->state == DN_CONNECTION_STATE_IDLE - || connections[i]->state == DN_CONNECTION_STATE_HAS_DATA) + || HAS_MESSAGE_BUFFERED(connections[i])) continue; /* prepare select params */ @@ -280,7 +282,7 @@ pgxc_node_receive(const int conn_count, else { /* flag as bad, it will be removed from the list */ - connections[i]->state == DN_CONNECTION_STATE_ERROR_NOT_READY; + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; } } @@ -327,11 +329,7 @@ retry: add_error_message(conn, "unexpected EOF on datanode connection"); elog(WARNING, "unexpected EOF on datanode connection"); /* Should we read from the other connections before returning? */ - return EOF; - } - else - { - conn->state = DN_CONNECTION_STATE_HAS_DATA; + return EOF; } } } @@ -476,18 +474,6 @@ retry: } -/* - * Clear out socket data and buffer. - * Throw away any data. - */ -void -clear_socket_data (PGXCNodeHandle *conn) -{ - do { - conn->inStart = conn->inCursor = conn->inEnd = 0; - } while (pgxc_node_read_data(conn) > 0); -} - /* * Get one character from the connection buffer and advance cursor */ @@ -576,7 +562,6 @@ get_message(PGXCNodeHandle *conn, int *len, char **msg) * ensure_in_buffer_capacity() will immediately return */ ensure_in_buffer_capacity(5 + (size_t) *len, conn); - conn->state = DN_CONNECTION_STATE_QUERY; conn->inCursor = conn->inStart; return '\0'; } @@ -589,7 +574,7 @@ get_message(PGXCNodeHandle *conn, int *len, char **msg) /* - * Release all data node connections and coordinator connections + * Release all data node connections and coordinator connections * back to pool and release occupied memory * * If force_drop is true, we force dropping all of the connections, such as after @@ -853,6 +838,327 @@ send_some(PGXCNodeHandle *handle, int len) return result; } +/* + * Send PARSE message with specified statement down to the Data node + */ +int +pgxc_node_send_parse(PGXCNodeHandle * handle, const char* statement, + const char *query) +{ + /* statement name size (allow NULL) */ + int stmtLen = statement ? strlen(statement) + 1 : 1; + /* size of query string */ + int strLen = strlen(query) + 1; + /* size of parameter array (always empty for now) */ + int paramLen = 2; + + /* size + stmtLen + strlen + paramLen */ + int msgLen = 4 + stmtLen + strLen + paramLen; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + + handle->outBuffer[handle->outEnd++] = 'P'; + /* size */ + msgLen = htonl(msgLen); + memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4); + handle->outEnd += 4; + /* statement name */ + if (statement) + { + memcpy(handle->outBuffer + handle->outEnd, statement, stmtLen); + handle->outEnd += stmtLen; + } + else + handle->outBuffer[handle->outEnd++] = '\0'; + /* query */ + memcpy(handle->outBuffer + handle->outEnd, query, strLen); + handle->outEnd += strLen; + /* parameter types (none) */ + handle->outBuffer[handle->outEnd++] = 0; + handle->outBuffer[handle->outEnd++] = 0; + + return 0; +} + + +/* + * Send BIND message down to the Data node + */ +int +pgxc_node_send_bind(PGXCNodeHandle * handle, const char *portal, + const char *statement, int paramlen, char *params) +{ + uint16 n16; + /* portal name size (allow NULL) */ + int pnameLen = portal ? strlen(portal) + 1 : 1; + /* statement name size (allow NULL) */ + int stmtLen = statement ? strlen(statement) + 1 : 1; + /* size of parameter codes array (always empty for now) */ + int paramCodeLen = 2; + /* size of parameter values array, 2 if no params */ + int paramValueLen = paramlen ? paramlen : 2; + /* size of output parameter codes array (always empty for now) */ + int paramOutLen = 2; + + /* size + pnameLen + stmtLen + parameters */ + int msgLen = 4 + pnameLen + stmtLen + paramCodeLen + paramValueLen + paramOutLen; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + + handle->outBuffer[handle->outEnd++] = 'B'; + /* size */ + msgLen = htonl(msgLen); + memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4); + handle->outEnd += 4; + /* portal name */ + if (portal) + { + memcpy(handle->outBuffer + handle->outEnd, portal, pnameLen); + handle->outEnd += pnameLen; + } + else + handle->outBuffer[handle->outEnd++] = '\0'; + /* statement name */ + if (statement) + { + memcpy(handle->outBuffer + handle->outEnd, statement, stmtLen); + handle->outEnd += stmtLen; + } + else + handle->outBuffer[handle->outEnd++] = '\0'; + /* parameter codes (none) */ + handle->outBuffer[handle->outEnd++] = 0; + handle->outBuffer[handle->outEnd++] = 0; + /* parameter values */ + if (paramlen) + { + memcpy(handle->outBuffer + handle->outEnd, params, paramlen); + handle->outEnd += paramlen; + } + else + { + handle->outBuffer[handle->outEnd++] = 0; + handle->outBuffer[handle->outEnd++] = 0; + } + /* output parameter codes (none) */ + handle->outBuffer[handle->outEnd++] = 0; + handle->outBuffer[handle->outEnd++] = 0; + + return 0; +} + + +/* + * Send DESCRIBE message (portal or statement) down to the Data node + */ +int +pgxc_node_send_describe(PGXCNodeHandle * handle, bool is_statement, + const char *name) +{ + /* statement or portal name size (allow NULL) */ + int nameLen = name ? strlen(name) + 1 : 1; + + /* size + statement/portal + name */ + int msgLen = 4 + 1 + nameLen; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + + handle->outBuffer[handle->outEnd++] = 'D'; + /* size */ + msgLen = htonl(msgLen); + memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4); + handle->outEnd += 4; + /* statement/portal flag */ + handle->outBuffer[handle->outEnd++] = is_statement ? 'S' : 'P'; + /* object name */ + if (name) + { + memcpy(handle->outBuffer + handle->outEnd, name, nameLen); + handle->outEnd += nameLen; + } + else + handle->outBuffer[handle->outEnd++] = '\0'; + + return 0; +} + + +/* + * Send CLOSE message (portal or statement) down to the Data node + */ +int +pgxc_node_send_close(PGXCNodeHandle * handle, bool is_statement, + const char *name) +{ + /* statement or portal name size (allow NULL) */ + int nameLen = name ? strlen(name) + 1 : 1; + + /* size + statement/portal + name */ + int msgLen = 4 + 1 + nameLen; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + + handle->outBuffer[handle->outEnd++] = 'C'; + /* size */ + msgLen = htonl(msgLen); + memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4); + handle->outEnd += 4; + /* statement/portal flag */ + handle->outBuffer[handle->outEnd++] = is_statement ? 'S' : 'P'; + /* object name */ + if (name) + { + memcpy(handle->outBuffer + handle->outEnd, name, nameLen); + handle->outEnd += nameLen; + } + else + handle->outBuffer[handle->outEnd++] = '\0'; + + handle->state = DN_CONNECTION_STATE_QUERY; + + return 0; +} + +/* + * Send EXECUTE message down to the Data node + */ +int +pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch) +{ + /* portal name size (allow NULL) */ + int pnameLen = portal ? strlen(portal) + 1 : 1; + + /* size + pnameLen + fetchLen */ + int msgLen = 4 + pnameLen + 4; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + + handle->outBuffer[handle->outEnd++] = 'E'; + /* size */ + msgLen = htonl(msgLen); + memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4); + handle->outEnd += 4; + /* portal name */ + if (portal) + { + memcpy(handle->outBuffer + handle->outEnd, portal, pnameLen); + handle->outEnd += pnameLen; + } + else + handle->outBuffer[handle->outEnd++] = '\0'; + + /* fetch */ + fetch = htonl(fetch); + memcpy(handle->outBuffer + handle->outEnd, &fetch, 4); + handle->outEnd += 4; + + handle->state = DN_CONNECTION_STATE_QUERY; + + return 0; +} + + +/* + * Send FLUSH message down to the Data node + */ +int +pgxc_node_send_flush(PGXCNodeHandle * handle) +{ + /* size */ + int msgLen = 4; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + + handle->outBuffer[handle->outEnd++] = 'H'; + /* size */ + msgLen = htonl(msgLen); + memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4); + handle->outEnd += 4; + + return pgxc_node_flush(handle); +} + + +/* + * Send SYNC message down to the Data node + */ +int +pgxc_node_send_sync(PGXCNodeHandle * handle) +{ + /* size */ + int msgLen = 4; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + + handle->outBuffer[handle->outEnd++] = 'S'; + /* size */ + msgLen = htonl(msgLen); + memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4); + handle->outEnd += 4; + + return pgxc_node_flush(handle); +} + + +/* + * Send the GXID down to the Data node + */ +int +pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query, + const char *statement, const char *portal, + int paramlen, char *params, + bool send_describe, int fetch_size) +{ + if (pgxc_node_send_parse(handle, statement, query)) + return EOF; + if (pgxc_node_send_bind(handle, portal, statement, paramlen, params)) + return EOF; + if (send_describe) + if (pgxc_node_send_describe(handle, false, portal)) + return EOF; + if (fetch_size >= 0) + if (pgxc_node_send_execute(handle, portal, fetch_size)) + return EOF; + if (pgxc_node_send_sync(handle)) + return EOF; + + return 0; +} /* * This method won't return until connection buffer is empty or error occurs @@ -1034,7 +1340,6 @@ void add_error_message(PGXCNodeHandle *handle, const char *message) { handle->transaction_status = 'E'; - handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY; if (handle->error) { /* PGXCTODO append */ @@ -1072,7 +1377,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query) { ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); + errmsg("out of memory"))); } result->primary_handle = NULL; @@ -1348,8 +1653,7 @@ is_active_connection(PGXCNodeHandle *handle) { return handle->sock != NO_SOCKET && handle->state != DN_CONNECTION_STATE_IDLE && - handle->state != DN_CONNECTION_STATE_ERROR_NOT_READY && - handle->state != DN_CONNECTION_STATE_ERROR_FATAL; + !DN_CONNECTION_STATE_ERROR(handle); } /* @@ -1378,7 +1682,7 @@ get_active_nodes(PGXCNodeHandle **connections) if (is_active_connection(&co_handles[i])) connections[active_count++] = &co_handles[i]; } - } + } return active_count; } @@ -1442,6 +1746,11 @@ pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer /* Send to Datanodes */ for (i = 0; i < dn_conn_count; i++) { + /* + * Clean connection if fetch in progress + */ + if (pgxc_handles->datanode_handles[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(pgxc_handles->datanode_handles[i]); if (pgxc_node_send_query(pgxc_handles->datanode_handles[i], buffer)) { add_error_message(pgxc_handles->datanode_handles[i], "Can not send request"); diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index dd72a4bfee..91451ce60a 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -1523,7 +1523,7 @@ get_relname_relid(const char *relname, Oid relnamespace) 0, 0); } -#ifdef NOT_USED +#ifdef PGXC /* * get_relnatts * diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index cb078be067..50ad300429 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -2875,25 +2875,119 @@ reversedirection_heap(Tuplesortstate *state) static unsigned int getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK) { - PGXCNodeHandle *conn = state->combiner->connections[tapenum]; + RemoteQueryState *combiner = state->combiner; + PGXCNodeHandle *conn = combiner->connections[tapenum]; + /* + * If connection is active (potentially has data to read) we can get node + * number from the connection. If connection is not active (we have read all + * available data rows) and if we have buffered data from that connection + * the node number is stored in combiner->tapenodes[tapenum]. + * If connection is inactive and no buffered data we have EOF condition + */ + int nodenum = conn ? conn->nodenum : combiner->tapenodes[tapenum]; + unsigned int len = 0; + ListCell *lc; + ListCell *prev = NULL; + + /* + * If there are buffered rows iterate over them and get first from + * the requested tape + */ + foreach (lc, combiner->rowBuffer) + { + RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc); + if (dataRow->msgnode == nodenum) + { + combiner->currentRow = *dataRow; + combiner->rowBuffer = list_delete_cell(combiner->rowBuffer, lc, prev); + return dataRow->msglen; + } + prev = lc; + } + + /* Nothing is found in the buffer, check for EOF */ + if (conn == NULL) + { + if (eofOK) + return 0; + else + elog(ERROR, "unexpected end of data"); + } + + /* Going to get data from connection, buffer if needed */ + if (conn->state == DN_CONNECTION_STATE_QUERY && conn->combiner != combiner) + BufferConnection(conn); + + /* Request more rows if needed */ + if (conn->state == DN_CONNECTION_STATE_IDLE) + { + Assert(combiner->cursor); + if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node cursor"))); + if (pgxc_node_send_sync(conn) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node cursor"))); + conn->state = DN_CONNECTION_STATE_QUERY; + conn->combiner = combiner; + } + /* Read data from the connection until get a row or EOF */ for (;;) { - switch (handle_response(conn, state->combiner)) + switch (handle_response(conn, combiner)) { + case RESPONSE_SUSPENDED: + /* Send Execute to request next row */ + Assert(combiner->cursor); + if (len) + return len; + if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node cursor"))); + if (pgxc_node_send_sync(conn) != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to fetch from data node cursor"))); + conn->state = DN_CONNECTION_STATE_QUERY; + conn->combiner = combiner; + /* fallthru */ case RESPONSE_EOF: + /* receive more data */ if (pgxc_node_receive(1, &conn, NULL)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg(conn->error))); break; case RESPONSE_COMPLETE: + /* EOF encountered, close the tape and report EOF */ + if (combiner->cursor) + { + combiner->connections[tapenum] = NULL; + if (len) + return len; + } if (eofOK) return 0; else elog(ERROR, "unexpected end of data"); break; case RESPONSE_DATAROW: - return state->combiner->msglen; + Assert(len == 0); + if (state->combiner->cursor) + { + /* + * We fetching one row at a time when running EQP + * so read following PortalSuspended or ResponseComplete + * to leave connection clean between the calls + */ + len = state->combiner->currentRow.msglen; + break; + } + else + return state->combiner->currentRow.msglen; default: ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index c85b1b0c61..0c2c4fe0a2 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -124,6 +124,7 @@ typedef struct TupleTableSlot */ char *tts_dataRow; /* Tuple data in DataRow format */ int tts_dataLen; /* Actual length of the data row */ + int tts_dataNode; /* Originating node of the data row */ bool tts_shouldFreeRow; /* should pfree tts_dataRow? */ struct AttInMetadata *tts_attinmeta; /* store here info to extract values from the DataRow */ #endif @@ -177,6 +178,7 @@ extern TupleTableSlot *ExecStoreMinimalTuple(MinimalTuple mtup, #ifdef PGXC extern TupleTableSlot *ExecStoreDataRowTuple(char *msg, size_t len, + int node, TupleTableSlot *slot, bool shouldFree); #endif diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index 4f4a0c9c53..4a33842bdf 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -16,8 +16,8 @@ #ifndef EXECREMOTE_H #define EXECREMOTE_H -#include "pgxcnode.h" #include "locator.h" +#include "pgxcnode.h" #include "planner.h" #include "access/tupdesc.h" #include "executor/tuptable.h" @@ -29,9 +29,10 @@ /* Outputs of handle_response() */ #define RESPONSE_EOF EOF #define RESPONSE_COMPLETE 0 -#define RESPONSE_TUPDESC 1 -#define RESPONSE_DATAROW 2 -#define RESPONSE_COPY 3 +#define RESPONSE_SUSPENDED 1 +#define RESPONSE_TUPDESC 2 +#define RESPONSE_DATAROW 3 +#define RESPONSE_COPY 4 typedef enum { @@ -42,6 +43,18 @@ typedef enum REQUEST_TYPE_COPY_OUT /* Copy Out response */ } RequestType; +/* + * Represents a DataRow message received from a remote node. + * Contains originating node number and message body in DataRow format without + * message code and length. Length is separate field + */ +typedef struct RemoteDataRowData +{ + char *msg; /* last data row message */ + int msglen; /* length of the data row message */ + int msgnode; /* node number of the data row message */ +} RemoteDataRowData; +typedef RemoteDataRowData *RemoteDataRow; typedef struct RemoteQueryState { @@ -60,8 +73,18 @@ typedef struct RemoteQueryState char errorCode[5]; /* error code to send back to client */ char *errorMessage; /* error message to send back to client */ bool query_Done; /* query has been sent down to data nodes */ - char *msg; /* last data row message */ - int msglen; /* length of the data row message */ + RemoteDataRowData currentRow; /* next data ro to be wrapped into a tuple */ + /* TODO use a tuplestore as a rowbuffer */ + List *rowBuffer; /* buffer where rows are stored when connection + * should be cleaned for reuse by other RemoteQuery */ + /* + * To handle special case - if there is a simple sort and sort connection + * is buffered. If EOF is reached on a connection it should be removed from + * the array, but we need to know node number of the connection to find + * messages in the buffer. So we store nodenum to that array if reach EOF + * when buffering + */ + int *tapenodes; /* * While we are not supporting grouping use this flag to indicate we need * to initialize collecting of aggregates from the DNs @@ -74,6 +97,11 @@ typedef struct RemoteQueryState MemoryContext tmp_ctx; /* separate context is needed to compare tuples */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ uint64 processed; /* count of data rows when running CopyOut */ + /* cursor support */ + char *cursor; /* cursor name */ + char *update_cursor; /* throw this cursor current tuple can be updated */ + int cursor_count; /* total count of participating nodes */ + PGXCNodeHandle **cursor_connections;/* data node connections being combined */ } RemoteQueryState; /* Multinode Executor */ @@ -98,9 +126,9 @@ extern void ExecRemoteUtility(RemoteQuery *node); extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner); extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); +extern void BufferConnection(PGXCNodeHandle *conn); extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt); -extern void PGXCNodeConsumeMessages(void); extern int primary_data_node; #endif diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index d6c5af950c..a57e4f1673 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -34,17 +34,18 @@ typedef enum { DN_CONNECTION_STATE_IDLE, /* idle, ready for query */ DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */ - DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */ - DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */ - DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */ DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */ DN_CONNECTION_STATE_COPY_IN, DN_CONNECTION_STATE_COPY_OUT } DNConnectionState; #define DN_CONNECTION_STATE_ERROR(dnconn) \ - (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \ - || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY + ((dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \ + || (dnconn)->transaction_status == 'E') + +#define HAS_MESSAGE_BUFFERED(conn) \ + ((conn)->inCursor + 4 < (conn)->inEnd \ + && (conn)->inCursor + ntohl(*((uint32_t *) ((conn)->inBuffer + (conn)->inCursor + 1))) < (conn)->inEnd) struct pgxc_node_handle { @@ -54,6 +55,7 @@ struct pgxc_node_handle /* Connection state */ char transaction_status; DNConnectionState state; + struct RemoteQueryState *combiner; #ifdef DN_CONNECTION_DEBUG bool have_row_desc; #endif @@ -103,6 +105,16 @@ extern int ensure_in_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handl extern int ensure_out_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handle); extern int pgxc_node_send_query(PGXCNodeHandle * handle, const char *query); +extern int pgxc_node_send_describe(PGXCNodeHandle * handle, bool is_statement, + const char *name); +extern int pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch); +extern int pgxc_node_send_close(PGXCNodeHandle * handle, bool is_statement, + const char *name); +extern int pgxc_node_send_sync(PGXCNodeHandle * handle); +extern int pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query, + const char *statement, const char *portal, + int paramlen, char *params, + bool send_describe, int fetch_size); extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid); extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot); extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp); @@ -119,6 +131,5 @@ extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const c extern char get_message(PGXCNodeHandle *conn, int *len, char **msg); extern void add_error_message(PGXCNodeHandle * handle, const char *message); -extern void clear_socket_data (PGXCNodeHandle *conn); #endif diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index 8aae356fd2..7284ef7432 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -85,6 +85,7 @@ typedef struct SimpleDistinct *distinct; bool read_only; /* do not use 2PC when committing read only steps */ bool force_autocommit; /* some commands like VACUUM require autocommit mode */ + char *cursor; /* if specified use it as a Portal name on data nodes */ RemoteQueryExecType exec_type; char *relname; diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 854b3eac9a..aef70eb7d1 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -88,6 +88,9 @@ extern char func_volatile(Oid funcid); extern float4 get_func_cost(Oid funcid); extern float4 get_func_rows(Oid funcid); extern Oid get_relname_relid(const char *relname, Oid relnamespace); +#ifdef PGXC +extern int get_relnatts(Oid relid); +#endif extern char *get_rel_name(Oid relid); extern Oid get_rel_namespace(Oid relid); extern Oid get_rel_type_id(Oid relid); |
