diff options
| -rw-r--r-- | src/backend/commands/prepare.c | 229 | ||||
| -rw-r--r-- | src/backend/executor/execTuples.c | 84 | ||||
| -rw-r--r-- | src/backend/nodes/copyfuncs.c | 15 | ||||
| -rw-r--r-- | src/backend/pgxc/locator/locator.c | 11 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 506 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 915 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 11 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 29 | ||||
| -rw-r--r-- | src/backend/utils/adt/ruleutils.c | 15 | ||||
| -rw-r--r-- | src/backend/utils/cache/plancache.c | 45 | ||||
| -rw-r--r-- | src/include/commands/prepare.h | 16 | ||||
| -rw-r--r-- | src/include/executor/tuptable.h | 3 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 4 | ||||
| -rw-r--r-- | src/include/pgxc/locator.h | 6 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 5 | ||||
| -rw-r--r-- | src/include/utils/builtins.h | 2 |
16 files changed, 1353 insertions, 543 deletions
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 0e948e4b72..89d1f021ba 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -33,7 +33,11 @@ #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/snapmgr.h" - +#ifdef PGXC +#include "pgxc/pgxc.h" +#include "pgxc/poolmgr.h" +#include "pgxc/execRemote.h" +#endif /* * The hash table in which prepared queries are stored. This is @@ -42,6 +46,14 @@ * (statement names); the entries are PreparedStatement structs. */ static HTAB *prepared_queries = NULL; +#ifdef PGXC +/* + * The hash table where datanode prepared statements are stored. + * The keys are statement names referenced from cached RemoteQuery nodes; the + * entries are DatanodeStatement structs + */ +static HTAB *datanode_queries = NULL; +#endif static void InitQueryHashTable(void); static ParamListInfo EvaluateParams(PreparedStatement *pstmt, List *params, @@ -147,6 +159,22 @@ PrepareQuery(PrepareStmt *stmt, const char *queryString) /* Generate plans for queries. */ plan_list = pg_plan_queries(query_list, 0, NULL); +#ifdef PGXC + /* + * Check if we are dealing with more than one step. + * Multi-step preapred statements are not yet supported. + * PGXCTODO - temporary - Once we add support, this code should be removed. + */ + if (IS_PGXC_COORDINATOR && plan_list && plan_list->head) + { + PlannedStmt *stmt = (PlannedStmt *) lfirst(plan_list->head); + + if (stmt->planTree->lefttree || stmt->planTree->righttree) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PSTATEMENT_DEFINITION), + errmsg("Multi-step Prepared Statements not yet supported"))); + } +#endif /* * Save the results. */ @@ -419,7 +447,76 @@ InitQueryHashTable(void) 32, &hash_ctl, HASH_ELEM); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + MemSet(&hash_ctl, 0, sizeof(hash_ctl)); + + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(DatanodeStatement) + NumDataNodes * sizeof(int); + + datanode_queries = hash_create("Datanode Queries", + 64, + &hash_ctl, + HASH_ELEM); + } +#endif +} + +#ifdef PGXC +/* + * Assign the statement name for all the RemoteQueries in the plan tree, so + * they use datanode statements + */ +static int +set_remote_stmtname(Plan *plan, const char *stmt_name, int n) +{ + if (IsA(plan, RemoteQuery)) + { + DatanodeStatement *entry; + bool exists; + + char name[NAMEDATALEN]; + do + { + strcpy(name, stmt_name); + /* + * Append modifier. If resulting string is going to be truncated, + * truncate better the base string, otherwise we may enter endless + * loop + */ + if (n) + { + char modifier[NAMEDATALEN]; + sprintf(modifier, "__%d", n); + /* + * if position NAMEDATALEN - strlen(modifier) - 1 is beyond the + * base string this is effectively noop, otherwise it truncates + * the base string + */ + name[NAMEDATALEN - strlen(modifier) - 1] = '\0'; + strcat(name, modifier); + } + n++; + hash_search(datanode_queries, name, HASH_FIND, &exists); + } while (exists); + ((RemoteQuery *) plan)->statement = pstrdup(name); + entry = (DatanodeStatement *) hash_search(datanode_queries, + name, + HASH_ENTER, + NULL); + entry->nodenum = 0; + } + + if (innerPlan(plan)) + n = set_remote_stmtname(innerPlan(plan), stmt_name, n); + + if (outerPlan(plan)) + n = set_remote_stmtname(outerPlan(plan), stmt_name, n); + + return n; } +#endif /* * Store all the data pertaining to a query in the hash table using @@ -459,6 +556,25 @@ StorePreparedStatement(const char *stmt_name, errmsg("prepared statement \"%s\" already exists", stmt_name))); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + ListCell *lc; + int n; + + /* + * Scan the plans and set the statement field for all found RemoteQuery + * nodes so they use data node statements + */ + n = 0; + foreach(lc, stmt_list) + { + PlannedStmt *ps = (PlannedStmt *) lfirst(lc); + n = set_remote_stmtname(ps->planTree, stmt_name, n); + } + } +#endif + /* Create a plancache entry */ plansource = CreateCachedPlan(raw_parse_tree, query_string, @@ -844,3 +960,114 @@ build_regtype_array(Oid *param_types, int num_params) result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i'); return PointerGetDatum(result); } + + +#ifdef PGXC +DatanodeStatement * +FetchDatanodeStatement(const char *stmt_name, bool throwError) +{ + DatanodeStatement *entry; + + /* + * If the hash table hasn't been initialized, it can't be storing + * anything, therefore it couldn't possibly store our plan. + */ + if (datanode_queries) + entry = (DatanodeStatement *) hash_search(datanode_queries, + stmt_name, + HASH_FIND, + NULL); + else + entry = NULL; + + /* Report error if entry is not found */ + if (!entry && throwError) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_PSTATEMENT), + errmsg("datanode statement \"%s\" does not exist", + stmt_name))); + + return entry; +} + +/* + * Drop datanode statement and close it on nodes if active + */ +void +DropDatanodeStatement(const char *stmt_name) +{ + DatanodeStatement *entry; + + entry = FetchDatanodeStatement(stmt_name, false); + if (entry) + { + int i; + List *nodelist = NIL; + + /* make a List of integers from node numbers */ + for (i = 0; i < entry->nodenum; i++) + nodelist = lappend_int(nodelist, entry->nodes[i]); + entry->nodenum = 0; + + ExecCloseRemoteStatement(stmt_name, nodelist); + + hash_search(datanode_queries, entry->stmt_name, HASH_REMOVE, NULL); + } +} + + +/* + * Return true if there is at least one active datanode statement, so acquired + * datanode connections should not be released + */ +bool +HaveActiveDatanodeStatements(void) +{ + HASH_SEQ_STATUS seq; + DatanodeStatement *entry; + + /* nothing cached */ + if (!datanode_queries) + return false; + + /* walk over cache */ + hash_seq_init(&seq, datanode_queries); + while ((entry = hash_seq_search(&seq)) != NULL) + { + /* Stop walking and return true */ + if (entry->nodenum > 0) + { + hash_seq_term(&seq); + return true; + } + } + /* nothing found */ + return false; +} + + +/* + * Mark datanode statement as active on specified node + * Return true if statement has already been active on the node and can be used + * Returns falsee if statement has not been active on the node and should be + * prepared on the node + */ +bool +ActivateDatanodeStatementOnNode(const char *stmt_name, int node) +{ + DatanodeStatement *entry; + int i; + + /* find the statement in cache */ + entry = FetchDatanodeStatement(stmt_name, true); + + /* see if statement already active on the node */ + for (i = 0; i < entry->nodenum; i++) + if (entry->nodes[i] == node) + return true; + + /* statement is not active on the specified node append item to the list */ + entry->nodes[entry->nodenum++] = node; + return false; +} +#endif diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 0c43e9479c..189c8b3fa2 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -793,6 +793,90 @@ ExecCopySlotMinimalTuple(TupleTableSlot *slot) slot->tts_isnull); } +#ifdef PGXC +/* -------------------------------- + * ExecCopySlotDatarow + * Obtain a copy of a slot's data row. The copy is + * palloc'd in the current memory context. + * Pointer to the datarow is returned as a var parameter, function + * returns the length of the data row + * The slot itself is undisturbed + * -------------------------------- + */ +int +ExecCopySlotDatarow(TupleTableSlot *slot, char **datarow) +{ + Assert(datarow); + + if (slot->tts_dataRow) + { + /* if we already have datarow make a copy */ + *datarow = (char *)palloc(slot->tts_dataLen); + memcpy(*datarow, slot->tts_dataRow, slot->tts_dataLen); + return slot->tts_dataLen; + } + else + { + TupleDesc tdesc = slot->tts_tupleDescriptor; + StringInfoData buf; + uint16 n16; + int i; + + initStringInfo(&buf); + /* Number of parameter values */ + n16 = htons(tdesc->natts); + appendBinaryStringInfo(&buf, (char *) &n16, 2); + + /* ensure we have all values */ + slot_getallattrs(slot); + for (i = 0; i < tdesc->natts; i++) + { + uint32 n32; + + if (slot->tts_isnull[i]) + { + n32 = htonl(-1); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + } + else + { + Form_pg_attribute attr = tdesc->attrs[i]; + Oid typOutput; + bool typIsVarlena; + Datum pval; + char *pstring; + int len; + + /* Get info needed to output the value */ + getTypeOutputInfo(attr->atttypid, &typOutput, &typIsVarlena); + /* + * If we have a toasted datum, forcibly detoast it here to avoid + * memory leakage inside the type's output routine. + */ + if (typIsVarlena) + pval = PointerGetDatum(PG_DETOAST_DATUM(slot->tts_values[i])); + else + pval = slot->tts_values[i]; + + /* Convert Datum to string */ + pstring = OidOutputFunctionCall(typOutput, pval); + + /* copy data to the buffer */ + len = strlen(pstring); + n32 = htonl(len); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + appendBinaryStringInfo(&buf, pstring, len); + } + } + /* copy data to the buffer */ + *datarow = palloc(buf.len); + memcpy(*datarow, buf.data, buf.len); + pfree(buf.data); + return buf.len; + } +} +#endif + /* -------------------------------- * ExecFetchSlotTuple * Fetch the slot's regular physical tuple. diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index aa0587f9c7..72e4d583a2 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -839,12 +839,16 @@ _copyRemoteQuery(RemoteQuery *from) COPY_NODE_FIELD(distinct); COPY_SCALAR_FIELD(read_only); COPY_SCALAR_FIELD(force_autocommit); + COPY_STRING_FIELD(statement); COPY_STRING_FIELD(cursor); + COPY_SCALAR_FIELD(exec_type); + COPY_SCALAR_FIELD(paramval_data); + COPY_SCALAR_FIELD(paramval_len); COPY_STRING_FIELD(relname); COPY_SCALAR_FIELD(remotejoin); - COPY_SCALAR_FIELD(reduce_level); - COPY_NODE_FIELD(base_tlist); + COPY_SCALAR_FIELD(reduce_level); + COPY_NODE_FIELD(base_tlist); COPY_STRING_FIELD(outer_alias); COPY_STRING_FIELD(inner_alias); COPY_SCALAR_FIELD(outer_reduce_level); @@ -867,6 +871,9 @@ _copyExecNodes(ExecNodes *from) COPY_NODE_FIELD(nodelist); COPY_SCALAR_FIELD(baselocatortype); COPY_SCALAR_FIELD(tableusagetype); + COPY_NODE_FIELD(expr); + COPY_SCALAR_FIELD(relid); + COPY_SCALAR_FIELD(accesstype); return newnode; } @@ -2305,7 +2312,9 @@ _copyQuery(Query *from) COPY_NODE_FIELD(limitCount); COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(setOperations); - +#ifdef PGXC + COPY_STRING_FIELD(sql_statement); +#endif return newnode; } diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index 790b81d317..4442310965 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -354,22 +354,15 @@ GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, case LOCATOR_TYPE_HASH: if (partValue != NULL) - { /* in prototype, all partitioned tables use same map */ exec_nodes->nodelist = lappend_int(NULL, get_node_from_hash(hash_range_int(*partValue))); - } else - { - /* If no info, go to node 1 */ if (accessType == RELATION_ACCESS_INSERT) + /* Insert NULL to node 1 */ exec_nodes->nodelist = lappend_int(NULL, 1); else - /* - * No partitioning value passed in - * (no where qualification on part column - use all) - */ + /* Use all nodes for other types of access */ exec_nodes->nodelist = list_copy(rel_loc_info->nodeList); - } break; case LOCATOR_TYPE_SINGLE: diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 8d900f162a..1a56b44e40 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -58,6 +58,19 @@ typedef struct long constant; /* assume long PGXCTODO - should be Datum */ } Literal_Comparison; +/* + * Comparison of partitioned column and expression + * Expression can be evaluated at execution time to determine target nodes + */ +typedef struct +{ + Oid relid; + RelationLocInfo *rel_loc_info; + Oid attrnum; + char *col_name; + Expr *expr; /* assume long PGXCTODO - should be Datum */ +} Expr_Comparison; + /* Parent-Child joins for relations being joined on * their respective hash distribuion columns */ @@ -75,6 +88,7 @@ typedef struct typedef struct { List *partitioned_literal_comps; /* List of Literal_Comparison */ + List *partitioned_expressions; /* List of Expr_Comparison */ List *partitioned_parent_child; /* List of Parent_Child_Join */ List *replicated_joins; @@ -127,6 +141,7 @@ typedef struct XCWalkerContext Query *query; RelationAccessType accessType; RemoteQuery *query_step; /* remote query step being analized */ + PlannerInfo *root; /* planner data for the subquery */ Special_Conditions *conditions; bool multilevel_join; List *rtables; /* a pointer to a list of rtables */ @@ -144,11 +159,12 @@ bool StrictStatementChecking = true; /* Forbid multi-node SELECT statements with an ORDER BY clause */ bool StrictSelectChecking = false; -static void get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType); +static void get_plan_nodes(PlannerInfo *root, RemoteQuery *step, RelationAccessType accessType); static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); static void InitXCWalkerContext(XCWalkerContext *context); +static RemoteQuery *makeRemoteQuery(void); static void validate_part_col_updatable(const Query *query); static bool is_pgxc_safe_func(Oid funcid); @@ -307,6 +323,7 @@ free_special_relations(Special_Conditions *special_conditions) /* free all items in list, including Literal_Comparison struct */ list_free_deep(special_conditions->partitioned_literal_comps); + list_free_deep(special_conditions->partitioned_expressions); /* free list, but not items pointed to */ list_free(special_conditions->partitioned_parent_child); @@ -451,8 +468,9 @@ get_base_var(Var *var, XCWalkerContext *context) * then the caller should use the regular PG planner */ static void -get_plan_nodes_insert(Query *query, RemoteQuery *step) +get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step) { + Query *query = root->parse; RangeTblEntry *rte; RelationLocInfo *rel_loc_info; Const *constant; @@ -502,15 +520,15 @@ get_plan_nodes_insert(Query *query, RemoteQuery *step) if (sub_rte->rtekind == RTE_SUBQUERY && !sub_rte->subquery->limitCount && !sub_rte->subquery->limitOffset) - get_plan_nodes(sub_rte->subquery, step, RELATION_ACCESS_READ); + get_plan_nodes(root, step, RELATION_ACCESS_READ); } /* Send to general planner if the query is multiple step */ if (!step->exec_nodes) return; - /* If the source is not hash-based (eg, replicated) also send - * through general planner + /* If the source is not hash-based (eg, replicated) also send + * through general planner */ if (step->exec_nodes->baselocatortype != LOCATOR_TYPE_HASH) { @@ -612,7 +630,18 @@ get_plan_nodes_insert(Query *query, RemoteQuery *step) } if (checkexpr == NULL) - return; /* no constant */ + { + /* try and determine nodes on execution time */ + step->exec_nodes = makeNode(ExecNodes); + step->exec_nodes->baselocatortype = rel_loc_info->locatorType; + step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; + step->exec_nodes->primarynodelist = NULL; + step->exec_nodes->nodelist = NULL; + step->exec_nodes->expr = eval_expr; + step->exec_nodes->relid = rel_loc_info->relid; + step->exec_nodes->accesstype = RELATION_ACCESS_INSERT; + return; + } constant = (Const *) checkexpr; @@ -788,7 +817,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) initStringInfo(&buf); /* Step 1: select tuple values by ctid */ - step1 = makeNode(RemoteQuery); + step1 = makeRemoteQuery(); appendStringInfoString(&buf, "SELECT "); for (att = 1; att <= natts; att++) { @@ -822,13 +851,11 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'", tableName, ctid_str); step1->sql_statement = pstrdup(buf.data); - step1->is_single_step = true; step1->exec_nodes = makeNode(ExecNodes); - step1->read_only = true; step1->exec_nodes->nodelist = list_make1_int(nodenum); /* Step 2: declare cursor for update target table */ - step2 = makeNode(RemoteQuery); + step2 = makeRemoteQuery(); resetStringInfo(&buf); appendStringInfoString(&buf, step->cursor); @@ -852,18 +879,14 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) } appendStringInfoString(&buf, "FOR UPDATE"); step2->sql_statement = pstrdup(buf.data); - step2->is_single_step = true; - step2->read_only = true; step2->exec_nodes = makeNode(ExecNodes); step2->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); innerPlan(step2) = (Plan *) step1; /* Step 3: move cursor to first position */ - step3 = makeNode(RemoteQuery); + step3 = makeRemoteQuery(); resetStringInfo(&buf); appendStringInfo(&buf, "MOVE %s", node_cursor); step3->sql_statement = pstrdup(buf.data); - step3->is_single_step = true; - step3->read_only = true; step3->exec_nodes = makeNode(ExecNodes); step3->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); innerPlan(step3) = (Plan *) step2; @@ -1024,7 +1047,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) if (!IsA(arg2, Const)) { /* this gets freed when the memory context gets freed */ - Expr *eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) arg2); + Expr *eval_expr = (Expr *) eval_const_expressions(context->root, (Node *) arg2); checkexpr = get_numeric_constant(eval_expr); } @@ -1176,6 +1199,32 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) */ return false; } + /* + * Check if it is an expression like pcol = expr, where pcol is + * a partitioning column of the rel1 and planner could not + * evaluate expr. We probably can evaluate it at execution time. + * Save the expression, and if we do not have other hint, + * try and evaluate it at execution time + */ + rel_loc_info1 = GetRelationLocInfo(column_base->relid); + + if (!rel_loc_info1) + return true; + + if (IsHashColumn(rel_loc_info1, column_base->colname)) + { + Expr_Comparison *expr_comp = + palloc(sizeof(Expr_Comparison)); + + expr_comp->relid = column_base->relid; + expr_comp->rel_loc_info = rel_loc_info1; + expr_comp->col_name = column_base->colname; + expr_comp->expr = arg2; + context->conditions->partitioned_expressions = + lappend(context->conditions->partitioned_expressions, + expr_comp); + return false; + } } } } @@ -1599,24 +1648,19 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) } if (rtesave) - { /* a single table, just grab it */ rel_loc_info = GetRelationLocInfo(rtesave->relid); + } - if (!rel_loc_info) - return true; + /* have complex case */ + if (!rel_loc_info) + return true; - context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, - NULL, - context->accessType); - } - } - else - { + if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH) + /* do not need to determine partitioning expression */ context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->accessType); - } /* Note replicated table usage for determining safe queries */ if (context->query_step->exec_nodes) @@ -1625,6 +1669,38 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) table_usage_type = TABLE_USAGE_TYPE_USER_REPLICATED; context->query_step->exec_nodes->tableusagetype = table_usage_type; + } else if (context->conditions->partitioned_expressions) { + /* probably we can determine nodes on execution time */ + foreach(lc, context->conditions->partitioned_expressions) { + Expr_Comparison *expr_comp = (Expr_Comparison *) lfirst(lc); + if (rel_loc_info->relid == expr_comp->relid) + { + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->baselocatortype = + rel_loc_info->locatorType; + context->query_step->exec_nodes->tableusagetype = + TABLE_USAGE_TYPE_USER; + context->query_step->exec_nodes->primarynodelist = NULL; + context->query_step->exec_nodes->nodelist = NULL; + context->query_step->exec_nodes->expr = expr_comp->expr; + context->query_step->exec_nodes->relid = expr_comp->relid; + context->query_step->exec_nodes->accesstype = context->accessType; + break; + } + } + } else { + /* run query on all nodes */ + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->baselocatortype = + rel_loc_info->locatorType; + context->query_step->exec_nodes->tableusagetype = + TABLE_USAGE_TYPE_USER; + context->query_step->exec_nodes->primarynodelist = NULL; + context->query_step->exec_nodes->nodelist = + list_copy(rel_loc_info->nodeList); + context->query_step->exec_nodes->expr = NULL; + context->query_step->exec_nodes->relid = NULL; + context->query_step->exec_nodes->accesstype = context->accessType; } } /* check for partitioned col comparison against a literal */ @@ -1712,6 +1788,7 @@ InitXCWalkerContext(XCWalkerContext *context) context->query = NULL; context->accessType = RELATION_ACCESS_READ; context->query_step = NULL; + context->root = NULL; context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); context->rtables = NIL; context->multilevel_join = false; @@ -1722,20 +1799,57 @@ InitXCWalkerContext(XCWalkerContext *context) context->join_list = NIL; } + +/* + * Create an instance of RemoteQuery and initialize fields + */ +static RemoteQuery * +makeRemoteQuery(void) +{ + RemoteQuery *result = makeNode(RemoteQuery); + result->is_single_step = true; + result->sql_statement = NULL; + result->exec_nodes = NULL; + result->combine_type = COMBINE_TYPE_NONE; + result->simple_aggregates = NIL; + result->sort = NULL; + result->distinct = NULL; + result->read_only = true; + result->force_autocommit = false; + result->cursor = NULL; + result->exec_type = EXEC_ON_DATANODES; + result->paramval_data = NULL; + result->paramval_len = 0; + + result->relname = NULL; + result->remotejoin = false; + result->partitioned_replicated = false; + result->reduce_level = 0; + result->base_tlist = NIL; + result->outer_alias = NULL; + result->inner_alias = NULL; + result->outer_reduce_level = 0; + result->inner_reduce_level = 0; + result->outer_relids = NULL; + result->inner_relids = NULL; + return result; +} + /* * Top level entry point before walking query to determine plan nodes * */ static void -get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType) +get_plan_nodes(PlannerInfo *root, RemoteQuery *step, RelationAccessType accessType) { + Query *query = root->parse; XCWalkerContext context; - InitXCWalkerContext(&context); context.query = query; context.accessType = accessType; context.query_step = step; + context.root = root; context.rtables = lappend(context.rtables, query->rtable); if ((get_plan_nodes_walker((Node *) query, &context) @@ -1754,24 +1868,24 @@ get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType) * */ static void -get_plan_nodes_command(Query *query, RemoteQuery *step) +get_plan_nodes_command(RemoteQuery *step, PlannerInfo *root) { - switch (query->commandType) + switch (root->parse->commandType) { case CMD_SELECT: - get_plan_nodes(query, step, query->rowMarks ? + get_plan_nodes(root, step, root->parse->rowMarks ? RELATION_ACCESS_READ_FOR_UPDATE : RELATION_ACCESS_READ); break; case CMD_INSERT: - get_plan_nodes_insert(query, step); + get_plan_nodes_insert(root, step); break; case CMD_UPDATE: case CMD_DELETE: /* treat as a select */ - get_plan_nodes(query, step, RELATION_ACCESS_UPDATE); + get_plan_nodes(root, step, RELATION_ACCESS_UPDATE); break; default: @@ -2589,9 +2703,41 @@ PlannedStmt * pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result; - Plan *standardPlan; + PlannerGlobal *glob; + PlannerInfo *root; RemoteQuery *query_step; + StringInfoData buf; + /* + * Set up global state for this planner invocation. This data is needed + * across all levels of sub-Query that might exist in the given command, + * so we keep it in a separate struct that's linked to by each per-Query + * PlannerInfo. + */ + glob = makeNode(PlannerGlobal); + + glob->boundParams = boundParams; + glob->paramlist = NIL; + glob->subplans = NIL; + glob->subrtables = NIL; + glob->rewindPlanIDs = NULL; + glob->finalrtable = NIL; + glob->relationOids = NIL; + glob->invalItems = NIL; + glob->lastPHId = 0; + glob->transientPlan = false; + + /* Create a PlannerInfo data structure, usually it is done for a subquery */ + root = makeNode(PlannerInfo); + root->parse = query; + root->glob = glob; + root->query_level = 1; + root->parent_root = NULL; + root->planner_cxt = CurrentMemoryContext; + root->init_plans = NIL; + root->cte_plan_ids = NIL; + root->eq_classes = NIL; + root->append_rel_list = NIL; /* build the PlannedStmt result */ result = makeNode(PlannedStmt); @@ -2603,184 +2749,151 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) result->intoClause = query->intoClause; result->rtable = query->rtable; - query_step = makeNode(RemoteQuery); - query_step->is_single_step = false; + query_step = makeRemoteQuery(); + + /* Optimize multi-node handling */ + query_step->read_only = query->commandType == CMD_SELECT; if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options; - query_step->exec_nodes = NULL; - query_step->combine_type = COMBINE_TYPE_NONE; - query_step->simple_aggregates = NULL; - /* Optimize multi-node handling */ - query_step->read_only = query->commandType == CMD_SELECT; - query_step->force_autocommit = false; - result->planTree = (Plan *) query_step; - /* - * Determine where to execute the command, either at the Coordinator - * level, Data Nodes, or both. By default we choose both. We should be - * able to quickly expand this for more commands. - */ - switch (query->commandType) - { - case CMD_SELECT: - /* Perform some checks to make sure we can support the statement */ - if (query->intoClause) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("INTO clause not yet supported")))); - /* fallthru */ - case CMD_INSERT: - case CMD_UPDATE: - case CMD_DELETE: - /* PGXCTODO: This validation will not be removed - * until we support moving tuples from one node to another - * when the partition column of a table is updated - */ - if (query->commandType == CMD_UPDATE) - validate_part_col_updatable(query); - - if (query->returningList) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("RETURNING clause not yet supported")))); - - /* Set result relations */ - if (query->commandType != CMD_SELECT) - result->resultRelations = list_make1_int(query->resultRelation); + /* Perform some checks to make sure we can support the statement */ + if (query->commandType == CMD_SELECT && query->intoClause) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("INTO clause not yet supported")))); - get_plan_nodes_command(query, query_step); + /* PGXCTODO: This validation will not be removed + * until we support moving tuples from one node to another + * when the partition column of a table is updated + */ + if (query->commandType == CMD_UPDATE) + validate_part_col_updatable(query); - if (query_step->exec_nodes == NULL) - { - /* Do not yet allow multi-node correlated UPDATE or DELETE */ - if (query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) - { - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("UPDATE and DELETE that are correlated or use non-immutable functions not yet supported")))); - } + if (query->returningList) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("RETURNING clause not yet supported")))); - /* - * Processing guery against catalog tables, or multi-step command. - * Run through standard planner - */ - result = standard_planner(query, cursorOptions, boundParams); - return result; - } + /* Set result relations */ + if (query->commandType != CMD_SELECT) + result->resultRelations = list_make1_int(query->resultRelation); - /* Do not yet allow multi-node correlated UPDATE or DELETE */ - if ((query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) - && !query_step->exec_nodes - && list_length(query->rtable) > 1) - { - result = standard_planner(query, cursorOptions, boundParams); - return result; - } + get_plan_nodes_command(query_step, root); - /* - * get_plan_nodes_command may alter original statement, so do not - * process it before the call - * - * Declare Cursor case: - * We should leave as a step query only SELECT statement - * Further if we need refer source statement for planning we should take - * the truncated string - */ - if (query->utilityStmt && - IsA(query->utilityStmt, DeclareCursorStmt)) - { + if (query_step->exec_nodes == NULL) + { + /* Do not yet allow multi-node correlated UPDATE or DELETE */ + if (query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) + { + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("UPDATE and DELETE that are correlated or use non-immutable functions not yet supported")))); + } - /* search for SELECT keyword in the normalized string */ - char *select = strpos(query->sql_statement, " SELECT "); - /* Take substring of the original string using found offset */ - query_step->sql_statement = pstrdup(select + 1); - } - else - query_step->sql_statement = pstrdup(query->sql_statement); + /* + * Processing guery against catalog tables, or multi-step command. + * Run through standard planner + */ + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - /* - * If there already is an active portal, we may be doing planning - * within a function. Just use the standard plan, but check if - * it is part of an EXPLAIN statement so that we do not show that - * we plan multiple steps when it is a single-step operation. - */ - if (ActivePortal && strcmp(ActivePortal->commandTag, "EXPLAIN")) - return standard_planner(query, cursorOptions, boundParams); + /* Do not yet allow multi-node correlated UPDATE or DELETE */ + if ((query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) + && !query_step->exec_nodes + && list_length(query->rtable) > 1) + { + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - query_step->is_single_step = true; - /* - * PGXCTODO - * When Postgres runs insert into t (a) values (1); against table - * defined as create table t (a int, b int); the plan is looking - * like insert into t (a,b) values (1,null); - * Later executor is verifying plan, to make sure table has not - * been altered since plan has been created and comparing table - * definition with plan target list and output error if they do - * not match. - * I could not find better way to generate targetList for pgxc plan - * then call standard planner and take targetList from the plan - * generated by Postgres. - */ - query_step->scan.plan.targetlist = query->targetList; + /* + * Deparse query tree to get step query. It may be modified later on + */ + initStringInfo(&buf); + deparse_query(query, &buf, NIL); + query_step->sql_statement = pstrdup(buf.data); + pfree(buf.data); - if (query_step->exec_nodes) - query_step->combine_type = get_plan_combine_type( - query, query_step->exec_nodes->baselocatortype); + query_step->is_single_step = true; + /* + * PGXCTODO + * When Postgres runs insert into t (a) values (1); against table + * defined as create table t (a int, b int); the plan is looking + * like insert into t (a,b) values (1,null); + * Later executor is verifying plan, to make sure table has not + * been altered since plan has been created and comparing table + * definition with plan target list and output error if they do + * not match. + * I could not find better way to generate targetList for pgxc plan + * then call standard planner and take targetList from the plan + * generated by Postgres. + */ + query_step->scan.plan.targetlist = query->targetList; - /* Set up simple aggregates */ - /* PGXCTODO - we should detect what types of aggregates are used. - * in some cases we can avoid the final step and merely proxy results - * (when there is only one data node involved) instead of using - * coordinator consolidation. At the moment this is needed for AVG() - */ - query_step->simple_aggregates = get_simple_aggregates(query); + if (query_step->exec_nodes) + query_step->combine_type = get_plan_combine_type( + query, query_step->exec_nodes->baselocatortype); + + /* Set up simple aggregates */ + /* PGXCTODO - we should detect what types of aggregates are used. + * in some cases we can avoid the final step and merely proxy results + * (when there is only one data node involved) instead of using + * coordinator consolidation. At the moment this is needed for AVG() + */ + query_step->simple_aggregates = get_simple_aggregates(query); - /* - * Add sorting to the step - */ - if (list_length(query_step->exec_nodes->nodelist) > 1 && - (query->sortClause || query->distinctClause)) - make_simple_sort_from_sortclauses(query, query_step); + /* + * Add sorting to the step + */ + if (list_length(query_step->exec_nodes->nodelist) > 1 && + (query->sortClause || query->distinctClause)) + make_simple_sort_from_sortclauses(query, query_step); - /* Handle LIMIT and OFFSET for single-step queries on multiple nodes */ - if (handle_limit_offset(query_step, query, result)) - { - /* complicated expressions, just fallback to standard plan */ - result = standard_planner(query, cursorOptions, boundParams); - return result; - } + /* Handle LIMIT and OFFSET for single-step queries on multiple nodes */ + if (handle_limit_offset(query_step, query, result)) + { + /* complicated expressions, just fallback to standard plan */ + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - /* - * Use standard plan if we have more than one data node with either - * group by, hasWindowFuncs, or hasRecursive - */ - /* - * PGXCTODO - this could be improved to check if the first - * group by expression is the partitioning column, in which - * case it is ok to treat as a single step. - */ - if (query->commandType == CMD_SELECT - && query_step->exec_nodes - && list_length(query_step->exec_nodes->nodelist) > 1 - && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) - { - result->planTree = standardPlan; - return result; - } - break; + /* + * Use standard plan if we have more than one data node with either + * group by, hasWindowFuncs, or hasRecursive + */ + /* + * PGXCTODO - this could be improved to check if the first + * group by expression is the partitioning column, in which + * case it is ok to treat as a single step. + */ + if (query->commandType == CMD_SELECT + && query_step->exec_nodes + && list_length(query_step->exec_nodes->nodelist) > 1 + && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) + { + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - default: - /* Allow for override */ - if (StrictStatementChecking) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("This command is not yet supported.")))); - else - result->planTree = standardPlan; + /* Allow for override */ + /* AM: Is this ever possible? */ + if (query->commandType != CMD_SELECT && + query->commandType != CMD_INSERT && + query->commandType != CMD_UPDATE && + query->commandType != CMD_DELETE) + { + if (StrictStatementChecking) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("This command is not yet supported.")))); + else + result = standard_planner(query, cursorOptions, boundParams); + return result; } /* @@ -2808,6 +2921,13 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) } /* + * Assume single step. If there are multiple steps we should make up + * parameters for each step where they referenced + */ + if (boundParams) + query_step->paramval_len = ParamListToDataRow(boundParams, + &query_step->paramval_data); + /* * If query is FOR UPDATE fetch CTIDs from the remote node * Use CTID as a key to update tuples on remote nodes when handling * WHERE CURRENT OF @@ -3068,7 +3188,7 @@ validate_part_col_updatable(const Query *query) * * Based on is_immutable_func from postgresql_fdw.c * We add an exeption for base postgresql functions, to - * allow now() and others to still execute as part of single step + * allow now() and others to still execute as part of single step * queries. * * PGXCTODO - we currently make the false assumption that immutable diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index b954003db5..a387354e7b 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -19,6 +19,7 @@ #include "postgres.h" #include "access/gtm.h" #include "access/xact.h" +#include "commands/prepare.h" #include "executor/executor.h" #include "gtm/gtm_c.h" #include "libpq/libpq.h" @@ -27,6 +28,7 @@ #include "pgxc/poolmgr.h" #include "storage/ipc.h" #include "utils/datum.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/tuplesort.h" #include "utils/snapmgr.h" @@ -38,9 +40,6 @@ #define DATA_NODE_FETCH_SIZE 1 - -extern char *deparseSql(RemoteQueryState *scanstate); - /* * Buffer size does not affect performance significantly, just do not allow * connection buffer grows infinitely @@ -62,6 +61,9 @@ static int pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransacti PGXCNodeAllHandles * pgxc_handles, char *gid); static int pgxc_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, PGXCNodeAllHandles * pgxc_handles, char *gid); +static PGXCNodeAllHandles * get_exec_connections(RemoteQueryState *planstate, + ExecNodes *exec_nodes, + RemoteQueryExecType exec_type); static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, GlobalTransactionId commit_xid, PGXCNodeAllHandles * pgxc_handles, @@ -70,8 +72,6 @@ static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, static int pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid, PGXCNodeAllHandles * pgxc_handles, char *gid); -static PGXCNodeAllHandles * get_exec_connections(ExecNodes *exec_nodes, - RemoteQueryExecType exec_type); static int pgxc_node_receive_and_validate(const int conn_count, PGXCNodeHandle ** connections, bool reset_combiner); @@ -1265,10 +1265,10 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) /* * If we are in the process of shutting down, we - * may be rolling back, and the buffer may contain other messages. - * We want to avoid a procarray exception - * as well as an error stack overflow. - */ + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ if (proc_exit_inprogress) conn->state = DN_CONNECTION_STATE_ERROR_FATAL; @@ -1364,10 +1364,11 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) /* sync lost? */ elog(WARNING, "Received unsupported message type: %c", msg_type); conn->state = DN_CONNECTION_STATE_ERROR_FATAL; - return RESPONSE_EOF; + /* stop reading */ + return RESPONSE_COMPLETE; } } - + /* never happen, but keep compiler quiet */ return RESPONSE_EOF; } @@ -2746,7 +2747,6 @@ RemoteQueryState * ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) { RemoteQueryState *remotestate; - Relation currentRelation; remotestate = CreateResponseCombiner(0, node->combine_type); remotestate->ss.ps.plan = (Plan *) node; @@ -2788,6 +2788,19 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) ALLOCSET_DEFAULT_MAXSIZE); } + /* + * If we have parameter values here and planner has not had them we + * should prepare them now + */ + if (estate->es_param_list_info && !node->paramval_data) + node->paramval_len = ParamListToDataRow(estate->es_param_list_info, + &node->paramval_data); + + /* We need expression context to evaluate */ + if (node->exec_nodes && node->exec_nodes->expr) + ExecAssignExprContext(estate, &remotestate->ss.ps); + + if (innerPlan(node)) innerPlanState(remotestate) = ExecInitNode(innerPlan(node), estate, eflags); @@ -2853,7 +2866,8 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) * Datanodes Only, Coordinators only or both types */ static PGXCNodeAllHandles * -get_exec_connections(ExecNodes *exec_nodes, +get_exec_connections(RemoteQueryState *planstate, + ExecNodes *exec_nodes, RemoteQueryExecType exec_type) { List *nodelist = NIL; @@ -2873,8 +2887,34 @@ get_exec_connections(ExecNodes *exec_nodes, if (exec_nodes) { - nodelist = exec_nodes->nodelist; - primarynode = exec_nodes->primarynodelist; + if (exec_nodes->expr) + { + /* execution time determining of target data nodes */ + bool isnull; + ExprState *estate = ExecInitExpr(exec_nodes->expr, + (PlanState *) planstate); + Datum partvalue = ExecEvalExpr(estate, + planstate->ss.ps.ps_ExprContext, + &isnull, + NULL); + if (!isnull) + { + RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->relid); + ExecNodes *nodes = GetRelationNodes(rel_loc_info, + (long *) &partvalue, + exec_nodes->accesstype); + if (nodes) + { + nodelist = nodes->nodelist; + primarynode = nodes->primarynodelist; + pfree(nodes); + } + FreeRelationLocInfo(rel_loc_info); + } + } else { + nodelist = exec_nodes->nodelist; + primarynode = exec_nodes->primarynodelist; + } } if (list_length(nodelist) == 0 && @@ -2961,212 +3001,273 @@ register_write_nodes(int conn_count, PGXCNodeHandle **connections) } } -/* - * Execute step of PGXC plan. - * The step specifies a command to be executed on specified nodes. - * On first invocation connections to the data nodes are initialized and - * command is executed. Further, as well as within subsequent invocations, - * responses are received until step is completed or there is a tuple to emit. - * If there is a tuple it is returned, otherwise returned NULL. The NULL result - * from the function indicates completed step. - * The function returns at most one tuple per invocation. - */ -TupleTableSlot * -ExecRemoteQuery(RemoteQueryState *node) + +static void +do_query(RemoteQueryState *node) { RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; - TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; - bool have_tuple = false; + bool force_autocommit = step->force_autocommit; + bool is_read_only = step->read_only; + GlobalTransactionId gxid = InvalidGlobalTransactionId; + Snapshot snapshot = GetActiveSnapshot(); + TimestampTz timestamp = GetCurrentGTMStartTimestamp(); + PGXCNodeHandle **connections = NULL; + PGXCNodeHandle *primaryconnection = NULL; + int i; + int regular_conn_count; + int total_conn_count; + bool need_tran; + PGXCNodeAllHandles *pgxc_connections; + /* + * Get connections for Datanodes only, utilities and DDLs + * are launched in ExecRemoteUtility + */ + pgxc_connections = get_exec_connections(node, step->exec_nodes, + EXEC_ON_DATANODES); - if (!node->query_Done) + connections = pgxc_connections->datanode_handles; + primaryconnection = pgxc_connections->primary_handle; + total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count; + + /* + * Primary connection is counted separately but is included in total_conn_count if used. + */ + if (primaryconnection) { - /* First invocation, initialize */ - bool force_autocommit = step->force_autocommit; - bool is_read_only = step->read_only; - GlobalTransactionId gxid = InvalidGlobalTransactionId; - Snapshot snapshot = GetActiveSnapshot(); - TimestampTz timestamp = GetCurrentGTMStartTimestamp(); - PGXCNodeHandle **connections = NULL; - PGXCNodeHandle *primaryconnection = NULL; - int i; - int regular_conn_count; - int total_conn_count; - bool need_tran; - PGXCNodeAllHandles *pgxc_connections; - TupleTableSlot *innerSlot = NULL; - - implicit_force_autocommit = force_autocommit; + regular_conn_count--; + } - /* - * Inner plan for RemoteQuery supplies parameters. - * We execute inner plan to get a tuple and use values of the tuple as - * parameter values when executing this remote query. - * If returned slot contains NULL tuple break execution. - * TODO there is a problem how to handle the case if both inner and - * outer plans exist. We can decide later, since it is never used now. - */ - if (innerPlanState(node)) - { - innerSlot = ExecProcNode(innerPlanState(node)); -// if (TupIsNull(innerSlot)) -// return innerSlot; - } + pfree(pgxc_connections); - /* - * Get connections for Datanodes only, utilities and DDLs - * are launched in ExecRemoteUtility - */ - pgxc_connections = get_exec_connections(step->exec_nodes, - EXEC_ON_DATANODES); + /* + * We save only regular connections, at the time we exit the function + * we finish with the primary connection and deal only with regular + * connections on subsequent invocations + */ + node->node_count = regular_conn_count; - connections = pgxc_connections->datanode_handles; - primaryconnection = pgxc_connections->primary_handle; - total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count; + if (force_autocommit) + need_tran = false; + else + need_tran = !autocommit || (!is_read_only && total_conn_count > 1); - /* - * Primary connection is counted separately but is included in total_conn_count if used. - */ + elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + + stat_statement(); + if (autocommit) + { + stat_transaction(total_conn_count); + /* We normally clear for transactions, but if autocommit, clear here, too */ + clear_write_node_list(); + } + + if (!is_read_only) + { if (primaryconnection) - { - regular_conn_count--; - } + register_write_nodes(1, &primaryconnection); + register_write_nodes(regular_conn_count, connections); + } + + gxid = GetCurrentGlobalTransactionId(); - pfree(pgxc_connections); + if (!GlobalTransactionIdIsValid(gxid)) + { + if (primaryconnection) + pfree(primaryconnection); + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to get next transaction ID"))); + } + if (need_tran) + { /* - * We save only regular connections, at the time we exit the function - * we finish with the primary connection and deal only with regular - * connections on subsequent invocations + * Check if data node connections are in transaction and start + * transactions on nodes where it is not started */ - node->node_count = regular_conn_count; + PGXCNodeHandle *new_connections[total_conn_count]; + int new_count = 0; - if (force_autocommit) - need_tran = false; - else - need_tran = !autocommit || (!is_read_only && total_conn_count > 1); + if (primaryconnection && primaryconnection->transaction_status != 'T') + new_connections[new_count++] = primaryconnection; + for (i = 0; i < regular_conn_count; i++) + if (connections[i]->transaction_status != 'T') + new_connections[new_count++] = connections[i]; + + if (new_count && pgxc_node_begin(new_count, new_connections, gxid)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not begin transaction on data nodes."))); + } - elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + /* See if we have a primary node, execute on it first before the others */ + if (primaryconnection) + { + if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(primaryconnection); - stat_statement(); - if (autocommit) + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) { - stat_transaction(total_conn_count); - /* We normally clear for transactions, but if autocommit, clear here, too */ - clear_write_node_list(); + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); } - - if (!is_read_only) + if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp)) { - if (primaryconnection) - register_write_nodes(1, &primaryconnection); - register_write_nodes(regular_conn_count, connections); + /* + * If a transaction involves multiple connections timestamp is + * always sent down to Datanodes with pgxc_node_begin. + * An autocommit transaction needs the global timestamp also, + * so handle this case here. + */ + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); } - - gxid = GetCurrentGlobalTransactionId(); - - if (!GlobalTransactionIdIsValid(gxid)) + if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot)) { - if (primaryconnection) - pfree(primaryconnection); pfree(connections); + pfree(primaryconnection); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to get next transaction ID"))); + errmsg("Failed to send command to data nodes"))); } - - if (need_tran) + if (step->statement || step->cursor || step->paramval_data) { + /* need to use Extended Query Protocol */ + int fetch = 0; + bool prepared = false; + + /* if prepared statement is referenced see if it is already exist */ + if (step->statement) + prepared = ActivateDatanodeStatementOnNode(step->statement, + primaryconnection->nodenum); /* - * Check if data node connections are in transaction and start - * transactions on nodes where it is not started + * execute and fetch rows only if they will be consumed + * immediately by the sorter */ - PGXCNodeHandle *new_connections[total_conn_count]; - int new_count = 0; - - if (primaryconnection && primaryconnection->transaction_status != 'T') - new_connections[new_count++] = primaryconnection; - for (i = 0; i < regular_conn_count; i++) - if (connections[i]->transaction_status != 'T') - new_connections[new_count++] = connections[i]; - - if (new_count && pgxc_node_begin(new_count, new_connections, gxid)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Could not begin transaction on data nodes."))); - } - - /* See if we have a primary node, execute on it first before the others */ - if (primaryconnection) - { - if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(primaryconnection); - - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) + if (step->cursor) + fetch = 1; + + if (pgxc_node_send_query_extended(primaryconnection, + prepared ? NULL : step->sql_statement, + step->statement, + step->cursor, + step->paramval_len, + step->paramval_data, + step->read_only, + fetch) != 0) { pfree(connections); - pfree(primaryconnection); + if (primaryconnection) + pfree(primaryconnection); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp)) + } + else + { + if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0) { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ pfree(connections); pfree(primaryconnection); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot)) - { - pfree(connections); - pfree(primaryconnection); + } + primaryconnection->combiner = node; + Assert(node->combine_type == COMBINE_TYPE_SAME); + + while (node->command_complete_count < 1) + { + if (pgxc_node_receive(1, &primaryconnection, NULL)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0) + errmsg("Failed to read response from data nodes"))); + handle_response(primaryconnection, node); + if (node->errorMessage) { - pfree(connections); - pfree(primaryconnection); + char *code = node->errorCode; ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - Assert(node->combine_type == COMBINE_TYPE_SAME); - - while (node->command_complete_count < 1) - { - if (pgxc_node_receive(1, &primaryconnection, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); - handle_response(primaryconnection, node); - if (node->errorMessage) - { - char *code = node->errorCode; - ereport(ERROR, - (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), - errmsg("%s", node->errorMessage))); - } + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); } } + } - for (i = 0; i < regular_conn_count; i++) + for (i = 0; i < regular_conn_count; i++) + { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) { - if (connections[i]->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(connections[i]); - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp)) + { + /* + * If a transaction involves multiple connections timestamp is + * always sent down to Datanodes with pgxc_node_begin. + * An autocommit transaction needs the global timestamp also, + * so handle this case here. + */ + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot)) + { + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (step->statement || step->cursor || step->paramval_data) + { + /* need to use Extended Query Protocol */ + int fetch = 0; + bool prepared = false; + + /* if prepared statement is referenced see if it is already exist */ + if (step->statement) + prepared = ActivateDatanodeStatementOnNode(step->statement, + connections[i]->nodenum); + /* + * execute and fetch rows only if they will be consumed + * immediately by the sorter + */ + if (step->cursor) + fetch = 1; + + if (pgxc_node_send_query_extended(connections[i], + prepared ? NULL : step->sql_statement, + step->statement, + step->cursor, + step->paramval_len, + step->paramval_data, + step->read_only, + fetch) != 0) { pfree(connections); if (primaryconnection) @@ -3175,14 +3276,11 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp)) + } + else + { + if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ pfree(connections); if (primaryconnection) pfree(primaryconnection); @@ -3190,176 +3288,163 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot)) + } + connections[i]->combiner = node; + } + + if (step->cursor) + { + node->cursor = step->cursor; + node->cursor_count = regular_conn_count; + node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *)); + memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *)); + } + + /* + * Stop if all commands are completed or we got a data row and + * initialized state node for subsequent invocations + */ + while (regular_conn_count > 0 && node->connections == NULL) + { + int i = 0; + + if (pgxc_node_receive(regular_conn_count, connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + /* + * Handle input from the data nodes. + * If we got a RESPONSE_DATAROW we can break handling to wrap + * it into a tuple and return. Handling will be continued upon + * subsequent invocations. + * If we got 0, we exclude connection from the list. We do not + * expect more input from it. In case of non-SELECT query we quit + * the loop when all nodes finish their work and send ReadyForQuery + * with empty connections array. + * If we got EOF, move to the next connection, will receive more + * data on the next iteration. + */ + while (i < regular_conn_count) + { + int res = handle_response(connections[i], node); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); + if (i < --regular_conn_count) + connections[i] = connections[regular_conn_count]; } - if (step->cursor || innerSlot) + else if (res == RESPONSE_TUPDESC) { - int fetch = 0; + ExecSetSlotDescriptor(scanslot, node->tuple_desc); /* - * execute and fetch rows only if they will be consumed - * immediately by the sorter + * Now tuple table slot is responsible for freeing the + * descriptor */ - if (step->cursor) - fetch = 1; - /* - * Here is ad hoc handling of WHERE CURRENT OF clause. - * Previous step have returned the key values to update - * replicated tuple into innerSlot and the query is a - * DECLARE CURSOR. We do not want to execute this query now - * because of unusual fetch procedure: we just need to - * position cursor on the first row on each node. - * So here we just parse and bind query. As the result cursor - * specified by the statement will be created, and next step - * will issue MOVE command positioning the cursor properly. - * If we want to use parametrized requests for any other - * purpose we should return and rework this code - */ - if (pgxc_node_send_query_extended(connections[i], - step->sql_statement, - NULL, - step->cursor, - innerSlot ? innerSlot->tts_dataLen : 0, - innerSlot ? innerSlot->tts_dataRow : NULL, - step->cursor != NULL, - fetch) != 0) + node->tuple_desc = NULL; + if (step->sort) { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); + SimpleSort *sort = step->sort; + + node->connections = connections; + node->conn_count = regular_conn_count; + /* + * First message is already in the buffer + * Further fetch will be under tuplesort control + * If query does not produce rows tuplesort will not + * be initialized + */ + node->tuplesortstate = tuplesort_begin_merge( + scanslot->tts_tupleDescriptor, + sort->numCols, + sort->sortColIdx, + sort->sortOperators, + sort->nullsFirst, + node, + work_mem); + /* + * Break the loop, do not wait for first row. + * Tuplesort module want to control node it is + * fetching rows from, while in this loop first + * row would be got from random node + */ + break; } } - else + else if (res == RESPONSE_DATAROW) { - if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } + /* + * Got first data row, quit the loop + */ + node->connections = connections; + node->conn_count = regular_conn_count; + node->current_conn = i; + break; } - connections[i]->combiner = node; } + } + + if (node->cursor_count) + { + node->conn_count = node->cursor_count; + memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *)); + node->connections = connections; + } +} + + +/* + * Execute step of PGXC plan. + * The step specifies a command to be executed on specified nodes. + * On first invocation connections to the data nodes are initialized and + * command is executed. Further, as well as within subsequent invocations, + * responses are received until step is completed or there is a tuple to emit. + * If there is a tuple it is returned, otherwise returned NULL. The NULL result + * from the function indicates completed step. + * The function returns at most one tuple per invocation. + */ +TupleTableSlot * +ExecRemoteQuery(RemoteQueryState *node) +{ + RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; + TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; + TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; + bool have_tuple = false; - if (step->cursor) - { - node->cursor = step->cursor; - node->cursor_count = regular_conn_count; - node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *)); - memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *)); - } + if (!node->query_Done) + { /* - * Stop if all commands are completed or we got a data row and - * initialized state node for subsequent invocations + * Inner plan for RemoteQuery supplies parameters. + * We execute inner plan to get a tuple and use values of the tuple as + * parameter values when executing this remote query. + * If returned slot contains NULL tuple break execution. + * TODO there is a problem how to handle the case if both inner and + * outer plans exist. We can decide later, since it is never used now. */ - while (regular_conn_count > 0 && node->connections == NULL) + if (innerPlanState(node)) { - int i = 0; - - if (pgxc_node_receive(regular_conn_count, connections, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); + TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); /* - * Handle input from the data nodes. - * If we got a RESPONSE_DATAROW we can break handling to wrap - * it into a tuple and return. Handling will be continued upon - * subsequent invocations. - * If we got 0, we exclude connection from the list. We do not - * expect more input from it. In case of non-SELECT query we quit - * the loop when all nodes finish their work and send ReadyForQuery - * with empty connections array. - * If we got EOF, move to the next connection, will receive more - * data on the next iteration. + * Use data row returned by the previus step as a parameters for + * the main query. + * Exit if no more slots. */ - while (i < regular_conn_count) - { - int res = handle_response(connections[i], node); - if (res == RESPONSE_EOF) - { - i++; - } - else if (res == RESPONSE_COMPLETE) - { - if (i < --regular_conn_count) - connections[i] = connections[regular_conn_count]; - } - else if (res == RESPONSE_TUPDESC) - { - ExecSetSlotDescriptor(scanslot, node->tuple_desc); - /* - * Now tuple table slot is responsible for freeing the - * descriptor - */ - node->tuple_desc = NULL; - if (step->sort) - { - SimpleSort *sort = step->sort; - - node->connections = connections; - node->conn_count = regular_conn_count; - /* - * First message is already in the buffer - * Further fetch will be under tuplesort control - * If query does not produce rows tuplesort will not - * be initialized - */ - node->tuplesortstate = tuplesort_begin_merge( - scanslot->tts_tupleDescriptor, - sort->numCols, - sort->sortColIdx, - sort->sortOperators, - sort->nullsFirst, - node, - work_mem); - /* - * Break the loop, do not wait for first row. - * Tuplesort module want to control node it is - * fetching rows from, while in this loop first - * row would be got from random node - */ - break; - } - } - else if (res == RESPONSE_DATAROW) - { - /* - * Got first data row, quit the loop - */ - node->connections = connections; - node->conn_count = regular_conn_count; - node->current_conn = i; - break; - } - } + if (!TupIsNull(innerSlot)) + step->paramval_len = ExecCopySlotDatarow(innerSlot, + &step->paramval_data); } - if (node->cursor_count) - { - node->conn_count = node->cursor_count; - memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *)); - node->connections = connections; - } + do_query(node); node->query_Done = true; } if (node->update_cursor) { - PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES); + PGXCNodeAllHandles *all_dn_handles = get_exec_connections(node, NULL, EXEC_ON_DATANODES); close_node_cursors(all_dn_handles->datanode_handles, all_dn_handles->dn_conn_count, node->update_cursor); @@ -3367,6 +3452,7 @@ ExecRemoteQuery(RemoteQueryState *node) node->update_cursor = NULL; } +handle_results: if (node->tuplesortstate) { while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, @@ -3458,12 +3544,22 @@ ExecRemoteQuery(RemoteQueryState *node) return resultslot; /* - * If coordinator plan is specified execute it first. - * If the plan is returning we are returning these tuples immediately. - * If it is not returning or returned them all by current invocation - * we will go ahead and execute remote query. Then we will never execute - * the outer plan again because node->query_Done flag will be set and - * execution won't get to that place. + * We can not use recursion here. We can run out of the stack memory if + * inner node returns long result set and this node does not returns rows + * (like INSERT ... SELECT) + */ + if (innerPlanState(node)) + { + TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); + if (!TupIsNull(innerSlot)) + { + do_query(node); + goto handle_results; + } + } + + /* + * Execute outer plan if specified */ if (outerPlanState(node)) { @@ -3557,7 +3653,7 @@ ExecEndRemoteQuery(RemoteQueryState *node) if (node->update_cursor) { - PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES); + PGXCNodeAllHandles *all_dn_handles = get_exec_connections(node, NULL, EXEC_ON_DATANODES); close_node_cursors(all_dn_handles->datanode_handles, all_dn_handles->dn_conn_count, node->update_cursor); @@ -3566,6 +3662,16 @@ ExecEndRemoteQuery(RemoteQueryState *node) } /* + * Clean up parameters if they were set, since plan may be reused + */ + if (((RemoteQuery *) node->ss.ps.plan)->paramval_data) + { + pfree(((RemoteQuery *) node->ss.ps.plan)->paramval_data); + ((RemoteQuery *) node->ss.ps.plan)->paramval_data = NULL; + ((RemoteQuery *) node->ss.ps.plan)->paramval_len = 0; + } + + /* * shut down the subplan */ if (outerPlanState(node)) @@ -3631,6 +3737,74 @@ close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor) ValidateAndCloseCombiner(combiner); } + +/* + * Encode parameter values to format of DataRow message (the same format is + * used in Bind) to prepare for sending down to data nodes. + * The buffer to store encoded value is palloc'ed and returned as the result + * parameter. Function returns size of the result + */ +int +ParamListToDataRow(ParamListInfo params, char** result) +{ + StringInfoData buf; + uint16 n16; + int i; + + initStringInfo(&buf); + /* Number of parameter values */ + n16 = htons(params->numParams); + appendBinaryStringInfo(&buf, (char *) &n16, 2); + + /* Parameter values */ + for (i = 0; i < params->numParams; i++) + { + ParamExternData *param = params->params + i; + uint32 n32; + if (param->isnull) + { + n32 = htonl(-1); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + } + else + { + Oid typOutput; + bool typIsVarlena; + Datum pval; + char *pstring; + int len; + + /* Get info needed to output the value */ + getTypeOutputInfo(param->ptype, &typOutput, &typIsVarlena); + + /* + * If we have a toasted datum, forcibly detoast it here to avoid + * memory leakage inside the type's output routine. + */ + if (typIsVarlena) + pval = PointerGetDatum(PG_DETOAST_DATUM(param->value)); + else + pval = param->value; + + /* Convert Datum to string */ + pstring = OidOutputFunctionCall(typOutput, pval); + + /* copy data to the buffer */ + len = strlen(pstring); + n32 = htonl(len); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + appendBinaryStringInfo(&buf, pstring, len); + } + } + + /* Take data from the buffer */ + *result = palloc(buf.len); + memcpy(*result, buf.data, buf.len); + pfree(buf.data); + return buf.len; +} + + /* * Consume any remaining messages on the connections. * This is useful for calling after ereport() @@ -3744,8 +3918,7 @@ ExecRemoteUtility(RemoteQuery *node) remotestate = CreateResponseCombiner(0, node->combine_type); - pgxc_connections = get_exec_connections(node->exec_nodes, - exec_type); + pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type); primaryconnection = pgxc_connections->primary_handle; @@ -4157,6 +4330,88 @@ pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles) pfree(pgxc_handles); } + +void +ExecCloseRemoteStatement(const char *stmt_name, List *nodelist) +{ + PGXCNodeAllHandles *all_handles; + PGXCNodeHandle **connections; + RemoteQueryState *combiner; + int conn_count; + int i; + + /* Exit if nodelist is empty */ + if (list_length(nodelist) == 0) + return; + + /* get needed data node connections */ + all_handles = get_handles(nodelist, NIL, false); + conn_count = all_handles->dn_conn_count; + connections = all_handles->datanode_handles; + + for (i = 0; i < conn_count; i++) + { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); + if (pgxc_node_send_close(connections[i], true, stmt_name) != 0) + { + /* + * statements are not affected by statement end, so consider + * unclosed statement on the datanode as a fatal issue and + * force connection is discarded + */ + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node statemrnt"))); + } + if (pgxc_node_send_sync(connections[i]) != 0) + { + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node statement"))); + } + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + + while (conn_count > 0) + { + if (pgxc_node_receive(conn_count, connections, NULL)) + { + for (i = 0; i <= conn_count; i++) + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node statement"))); + } + i = 0; + while (i < conn_count) + { + int res = handle_response(connections[i], combiner); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) + { + if (--conn_count > i) + connections[i] = connections[conn_count]; + } + else + { + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + } + } + } + + ValidateAndCloseCombiner(combiner); + pfree_pgxc_all_handles(all_handles); +} + + /* * Check if an Implicit 2PC is necessary for this transaction. * Check also if it is necessary to prepare transaction locally. diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index 4790f9573a..fc6345710c 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -27,6 +27,7 @@ #include "access/gtm.h" #include "access/transam.h" #include "access/xact.h" +#include "commands/prepare.h" #include "gtm/gtm_c.h" #include "pgxc/pgxcnode.h" #include "pgxc/locator.h" @@ -592,6 +593,10 @@ release_handles(bool force_drop) if (datanode_count == 0 && coord_count == 0) return; + /* Do not release connections if we have prepared statements on nodes */ + if (HaveActiveDatanodeStatements()) + return; + /* Collect Data Nodes handles */ for (i = 0; i < NumDataNodes; i++) { @@ -1144,8 +1149,10 @@ pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query, int paramlen, char *params, bool send_describe, int fetch_size) { - if (pgxc_node_send_parse(handle, statement, query)) - return EOF; + /* NULL query indicates already prepared statement */ + if (query) + if (pgxc_node_send_parse(handle, statement, query)) + return EOF; if (pgxc_node_send_bind(handle, portal, statement, paramlen, params)) return EOF; if (send_describe) diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 264271bd68..50e38dceea 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -350,7 +350,7 @@ PoolManagerInit() else ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid list syntax for \"coordinator_ports\""))); + errmsg("invalid list syntax for \"coordinator_ports\""))); } if (count == 0) @@ -369,7 +369,7 @@ PoolManagerInit() else ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid list syntax for \"coordinator_users\""))); + errmsg("invalid list syntax for \"coordinator_users\""))); } i = 0; @@ -715,14 +715,33 @@ agent_destroy(PoolAgent *agent) /* Discard connections if any remaining */ if (agent->pool) - agent_release_connections(agent, NULL, NULL); + { + List *dn_conn = NIL; + List *co_conn = NIL; + int i; + + /* gather abandoned datanode connections */ + if (agent->dn_connections) + for (i = 0; i < NumDataNodes; i++) + if (agent->dn_connections[i]) + dn_conn = lappend_int(dn_conn, i+1); + + /* gather abandoned coordinator connections */ + if (agent->coord_connections) + for (i = 0; i < NumCoords; i++) + if (agent->coord_connections[i]) + co_conn = lappend_int(co_conn, i+1); + + /* release them all */ + agent_release_connections(agent, dn_conn, co_conn); + } /* find agent in the list */ for (i = 0; i < agentCount; i++) { if (poolAgents[i] == agent) { - /* free memory */ + /* Free memory. All connection slots are NULL at this point */ if (agent->dn_connections) { pfree(agent->dn_connections); @@ -1108,7 +1127,7 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) for (i = 0; i < NumCoords; i++) agent->coord_connections[i] = NULL; - } + } /* Initialize result */ diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 43737cae48..66c250fb1f 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -2287,6 +2287,21 @@ make_viewdef(StringInfo buf, HeapTuple ruletup, TupleDesc rulettc, } +#ifdef PGXC +/* ---------- + * deparse_query - Parse back one query parsetree + * + * Purpose of this function is to build up statement for a RemoteQuery + * It just calls get_query_def without pretty print flags + * ---------- + */ +void +deparse_query(Query *query, StringInfo buf, List *parentnamespace) +{ + get_query_def(query, buf, parentnamespace, NULL, 0, 0); +} +#endif + /* ---------- * get_query_def - Parse back one query parsetree * diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 437b7d1114..bef1236b9c 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -56,6 +56,13 @@ #include "utils/resowner.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#ifdef PGXC +#include "commands/prepare.h" +#include "pgxc/execRemote.h" + + +static void release_datanode_statements(Plan *plannode); +#endif static List *cached_plans_list = NIL; @@ -563,6 +570,29 @@ RevalidateCachedPlan(CachedPlanSource *plansource, bool useResOwner) } /* + * Find and release all datanode statements referenced by the plan node and subnodes + */ +#ifdef PGXC +static void +release_datanode_statements(Plan *plannode) +{ + if (IsA(plannode, RemoteQuery)) + { + RemoteQuery *step = (RemoteQuery *) plannode; + + if (step->statement) + DropDatanodeStatement(step->statement); + } + + if (innerPlan(plannode)) + release_datanode_statements(innerPlan(plannode)); + + if (outerPlan(plannode)) + release_datanode_statements(outerPlan(plannode)); +} +#endif + +/* * ReleaseCachedPlan: release active use of a cached plan. * * This decrements the reference count, and frees the plan if the count @@ -581,7 +611,22 @@ ReleaseCachedPlan(CachedPlan *plan, bool useResOwner) Assert(plan->refcount > 0); plan->refcount--; if (plan->refcount == 0) + { +#ifdef PGXC + if (plan->fully_planned) + { + ListCell *lc; + /* close any active datanode statements */ + foreach (lc, plan->stmt_list) + { + PlannedStmt *ps = (PlannedStmt *)lfirst(lc); + + release_datanode_statements(ps->planTree); + } + } +#endif MemoryContextDelete(plan->context); + } } /* diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h index f52f001289..b6f5afdb0e 100644 --- a/src/include/commands/prepare.h +++ b/src/include/commands/prepare.h @@ -33,6 +33,15 @@ typedef struct TimestampTz prepare_time; /* the time when the stmt was prepared */ } PreparedStatement; +#ifdef PGXC +typedef struct +{ + /* dynahash.c requires key to be first field */ + char stmt_name[NAMEDATALEN]; + int nodenum; /* number of nodes where statement is active */ + int nodes[0]; /* node ids where statement is active */ +} DatanodeStatement; +#endif /* Utility statements PREPARE, EXECUTE, DEALLOCATE, EXPLAIN EXECUTE */ extern void PrepareQuery(PrepareStmt *stmt, const char *queryString); @@ -62,4 +71,11 @@ extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt); void DropAllPreparedStatements(void); +#ifdef PGXC +extern DatanodeStatement *FetchDatanodeStatement(const char *stmt_name, bool throwError); +extern bool ActivateDatanodeStatementOnNode(const char *stmt_name, int node); +extern bool HaveActiveDatanodeStatements(void); +extern void DropDatanodeStatement(const char *stmt_name); +#endif + #endif /* PREPARE_H */ diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index 0c2c4fe0a2..3e218bfc20 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -187,6 +187,9 @@ extern TupleTableSlot *ExecStoreVirtualTuple(TupleTableSlot *slot); extern TupleTableSlot *ExecStoreAllNullTuple(TupleTableSlot *slot); extern HeapTuple ExecCopySlotTuple(TupleTableSlot *slot); extern MinimalTuple ExecCopySlotMinimalTuple(TupleTableSlot *slot); +#ifdef PGXC +extern int ExecCopySlotDatarow(TupleTableSlot *slot, char **datarow); +#endif extern HeapTuple ExecFetchSlotTuple(TupleTableSlot *slot); extern MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot); extern Datum ExecFetchSlotTupleDatum(TupleTableSlot *slot); diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index a3c1868422..c5c45c0162 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -144,5 +144,9 @@ extern void BufferConnection(PGXCNodeHandle *conn); extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt); +extern int ParamListToDataRow(ParamListInfo params, char** result); + +extern void ExecCloseRemoteStatement(const char *stmt_name, List *nodelist); + extern int primary_data_node; #endif diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index ee28c5a402..a6d1d531a3 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -26,6 +26,8 @@ #define HASH_MASK 0x00000FFF; #define IsReplicated(x) (x->locatorType == LOCATOR_TYPE_REPLICATED) + +#include "nodes/primnodes.h" #include "utils/relcache.h" @@ -76,6 +78,10 @@ typedef struct List *nodelist; char baselocatortype; TableUsageType tableusagetype; /* track pg_catalog usage */ + Expr *expr; /* expression to evaluate at execution time if planner + * can not determine execution nodes */ + Oid relid; /* Relation to determine execution nodes */ + RelationAccessType accesstype; /* Access type to determine execution nodes */ } ExecNodes; diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index bb1f934be9..61cb6d3291 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -85,9 +85,14 @@ typedef struct SimpleDistinct *distinct; bool read_only; /* do not use 2PC when committing read only steps */ bool force_autocommit; /* some commands like VACUUM require autocommit mode */ + char *statement; /* if specified use it as a PreparedStatement name on data nodes */ char *cursor; /* if specified use it as a Portal name on data nodes */ RemoteQueryExecType exec_type; + /* Support for parameters */ + char *paramval_data; /* parameter data, format is like in BIND */ + int paramval_len; /* length of parameter values data */ + char *relname; bool remotejoin; /* True if this is a reduced remote join */ bool partitioned_replicated; /* True if reduced and contains replicated-partitioned join */ diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 6e26da755d..4a56f9fb11 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -15,6 +15,7 @@ #define BUILTINS_H #include "fmgr.h" +#include "lib/stringinfo.h" #include "nodes/parsenodes.h" /* @@ -598,6 +599,7 @@ extern char *deparse_expression(Node *expr, List *dpcontext, #ifdef PGXC extern List *deparse_context_for_remotequery(const char *aliasname, Oid relid); extern List *deparse_context_for(const char *aliasname, Oid relid); +extern void deparse_query(Query *query, StringInfo buf, List *parentnamespace); #endif extern List *deparse_context_for_plan(Node *plan, Node *outer_plan, List *rtable, List *subplans); |
