diff options
| author | Mason Sharp | 2010-12-23 20:10:38 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2011-05-19 16:45:26 +0000 |
| commit | d6d2d3d925f571b0b58ff6b4f6504d88e96bb342 (patch) | |
| tree | 46934a7eccee6a1c45edcbd68c5e7d52d0387b3e | |
| parent | 5ea02d34bd8584fadd738437bbed1155162a362f (diff) | |
Add support for single-step prepared statements.
Works for both named and unnamed prepared statements,
works for PREPARE and EXECUTE commands.
The Coordinator tracks a list of the prepared statements,
and prepares them in turn on Data Nodes, and only on demand,
when they are first executed on the target node(s).
At the end of a transaction, if there are still prepared
statements that exist for the session, the connections
are not released to the pool. (We should do something
similar for temporary tables.)
This commit also changes an existing kluge with using the
SQL string in some cases, and now deparses from the Query tree
instead.
Written by Andrei Martsinchyk, multi-step check added by me.
| -rw-r--r-- | src/backend/commands/prepare.c | 229 | ||||
| -rw-r--r-- | src/backend/executor/execTuples.c | 84 | ||||
| -rw-r--r-- | src/backend/nodes/copyfuncs.c | 15 | ||||
| -rw-r--r-- | src/backend/pgxc/locator/locator.c | 11 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 506 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 915 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 11 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 29 | ||||
| -rw-r--r-- | src/backend/utils/adt/ruleutils.c | 15 | ||||
| -rw-r--r-- | src/backend/utils/cache/plancache.c | 45 | ||||
| -rw-r--r-- | src/include/commands/prepare.h | 16 | ||||
| -rw-r--r-- | src/include/executor/tuptable.h | 3 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 4 | ||||
| -rw-r--r-- | src/include/pgxc/locator.h | 6 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 5 | ||||
| -rw-r--r-- | src/include/utils/builtins.h | 2 |
16 files changed, 1353 insertions, 543 deletions
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 0e948e4b72..89d1f021ba 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -33,7 +33,11 @@ #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/snapmgr.h" - +#ifdef PGXC +#include "pgxc/pgxc.h" +#include "pgxc/poolmgr.h" +#include "pgxc/execRemote.h" +#endif /* * The hash table in which prepared queries are stored. This is @@ -42,6 +46,14 @@ * (statement names); the entries are PreparedStatement structs. */ static HTAB *prepared_queries = NULL; +#ifdef PGXC +/* + * The hash table where datanode prepared statements are stored. + * The keys are statement names referenced from cached RemoteQuery nodes; the + * entries are DatanodeStatement structs + */ +static HTAB *datanode_queries = NULL; +#endif static void InitQueryHashTable(void); static ParamListInfo EvaluateParams(PreparedStatement *pstmt, List *params, @@ -147,6 +159,22 @@ PrepareQuery(PrepareStmt *stmt, const char *queryString) /* Generate plans for queries. */ plan_list = pg_plan_queries(query_list, 0, NULL); +#ifdef PGXC + /* + * Check if we are dealing with more than one step. + * Multi-step preapred statements are not yet supported. + * PGXCTODO - temporary - Once we add support, this code should be removed. + */ + if (IS_PGXC_COORDINATOR && plan_list && plan_list->head) + { + PlannedStmt *stmt = (PlannedStmt *) lfirst(plan_list->head); + + if (stmt->planTree->lefttree || stmt->planTree->righttree) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PSTATEMENT_DEFINITION), + errmsg("Multi-step Prepared Statements not yet supported"))); + } +#endif /* * Save the results. */ @@ -419,7 +447,76 @@ InitQueryHashTable(void) 32, &hash_ctl, HASH_ELEM); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + MemSet(&hash_ctl, 0, sizeof(hash_ctl)); + + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(DatanodeStatement) + NumDataNodes * sizeof(int); + + datanode_queries = hash_create("Datanode Queries", + 64, + &hash_ctl, + HASH_ELEM); + } +#endif +} + +#ifdef PGXC +/* + * Assign the statement name for all the RemoteQueries in the plan tree, so + * they use datanode statements + */ +static int +set_remote_stmtname(Plan *plan, const char *stmt_name, int n) +{ + if (IsA(plan, RemoteQuery)) + { + DatanodeStatement *entry; + bool exists; + + char name[NAMEDATALEN]; + do + { + strcpy(name, stmt_name); + /* + * Append modifier. If resulting string is going to be truncated, + * truncate better the base string, otherwise we may enter endless + * loop + */ + if (n) + { + char modifier[NAMEDATALEN]; + sprintf(modifier, "__%d", n); + /* + * if position NAMEDATALEN - strlen(modifier) - 1 is beyond the + * base string this is effectively noop, otherwise it truncates + * the base string + */ + name[NAMEDATALEN - strlen(modifier) - 1] = '\0'; + strcat(name, modifier); + } + n++; + hash_search(datanode_queries, name, HASH_FIND, &exists); + } while (exists); + ((RemoteQuery *) plan)->statement = pstrdup(name); + entry = (DatanodeStatement *) hash_search(datanode_queries, + name, + HASH_ENTER, + NULL); + entry->nodenum = 0; + } + + if (innerPlan(plan)) + n = set_remote_stmtname(innerPlan(plan), stmt_name, n); + + if (outerPlan(plan)) + n = set_remote_stmtname(outerPlan(plan), stmt_name, n); + + return n; } +#endif /* * Store all the data pertaining to a query in the hash table using @@ -459,6 +556,25 @@ StorePreparedStatement(const char *stmt_name, errmsg("prepared statement \"%s\" already exists", stmt_name))); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + ListCell *lc; + int n; + + /* + * Scan the plans and set the statement field for all found RemoteQuery + * nodes so they use data node statements + */ + n = 0; + foreach(lc, stmt_list) + { + PlannedStmt *ps = (PlannedStmt *) lfirst(lc); + n = set_remote_stmtname(ps->planTree, stmt_name, n); + } + } +#endif + /* Create a plancache entry */ plansource = CreateCachedPlan(raw_parse_tree, query_string, @@ -844,3 +960,114 @@ build_regtype_array(Oid *param_types, int num_params) result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i'); return PointerGetDatum(result); } + + +#ifdef PGXC +DatanodeStatement * +FetchDatanodeStatement(const char *stmt_name, bool throwError) +{ + DatanodeStatement *entry; + + /* + * If the hash table hasn't been initialized, it can't be storing + * anything, therefore it couldn't possibly store our plan. + */ + if (datanode_queries) + entry = (DatanodeStatement *) hash_search(datanode_queries, + stmt_name, + HASH_FIND, + NULL); + else + entry = NULL; + + /* Report error if entry is not found */ + if (!entry && throwError) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_PSTATEMENT), + errmsg("datanode statement \"%s\" does not exist", + stmt_name))); + + return entry; +} + +/* + * Drop datanode statement and close it on nodes if active + */ +void +DropDatanodeStatement(const char *stmt_name) +{ + DatanodeStatement *entry; + + entry = FetchDatanodeStatement(stmt_name, false); + if (entry) + { + int i; + List *nodelist = NIL; + + /* make a List of integers from node numbers */ + for (i = 0; i < entry->nodenum; i++) + nodelist = lappend_int(nodelist, entry->nodes[i]); + entry->nodenum = 0; + + ExecCloseRemoteStatement(stmt_name, nodelist); + + hash_search(datanode_queries, entry->stmt_name, HASH_REMOVE, NULL); + } +} + + +/* + * Return true if there is at least one active datanode statement, so acquired + * datanode connections should not be released + */ +bool +HaveActiveDatanodeStatements(void) +{ + HASH_SEQ_STATUS seq; + DatanodeStatement *entry; + + /* nothing cached */ + if (!datanode_queries) + return false; + + /* walk over cache */ + hash_seq_init(&seq, datanode_queries); + while ((entry = hash_seq_search(&seq)) != NULL) + { + /* Stop walking and return true */ + if (entry->nodenum > 0) + { + hash_seq_term(&seq); + return true; + } + } + /* nothing found */ + return false; +} + + +/* + * Mark datanode statement as active on specified node + * Return true if statement has already been active on the node and can be used + * Returns falsee if statement has not been active on the node and should be + * prepared on the node + */ +bool +ActivateDatanodeStatementOnNode(const char *stmt_name, int node) +{ + DatanodeStatement *entry; + int i; + + /* find the statement in cache */ + entry = FetchDatanodeStatement(stmt_name, true); + + /* see if statement already active on the node */ + for (i = 0; i < entry->nodenum; i++) + if (entry->nodes[i] == node) + return true; + + /* statement is not active on the specified node append item to the list */ + entry->nodes[entry->nodenum++] = node; + return false; +} +#endif diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 0c43e9479c..189c8b3fa2 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -793,6 +793,90 @@ ExecCopySlotMinimalTuple(TupleTableSlot *slot) slot->tts_isnull); } +#ifdef PGXC +/* -------------------------------- + * ExecCopySlotDatarow + * Obtain a copy of a slot's data row. The copy is + * palloc'd in the current memory context. + * Pointer to the datarow is returned as a var parameter, function + * returns the length of the data row + * The slot itself is undisturbed + * -------------------------------- + */ +int +ExecCopySlotDatarow(TupleTableSlot *slot, char **datarow) +{ + Assert(datarow); + + if (slot->tts_dataRow) + { + /* if we already have datarow make a copy */ + *datarow = (char *)palloc(slot->tts_dataLen); + memcpy(*datarow, slot->tts_dataRow, slot->tts_dataLen); + return slot->tts_dataLen; + } + else + { + TupleDesc tdesc = slot->tts_tupleDescriptor; + StringInfoData buf; + uint16 n16; + int i; + + initStringInfo(&buf); + /* Number of parameter values */ + n16 = htons(tdesc->natts); + appendBinaryStringInfo(&buf, (char *) &n16, 2); + + /* ensure we have all values */ + slot_getallattrs(slot); + for (i = 0; i < tdesc->natts; i++) + { + uint32 n32; + + if (slot->tts_isnull[i]) + { + n32 = htonl(-1); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + } + else + { + Form_pg_attribute attr = tdesc->attrs[i]; + Oid typOutput; + bool typIsVarlena; + Datum pval; + char *pstring; + int len; + + /* Get info needed to output the value */ + getTypeOutputInfo(attr->atttypid, &typOutput, &typIsVarlena); + /* + * If we have a toasted datum, forcibly detoast it here to avoid + * memory leakage inside the type's output routine. + */ + if (typIsVarlena) + pval = PointerGetDatum(PG_DETOAST_DATUM(slot->tts_values[i])); + else + pval = slot->tts_values[i]; + + /* Convert Datum to string */ + pstring = OidOutputFunctionCall(typOutput, pval); + + /* copy data to the buffer */ + len = strlen(pstring); + n32 = htonl(len); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + appendBinaryStringInfo(&buf, pstring, len); + } + } + /* copy data to the buffer */ + *datarow = palloc(buf.len); + memcpy(*datarow, buf.data, buf.len); + pfree(buf.data); + return buf.len; + } +} +#endif + /* -------------------------------- * ExecFetchSlotTuple * Fetch the slot's regular physical tuple. diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index aa0587f9c7..72e4d583a2 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -839,12 +839,16 @@ _copyRemoteQuery(RemoteQuery *from) COPY_NODE_FIELD(distinct); COPY_SCALAR_FIELD(read_only); COPY_SCALAR_FIELD(force_autocommit); + COPY_STRING_FIELD(statement); COPY_STRING_FIELD(cursor); + COPY_SCALAR_FIELD(exec_type); + COPY_SCALAR_FIELD(paramval_data); + COPY_SCALAR_FIELD(paramval_len); COPY_STRING_FIELD(relname); COPY_SCALAR_FIELD(remotejoin); - COPY_SCALAR_FIELD(reduce_level); - COPY_NODE_FIELD(base_tlist); + COPY_SCALAR_FIELD(reduce_level); + COPY_NODE_FIELD(base_tlist); COPY_STRING_FIELD(outer_alias); COPY_STRING_FIELD(inner_alias); COPY_SCALAR_FIELD(outer_reduce_level); @@ -867,6 +871,9 @@ _copyExecNodes(ExecNodes *from) COPY_NODE_FIELD(nodelist); COPY_SCALAR_FIELD(baselocatortype); COPY_SCALAR_FIELD(tableusagetype); + COPY_NODE_FIELD(expr); + COPY_SCALAR_FIELD(relid); + COPY_SCALAR_FIELD(accesstype); return newnode; } @@ -2305,7 +2312,9 @@ _copyQuery(Query *from) COPY_NODE_FIELD(limitCount); COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(setOperations); - +#ifdef PGXC + COPY_STRING_FIELD(sql_statement); +#endif return newnode; } diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index 790b81d317..4442310965 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -354,22 +354,15 @@ GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue, case LOCATOR_TYPE_HASH: if (partValue != NULL) - { /* in prototype, all partitioned tables use same map */ exec_nodes->nodelist = lappend_int(NULL, get_node_from_hash(hash_range_int(*partValue))); - } else - { - /* If no info, go to node 1 */ if (accessType == RELATION_ACCESS_INSERT) + /* Insert NULL to node 1 */ exec_nodes->nodelist = lappend_int(NULL, 1); else - /* - * No partitioning value passed in - * (no where qualification on part column - use all) - */ + /* Use all nodes for other types of access */ exec_nodes->nodelist = list_copy(rel_loc_info->nodeList); - } break; case LOCATOR_TYPE_SINGLE: diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 8d900f162a..1a56b44e40 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -58,6 +58,19 @@ typedef struct long constant; /* assume long PGXCTODO - should be Datum */ } Literal_Comparison; +/* + * Comparison of partitioned column and expression + * Expression can be evaluated at execution time to determine target nodes + */ +typedef struct +{ + Oid relid; + RelationLocInfo *rel_loc_info; + Oid attrnum; + char *col_name; + Expr *expr; /* assume long PGXCTODO - should be Datum */ +} Expr_Comparison; + /* Parent-Child joins for relations being joined on * their respective hash distribuion columns */ @@ -75,6 +88,7 @@ typedef struct typedef struct { List *partitioned_literal_comps; /* List of Literal_Comparison */ + List *partitioned_expressions; /* List of Expr_Comparison */ List *partitioned_parent_child; /* List of Parent_Child_Join */ List *replicated_joins; @@ -127,6 +141,7 @@ typedef struct XCWalkerContext Query *query; RelationAccessType accessType; RemoteQuery *query_step; /* remote query step being analized */ + PlannerInfo *root; /* planner data for the subquery */ Special_Conditions *conditions; bool multilevel_join; List *rtables; /* a pointer to a list of rtables */ @@ -144,11 +159,12 @@ bool StrictStatementChecking = true; /* Forbid multi-node SELECT statements with an ORDER BY clause */ bool StrictSelectChecking = false; -static void get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType); +static void get_plan_nodes(PlannerInfo *root, RemoteQuery *step, RelationAccessType accessType); static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); static void InitXCWalkerContext(XCWalkerContext *context); +static RemoteQuery *makeRemoteQuery(void); static void validate_part_col_updatable(const Query *query); static bool is_pgxc_safe_func(Oid funcid); @@ -307,6 +323,7 @@ free_special_relations(Special_Conditions *special_conditions) /* free all items in list, including Literal_Comparison struct */ list_free_deep(special_conditions->partitioned_literal_comps); + list_free_deep(special_conditions->partitioned_expressions); /* free list, but not items pointed to */ list_free(special_conditions->partitioned_parent_child); @@ -451,8 +468,9 @@ get_base_var(Var *var, XCWalkerContext *context) * then the caller should use the regular PG planner */ static void -get_plan_nodes_insert(Query *query, RemoteQuery *step) +get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step) { + Query *query = root->parse; RangeTblEntry *rte; RelationLocInfo *rel_loc_info; Const *constant; @@ -502,15 +520,15 @@ get_plan_nodes_insert(Query *query, RemoteQuery *step) if (sub_rte->rtekind == RTE_SUBQUERY && !sub_rte->subquery->limitCount && !sub_rte->subquery->limitOffset) - get_plan_nodes(sub_rte->subquery, step, RELATION_ACCESS_READ); + get_plan_nodes(root, step, RELATION_ACCESS_READ); } /* Send to general planner if the query is multiple step */ if (!step->exec_nodes) return; - /* If the source is not hash-based (eg, replicated) also send - * through general planner + /* If the source is not hash-based (eg, replicated) also send + * through general planner */ if (step->exec_nodes->baselocatortype != LOCATOR_TYPE_HASH) { @@ -612,7 +630,18 @@ get_plan_nodes_insert(Query *query, RemoteQuery *step) } if (checkexpr == NULL) - return; /* no constant */ + { + /* try and determine nodes on execution time */ + step->exec_nodes = makeNode(ExecNodes); + step->exec_nodes->baselocatortype = rel_loc_info->locatorType; + step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; + step->exec_nodes->primarynodelist = NULL; + step->exec_nodes->nodelist = NULL; + step->exec_nodes->expr = eval_expr; + step->exec_nodes->relid = rel_loc_info->relid; + step->exec_nodes->accesstype = RELATION_ACCESS_INSERT; + return; + } constant = (Const *) checkexpr; @@ -788,7 +817,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) initStringInfo(&buf); /* Step 1: select tuple values by ctid */ - step1 = makeNode(RemoteQuery); + step1 = makeRemoteQuery(); appendStringInfoString(&buf, "SELECT "); for (att = 1; att <= natts; att++) { @@ -822,13 +851,11 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'", tableName, ctid_str); step1->sql_statement = pstrdup(buf.data); - step1->is_single_step = true; step1->exec_nodes = makeNode(ExecNodes); - step1->read_only = true; step1->exec_nodes->nodelist = list_make1_int(nodenum); /* Step 2: declare cursor for update target table */ - step2 = makeNode(RemoteQuery); + step2 = makeRemoteQuery(); resetStringInfo(&buf); appendStringInfoString(&buf, step->cursor); @@ -852,18 +879,14 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) } appendStringInfoString(&buf, "FOR UPDATE"); step2->sql_statement = pstrdup(buf.data); - step2->is_single_step = true; - step2->read_only = true; step2->exec_nodes = makeNode(ExecNodes); step2->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); innerPlan(step2) = (Plan *) step1; /* Step 3: move cursor to first position */ - step3 = makeNode(RemoteQuery); + step3 = makeRemoteQuery(); resetStringInfo(&buf); appendStringInfo(&buf, "MOVE %s", node_cursor); step3->sql_statement = pstrdup(buf.data); - step3->is_single_step = true; - step3->read_only = true; step3->exec_nodes = makeNode(ExecNodes); step3->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList); innerPlan(step3) = (Plan *) step2; @@ -1024,7 +1047,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) if (!IsA(arg2, Const)) { /* this gets freed when the memory context gets freed */ - Expr *eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) arg2); + Expr *eval_expr = (Expr *) eval_const_expressions(context->root, (Node *) arg2); checkexpr = get_numeric_constant(eval_expr); } @@ -1176,6 +1199,32 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) */ return false; } + /* + * Check if it is an expression like pcol = expr, where pcol is + * a partitioning column of the rel1 and planner could not + * evaluate expr. We probably can evaluate it at execution time. + * Save the expression, and if we do not have other hint, + * try and evaluate it at execution time + */ + rel_loc_info1 = GetRelationLocInfo(column_base->relid); + + if (!rel_loc_info1) + return true; + + if (IsHashColumn(rel_loc_info1, column_base->colname)) + { + Expr_Comparison *expr_comp = + palloc(sizeof(Expr_Comparison)); + + expr_comp->relid = column_base->relid; + expr_comp->rel_loc_info = rel_loc_info1; + expr_comp->col_name = column_base->colname; + expr_comp->expr = arg2; + context->conditions->partitioned_expressions = + lappend(context->conditions->partitioned_expressions, + expr_comp); + return false; + } } } } @@ -1599,24 +1648,19 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) } if (rtesave) - { /* a single table, just grab it */ rel_loc_info = GetRelationLocInfo(rtesave->relid); + } - if (!rel_loc_info) - return true; + /* have complex case */ + if (!rel_loc_info) + return true; - context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, - NULL, - context->accessType); - } - } - else - { + if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH) + /* do not need to determine partitioning expression */ context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->accessType); - } /* Note replicated table usage for determining safe queries */ if (context->query_step->exec_nodes) @@ -1625,6 +1669,38 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) table_usage_type = TABLE_USAGE_TYPE_USER_REPLICATED; context->query_step->exec_nodes->tableusagetype = table_usage_type; + } else if (context->conditions->partitioned_expressions) { + /* probably we can determine nodes on execution time */ + foreach(lc, context->conditions->partitioned_expressions) { + Expr_Comparison *expr_comp = (Expr_Comparison *) lfirst(lc); + if (rel_loc_info->relid == expr_comp->relid) + { + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->baselocatortype = + rel_loc_info->locatorType; + context->query_step->exec_nodes->tableusagetype = + TABLE_USAGE_TYPE_USER; + context->query_step->exec_nodes->primarynodelist = NULL; + context->query_step->exec_nodes->nodelist = NULL; + context->query_step->exec_nodes->expr = expr_comp->expr; + context->query_step->exec_nodes->relid = expr_comp->relid; + context->query_step->exec_nodes->accesstype = context->accessType; + break; + } + } + } else { + /* run query on all nodes */ + context->query_step->exec_nodes = makeNode(ExecNodes); + context->query_step->exec_nodes->baselocatortype = + rel_loc_info->locatorType; + context->query_step->exec_nodes->tableusagetype = + TABLE_USAGE_TYPE_USER; + context->query_step->exec_nodes->primarynodelist = NULL; + context->query_step->exec_nodes->nodelist = + list_copy(rel_loc_info->nodeList); + context->query_step->exec_nodes->expr = NULL; + context->query_step->exec_nodes->relid = NULL; + context->query_step->exec_nodes->accesstype = context->accessType; } } /* check for partitioned col comparison against a literal */ @@ -1712,6 +1788,7 @@ InitXCWalkerContext(XCWalkerContext *context) context->query = NULL; context->accessType = RELATION_ACCESS_READ; context->query_step = NULL; + context->root = NULL; context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); context->rtables = NIL; context->multilevel_join = false; @@ -1722,20 +1799,57 @@ InitXCWalkerContext(XCWalkerContext *context) context->join_list = NIL; } + +/* + * Create an instance of RemoteQuery and initialize fields + */ +static RemoteQuery * +makeRemoteQuery(void) +{ + RemoteQuery *result = makeNode(RemoteQuery); + result->is_single_step = true; + result->sql_statement = NULL; + result->exec_nodes = NULL; + result->combine_type = COMBINE_TYPE_NONE; + result->simple_aggregates = NIL; + result->sort = NULL; + result->distinct = NULL; + result->read_only = true; + result->force_autocommit = false; + result->cursor = NULL; + result->exec_type = EXEC_ON_DATANODES; + result->paramval_data = NULL; + result->paramval_len = 0; + + result->relname = NULL; + result->remotejoin = false; + result->partitioned_replicated = false; + result->reduce_level = 0; + result->base_tlist = NIL; + result->outer_alias = NULL; + result->inner_alias = NULL; + result->outer_reduce_level = 0; + result->inner_reduce_level = 0; + result->outer_relids = NULL; + result->inner_relids = NULL; + return result; +} + /* * Top level entry point before walking query to determine plan nodes * */ static void -get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType) +get_plan_nodes(PlannerInfo *root, RemoteQuery *step, RelationAccessType accessType) { + Query *query = root->parse; XCWalkerContext context; - InitXCWalkerContext(&context); context.query = query; context.accessType = accessType; context.query_step = step; + context.root = root; context.rtables = lappend(context.rtables, query->rtable); if ((get_plan_nodes_walker((Node *) query, &context) @@ -1754,24 +1868,24 @@ get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType) * */ static void -get_plan_nodes_command(Query *query, RemoteQuery *step) +get_plan_nodes_command(RemoteQuery *step, PlannerInfo *root) { - switch (query->commandType) + switch (root->parse->commandType) { case CMD_SELECT: - get_plan_nodes(query, step, query->rowMarks ? + get_plan_nodes(root, step, root->parse->rowMarks ? RELATION_ACCESS_READ_FOR_UPDATE : RELATION_ACCESS_READ); break; case CMD_INSERT: - get_plan_nodes_insert(query, step); + get_plan_nodes_insert(root, step); break; case CMD_UPDATE: case CMD_DELETE: /* treat as a select */ - get_plan_nodes(query, step, RELATION_ACCESS_UPDATE); + get_plan_nodes(root, step, RELATION_ACCESS_UPDATE); break; default: @@ -2589,9 +2703,41 @@ PlannedStmt * pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result; - Plan *standardPlan; + PlannerGlobal *glob; + PlannerInfo *root; RemoteQuery *query_step; + StringInfoData buf; + /* + * Set up global state for this planner invocation. This data is needed + * across all levels of sub-Query that might exist in the given command, + * so we keep it in a separate struct that's linked to by each per-Query + * PlannerInfo. + */ + glob = makeNode(PlannerGlobal); + + glob->boundParams = boundParams; + glob->paramlist = NIL; + glob->subplans = NIL; + glob->subrtables = NIL; + glob->rewindPlanIDs = NULL; + glob->finalrtable = NIL; + glob->relationOids = NIL; + glob->invalItems = NIL; + glob->lastPHId = 0; + glob->transientPlan = false; + + /* Create a PlannerInfo data structure, usually it is done for a subquery */ + root = makeNode(PlannerInfo); + root->parse = query; + root->glob = glob; + root->query_level = 1; + root->parent_root = NULL; + root->planner_cxt = CurrentMemoryContext; + root->init_plans = NIL; + root->cte_plan_ids = NIL; + root->eq_classes = NIL; + root->append_rel_list = NIL; /* build the PlannedStmt result */ result = makeNode(PlannedStmt); @@ -2603,184 +2749,151 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) result->intoClause = query->intoClause; result->rtable = query->rtable; - query_step = makeNode(RemoteQuery); - query_step->is_single_step = false; + query_step = makeRemoteQuery(); + + /* Optimize multi-node handling */ + query_step->read_only = query->commandType == CMD_SELECT; if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options; - query_step->exec_nodes = NULL; - query_step->combine_type = COMBINE_TYPE_NONE; - query_step->simple_aggregates = NULL; - /* Optimize multi-node handling */ - query_step->read_only = query->commandType == CMD_SELECT; - query_step->force_autocommit = false; - result->planTree = (Plan *) query_step; - /* - * Determine where to execute the command, either at the Coordinator - * level, Data Nodes, or both. By default we choose both. We should be - * able to quickly expand this for more commands. - */ - switch (query->commandType) - { - case CMD_SELECT: - /* Perform some checks to make sure we can support the statement */ - if (query->intoClause) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("INTO clause not yet supported")))); - /* fallthru */ - case CMD_INSERT: - case CMD_UPDATE: - case CMD_DELETE: - /* PGXCTODO: This validation will not be removed - * until we support moving tuples from one node to another - * when the partition column of a table is updated - */ - if (query->commandType == CMD_UPDATE) - validate_part_col_updatable(query); - - if (query->returningList) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("RETURNING clause not yet supported")))); - - /* Set result relations */ - if (query->commandType != CMD_SELECT) - result->resultRelations = list_make1_int(query->resultRelation); + /* Perform some checks to make sure we can support the statement */ + if (query->commandType == CMD_SELECT && query->intoClause) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("INTO clause not yet supported")))); - get_plan_nodes_command(query, query_step); + /* PGXCTODO: This validation will not be removed + * until we support moving tuples from one node to another + * when the partition column of a table is updated + */ + if (query->commandType == CMD_UPDATE) + validate_part_col_updatable(query); - if (query_step->exec_nodes == NULL) - { - /* Do not yet allow multi-node correlated UPDATE or DELETE */ - if (query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) - { - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("UPDATE and DELETE that are correlated or use non-immutable functions not yet supported")))); - } + if (query->returningList) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("RETURNING clause not yet supported")))); - /* - * Processing guery against catalog tables, or multi-step command. - * Run through standard planner - */ - result = standard_planner(query, cursorOptions, boundParams); - return result; - } + /* Set result relations */ + if (query->commandType != CMD_SELECT) + result->resultRelations = list_make1_int(query->resultRelation); - /* Do not yet allow multi-node correlated UPDATE or DELETE */ - if ((query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) - && !query_step->exec_nodes - && list_length(query->rtable) > 1) - { - result = standard_planner(query, cursorOptions, boundParams); - return result; - } + get_plan_nodes_command(query_step, root); - /* - * get_plan_nodes_command may alter original statement, so do not - * process it before the call - * - * Declare Cursor case: - * We should leave as a step query only SELECT statement - * Further if we need refer source statement for planning we should take - * the truncated string - */ - if (query->utilityStmt && - IsA(query->utilityStmt, DeclareCursorStmt)) - { + if (query_step->exec_nodes == NULL) + { + /* Do not yet allow multi-node correlated UPDATE or DELETE */ + if (query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) + { + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("UPDATE and DELETE that are correlated or use non-immutable functions not yet supported")))); + } - /* search for SELECT keyword in the normalized string */ - char *select = strpos(query->sql_statement, " SELECT "); - /* Take substring of the original string using found offset */ - query_step->sql_statement = pstrdup(select + 1); - } - else - query_step->sql_statement = pstrdup(query->sql_statement); + /* + * Processing guery against catalog tables, or multi-step command. + * Run through standard planner + */ + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - /* - * If there already is an active portal, we may be doing planning - * within a function. Just use the standard plan, but check if - * it is part of an EXPLAIN statement so that we do not show that - * we plan multiple steps when it is a single-step operation. - */ - if (ActivePortal && strcmp(ActivePortal->commandTag, "EXPLAIN")) - return standard_planner(query, cursorOptions, boundParams); + /* Do not yet allow multi-node correlated UPDATE or DELETE */ + if ((query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE) + && !query_step->exec_nodes + && list_length(query->rtable) > 1) + { + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - query_step->is_single_step = true; - /* - * PGXCTODO - * When Postgres runs insert into t (a) values (1); against table - * defined as create table t (a int, b int); the plan is looking - * like insert into t (a,b) values (1,null); - * Later executor is verifying plan, to make sure table has not - * been altered since plan has been created and comparing table - * definition with plan target list and output error if they do - * not match. - * I could not find better way to generate targetList for pgxc plan - * then call standard planner and take targetList from the plan - * generated by Postgres. - */ - query_step->scan.plan.targetlist = query->targetList; + /* + * Deparse query tree to get step query. It may be modified later on + */ + initStringInfo(&buf); + deparse_query(query, &buf, NIL); + query_step->sql_statement = pstrdup(buf.data); + pfree(buf.data); - if (query_step->exec_nodes) - query_step->combine_type = get_plan_combine_type( - query, query_step->exec_nodes->baselocatortype); + query_step->is_single_step = true; + /* + * PGXCTODO + * When Postgres runs insert into t (a) values (1); against table + * defined as create table t (a int, b int); the plan is looking + * like insert into t (a,b) values (1,null); + * Later executor is verifying plan, to make sure table has not + * been altered since plan has been created and comparing table + * definition with plan target list and output error if they do + * not match. + * I could not find better way to generate targetList for pgxc plan + * then call standard planner and take targetList from the plan + * generated by Postgres. + */ + query_step->scan.plan.targetlist = query->targetList; - /* Set up simple aggregates */ - /* PGXCTODO - we should detect what types of aggregates are used. - * in some cases we can avoid the final step and merely proxy results - * (when there is only one data node involved) instead of using - * coordinator consolidation. At the moment this is needed for AVG() - */ - query_step->simple_aggregates = get_simple_aggregates(query); + if (query_step->exec_nodes) + query_step->combine_type = get_plan_combine_type( + query, query_step->exec_nodes->baselocatortype); + + /* Set up simple aggregates */ + /* PGXCTODO - we should detect what types of aggregates are used. + * in some cases we can avoid the final step and merely proxy results + * (when there is only one data node involved) instead of using + * coordinator consolidation. At the moment this is needed for AVG() + */ + query_step->simple_aggregates = get_simple_aggregates(query); - /* - * Add sorting to the step - */ - if (list_length(query_step->exec_nodes->nodelist) > 1 && - (query->sortClause || query->distinctClause)) - make_simple_sort_from_sortclauses(query, query_step); + /* + * Add sorting to the step + */ + if (list_length(query_step->exec_nodes->nodelist) > 1 && + (query->sortClause || query->distinctClause)) + make_simple_sort_from_sortclauses(query, query_step); - /* Handle LIMIT and OFFSET for single-step queries on multiple nodes */ - if (handle_limit_offset(query_step, query, result)) - { - /* complicated expressions, just fallback to standard plan */ - result = standard_planner(query, cursorOptions, boundParams); - return result; - } + /* Handle LIMIT and OFFSET for single-step queries on multiple nodes */ + if (handle_limit_offset(query_step, query, result)) + { + /* complicated expressions, just fallback to standard plan */ + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - /* - * Use standard plan if we have more than one data node with either - * group by, hasWindowFuncs, or hasRecursive - */ - /* - * PGXCTODO - this could be improved to check if the first - * group by expression is the partitioning column, in which - * case it is ok to treat as a single step. - */ - if (query->commandType == CMD_SELECT - && query_step->exec_nodes - && list_length(query_step->exec_nodes->nodelist) > 1 - && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) - { - result->planTree = standardPlan; - return result; - } - break; + /* + * Use standard plan if we have more than one data node with either + * group by, hasWindowFuncs, or hasRecursive + */ + /* + * PGXCTODO - this could be improved to check if the first + * group by expression is the partitioning column, in which + * case it is ok to treat as a single step. + */ + if (query->commandType == CMD_SELECT + && query_step->exec_nodes + && list_length(query_step->exec_nodes->nodelist) > 1 + && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) + { + result = standard_planner(query, cursorOptions, boundParams); + return result; + } - default: - /* Allow for override */ - if (StrictStatementChecking) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("This command is not yet supported.")))); - else - result->planTree = standardPlan; + /* Allow for override */ + /* AM: Is this ever possible? */ + if (query->commandType != CMD_SELECT && + query->commandType != CMD_INSERT && + query->commandType != CMD_UPDATE && + query->commandType != CMD_DELETE) + { + if (StrictStatementChecking) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("This command is not yet supported.")))); + else + result = standard_planner(query, cursorOptions, boundParams); + return result; } /* @@ -2808,6 +2921,13 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) } /* + * Assume single step. If there are multiple steps we should make up + * parameters for each step where they referenced + */ + if (boundParams) + query_step->paramval_len = ParamListToDataRow(boundParams, + &query_step->paramval_data); + /* * If query is FOR UPDATE fetch CTIDs from the remote node * Use CTID as a key to update tuples on remote nodes when handling * WHERE CURRENT OF @@ -3068,7 +3188,7 @@ validate_part_col_updatable(const Query *query) * * Based on is_immutable_func from postgresql_fdw.c * We add an exeption for base postgresql functions, to - * allow now() and others to still execute as part of single step + * allow now() and others to still execute as part of single step * queries. * * PGXCTODO - we currently make the false assumption that immutable diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index b954003db5..a387354e7b 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -19,6 +19,7 @@ #include "postgres.h" #include "access/gtm.h" #include "access/xact.h" +#include "commands/prepare.h" #include "executor/executor.h" #include "gtm/gtm_c.h" #include "libpq/libpq.h" @@ -27,6 +28,7 @@ #include "pgxc/poolmgr.h" #include "storage/ipc.h" #include "utils/datum.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/tuplesort.h" #include "utils/snapmgr.h" @@ -38,9 +40,6 @@ #define DATA_NODE_FETCH_SIZE 1 - -extern char *deparseSql(RemoteQueryState *scanstate); - /* * Buffer size does not affect performance significantly, just do not allow * connection buffer grows infinitely @@ -62,6 +61,9 @@ static int pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransacti PGXCNodeAllHandles * pgxc_handles, char *gid); static int pgxc_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, PGXCNodeAllHandles * pgxc_handles, char *gid); +static PGXCNodeAllHandles * get_exec_connections(RemoteQueryState *planstate, + ExecNodes *exec_nodes, + RemoteQueryExecType exec_type); static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, GlobalTransactionId commit_xid, PGXCNodeAllHandles * pgxc_handles, @@ -70,8 +72,6 @@ static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, static int pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid, PGXCNodeAllHandles * pgxc_handles, char *gid); -static PGXCNodeAllHandles * get_exec_connections(ExecNodes *exec_nodes, - RemoteQueryExecType exec_type); static int pgxc_node_receive_and_validate(const int conn_count, PGXCNodeHandle ** connections, bool reset_combiner); @@ -1265,10 +1265,10 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) /* * If we are in the process of shutting down, we - * may be rolling back, and the buffer may contain other messages. - * We want to avoid a procarray exception - * as well as an error stack overflow. - */ + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ if (proc_exit_inprogress) conn->state = DN_CONNECTION_STATE_ERROR_FATAL; @@ -1364,10 +1364,11 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) /* sync lost? */ elog(WARNING, "Received unsupported message type: %c", msg_type); conn->state = DN_CONNECTION_STATE_ERROR_FATAL; - return RESPONSE_EOF; + /* stop reading */ + return RESPONSE_COMPLETE; } } - + /* never happen, but keep compiler quiet */ return RESPONSE_EOF; } @@ -2746,7 +2747,6 @@ RemoteQueryState * ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) { RemoteQueryState *remotestate; - Relation currentRelation; remotestate = CreateResponseCombiner(0, node->combine_type); remotestate->ss.ps.plan = (Plan *) node; @@ -2788,6 +2788,19 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) ALLOCSET_DEFAULT_MAXSIZE); } + /* + * If we have parameter values here and planner has not had them we + * should prepare them now + */ + if (estate->es_param_list_info && !node->paramval_data) + node->paramval_len = ParamListToDataRow(estate->es_param_list_info, + &node->paramval_data); + + /* We need expression context to evaluate */ + if (node->exec_nodes && node->exec_nodes->expr) + ExecAssignExprContext(estate, &remotestate->ss.ps); + + if (innerPlan(node)) innerPlanState(remotestate) = ExecInitNode(innerPlan(node), estate, eflags); @@ -2853,7 +2866,8 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) * Datanodes Only, Coordinators only or both types */ static PGXCNodeAllHandles * -get_exec_connections(ExecNodes *exec_nodes, +get_exec_connections(RemoteQueryState *planstate, + ExecNodes *exec_nodes, RemoteQueryExecType exec_type) { List *nodelist = NIL; @@ -2873,8 +2887,34 @@ get_exec_connections(ExecNodes *exec_nodes, if (exec_nodes) { - nodelist = exec_nodes->nodelist; - primarynode = exec_nodes->primarynodelist; + if (exec_nodes->expr) + { + /* execution time determining of target data nodes */ + bool isnull; + ExprState *estate = ExecInitExpr(exec_nodes->expr, + (PlanState *) planstate); + Datum partvalue = ExecEvalExpr(estate, + planstate->ss.ps.ps_ExprContext, + &isnull, + NULL); + if (!isnull) + { + RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->relid); + ExecNodes *nodes = GetRelationNodes(rel_loc_info, + (long *) &partvalue, + exec_nodes->accesstype); + if (nodes) + { + nodelist = nodes->nodelist; + primarynode = nodes->primarynodelist; + pfree(nodes); + } + FreeRelationLocInfo(rel_loc_info); + } + } else { + nodelist = exec_nodes->nodelist; + primarynode = exec_nodes->primarynodelist; + } } if (list_length(nodelist) == 0 && @@ -2961,212 +3001,273 @@ register_write_nodes(int conn_count, PGXCNodeHandle **connections) } } -/* - * Execute step of PGXC plan. - * The step specifies a command to be executed on specified nodes. - * On first invocation connections to the data nodes are initialized and - * command is executed. Further, as well as within subsequent invocations, - * responses are received until step is completed or there is a tuple to emit. - * If there is a tuple it is returned, otherwise returned NULL. The NULL result - * from the function indicates completed step. - * The function returns at most one tuple per invocation. - */ -TupleTableSlot * -ExecRemoteQuery(RemoteQueryState *node) + +static void +do_query(RemoteQueryState *node) { RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; - TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; - bool have_tuple = false; + bool force_autocommit = step->force_autocommit; + bool is_read_only = step->read_only; + GlobalTransactionId gxid = InvalidGlobalTransactionId; + Snapshot snapshot = GetActiveSnapshot(); + TimestampTz timestamp = GetCurrentGTMStartTimestamp(); + PGXCNodeHandle **connections = NULL; + PGXCNodeHandle *primaryconnection = NULL; + int i; + int regular_conn_count; + int total_conn_count; + bool need_tran; + PGXCNodeAllHandles *pgxc_connections; + /* + * Get connections for Datanodes only, utilities and DDLs + * are launched in ExecRemoteUtility + */ + pgxc_connections = get_exec_connections(node, step->exec_nodes, + EXEC_ON_DATANODES); - if (!node->query_Done) + connections = pgxc_connections->datanode_handles; + primaryconnection = pgxc_connections->primary_handle; + total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count; + + /* + * Primary connection is counted separately but is included in total_conn_count if used. + */ + if (primaryconnection) { - /* First invocation, initialize */ - bool force_autocommit = step->force_autocommit; - bool is_read_only = step->read_only; - GlobalTransactionId gxid = InvalidGlobalTransactionId; - Snapshot snapshot = GetActiveSnapshot(); - TimestampTz timestamp = GetCurrentGTMStartTimestamp(); - PGXCNodeHandle **connections = NULL; - PGXCNodeHandle *primaryconnection = NULL; - int i; - int regular_conn_count; - int total_conn_count; - bool need_tran; - PGXCNodeAllHandles *pgxc_connections; - TupleTableSlot *innerSlot = NULL; - - implicit_force_autocommit = force_autocommit; + regular_conn_count--; + } - /* - * Inner plan for RemoteQuery supplies parameters. - * We execute inner plan to get a tuple and use values of the tuple as - * parameter values when executing this remote query. - * If returned slot contains NULL tuple break execution. - * TODO there is a problem how to handle the case if both inner and - * outer plans exist. We can decide later, since it is never used now. - */ - if (innerPlanState(node)) - { - innerSlot = ExecProcNode(innerPlanState(node)); -// if (TupIsNull(innerSlot)) -// return innerSlot; - } + pfree(pgxc_connections); - /* - * Get connections for Datanodes only, utilities and DDLs - * are launched in ExecRemoteUtility - */ - pgxc_connections = get_exec_connections(step->exec_nodes, - EXEC_ON_DATANODES); + /* + * We save only regular connections, at the time we exit the function + * we finish with the primary connection and deal only with regular + * connections on subsequent invocations + */ + node->node_count = regular_conn_count; - connections = pgxc_connections->datanode_handles; - primaryconnection = pgxc_connections->primary_handle; - total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count; + if (force_autocommit) + need_tran = false; + else + need_tran = !autocommit || (!is_read_only && total_conn_count > 1); - /* - * Primary connection is counted separately but is included in total_conn_count if used. - */ + elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + + stat_statement(); + if (autocommit) + { + stat_transaction(total_conn_count); + /* We normally clear for transactions, but if autocommit, clear here, too */ + clear_write_node_list(); + } + + if (!is_read_only) + { if (primaryconnection) - { - regular_conn_count--; - } + register_write_nodes(1, &primaryconnection); + register_write_nodes(regular_conn_count, connections); + } + + gxid = GetCurrentGlobalTransactionId(); - pfree(pgxc_connections); + if (!GlobalTransactionIdIsValid(gxid)) + { + if (primaryconnection) + pfree(primaryconnection); + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to get next transaction ID"))); + } + if (need_tran) + { /* - * We save only regular connections, at the time we exit the function - * we finish with the primary connection and deal only with regular - * connections on subsequent invocations + * Check if data node connections are in transaction and start + * transactions on nodes where it is not started */ - node->node_count = regular_conn_count; + PGXCNodeHandle *new_connections[total_conn_count]; + int new_count = 0; - if (force_autocommit) - need_tran = false; - else - need_tran = !autocommit || (!is_read_only && total_conn_count > 1); + if (primaryconnection && primaryconnection->transaction_status != 'T') + new_connections[new_count++] = primaryconnection; + for (i = 0; i < regular_conn_count; i++) + if (connections[i]->transaction_status != 'T') + new_connections[new_count++] = connections[i]; + + if (new_count && pgxc_node_begin(new_count, new_connections, gxid)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not begin transaction on data nodes."))); + } - elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + /* See if we have a primary node, execute on it first before the others */ + if (primaryconnection) + { + if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(primaryconnection); - stat_statement(); - if (autocommit) + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) { - stat_transaction(total_conn_count); - /* We normally clear for transactions, but if autocommit, clear here, too */ - clear_write_node_list(); + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); } - - if (!is_read_only) + if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp)) { - if (primaryconnection) - register_write_nodes(1, &primaryconnection); - register_write_nodes(regular_conn_count, connections); + /* + * If a transaction involves multiple connections timestamp is + * always sent down to Datanodes with pgxc_node_begin. + * An autocommit transaction needs the global timestamp also, + * so handle this case here. + */ + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); } - - gxid = GetCurrentGlobalTransactionId(); - - if (!GlobalTransactionIdIsValid(gxid)) + if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot)) { - if (primaryconnection) - pfree(primaryconnection); pfree(connections); + pfree(primaryconnection); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to get next transaction ID"))); + errmsg("Failed to send command to data nodes"))); } - - if (need_tran) + if (step->statement || step->cursor || step->paramval_data) { + /* need to use Extended Query Protocol */ + int fetch = 0; + bool prepared = false; + + /* if prepared statement is referenced see if it is already exist */ + if (step->statement) + prepared = ActivateDatanodeStatementOnNode(step->statement, + primaryconnection->nodenum); /* - * Check if data node connections are in transaction and start - * transactions on nodes where it is not started + * execute and fetch rows only if they will be consumed + * immediately by the sorter */ - PGXCNodeHandle *new_connections[total_conn_count]; - int new_count = 0; - - if (primaryconnection && primaryconnection->transaction_status != 'T') - new_connections[new_count++] = primaryconnection; - for (i = 0; i < regular_conn_count; i++) - if (connections[i]->transaction_status != 'T') - new_connections[new_count++] = connections[i]; - - if (new_count && pgxc_node_begin(new_count, new_connections, gxid)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Could not begin transaction on data nodes."))); - } - - /* See if we have a primary node, execute on it first before the others */ - if (primaryconnection) - { - if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(primaryconnection); - - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) + if (step->cursor) + fetch = 1; + + if (pgxc_node_send_query_extended(primaryconnection, + prepared ? NULL : step->sql_statement, + step->statement, + step->cursor, + step->paramval_len, + step->paramval_data, + step->read_only, + fetch) != 0) { pfree(connections); - pfree(primaryconnection); + if (primaryconnection) + pfree(primaryconnection); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp)) + } + else + { + if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0) { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ pfree(connections); pfree(primaryconnection); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot)) - { - pfree(connections); - pfree(primaryconnection); + } + primaryconnection->combiner = node; + Assert(node->combine_type == COMBINE_TYPE_SAME); + + while (node->command_complete_count < 1) + { + if (pgxc_node_receive(1, &primaryconnection, NULL)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0) + errmsg("Failed to read response from data nodes"))); + handle_response(primaryconnection, node); + if (node->errorMessage) { - pfree(connections); - pfree(primaryconnection); + char *code = node->errorCode; ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - Assert(node->combine_type == COMBINE_TYPE_SAME); - - while (node->command_complete_count < 1) - { - if (pgxc_node_receive(1, &primaryconnection, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); - handle_response(primaryconnection, node); - if (node->errorMessage) - { - char *code = node->errorCode; - ereport(ERROR, - (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), - errmsg("%s", node->errorMessage))); - } + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); } } + } - for (i = 0; i < regular_conn_count; i++) + for (i = 0; i < regular_conn_count; i++) + { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) { - if (connections[i]->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(connections[i]); - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp)) + { + /* + * If a transaction involves multiple connections timestamp is + * always sent down to Datanodes with pgxc_node_begin. + * An autocommit transaction needs the global timestamp also, + * so handle this case here. + */ + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot)) + { + pfree(connections); + if (primaryconnection) + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (step->statement || step->cursor || step->paramval_data) + { + /* need to use Extended Query Protocol */ + int fetch = 0; + bool prepared = false; + + /* if prepared statement is referenced see if it is already exist */ + if (step->statement) + prepared = ActivateDatanodeStatementOnNode(step->statement, + connections[i]->nodenum); + /* + * execute and fetch rows only if they will be consumed + * immediately by the sorter + */ + if (step->cursor) + fetch = 1; + + if (pgxc_node_send_query_extended(connections[i], + prepared ? NULL : step->sql_statement, + step->statement, + step->cursor, + step->paramval_len, + step->paramval_data, + step->read_only, + fetch) != 0) { pfree(connections); if (primaryconnection) @@ -3175,14 +3276,11 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp)) + } + else + { + if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ pfree(connections); if (primaryconnection) pfree(primaryconnection); @@ -3190,176 +3288,163 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot)) + } + connections[i]->combiner = node; + } + + if (step->cursor) + { + node->cursor = step->cursor; + node->cursor_count = regular_conn_count; + node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *)); + memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *)); + } + + /* + * Stop if all commands are completed or we got a data row and + * initialized state node for subsequent invocations + */ + while (regular_conn_count > 0 && node->connections == NULL) + { + int i = 0; + + if (pgxc_node_receive(regular_conn_count, connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + /* + * Handle input from the data nodes. + * If we got a RESPONSE_DATAROW we can break handling to wrap + * it into a tuple and return. Handling will be continued upon + * subsequent invocations. + * If we got 0, we exclude connection from the list. We do not + * expect more input from it. In case of non-SELECT query we quit + * the loop when all nodes finish their work and send ReadyForQuery + * with empty connections array. + * If we got EOF, move to the next connection, will receive more + * data on the next iteration. + */ + while (i < regular_conn_count) + { + int res = handle_response(connections[i], node); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); + if (i < --regular_conn_count) + connections[i] = connections[regular_conn_count]; } - if (step->cursor || innerSlot) + else if (res == RESPONSE_TUPDESC) { - int fetch = 0; + ExecSetSlotDescriptor(scanslot, node->tuple_desc); /* - * execute and fetch rows only if they will be consumed - * immediately by the sorter + * Now tuple table slot is responsible for freeing the + * descriptor */ - if (step->cursor) - fetch = 1; - /* - * Here is ad hoc handling of WHERE CURRENT OF clause. - * Previous step have returned the key values to update - * replicated tuple into innerSlot and the query is a - * DECLARE CURSOR. We do not want to execute this query now - * because of unusual fetch procedure: we just need to - * position cursor on the first row on each node. - * So here we just parse and bind query. As the result cursor - * specified by the statement will be created, and next step - * will issue MOVE command positioning the cursor properly. - * If we want to use parametrized requests for any other - * purpose we should return and rework this code - */ - if (pgxc_node_send_query_extended(connections[i], - step->sql_statement, - NULL, - step->cursor, - innerSlot ? innerSlot->tts_dataLen : 0, - innerSlot ? innerSlot->tts_dataRow : NULL, - step->cursor != NULL, - fetch) != 0) + node->tuple_desc = NULL; + if (step->sort) { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); + SimpleSort *sort = step->sort; + + node->connections = connections; + node->conn_count = regular_conn_count; + /* + * First message is already in the buffer + * Further fetch will be under tuplesort control + * If query does not produce rows tuplesort will not + * be initialized + */ + node->tuplesortstate = tuplesort_begin_merge( + scanslot->tts_tupleDescriptor, + sort->numCols, + sort->sortColIdx, + sort->sortOperators, + sort->nullsFirst, + node, + work_mem); + /* + * Break the loop, do not wait for first row. + * Tuplesort module want to control node it is + * fetching rows from, while in this loop first + * row would be got from random node + */ + break; } } - else + else if (res == RESPONSE_DATAROW) { - if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } + /* + * Got first data row, quit the loop + */ + node->connections = connections; + node->conn_count = regular_conn_count; + node->current_conn = i; + break; } - connections[i]->combiner = node; } + } + + if (node->cursor_count) + { + node->conn_count = node->cursor_count; + memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *)); + node->connections = connections; + } +} + + +/* + * Execute step of PGXC plan. + * The step specifies a command to be executed on specified nodes. + * On first invocation connections to the data nodes are initialized and + * command is executed. Further, as well as within subsequent invocations, + * responses are received until step is completed or there is a tuple to emit. + * If there is a tuple it is returned, otherwise returned NULL. The NULL result + * from the function indicates completed step. + * The function returns at most one tuple per invocation. + */ +TupleTableSlot * +ExecRemoteQuery(RemoteQueryState *node) +{ + RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; + TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; + TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; + bool have_tuple = false; - if (step->cursor) - { - node->cursor = step->cursor; - node->cursor_count = regular_conn_count; - node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *)); - memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *)); - } + if (!node->query_Done) + { /* - * Stop if all commands are completed or we got a data row and - * initialized state node for subsequent invocations + * Inner plan for RemoteQuery supplies parameters. + * We execute inner plan to get a tuple and use values of the tuple as + * parameter values when executing this remote query. + * If returned slot contains NULL tuple break execution. + * TODO there is a problem how to handle the case if both inner and + * outer plans exist. We can decide later, since it is never used now. */ - while (regular_conn_count > 0 && node->connections == NULL) + if (innerPlanState(node)) { - int i = 0; - - if (pgxc_node_receive(regular_conn_count, connections, NULL)) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to read response from data nodes"))); + TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); /* - * Handle input from the data nodes. - * If we got a RESPONSE_DATAROW we can break handling to wrap - * it into a tuple and return. Handling will be continued upon - * subsequent invocations. - * If we got 0, we exclude connection from the list. We do not - * expect more input from it. In case of non-SELECT query we quit - * the loop when all nodes finish their work and send ReadyForQuery - * with empty connections array. - * If we got EOF, move to the next connection, will receive more - * data on the next iteration. + * Use data row returned by the previus step as a parameters for + * the main query. + * Exit if no more slots. */ - while (i < regular_conn_count) - { - int res = handle_response(connections[i], node); - if (res == RESPONSE_EOF) - { - i++; - } - else if (res == RESPONSE_COMPLETE) - { - if (i < --regular_conn_count) - connections[i] = connections[regular_conn_count]; - } - else if (res == RESPONSE_TUPDESC) - { - ExecSetSlotDescriptor(scanslot, node->tuple_desc); - /* - * Now tuple table slot is responsible for freeing the - * descriptor - */ - node->tuple_desc = NULL; - if (step->sort) - { - SimpleSort *sort = step->sort; - - node->connections = connections; - node->conn_count = regular_conn_count; - /* - * First message is already in the buffer - * Further fetch will be under tuplesort control - * If query does not produce rows tuplesort will not - * be initialized - */ - node->tuplesortstate = tuplesort_begin_merge( - scanslot->tts_tupleDescriptor, - sort->numCols, - sort->sortColIdx, - sort->sortOperators, - sort->nullsFirst, - node, - work_mem); - /* - * Break the loop, do not wait for first row. - * Tuplesort module want to control node it is - * fetching rows from, while in this loop first - * row would be got from random node - */ - break; - } - } - else if (res == RESPONSE_DATAROW) - { - /* - * Got first data row, quit the loop - */ - node->connections = connections; - node->conn_count = regular_conn_count; - node->current_conn = i; - break; - } - } + if (!TupIsNull(innerSlot)) + step->paramval_len = ExecCopySlotDatarow(innerSlot, + &step->paramval_data); } - if (node->cursor_count) - { - node->conn_count = node->cursor_count; - memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *)); - node->connections = connections; - } + do_query(node); node->query_Done = true; } if (node->update_cursor) { - PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES); + PGXCNodeAllHandles *all_dn_handles = get_exec_connections(node, NULL, EXEC_ON_DATANODES); close_node_cursors(all_dn_handles->datanode_handles, all_dn_handles->dn_conn_count, node->update_cursor); @@ -3367,6 +3452,7 @@ ExecRemoteQuery(RemoteQueryState *node) node->update_cursor = NULL; } +handle_results: if (node->tuplesortstate) { while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, @@ -3458,12 +3544,22 @@ ExecRemoteQuery(RemoteQueryState *node) return resultslot; /* - * If coordinator plan is specified execute it first. - * If the plan is returning we are returning these tuples immediately. - * If it is not returning or returned them all by current invocation - * we will go ahead and execute remote query. Then we will never execute - * the outer plan again because node->query_Done flag will be set and - * execution won't get to that place. + * We can not use recursion here. We can run out of the stack memory if + * inner node returns long result set and this node does not returns rows + * (like INSERT ... SELECT) + */ + if (innerPlanState(node)) + { + TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); + if (!TupIsNull(innerSlot)) + { + do_query(node); + goto handle_results; + } + } + + /* + * Execute outer plan if specified */ if (outerPlanState(node)) { @@ -3557,7 +3653,7 @@ ExecEndRemoteQuery(RemoteQueryState *node) if (node->update_cursor) { - PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES); + PGXCNodeAllHandles *all_dn_handles = get_exec_connections(node, NULL, EXEC_ON_DATANODES); close_node_cursors(all_dn_handles->datanode_handles, all_dn_handles->dn_conn_count, node->update_cursor); @@ -3566,6 +3662,16 @@ ExecEndRemoteQuery(RemoteQueryState *node) } /* + * Clean up parameters if they were set, since plan may be reused + */ + if (((RemoteQuery *) node->ss.ps.plan)->paramval_data) + { + pfree(((RemoteQuery *) node->ss.ps.plan)->paramval_data); + ((RemoteQuery *) node->ss.ps.plan)->paramval_data = NULL; + ((RemoteQuery *) node->ss.ps.plan)->paramval_len = 0; + } + + /* * shut down the subplan */ if (outerPlanState(node)) @@ -3631,6 +3737,74 @@ close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor) ValidateAndCloseCombiner(combiner); } + +/* + * Encode parameter values to format of DataRow message (the same format is + * used in Bind) to prepare for sending down to data nodes. + * The buffer to store encoded value is palloc'ed and returned as the result + * parameter. Function returns size of the result + */ +int +ParamListToDataRow(ParamListInfo params, char** result) +{ + StringInfoData buf; + uint16 n16; + int i; + + initStringInfo(&buf); + /* Number of parameter values */ + n16 = htons(params->numParams); + appendBinaryStringInfo(&buf, (char *) &n16, 2); + + /* Parameter values */ + for (i = 0; i < params->numParams; i++) + { + ParamExternData *param = params->params + i; + uint32 n32; + if (param->isnull) + { + n32 = htonl(-1); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + } + else + { + Oid typOutput; + bool typIsVarlena; + Datum pval; + char *pstring; + int len; + + /* Get info needed to output the value */ + getTypeOutputInfo(param->ptype, &typOutput, &typIsVarlena); + + /* + * If we have a toasted datum, forcibly detoast it here to avoid + * memory leakage inside the type's output routine. + */ + if (typIsVarlena) + pval = PointerGetDatum(PG_DETOAST_DATUM(param->value)); + else + pval = param->value; + + /* Convert Datum to string */ + pstring = OidOutputFunctionCall(typOutput, pval); + + /* copy data to the buffer */ + len = strlen(pstring); + n32 = htonl(len); + appendBinaryStringInfo(&buf, (char *) &n32, 4); + appendBinaryStringInfo(&buf, pstring, len); + } + } + + /* Take data from the buffer */ + *result = palloc(buf.len); + memcpy(*result, buf.data, buf.len); + pfree(buf.data); + return buf.len; +} + + /* * Consume any remaining messages on the connections. * This is useful for calling after ereport() @@ -3744,8 +3918,7 @@ ExecRemoteUtility(RemoteQuery *node) remotestate = CreateResponseCombiner(0, node->combine_type); - pgxc_connections = get_exec_connections(node->exec_nodes, - exec_type); + pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type); primaryconnection = pgxc_connections->primary_handle; @@ -4157,6 +4330,88 @@ pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles) pfree(pgxc_handles); } + +void +ExecCloseRemoteStatement(const char *stmt_name, List *nodelist) +{ + PGXCNodeAllHandles *all_handles; + PGXCNodeHandle **connections; + RemoteQueryState *combiner; + int conn_count; + int i; + + /* Exit if nodelist is empty */ + if (list_length(nodelist) == 0) + return; + + /* get needed data node connections */ + all_handles = get_handles(nodelist, NIL, false); + conn_count = all_handles->dn_conn_count; + connections = all_handles->datanode_handles; + + for (i = 0; i < conn_count; i++) + { + if (connections[i]->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connections[i]); + if (pgxc_node_send_close(connections[i], true, stmt_name) != 0) + { + /* + * statements are not affected by statement end, so consider + * unclosed statement on the datanode as a fatal issue and + * force connection is discarded + */ + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node statemrnt"))); + } + if (pgxc_node_send_sync(connections[i]) != 0) + { + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node statement"))); + } + } + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); + + while (conn_count > 0) + { + if (pgxc_node_receive(conn_count, connections, NULL)) + { + for (i = 0; i <= conn_count; i++) + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to close data node statement"))); + } + i = 0; + while (i < conn_count) + { + int res = handle_response(connections[i], combiner); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) + { + if (--conn_count > i) + connections[i] = connections[conn_count]; + } + else + { + connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL; + } + } + } + + ValidateAndCloseCombiner(combiner); + pfree_pgxc_all_handles(all_handles); +} + + /* * Check if an Implicit 2PC is necessary for this transaction. * Check also if it is necessary to prepare transaction locally. diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index 4790f9573a..fc6345710c 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -27,6 +27,7 @@ #include "access/gtm.h" #include "access/transam.h" #include "access/xact.h" +#include "commands/prepare.h" #include "gtm/gtm_c.h" #include "pgxc/pgxcnode.h" #include "pgxc/locator.h" @@ -592,6 +593,10 @@ release_handles(bool force_drop) if (datanode_count == 0 && coord_count == 0) return; + /* Do not release connections if we have prepared statements on nodes */ + if (HaveActiveDatanodeStatements()) + return; + /* Collect Data Nodes handles */ for (i = 0; i < NumDataNodes; i++) { @@ -1144,8 +1149,10 @@ pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query, int paramlen, char *params, bool send_describe, int fetch_size) { - if (pgxc_node_send_parse(handle, statement, query)) - return EOF; + /* NULL query indicates already prepared statement */ + if (query) + if (pgxc_node_send_parse(handle, statement, query)) + return EOF; if (pgxc_node_send_bind(handle, portal, statement, paramlen, params)) return EOF; if (send_describe) diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 264271bd68..50e38dceea 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -350,7 +350,7 @@ PoolManagerInit() else ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid list syntax for \"coordinator_ports\""))); + errmsg("invalid list syntax for \"coordinator_ports\""))); } if (count == 0) @@ -369,7 +369,7 @@ PoolManagerInit() else ereport(FATAL, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid list syntax for \"coordinator_users\""))); + errmsg("invalid list syntax for \"coordinator_users\""))); } i = 0; @@ -715,14 +715,33 @@ agent_destroy(PoolAgent *agent) /* Discard connections if any remaining */ if (agent->pool) - agent_release_connections(agent, NULL, NULL); + { + List *dn_conn = NIL; + List *co_conn = NIL; + int i; + + /* gather abandoned datanode connections */ + if (agent->dn_connections) + for (i = 0; i < NumDataNodes; i++) + if (agent->dn_connections[i]) + dn_conn = lappend_int(dn_conn, i+1); + + /* gather abandoned coordinator connections */ + if (agent->coord_connections) + for (i = 0; i < NumCoords; i++) + if (agent->coord_connections[i]) + co_conn = lappend_int(co_conn, i+1); + + /* release them all */ + agent_release_connections(agent, dn_conn, co_conn); + } /* find agent in the list */ for (i = 0; i < agentCount; i++) { if (poolAgents[i] == agent) { - /* free memory */ + /* Free memory. All connection slots are NULL at this point */ if (agent->dn_connections) { pfree(agent->dn_connections); @@ -1108,7 +1127,7 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) for (i = 0; i < NumCoords; i++) agent->coord_connections[i] = NULL; - } + } /* Initialize result */ diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 43737cae48..66c250fb1f 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -2287,6 +2287,21 @@ make_viewdef(StringInfo buf, HeapTuple ruletup, TupleDesc rulettc, } +#ifdef PGXC +/* ---------- + * deparse_query - Parse back one query parsetree + * + * Purpose of this function is to build up statement for a RemoteQuery + * It just calls get_query_def without pretty print flags + * ---------- + */ +void +deparse_query(Query *query, StringInfo buf, List *parentnamespace) +{ + get_query_def(query, buf, parentnamespace, NULL, 0, 0); +} +#endif + /* ---------- * get_query_def - Parse back one query parsetree * diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 437b7d1114..bef1236b9c 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -56,6 +56,13 @@ #include "utils/resowner.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#ifdef PGXC +#include "commands/prepare.h" +#include "pgxc/execRemote.h" + + +static void release_datanode_statements(Plan *plannode); +#endif static List *cached_plans_list = NIL; @@ -563,6 +570,29 @@ RevalidateCachedPlan(CachedPlanSource *plansource, bool useResOwner) } /* + * Find and release all datanode statements referenced by the plan node and subnodes + */ +#ifdef PGXC +static void +release_datanode_statements(Plan *plannode) +{ + if (IsA(plannode, RemoteQuery)) + { + RemoteQuery *step = (RemoteQuery *) plannode; + + if (step->statement) + DropDatanodeStatement(step->statement); + } + + if (innerPlan(plannode)) + release_datanode_statements(innerPlan(plannode)); + + if (outerPlan(plannode)) + release_datanode_statements(outerPlan(plannode)); +} +#endif + +/* * ReleaseCachedPlan: release active use of a cached plan. * * This decrements the reference count, and frees the plan if the count @@ -581,7 +611,22 @@ ReleaseCachedPlan(CachedPlan *plan, bool useResOwner) Assert(plan->refcount > 0); plan->refcount--; if (plan->refcount == 0) + { +#ifdef PGXC + if (plan->fully_planned) + { + ListCell *lc; + /* close any active datanode statements */ + foreach (lc, plan->stmt_list) + { + PlannedStmt *ps = (PlannedStmt *)lfirst(lc); + + release_datanode_statements(ps->planTree); + } + } +#endif MemoryContextDelete(plan->context); + } } /* diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h index f52f001289..b6f5afdb0e 100644 --- a/src/include/commands/prepare.h +++ b/src/include/commands/prepare.h @@ -33,6 +33,15 @@ typedef struct TimestampTz prepare_time; /* the time when the stmt was prepared */ } PreparedStatement; +#ifdef PGXC +typedef struct +{ + /* dynahash.c requires key to be first field */ + char stmt_name[NAMEDATALEN]; + int nodenum; /* number of nodes where statement is active */ + int nodes[0]; /* node ids where statement is active */ +} DatanodeStatement; +#endif /* Utility statements PREPARE, EXECUTE, DEALLOCATE, EXPLAIN EXECUTE */ extern void PrepareQuery(PrepareStmt *stmt, const char *queryString); @@ -62,4 +71,11 @@ extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt); void DropAllPreparedStatements(void); +#ifdef PGXC +extern DatanodeStatement *FetchDatanodeStatement(const char *stmt_name, bool throwError); +extern bool ActivateDatanodeStatementOnNode(const char *stmt_name, int node); +extern bool HaveActiveDatanodeStatements(void); +extern void DropDatanodeStatement(const char *stmt_name); +#endif + #endif /* PREPARE_H */ diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index 0c2c4fe0a2..3e218bfc20 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -187,6 +187,9 @@ extern TupleTableSlot *ExecStoreVirtualTuple(TupleTableSlot *slot); extern TupleTableSlot *ExecStoreAllNullTuple(TupleTableSlot *slot); extern HeapTuple ExecCopySlotTuple(TupleTableSlot *slot); extern MinimalTuple ExecCopySlotMinimalTuple(TupleTableSlot *slot); +#ifdef PGXC +extern int ExecCopySlotDatarow(TupleTableSlot *slot, char **datarow); +#endif extern HeapTuple ExecFetchSlotTuple(TupleTableSlot *slot); extern MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot); extern Datum ExecFetchSlotTupleDatum(TupleTableSlot *slot); diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index a3c1868422..c5c45c0162 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -144,5 +144,9 @@ extern void BufferConnection(PGXCNodeHandle *conn); extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt); +extern int ParamListToDataRow(ParamListInfo params, char** result); + +extern void ExecCloseRemoteStatement(const char *stmt_name, List *nodelist); + extern int primary_data_node; #endif diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index ee28c5a402..a6d1d531a3 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -26,6 +26,8 @@ #define HASH_MASK 0x00000FFF; #define IsReplicated(x) (x->locatorType == LOCATOR_TYPE_REPLICATED) + +#include "nodes/primnodes.h" #include "utils/relcache.h" @@ -76,6 +78,10 @@ typedef struct List *nodelist; char baselocatortype; TableUsageType tableusagetype; /* track pg_catalog usage */ + Expr *expr; /* expression to evaluate at execution time if planner + * can not determine execution nodes */ + Oid relid; /* Relation to determine execution nodes */ + RelationAccessType accesstype; /* Access type to determine execution nodes */ } ExecNodes; diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index bb1f934be9..61cb6d3291 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -85,9 +85,14 @@ typedef struct SimpleDistinct *distinct; bool read_only; /* do not use 2PC when committing read only steps */ bool force_autocommit; /* some commands like VACUUM require autocommit mode */ + char *statement; /* if specified use it as a PreparedStatement name on data nodes */ char *cursor; /* if specified use it as a Portal name on data nodes */ RemoteQueryExecType exec_type; + /* Support for parameters */ + char *paramval_data; /* parameter data, format is like in BIND */ + int paramval_len; /* length of parameter values data */ + char *relname; bool remotejoin; /* True if this is a reduced remote join */ bool partitioned_replicated; /* True if reduced and contains replicated-partitioned join */ diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 6e26da755d..4a56f9fb11 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -15,6 +15,7 @@ #define BUILTINS_H #include "fmgr.h" +#include "lib/stringinfo.h" #include "nodes/parsenodes.h" /* @@ -598,6 +599,7 @@ extern char *deparse_expression(Node *expr, List *dpcontext, #ifdef PGXC extern List *deparse_context_for_remotequery(const char *aliasname, Oid relid); extern List *deparse_context_for(const char *aliasname, Oid relid); +extern void deparse_query(Query *query, StringInfo buf, List *parentnamespace); #endif extern List *deparse_context_for_plan(Node *plan, Node *outer_plan, List *rtable, List *subplans); |
