summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/commands/prepare.c229
-rw-r--r--src/backend/executor/execTuples.c84
-rw-r--r--src/backend/nodes/copyfuncs.c15
-rw-r--r--src/backend/pgxc/locator/locator.c11
-rw-r--r--src/backend/pgxc/plan/planner.c506
-rw-r--r--src/backend/pgxc/pool/execRemote.c915
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c11
-rw-r--r--src/backend/pgxc/pool/poolmgr.c29
-rw-r--r--src/backend/utils/adt/ruleutils.c15
-rw-r--r--src/backend/utils/cache/plancache.c45
-rw-r--r--src/include/commands/prepare.h16
-rw-r--r--src/include/executor/tuptable.h3
-rw-r--r--src/include/pgxc/execRemote.h4
-rw-r--r--src/include/pgxc/locator.h6
-rw-r--r--src/include/pgxc/planner.h5
-rw-r--r--src/include/utils/builtins.h2
16 files changed, 1353 insertions, 543 deletions
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 0e948e4b72..89d1f021ba 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -33,7 +33,11 @@
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
-
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#include "pgxc/poolmgr.h"
+#include "pgxc/execRemote.h"
+#endif
/*
* The hash table in which prepared queries are stored. This is
@@ -42,6 +46,14 @@
* (statement names); the entries are PreparedStatement structs.
*/
static HTAB *prepared_queries = NULL;
+#ifdef PGXC
+/*
+ * The hash table where datanode prepared statements are stored.
+ * The keys are statement names referenced from cached RemoteQuery nodes; the
+ * entries are DatanodeStatement structs
+ */
+static HTAB *datanode_queries = NULL;
+#endif
static void InitQueryHashTable(void);
static ParamListInfo EvaluateParams(PreparedStatement *pstmt, List *params,
@@ -147,6 +159,22 @@ PrepareQuery(PrepareStmt *stmt, const char *queryString)
/* Generate plans for queries. */
plan_list = pg_plan_queries(query_list, 0, NULL);
+#ifdef PGXC
+ /*
+ * Check if we are dealing with more than one step.
+ * Multi-step preapred statements are not yet supported.
+ * PGXCTODO - temporary - Once we add support, this code should be removed.
+ */
+ if (IS_PGXC_COORDINATOR && plan_list && plan_list->head)
+ {
+ PlannedStmt *stmt = (PlannedStmt *) lfirst(plan_list->head);
+
+ if (stmt->planTree->lefttree || stmt->planTree->righttree)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PSTATEMENT_DEFINITION),
+ errmsg("Multi-step Prepared Statements not yet supported")));
+ }
+#endif
/*
* Save the results.
*/
@@ -419,7 +447,76 @@ InitQueryHashTable(void)
32,
&hash_ctl,
HASH_ELEM);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(DatanodeStatement) + NumDataNodes * sizeof(int);
+
+ datanode_queries = hash_create("Datanode Queries",
+ 64,
+ &hash_ctl,
+ HASH_ELEM);
+ }
+#endif
+}
+
+#ifdef PGXC
+/*
+ * Assign the statement name for all the RemoteQueries in the plan tree, so
+ * they use datanode statements
+ */
+static int
+set_remote_stmtname(Plan *plan, const char *stmt_name, int n)
+{
+ if (IsA(plan, RemoteQuery))
+ {
+ DatanodeStatement *entry;
+ bool exists;
+
+ char name[NAMEDATALEN];
+ do
+ {
+ strcpy(name, stmt_name);
+ /*
+ * Append modifier. If resulting string is going to be truncated,
+ * truncate better the base string, otherwise we may enter endless
+ * loop
+ */
+ if (n)
+ {
+ char modifier[NAMEDATALEN];
+ sprintf(modifier, "__%d", n);
+ /*
+ * if position NAMEDATALEN - strlen(modifier) - 1 is beyond the
+ * base string this is effectively noop, otherwise it truncates
+ * the base string
+ */
+ name[NAMEDATALEN - strlen(modifier) - 1] = '\0';
+ strcat(name, modifier);
+ }
+ n++;
+ hash_search(datanode_queries, name, HASH_FIND, &exists);
+ } while (exists);
+ ((RemoteQuery *) plan)->statement = pstrdup(name);
+ entry = (DatanodeStatement *) hash_search(datanode_queries,
+ name,
+ HASH_ENTER,
+ NULL);
+ entry->nodenum = 0;
+ }
+
+ if (innerPlan(plan))
+ n = set_remote_stmtname(innerPlan(plan), stmt_name, n);
+
+ if (outerPlan(plan))
+ n = set_remote_stmtname(outerPlan(plan), stmt_name, n);
+
+ return n;
}
+#endif
/*
* Store all the data pertaining to a query in the hash table using
@@ -459,6 +556,25 @@ StorePreparedStatement(const char *stmt_name,
errmsg("prepared statement \"%s\" already exists",
stmt_name)));
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ ListCell *lc;
+ int n;
+
+ /*
+ * Scan the plans and set the statement field for all found RemoteQuery
+ * nodes so they use data node statements
+ */
+ n = 0;
+ foreach(lc, stmt_list)
+ {
+ PlannedStmt *ps = (PlannedStmt *) lfirst(lc);
+ n = set_remote_stmtname(ps->planTree, stmt_name, n);
+ }
+ }
+#endif
+
/* Create a plancache entry */
plansource = CreateCachedPlan(raw_parse_tree,
query_string,
@@ -844,3 +960,114 @@ build_regtype_array(Oid *param_types, int num_params)
result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
return PointerGetDatum(result);
}
+
+
+#ifdef PGXC
+DatanodeStatement *
+FetchDatanodeStatement(const char *stmt_name, bool throwError)
+{
+ DatanodeStatement *entry;
+
+ /*
+ * If the hash table hasn't been initialized, it can't be storing
+ * anything, therefore it couldn't possibly store our plan.
+ */
+ if (datanode_queries)
+ entry = (DatanodeStatement *) hash_search(datanode_queries,
+ stmt_name,
+ HASH_FIND,
+ NULL);
+ else
+ entry = NULL;
+
+ /* Report error if entry is not found */
+ if (!entry && throwError)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_PSTATEMENT),
+ errmsg("datanode statement \"%s\" does not exist",
+ stmt_name)));
+
+ return entry;
+}
+
+/*
+ * Drop datanode statement and close it on nodes if active
+ */
+void
+DropDatanodeStatement(const char *stmt_name)
+{
+ DatanodeStatement *entry;
+
+ entry = FetchDatanodeStatement(stmt_name, false);
+ if (entry)
+ {
+ int i;
+ List *nodelist = NIL;
+
+ /* make a List of integers from node numbers */
+ for (i = 0; i < entry->nodenum; i++)
+ nodelist = lappend_int(nodelist, entry->nodes[i]);
+ entry->nodenum = 0;
+
+ ExecCloseRemoteStatement(stmt_name, nodelist);
+
+ hash_search(datanode_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ }
+}
+
+
+/*
+ * Return true if there is at least one active datanode statement, so acquired
+ * datanode connections should not be released
+ */
+bool
+HaveActiveDatanodeStatements(void)
+{
+ HASH_SEQ_STATUS seq;
+ DatanodeStatement *entry;
+
+ /* nothing cached */
+ if (!datanode_queries)
+ return false;
+
+ /* walk over cache */
+ hash_seq_init(&seq, datanode_queries);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ {
+ /* Stop walking and return true */
+ if (entry->nodenum > 0)
+ {
+ hash_seq_term(&seq);
+ return true;
+ }
+ }
+ /* nothing found */
+ return false;
+}
+
+
+/*
+ * Mark datanode statement as active on specified node
+ * Return true if statement has already been active on the node and can be used
+ * Returns falsee if statement has not been active on the node and should be
+ * prepared on the node
+ */
+bool
+ActivateDatanodeStatementOnNode(const char *stmt_name, int node)
+{
+ DatanodeStatement *entry;
+ int i;
+
+ /* find the statement in cache */
+ entry = FetchDatanodeStatement(stmt_name, true);
+
+ /* see if statement already active on the node */
+ for (i = 0; i < entry->nodenum; i++)
+ if (entry->nodes[i] == node)
+ return true;
+
+ /* statement is not active on the specified node append item to the list */
+ entry->nodes[entry->nodenum++] = node;
+ return false;
+}
+#endif
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 0c43e9479c..189c8b3fa2 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -793,6 +793,90 @@ ExecCopySlotMinimalTuple(TupleTableSlot *slot)
slot->tts_isnull);
}
+#ifdef PGXC
+/* --------------------------------
+ * ExecCopySlotDatarow
+ * Obtain a copy of a slot's data row. The copy is
+ * palloc'd in the current memory context.
+ * Pointer to the datarow is returned as a var parameter, function
+ * returns the length of the data row
+ * The slot itself is undisturbed
+ * --------------------------------
+ */
+int
+ExecCopySlotDatarow(TupleTableSlot *slot, char **datarow)
+{
+ Assert(datarow);
+
+ if (slot->tts_dataRow)
+ {
+ /* if we already have datarow make a copy */
+ *datarow = (char *)palloc(slot->tts_dataLen);
+ memcpy(*datarow, slot->tts_dataRow, slot->tts_dataLen);
+ return slot->tts_dataLen;
+ }
+ else
+ {
+ TupleDesc tdesc = slot->tts_tupleDescriptor;
+ StringInfoData buf;
+ uint16 n16;
+ int i;
+
+ initStringInfo(&buf);
+ /* Number of parameter values */
+ n16 = htons(tdesc->natts);
+ appendBinaryStringInfo(&buf, (char *) &n16, 2);
+
+ /* ensure we have all values */
+ slot_getallattrs(slot);
+ for (i = 0; i < tdesc->natts; i++)
+ {
+ uint32 n32;
+
+ if (slot->tts_isnull[i])
+ {
+ n32 = htonl(-1);
+ appendBinaryStringInfo(&buf, (char *) &n32, 4);
+ }
+ else
+ {
+ Form_pg_attribute attr = tdesc->attrs[i];
+ Oid typOutput;
+ bool typIsVarlena;
+ Datum pval;
+ char *pstring;
+ int len;
+
+ /* Get info needed to output the value */
+ getTypeOutputInfo(attr->atttypid, &typOutput, &typIsVarlena);
+ /*
+ * If we have a toasted datum, forcibly detoast it here to avoid
+ * memory leakage inside the type's output routine.
+ */
+ if (typIsVarlena)
+ pval = PointerGetDatum(PG_DETOAST_DATUM(slot->tts_values[i]));
+ else
+ pval = slot->tts_values[i];
+
+ /* Convert Datum to string */
+ pstring = OidOutputFunctionCall(typOutput, pval);
+
+ /* copy data to the buffer */
+ len = strlen(pstring);
+ n32 = htonl(len);
+ appendBinaryStringInfo(&buf, (char *) &n32, 4);
+ appendBinaryStringInfo(&buf, pstring, len);
+ }
+ }
+ /* copy data to the buffer */
+ *datarow = palloc(buf.len);
+ memcpy(*datarow, buf.data, buf.len);
+ pfree(buf.data);
+ return buf.len;
+ }
+}
+#endif
+
/* --------------------------------
* ExecFetchSlotTuple
* Fetch the slot's regular physical tuple.
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index aa0587f9c7..72e4d583a2 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -839,12 +839,16 @@ _copyRemoteQuery(RemoteQuery *from)
COPY_NODE_FIELD(distinct);
COPY_SCALAR_FIELD(read_only);
COPY_SCALAR_FIELD(force_autocommit);
+ COPY_STRING_FIELD(statement);
COPY_STRING_FIELD(cursor);
+ COPY_SCALAR_FIELD(exec_type);
+ COPY_SCALAR_FIELD(paramval_data);
+ COPY_SCALAR_FIELD(paramval_len);
COPY_STRING_FIELD(relname);
COPY_SCALAR_FIELD(remotejoin);
- COPY_SCALAR_FIELD(reduce_level);
- COPY_NODE_FIELD(base_tlist);
+ COPY_SCALAR_FIELD(reduce_level);
+ COPY_NODE_FIELD(base_tlist);
COPY_STRING_FIELD(outer_alias);
COPY_STRING_FIELD(inner_alias);
COPY_SCALAR_FIELD(outer_reduce_level);
@@ -867,6 +871,9 @@ _copyExecNodes(ExecNodes *from)
COPY_NODE_FIELD(nodelist);
COPY_SCALAR_FIELD(baselocatortype);
COPY_SCALAR_FIELD(tableusagetype);
+ COPY_NODE_FIELD(expr);
+ COPY_SCALAR_FIELD(relid);
+ COPY_SCALAR_FIELD(accesstype);
return newnode;
}
@@ -2305,7 +2312,9 @@ _copyQuery(Query *from)
COPY_NODE_FIELD(limitCount);
COPY_NODE_FIELD(rowMarks);
COPY_NODE_FIELD(setOperations);
-
+#ifdef PGXC
+ COPY_STRING_FIELD(sql_statement);
+#endif
return newnode;
}
diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c
index 790b81d317..4442310965 100644
--- a/src/backend/pgxc/locator/locator.c
+++ b/src/backend/pgxc/locator/locator.c
@@ -354,22 +354,15 @@ GetRelationNodes(RelationLocInfo *rel_loc_info, long *partValue,
case LOCATOR_TYPE_HASH:
if (partValue != NULL)
- {
/* in prototype, all partitioned tables use same map */
exec_nodes->nodelist = lappend_int(NULL, get_node_from_hash(hash_range_int(*partValue)));
- }
else
- {
- /* If no info, go to node 1 */
if (accessType == RELATION_ACCESS_INSERT)
+ /* Insert NULL to node 1 */
exec_nodes->nodelist = lappend_int(NULL, 1);
else
- /*
- * No partitioning value passed in
- * (no where qualification on part column - use all)
- */
+ /* Use all nodes for other types of access */
exec_nodes->nodelist = list_copy(rel_loc_info->nodeList);
- }
break;
case LOCATOR_TYPE_SINGLE:
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 8d900f162a..1a56b44e40 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -58,6 +58,19 @@ typedef struct
long constant; /* assume long PGXCTODO - should be Datum */
} Literal_Comparison;
+/*
+ * Comparison of partitioned column and expression
+ * Expression can be evaluated at execution time to determine target nodes
+ */
+typedef struct
+{
+ Oid relid;
+ RelationLocInfo *rel_loc_info;
+ Oid attrnum;
+ char *col_name;
+ Expr *expr; /* assume long PGXCTODO - should be Datum */
+} Expr_Comparison;
+
/* Parent-Child joins for relations being joined on
* their respective hash distribuion columns
*/
@@ -75,6 +88,7 @@ typedef struct
typedef struct
{
List *partitioned_literal_comps; /* List of Literal_Comparison */
+ List *partitioned_expressions; /* List of Expr_Comparison */
List *partitioned_parent_child; /* List of Parent_Child_Join */
List *replicated_joins;
@@ -127,6 +141,7 @@ typedef struct XCWalkerContext
Query *query;
RelationAccessType accessType;
RemoteQuery *query_step; /* remote query step being analized */
+ PlannerInfo *root; /* planner data for the subquery */
Special_Conditions *conditions;
bool multilevel_join;
List *rtables; /* a pointer to a list of rtables */
@@ -144,11 +159,12 @@ bool StrictStatementChecking = true;
/* Forbid multi-node SELECT statements with an ORDER BY clause */
bool StrictSelectChecking = false;
-static void get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType);
+static void get_plan_nodes(PlannerInfo *root, RemoteQuery *step, RelationAccessType accessType);
static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context);
static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context);
static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt);
static void InitXCWalkerContext(XCWalkerContext *context);
+static RemoteQuery *makeRemoteQuery(void);
static void validate_part_col_updatable(const Query *query);
static bool is_pgxc_safe_func(Oid funcid);
@@ -307,6 +323,7 @@ free_special_relations(Special_Conditions *special_conditions)
/* free all items in list, including Literal_Comparison struct */
list_free_deep(special_conditions->partitioned_literal_comps);
+ list_free_deep(special_conditions->partitioned_expressions);
/* free list, but not items pointed to */
list_free(special_conditions->partitioned_parent_child);
@@ -451,8 +468,9 @@ get_base_var(Var *var, XCWalkerContext *context)
* then the caller should use the regular PG planner
*/
static void
-get_plan_nodes_insert(Query *query, RemoteQuery *step)
+get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step)
{
+ Query *query = root->parse;
RangeTblEntry *rte;
RelationLocInfo *rel_loc_info;
Const *constant;
@@ -502,15 +520,15 @@ get_plan_nodes_insert(Query *query, RemoteQuery *step)
if (sub_rte->rtekind == RTE_SUBQUERY &&
!sub_rte->subquery->limitCount &&
!sub_rte->subquery->limitOffset)
- get_plan_nodes(sub_rte->subquery, step, RELATION_ACCESS_READ);
+ get_plan_nodes(root, step, RELATION_ACCESS_READ);
}
/* Send to general planner if the query is multiple step */
if (!step->exec_nodes)
return;
- /* If the source is not hash-based (eg, replicated) also send
- * through general planner
+ /* If the source is not hash-based (eg, replicated) also send
+ * through general planner
*/
if (step->exec_nodes->baselocatortype != LOCATOR_TYPE_HASH)
{
@@ -612,7 +630,18 @@ get_plan_nodes_insert(Query *query, RemoteQuery *step)
}
if (checkexpr == NULL)
- return; /* no constant */
+ {
+ /* try and determine nodes on execution time */
+ step->exec_nodes = makeNode(ExecNodes);
+ step->exec_nodes->baselocatortype = rel_loc_info->locatorType;
+ step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
+ step->exec_nodes->primarynodelist = NULL;
+ step->exec_nodes->nodelist = NULL;
+ step->exec_nodes->expr = eval_expr;
+ step->exec_nodes->relid = rel_loc_info->relid;
+ step->exec_nodes->accesstype = RELATION_ACCESS_INSERT;
+ return;
+ }
constant = (Const *) checkexpr;
@@ -788,7 +817,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
initStringInfo(&buf);
/* Step 1: select tuple values by ctid */
- step1 = makeNode(RemoteQuery);
+ step1 = makeRemoteQuery();
appendStringInfoString(&buf, "SELECT ");
for (att = 1; att <= natts; att++)
{
@@ -822,13 +851,11 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'",
tableName, ctid_str);
step1->sql_statement = pstrdup(buf.data);
- step1->is_single_step = true;
step1->exec_nodes = makeNode(ExecNodes);
- step1->read_only = true;
step1->exec_nodes->nodelist = list_make1_int(nodenum);
/* Step 2: declare cursor for update target table */
- step2 = makeNode(RemoteQuery);
+ step2 = makeRemoteQuery();
resetStringInfo(&buf);
appendStringInfoString(&buf, step->cursor);
@@ -852,18 +879,14 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
}
appendStringInfoString(&buf, "FOR UPDATE");
step2->sql_statement = pstrdup(buf.data);
- step2->is_single_step = true;
- step2->read_only = true;
step2->exec_nodes = makeNode(ExecNodes);
step2->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
innerPlan(step2) = (Plan *) step1;
/* Step 3: move cursor to first position */
- step3 = makeNode(RemoteQuery);
+ step3 = makeRemoteQuery();
resetStringInfo(&buf);
appendStringInfo(&buf, "MOVE %s", node_cursor);
step3->sql_statement = pstrdup(buf.data);
- step3->is_single_step = true;
- step3->read_only = true;
step3->exec_nodes = makeNode(ExecNodes);
step3->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
innerPlan(step3) = (Plan *) step2;
@@ -1024,7 +1047,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
if (!IsA(arg2, Const))
{
/* this gets freed when the memory context gets freed */
- Expr *eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) arg2);
+ Expr *eval_expr = (Expr *) eval_const_expressions(context->root, (Node *) arg2);
checkexpr = get_numeric_constant(eval_expr);
}
@@ -1176,6 +1199,32 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
*/
return false;
}
+ /*
+ * Check if it is an expression like pcol = expr, where pcol is
+ * a partitioning column of the rel1 and planner could not
+ * evaluate expr. We probably can evaluate it at execution time.
+ * Save the expression, and if we do not have other hint,
+ * try and evaluate it at execution time
+ */
+ rel_loc_info1 = GetRelationLocInfo(column_base->relid);
+
+ if (!rel_loc_info1)
+ return true;
+
+ if (IsHashColumn(rel_loc_info1, column_base->colname))
+ {
+ Expr_Comparison *expr_comp =
+ palloc(sizeof(Expr_Comparison));
+
+ expr_comp->relid = column_base->relid;
+ expr_comp->rel_loc_info = rel_loc_info1;
+ expr_comp->col_name = column_base->colname;
+ expr_comp->expr = arg2;
+ context->conditions->partitioned_expressions =
+ lappend(context->conditions->partitioned_expressions,
+ expr_comp);
+ return false;
+ }
}
}
}
@@ -1599,24 +1648,19 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
}
if (rtesave)
- {
/* a single table, just grab it */
rel_loc_info = GetRelationLocInfo(rtesave->relid);
+ }
- if (!rel_loc_info)
- return true;
+ /* have complex case */
+ if (!rel_loc_info)
+ return true;
- context->query_step->exec_nodes = GetRelationNodes(rel_loc_info,
- NULL,
- context->accessType);
- }
- }
- else
- {
+ if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH)
+ /* do not need to determine partitioning expression */
context->query_step->exec_nodes = GetRelationNodes(rel_loc_info,
NULL,
context->accessType);
- }
/* Note replicated table usage for determining safe queries */
if (context->query_step->exec_nodes)
@@ -1625,6 +1669,38 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
table_usage_type = TABLE_USAGE_TYPE_USER_REPLICATED;
context->query_step->exec_nodes->tableusagetype = table_usage_type;
+ } else if (context->conditions->partitioned_expressions) {
+ /* probably we can determine nodes on execution time */
+ foreach(lc, context->conditions->partitioned_expressions) {
+ Expr_Comparison *expr_comp = (Expr_Comparison *) lfirst(lc);
+ if (rel_loc_info->relid == expr_comp->relid)
+ {
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->baselocatortype =
+ rel_loc_info->locatorType;
+ context->query_step->exec_nodes->tableusagetype =
+ TABLE_USAGE_TYPE_USER;
+ context->query_step->exec_nodes->primarynodelist = NULL;
+ context->query_step->exec_nodes->nodelist = NULL;
+ context->query_step->exec_nodes->expr = expr_comp->expr;
+ context->query_step->exec_nodes->relid = expr_comp->relid;
+ context->query_step->exec_nodes->accesstype = context->accessType;
+ break;
+ }
+ }
+ } else {
+ /* run query on all nodes */
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->baselocatortype =
+ rel_loc_info->locatorType;
+ context->query_step->exec_nodes->tableusagetype =
+ TABLE_USAGE_TYPE_USER;
+ context->query_step->exec_nodes->primarynodelist = NULL;
+ context->query_step->exec_nodes->nodelist =
+ list_copy(rel_loc_info->nodeList);
+ context->query_step->exec_nodes->expr = NULL;
+ context->query_step->exec_nodes->relid = NULL;
+ context->query_step->exec_nodes->accesstype = context->accessType;
}
}
/* check for partitioned col comparison against a literal */
@@ -1712,6 +1788,7 @@ InitXCWalkerContext(XCWalkerContext *context)
context->query = NULL;
context->accessType = RELATION_ACCESS_READ;
context->query_step = NULL;
+ context->root = NULL;
context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions));
context->rtables = NIL;
context->multilevel_join = false;
@@ -1722,20 +1799,57 @@ InitXCWalkerContext(XCWalkerContext *context)
context->join_list = NIL;
}
+
+/*
+ * Create an instance of RemoteQuery and initialize fields
+ */
+static RemoteQuery *
+makeRemoteQuery(void)
+{
+ RemoteQuery *result = makeNode(RemoteQuery);
+ result->is_single_step = true;
+ result->sql_statement = NULL;
+ result->exec_nodes = NULL;
+ result->combine_type = COMBINE_TYPE_NONE;
+ result->simple_aggregates = NIL;
+ result->sort = NULL;
+ result->distinct = NULL;
+ result->read_only = true;
+ result->force_autocommit = false;
+ result->cursor = NULL;
+ result->exec_type = EXEC_ON_DATANODES;
+ result->paramval_data = NULL;
+ result->paramval_len = 0;
+
+ result->relname = NULL;
+ result->remotejoin = false;
+ result->partitioned_replicated = false;
+ result->reduce_level = 0;
+ result->base_tlist = NIL;
+ result->outer_alias = NULL;
+ result->inner_alias = NULL;
+ result->outer_reduce_level = 0;
+ result->inner_reduce_level = 0;
+ result->outer_relids = NULL;
+ result->inner_relids = NULL;
+ return result;
+}
+
/*
* Top level entry point before walking query to determine plan nodes
*
*/
static void
-get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType)
+get_plan_nodes(PlannerInfo *root, RemoteQuery *step, RelationAccessType accessType)
{
+ Query *query = root->parse;
XCWalkerContext context;
-
InitXCWalkerContext(&context);
context.query = query;
context.accessType = accessType;
context.query_step = step;
+ context.root = root;
context.rtables = lappend(context.rtables, query->rtable);
if ((get_plan_nodes_walker((Node *) query, &context)
@@ -1754,24 +1868,24 @@ get_plan_nodes(Query *query, RemoteQuery *step, RelationAccessType accessType)
*
*/
static void
-get_plan_nodes_command(Query *query, RemoteQuery *step)
+get_plan_nodes_command(RemoteQuery *step, PlannerInfo *root)
{
- switch (query->commandType)
+ switch (root->parse->commandType)
{
case CMD_SELECT:
- get_plan_nodes(query, step, query->rowMarks ?
+ get_plan_nodes(root, step, root->parse->rowMarks ?
RELATION_ACCESS_READ_FOR_UPDATE :
RELATION_ACCESS_READ);
break;
case CMD_INSERT:
- get_plan_nodes_insert(query, step);
+ get_plan_nodes_insert(root, step);
break;
case CMD_UPDATE:
case CMD_DELETE:
/* treat as a select */
- get_plan_nodes(query, step, RELATION_ACCESS_UPDATE);
+ get_plan_nodes(root, step, RELATION_ACCESS_UPDATE);
break;
default:
@@ -2589,9 +2703,41 @@ PlannedStmt *
pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *result;
- Plan *standardPlan;
+ PlannerGlobal *glob;
+ PlannerInfo *root;
RemoteQuery *query_step;
+ StringInfoData buf;
+ /*
+ * Set up global state for this planner invocation. This data is needed
+ * across all levels of sub-Query that might exist in the given command,
+ * so we keep it in a separate struct that's linked to by each per-Query
+ * PlannerInfo.
+ */
+ glob = makeNode(PlannerGlobal);
+
+ glob->boundParams = boundParams;
+ glob->paramlist = NIL;
+ glob->subplans = NIL;
+ glob->subrtables = NIL;
+ glob->rewindPlanIDs = NULL;
+ glob->finalrtable = NIL;
+ glob->relationOids = NIL;
+ glob->invalItems = NIL;
+ glob->lastPHId = 0;
+ glob->transientPlan = false;
+
+ /* Create a PlannerInfo data structure, usually it is done for a subquery */
+ root = makeNode(PlannerInfo);
+ root->parse = query;
+ root->glob = glob;
+ root->query_level = 1;
+ root->parent_root = NULL;
+ root->planner_cxt = CurrentMemoryContext;
+ root->init_plans = NIL;
+ root->cte_plan_ids = NIL;
+ root->eq_classes = NIL;
+ root->append_rel_list = NIL;
/* build the PlannedStmt result */
result = makeNode(PlannedStmt);
@@ -2603,184 +2749,151 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
result->intoClause = query->intoClause;
result->rtable = query->rtable;
- query_step = makeNode(RemoteQuery);
- query_step->is_single_step = false;
+ query_step = makeRemoteQuery();
+
+ /* Optimize multi-node handling */
+ query_step->read_only = query->commandType == CMD_SELECT;
if (query->utilityStmt &&
IsA(query->utilityStmt, DeclareCursorStmt))
cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options;
- query_step->exec_nodes = NULL;
- query_step->combine_type = COMBINE_TYPE_NONE;
- query_step->simple_aggregates = NULL;
- /* Optimize multi-node handling */
- query_step->read_only = query->commandType == CMD_SELECT;
- query_step->force_autocommit = false;
-
result->planTree = (Plan *) query_step;
- /*
- * Determine where to execute the command, either at the Coordinator
- * level, Data Nodes, or both. By default we choose both. We should be
- * able to quickly expand this for more commands.
- */
- switch (query->commandType)
- {
- case CMD_SELECT:
- /* Perform some checks to make sure we can support the statement */
- if (query->intoClause)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("INTO clause not yet supported"))));
- /* fallthru */
- case CMD_INSERT:
- case CMD_UPDATE:
- case CMD_DELETE:
- /* PGXCTODO: This validation will not be removed
- * until we support moving tuples from one node to another
- * when the partition column of a table is updated
- */
- if (query->commandType == CMD_UPDATE)
- validate_part_col_updatable(query);
-
- if (query->returningList)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("RETURNING clause not yet supported"))));
-
- /* Set result relations */
- if (query->commandType != CMD_SELECT)
- result->resultRelations = list_make1_int(query->resultRelation);
+ /* Perform some checks to make sure we can support the statement */
+ if (query->commandType == CMD_SELECT && query->intoClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("INTO clause not yet supported"))));
- get_plan_nodes_command(query, query_step);
+ /* PGXCTODO: This validation will not be removed
+ * until we support moving tuples from one node to another
+ * when the partition column of a table is updated
+ */
+ if (query->commandType == CMD_UPDATE)
+ validate_part_col_updatable(query);
- if (query_step->exec_nodes == NULL)
- {
- /* Do not yet allow multi-node correlated UPDATE or DELETE */
- if (query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE)
- {
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("UPDATE and DELETE that are correlated or use non-immutable functions not yet supported"))));
- }
+ if (query->returningList)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("RETURNING clause not yet supported"))));
- /*
- * Processing guery against catalog tables, or multi-step command.
- * Run through standard planner
- */
- result = standard_planner(query, cursorOptions, boundParams);
- return result;
- }
+ /* Set result relations */
+ if (query->commandType != CMD_SELECT)
+ result->resultRelations = list_make1_int(query->resultRelation);
- /* Do not yet allow multi-node correlated UPDATE or DELETE */
- if ((query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE)
- && !query_step->exec_nodes
- && list_length(query->rtable) > 1)
- {
- result = standard_planner(query, cursorOptions, boundParams);
- return result;
- }
+ get_plan_nodes_command(query_step, root);
- /*
- * get_plan_nodes_command may alter original statement, so do not
- * process it before the call
- *
- * Declare Cursor case:
- * We should leave as a step query only SELECT statement
- * Further if we need refer source statement for planning we should take
- * the truncated string
- */
- if (query->utilityStmt &&
- IsA(query->utilityStmt, DeclareCursorStmt))
- {
+ if (query_step->exec_nodes == NULL)
+ {
+ /* Do not yet allow multi-node correlated UPDATE or DELETE */
+ if (query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("UPDATE and DELETE that are correlated or use non-immutable functions not yet supported"))));
+ }
- /* search for SELECT keyword in the normalized string */
- char *select = strpos(query->sql_statement, " SELECT ");
- /* Take substring of the original string using found offset */
- query_step->sql_statement = pstrdup(select + 1);
- }
- else
- query_step->sql_statement = pstrdup(query->sql_statement);
+ /*
+ * Processing guery against catalog tables, or multi-step command.
+ * Run through standard planner
+ */
+ result = standard_planner(query, cursorOptions, boundParams);
+ return result;
+ }
- /*
- * If there already is an active portal, we may be doing planning
- * within a function. Just use the standard plan, but check if
- * it is part of an EXPLAIN statement so that we do not show that
- * we plan multiple steps when it is a single-step operation.
- */
- if (ActivePortal && strcmp(ActivePortal->commandTag, "EXPLAIN"))
- return standard_planner(query, cursorOptions, boundParams);
+ /* Do not yet allow multi-node correlated UPDATE or DELETE */
+ if ((query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE)
+ && !query_step->exec_nodes
+ && list_length(query->rtable) > 1)
+ {
+ result = standard_planner(query, cursorOptions, boundParams);
+ return result;
+ }
- query_step->is_single_step = true;
- /*
- * PGXCTODO
- * When Postgres runs insert into t (a) values (1); against table
- * defined as create table t (a int, b int); the plan is looking
- * like insert into t (a,b) values (1,null);
- * Later executor is verifying plan, to make sure table has not
- * been altered since plan has been created and comparing table
- * definition with plan target list and output error if they do
- * not match.
- * I could not find better way to generate targetList for pgxc plan
- * then call standard planner and take targetList from the plan
- * generated by Postgres.
- */
- query_step->scan.plan.targetlist = query->targetList;
+ /*
+ * Deparse query tree to get step query. It may be modified later on
+ */
+ initStringInfo(&buf);
+ deparse_query(query, &buf, NIL);
+ query_step->sql_statement = pstrdup(buf.data);
+ pfree(buf.data);
- if (query_step->exec_nodes)
- query_step->combine_type = get_plan_combine_type(
- query, query_step->exec_nodes->baselocatortype);
+ query_step->is_single_step = true;
+ /*
+ * PGXCTODO
+ * When Postgres runs insert into t (a) values (1); against table
+ * defined as create table t (a int, b int); the plan is looking
+ * like insert into t (a,b) values (1,null);
+ * Later executor is verifying plan, to make sure table has not
+ * been altered since plan has been created and comparing table
+ * definition with plan target list and output error if they do
+ * not match.
+ * I could not find better way to generate targetList for pgxc plan
+ * then call standard planner and take targetList from the plan
+ * generated by Postgres.
+ */
+ query_step->scan.plan.targetlist = query->targetList;
- /* Set up simple aggregates */
- /* PGXCTODO - we should detect what types of aggregates are used.
- * in some cases we can avoid the final step and merely proxy results
- * (when there is only one data node involved) instead of using
- * coordinator consolidation. At the moment this is needed for AVG()
- */
- query_step->simple_aggregates = get_simple_aggregates(query);
+ if (query_step->exec_nodes)
+ query_step->combine_type = get_plan_combine_type(
+ query, query_step->exec_nodes->baselocatortype);
+
+ /* Set up simple aggregates */
+ /* PGXCTODO - we should detect what types of aggregates are used.
+ * in some cases we can avoid the final step and merely proxy results
+ * (when there is only one data node involved) instead of using
+ * coordinator consolidation. At the moment this is needed for AVG()
+ */
+ query_step->simple_aggregates = get_simple_aggregates(query);
- /*
- * Add sorting to the step
- */
- if (list_length(query_step->exec_nodes->nodelist) > 1 &&
- (query->sortClause || query->distinctClause))
- make_simple_sort_from_sortclauses(query, query_step);
+ /*
+ * Add sorting to the step
+ */
+ if (list_length(query_step->exec_nodes->nodelist) > 1 &&
+ (query->sortClause || query->distinctClause))
+ make_simple_sort_from_sortclauses(query, query_step);
- /* Handle LIMIT and OFFSET for single-step queries on multiple nodes */
- if (handle_limit_offset(query_step, query, result))
- {
- /* complicated expressions, just fallback to standard plan */
- result = standard_planner(query, cursorOptions, boundParams);
- return result;
- }
+ /* Handle LIMIT and OFFSET for single-step queries on multiple nodes */
+ if (handle_limit_offset(query_step, query, result))
+ {
+ /* complicated expressions, just fallback to standard plan */
+ result = standard_planner(query, cursorOptions, boundParams);
+ return result;
+ }
- /*
- * Use standard plan if we have more than one data node with either
- * group by, hasWindowFuncs, or hasRecursive
- */
- /*
- * PGXCTODO - this could be improved to check if the first
- * group by expression is the partitioning column, in which
- * case it is ok to treat as a single step.
- */
- if (query->commandType == CMD_SELECT
- && query_step->exec_nodes
- && list_length(query_step->exec_nodes->nodelist) > 1
- && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
- {
- result->planTree = standardPlan;
- return result;
- }
- break;
+ /*
+ * Use standard plan if we have more than one data node with either
+ * group by, hasWindowFuncs, or hasRecursive
+ */
+ /*
+ * PGXCTODO - this could be improved to check if the first
+ * group by expression is the partitioning column, in which
+ * case it is ok to treat as a single step.
+ */
+ if (query->commandType == CMD_SELECT
+ && query_step->exec_nodes
+ && list_length(query_step->exec_nodes->nodelist) > 1
+ && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
+ {
+ result = standard_planner(query, cursorOptions, boundParams);
+ return result;
+ }
- default:
- /* Allow for override */
- if (StrictStatementChecking)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("This command is not yet supported."))));
- else
- result->planTree = standardPlan;
+ /* Allow for override */
+ /* AM: Is this ever possible? */
+ if (query->commandType != CMD_SELECT &&
+ query->commandType != CMD_INSERT &&
+ query->commandType != CMD_UPDATE &&
+ query->commandType != CMD_DELETE)
+ {
+ if (StrictStatementChecking)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("This command is not yet supported."))));
+ else
+ result = standard_planner(query, cursorOptions, boundParams);
+ return result;
}
/*
@@ -2808,6 +2921,13 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
}
/*
+ * Assume single step. If there are multiple steps we should make up
+ * parameters for each step where they referenced
+ */
+ if (boundParams)
+ query_step->paramval_len = ParamListToDataRow(boundParams,
+ &query_step->paramval_data);
+ /*
* If query is FOR UPDATE fetch CTIDs from the remote node
* Use CTID as a key to update tuples on remote nodes when handling
* WHERE CURRENT OF
@@ -3068,7 +3188,7 @@ validate_part_col_updatable(const Query *query)
*
* Based on is_immutable_func from postgresql_fdw.c
* We add an exeption for base postgresql functions, to
- * allow now() and others to still execute as part of single step
+ * allow now() and others to still execute as part of single step
* queries.
*
* PGXCTODO - we currently make the false assumption that immutable
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index b954003db5..a387354e7b 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -19,6 +19,7 @@
#include "postgres.h"
#include "access/gtm.h"
#include "access/xact.h"
+#include "commands/prepare.h"
#include "executor/executor.h"
#include "gtm/gtm_c.h"
#include "libpq/libpq.h"
@@ -27,6 +28,7 @@
#include "pgxc/poolmgr.h"
#include "storage/ipc.h"
#include "utils/datum.h"
+#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/tuplesort.h"
#include "utils/snapmgr.h"
@@ -38,9 +40,6 @@
#define DATA_NODE_FETCH_SIZE 1
-
-extern char *deparseSql(RemoteQueryState *scanstate);
-
/*
* Buffer size does not affect performance significantly, just do not allow
* connection buffer grows infinitely
@@ -62,6 +61,9 @@ static int pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransacti
PGXCNodeAllHandles * pgxc_handles, char *gid);
static int pgxc_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
PGXCNodeAllHandles * pgxc_handles, char *gid);
+static PGXCNodeAllHandles * get_exec_connections(RemoteQueryState *planstate,
+ ExecNodes *exec_nodes,
+ RemoteQueryExecType exec_type);
static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid,
GlobalTransactionId commit_xid,
PGXCNodeAllHandles * pgxc_handles,
@@ -70,8 +72,6 @@ static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid,
static int pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid,
PGXCNodeAllHandles * pgxc_handles, char *gid);
-static PGXCNodeAllHandles * get_exec_connections(ExecNodes *exec_nodes,
- RemoteQueryExecType exec_type);
static int pgxc_node_receive_and_validate(const int conn_count,
PGXCNodeHandle ** connections,
bool reset_combiner);
@@ -1265,10 +1265,10 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
/*
* If we are in the process of shutting down, we
- * may be rolling back, and the buffer may contain other messages.
- * We want to avoid a procarray exception
- * as well as an error stack overflow.
- */
+ * may be rolling back, and the buffer may contain other messages.
+ * We want to avoid a procarray exception
+ * as well as an error stack overflow.
+ */
if (proc_exit_inprogress)
conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
@@ -1364,10 +1364,11 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
/* sync lost? */
elog(WARNING, "Received unsupported message type: %c", msg_type);
conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
- return RESPONSE_EOF;
+ /* stop reading */
+ return RESPONSE_COMPLETE;
}
}
-
+ /* never happen, but keep compiler quiet */
return RESPONSE_EOF;
}
@@ -2746,7 +2747,6 @@ RemoteQueryState *
ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
{
RemoteQueryState *remotestate;
- Relation currentRelation;
remotestate = CreateResponseCombiner(0, node->combine_type);
remotestate->ss.ps.plan = (Plan *) node;
@@ -2788,6 +2788,19 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
ALLOCSET_DEFAULT_MAXSIZE);
}
+ /*
+ * If we have parameter values here and planner has not had them we
+ * should prepare them now
+ */
+ if (estate->es_param_list_info && !node->paramval_data)
+ node->paramval_len = ParamListToDataRow(estate->es_param_list_info,
+ &node->paramval_data);
+
+ /* We need expression context to evaluate */
+ if (node->exec_nodes && node->exec_nodes->expr)
+ ExecAssignExprContext(estate, &remotestate->ss.ps);
+
+
if (innerPlan(node))
innerPlanState(remotestate) = ExecInitNode(innerPlan(node), estate, eflags);
@@ -2853,7 +2866,8 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
* Datanodes Only, Coordinators only or both types
*/
static PGXCNodeAllHandles *
-get_exec_connections(ExecNodes *exec_nodes,
+get_exec_connections(RemoteQueryState *planstate,
+ ExecNodes *exec_nodes,
RemoteQueryExecType exec_type)
{
List *nodelist = NIL;
@@ -2873,8 +2887,34 @@ get_exec_connections(ExecNodes *exec_nodes,
if (exec_nodes)
{
- nodelist = exec_nodes->nodelist;
- primarynode = exec_nodes->primarynodelist;
+ if (exec_nodes->expr)
+ {
+ /* execution time determining of target data nodes */
+ bool isnull;
+ ExprState *estate = ExecInitExpr(exec_nodes->expr,
+ (PlanState *) planstate);
+ Datum partvalue = ExecEvalExpr(estate,
+ planstate->ss.ps.ps_ExprContext,
+ &isnull,
+ NULL);
+ if (!isnull)
+ {
+ RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->relid);
+ ExecNodes *nodes = GetRelationNodes(rel_loc_info,
+ (long *) &partvalue,
+ exec_nodes->accesstype);
+ if (nodes)
+ {
+ nodelist = nodes->nodelist;
+ primarynode = nodes->primarynodelist;
+ pfree(nodes);
+ }
+ FreeRelationLocInfo(rel_loc_info);
+ }
+ } else {
+ nodelist = exec_nodes->nodelist;
+ primarynode = exec_nodes->primarynodelist;
+ }
}
if (list_length(nodelist) == 0 &&
@@ -2961,212 +3001,273 @@ register_write_nodes(int conn_count, PGXCNodeHandle **connections)
}
}
-/*
- * Execute step of PGXC plan.
- * The step specifies a command to be executed on specified nodes.
- * On first invocation connections to the data nodes are initialized and
- * command is executed. Further, as well as within subsequent invocations,
- * responses are received until step is completed or there is a tuple to emit.
- * If there is a tuple it is returned, otherwise returned NULL. The NULL result
- * from the function indicates completed step.
- * The function returns at most one tuple per invocation.
- */
-TupleTableSlot *
-ExecRemoteQuery(RemoteQueryState *node)
+
+static void
+do_query(RemoteQueryState *node)
{
RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan;
- TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
- bool have_tuple = false;
+ bool force_autocommit = step->force_autocommit;
+ bool is_read_only = step->read_only;
+ GlobalTransactionId gxid = InvalidGlobalTransactionId;
+ Snapshot snapshot = GetActiveSnapshot();
+ TimestampTz timestamp = GetCurrentGTMStartTimestamp();
+ PGXCNodeHandle **connections = NULL;
+ PGXCNodeHandle *primaryconnection = NULL;
+ int i;
+ int regular_conn_count;
+ int total_conn_count;
+ bool need_tran;
+ PGXCNodeAllHandles *pgxc_connections;
+ /*
+ * Get connections for Datanodes only, utilities and DDLs
+ * are launched in ExecRemoteUtility
+ */
+ pgxc_connections = get_exec_connections(node, step->exec_nodes,
+ EXEC_ON_DATANODES);
- if (!node->query_Done)
+ connections = pgxc_connections->datanode_handles;
+ primaryconnection = pgxc_connections->primary_handle;
+ total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count;
+
+ /*
+ * Primary connection is counted separately but is included in total_conn_count if used.
+ */
+ if (primaryconnection)
{
- /* First invocation, initialize */
- bool force_autocommit = step->force_autocommit;
- bool is_read_only = step->read_only;
- GlobalTransactionId gxid = InvalidGlobalTransactionId;
- Snapshot snapshot = GetActiveSnapshot();
- TimestampTz timestamp = GetCurrentGTMStartTimestamp();
- PGXCNodeHandle **connections = NULL;
- PGXCNodeHandle *primaryconnection = NULL;
- int i;
- int regular_conn_count;
- int total_conn_count;
- bool need_tran;
- PGXCNodeAllHandles *pgxc_connections;
- TupleTableSlot *innerSlot = NULL;
-
- implicit_force_autocommit = force_autocommit;
+ regular_conn_count--;
+ }
- /*
- * Inner plan for RemoteQuery supplies parameters.
- * We execute inner plan to get a tuple and use values of the tuple as
- * parameter values when executing this remote query.
- * If returned slot contains NULL tuple break execution.
- * TODO there is a problem how to handle the case if both inner and
- * outer plans exist. We can decide later, since it is never used now.
- */
- if (innerPlanState(node))
- {
- innerSlot = ExecProcNode(innerPlanState(node));
-// if (TupIsNull(innerSlot))
-// return innerSlot;
- }
+ pfree(pgxc_connections);
- /*
- * Get connections for Datanodes only, utilities and DDLs
- * are launched in ExecRemoteUtility
- */
- pgxc_connections = get_exec_connections(step->exec_nodes,
- EXEC_ON_DATANODES);
+ /*
+ * We save only regular connections, at the time we exit the function
+ * we finish with the primary connection and deal only with regular
+ * connections on subsequent invocations
+ */
+ node->node_count = regular_conn_count;
- connections = pgxc_connections->datanode_handles;
- primaryconnection = pgxc_connections->primary_handle;
- total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count;
+ if (force_autocommit)
+ need_tran = false;
+ else
+ need_tran = !autocommit || (!is_read_only && total_conn_count > 1);
- /*
- * Primary connection is counted separately but is included in total_conn_count if used.
- */
+ elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
+
+ stat_statement();
+ if (autocommit)
+ {
+ stat_transaction(total_conn_count);
+ /* We normally clear for transactions, but if autocommit, clear here, too */
+ clear_write_node_list();
+ }
+
+ if (!is_read_only)
+ {
if (primaryconnection)
- {
- regular_conn_count--;
- }
+ register_write_nodes(1, &primaryconnection);
+ register_write_nodes(regular_conn_count, connections);
+ }
+
+ gxid = GetCurrentGlobalTransactionId();
- pfree(pgxc_connections);
+ if (!GlobalTransactionIdIsValid(gxid))
+ {
+ if (primaryconnection)
+ pfree(primaryconnection);
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to get next transaction ID")));
+ }
+ if (need_tran)
+ {
/*
- * We save only regular connections, at the time we exit the function
- * we finish with the primary connection and deal only with regular
- * connections on subsequent invocations
+ * Check if data node connections are in transaction and start
+ * transactions on nodes where it is not started
*/
- node->node_count = regular_conn_count;
+ PGXCNodeHandle *new_connections[total_conn_count];
+ int new_count = 0;
- if (force_autocommit)
- need_tran = false;
- else
- need_tran = !autocommit || (!is_read_only && total_conn_count > 1);
+ if (primaryconnection && primaryconnection->transaction_status != 'T')
+ new_connections[new_count++] = primaryconnection;
+ for (i = 0; i < regular_conn_count; i++)
+ if (connections[i]->transaction_status != 'T')
+ new_connections[new_count++] = connections[i];
+
+ if (new_count && pgxc_node_begin(new_count, new_connections, gxid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not begin transaction on data nodes.")));
+ }
- elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
+ /* See if we have a primary node, execute on it first before the others */
+ if (primaryconnection)
+ {
+ if (primaryconnection->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(primaryconnection);
- stat_statement();
- if (autocommit)
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
{
- stat_transaction(total_conn_count);
- /* We normally clear for transactions, but if autocommit, clear here, too */
- clear_write_node_list();
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
}
-
- if (!is_read_only)
+ if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp))
{
- if (primaryconnection)
- register_write_nodes(1, &primaryconnection);
- register_write_nodes(regular_conn_count, connections);
+ /*
+ * If a transaction involves multiple connections timestamp is
+ * always sent down to Datanodes with pgxc_node_begin.
+ * An autocommit transaction needs the global timestamp also,
+ * so handle this case here.
+ */
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
}
-
- gxid = GetCurrentGlobalTransactionId();
-
- if (!GlobalTransactionIdIsValid(gxid))
+ if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot))
{
- if (primaryconnection)
- pfree(primaryconnection);
pfree(connections);
+ pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to get next transaction ID")));
+ errmsg("Failed to send command to data nodes")));
}
-
- if (need_tran)
+ if (step->statement || step->cursor || step->paramval_data)
{
+ /* need to use Extended Query Protocol */
+ int fetch = 0;
+ bool prepared = false;
+
+ /* if prepared statement is referenced see if it is already exist */
+ if (step->statement)
+ prepared = ActivateDatanodeStatementOnNode(step->statement,
+ primaryconnection->nodenum);
/*
- * Check if data node connections are in transaction and start
- * transactions on nodes where it is not started
+ * execute and fetch rows only if they will be consumed
+ * immediately by the sorter
*/
- PGXCNodeHandle *new_connections[total_conn_count];
- int new_count = 0;
-
- if (primaryconnection && primaryconnection->transaction_status != 'T')
- new_connections[new_count++] = primaryconnection;
- for (i = 0; i < regular_conn_count; i++)
- if (connections[i]->transaction_status != 'T')
- new_connections[new_count++] = connections[i];
-
- if (new_count && pgxc_node_begin(new_count, new_connections, gxid))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not begin transaction on data nodes.")));
- }
-
- /* See if we have a primary node, execute on it first before the others */
- if (primaryconnection)
- {
- if (primaryconnection->state == DN_CONNECTION_STATE_QUERY)
- BufferConnection(primaryconnection);
-
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
+ if (step->cursor)
+ fetch = 1;
+
+ if (pgxc_node_send_query_extended(primaryconnection,
+ prepared ? NULL : step->sql_statement,
+ step->statement,
+ step->cursor,
+ step->paramval_len,
+ step->paramval_data,
+ step->read_only,
+ fetch) != 0)
{
pfree(connections);
- pfree(primaryconnection);
+ if (primaryconnection)
+ pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp))
+ }
+ else
+ {
+ if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0)
{
- /*
- * If a transaction involves multiple connections timestamp is
- * always sent down to Datanodes with pgxc_node_begin.
- * An autocommit transaction needs the global timestamp also,
- * so handle this case here.
- */
pfree(connections);
pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot))
- {
- pfree(connections);
- pfree(primaryconnection);
+ }
+ primaryconnection->combiner = node;
+ Assert(node->combine_type == COMBINE_TYPE_SAME);
+
+ while (node->command_complete_count < 1)
+ {
+ if (pgxc_node_receive(1, &primaryconnection, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0)
+ errmsg("Failed to read response from data nodes")));
+ handle_response(primaryconnection, node);
+ if (node->errorMessage)
{
- pfree(connections);
- pfree(primaryconnection);
+ char *code = node->errorCode;
ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- Assert(node->combine_type == COMBINE_TYPE_SAME);
-
- while (node->command_complete_count < 1)
- {
- if (pgxc_node_receive(1, &primaryconnection, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
- handle_response(primaryconnection, node);
- if (node->errorMessage)
- {
- char *code = node->errorCode;
- ereport(ERROR,
- (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
- errmsg("%s", node->errorMessage)));
- }
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", node->errorMessage)));
}
}
+ }
- for (i = 0; i < regular_conn_count; i++)
+ for (i = 0; i < regular_conn_count; i++)
+ {
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
{
- if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
- BufferConnection(connections[i]);
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
+ pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp))
+ {
+ /*
+ * If a transaction involves multiple connections timestamp is
+ * always sent down to Datanodes with pgxc_node_begin.
+ * An autocommit transaction needs the global timestamp also,
+ * so handle this case here.
+ */
+ pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot))
+ {
+ pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (step->statement || step->cursor || step->paramval_data)
+ {
+ /* need to use Extended Query Protocol */
+ int fetch = 0;
+ bool prepared = false;
+
+ /* if prepared statement is referenced see if it is already exist */
+ if (step->statement)
+ prepared = ActivateDatanodeStatementOnNode(step->statement,
+ connections[i]->nodenum);
+ /*
+ * execute and fetch rows only if they will be consumed
+ * immediately by the sorter
+ */
+ if (step->cursor)
+ fetch = 1;
+
+ if (pgxc_node_send_query_extended(connections[i],
+ prepared ? NULL : step->sql_statement,
+ step->statement,
+ step->cursor,
+ step->paramval_len,
+ step->paramval_data,
+ step->read_only,
+ fetch) != 0)
{
pfree(connections);
if (primaryconnection)
@@ -3175,14 +3276,11 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp))
+ }
+ else
+ {
+ if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
{
- /*
- * If a transaction involves multiple connections timestamp is
- * always sent down to Datanodes with pgxc_node_begin.
- * An autocommit transaction needs the global timestamp also,
- * so handle this case here.
- */
pfree(connections);
if (primaryconnection)
pfree(primaryconnection);
@@ -3190,176 +3288,163 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot))
+ }
+ connections[i]->combiner = node;
+ }
+
+ if (step->cursor)
+ {
+ node->cursor = step->cursor;
+ node->cursor_count = regular_conn_count;
+ node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *));
+ memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *));
+ }
+
+ /*
+ * Stop if all commands are completed or we got a data row and
+ * initialized state node for subsequent invocations
+ */
+ while (regular_conn_count > 0 && node->connections == NULL)
+ {
+ int i = 0;
+
+ if (pgxc_node_receive(regular_conn_count, connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
+ /*
+ * Handle input from the data nodes.
+ * If we got a RESPONSE_DATAROW we can break handling to wrap
+ * it into a tuple and return. Handling will be continued upon
+ * subsequent invocations.
+ * If we got 0, we exclude connection from the list. We do not
+ * expect more input from it. In case of non-SELECT query we quit
+ * the loop when all nodes finish their work and send ReadyForQuery
+ * with empty connections array.
+ * If we got EOF, move to the next connection, will receive more
+ * data on the next iteration.
+ */
+ while (i < regular_conn_count)
+ {
+ int res = handle_response(connections[i], node);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
{
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
+ if (i < --regular_conn_count)
+ connections[i] = connections[regular_conn_count];
}
- if (step->cursor || innerSlot)
+ else if (res == RESPONSE_TUPDESC)
{
- int fetch = 0;
+ ExecSetSlotDescriptor(scanslot, node->tuple_desc);
/*
- * execute and fetch rows only if they will be consumed
- * immediately by the sorter
+ * Now tuple table slot is responsible for freeing the
+ * descriptor
*/
- if (step->cursor)
- fetch = 1;
- /*
- * Here is ad hoc handling of WHERE CURRENT OF clause.
- * Previous step have returned the key values to update
- * replicated tuple into innerSlot and the query is a
- * DECLARE CURSOR. We do not want to execute this query now
- * because of unusual fetch procedure: we just need to
- * position cursor on the first row on each node.
- * So here we just parse and bind query. As the result cursor
- * specified by the statement will be created, and next step
- * will issue MOVE command positioning the cursor properly.
- * If we want to use parametrized requests for any other
- * purpose we should return and rework this code
- */
- if (pgxc_node_send_query_extended(connections[i],
- step->sql_statement,
- NULL,
- step->cursor,
- innerSlot ? innerSlot->tts_dataLen : 0,
- innerSlot ? innerSlot->tts_dataRow : NULL,
- step->cursor != NULL,
- fetch) != 0)
+ node->tuple_desc = NULL;
+ if (step->sort)
{
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
+ SimpleSort *sort = step->sort;
+
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
+ /*
+ * First message is already in the buffer
+ * Further fetch will be under tuplesort control
+ * If query does not produce rows tuplesort will not
+ * be initialized
+ */
+ node->tuplesortstate = tuplesort_begin_merge(
+ scanslot->tts_tupleDescriptor,
+ sort->numCols,
+ sort->sortColIdx,
+ sort->sortOperators,
+ sort->nullsFirst,
+ node,
+ work_mem);
+ /*
+ * Break the loop, do not wait for first row.
+ * Tuplesort module want to control node it is
+ * fetching rows from, while in this loop first
+ * row would be got from random node
+ */
+ break;
}
}
- else
+ else if (res == RESPONSE_DATAROW)
{
- if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
- {
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
+ /*
+ * Got first data row, quit the loop
+ */
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
+ node->current_conn = i;
+ break;
}
- connections[i]->combiner = node;
}
+ }
+
+ if (node->cursor_count)
+ {
+ node->conn_count = node->cursor_count;
+ memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *));
+ node->connections = connections;
+ }
+}
+
+
+/*
+ * Execute step of PGXC plan.
+ * The step specifies a command to be executed on specified nodes.
+ * On first invocation connections to the data nodes are initialized and
+ * command is executed. Further, as well as within subsequent invocations,
+ * responses are received until step is completed or there is a tuple to emit.
+ * If there is a tuple it is returned, otherwise returned NULL. The NULL result
+ * from the function indicates completed step.
+ * The function returns at most one tuple per invocation.
+ */
+TupleTableSlot *
+ExecRemoteQuery(RemoteQueryState *node)
+{
+ RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan;
+ TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
+ TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
+ bool have_tuple = false;
- if (step->cursor)
- {
- node->cursor = step->cursor;
- node->cursor_count = regular_conn_count;
- node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *));
- memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *));
- }
+ if (!node->query_Done)
+ {
/*
- * Stop if all commands are completed or we got a data row and
- * initialized state node for subsequent invocations
+ * Inner plan for RemoteQuery supplies parameters.
+ * We execute inner plan to get a tuple and use values of the tuple as
+ * parameter values when executing this remote query.
+ * If returned slot contains NULL tuple break execution.
+ * TODO there is a problem how to handle the case if both inner and
+ * outer plans exist. We can decide later, since it is never used now.
*/
- while (regular_conn_count > 0 && node->connections == NULL)
+ if (innerPlanState(node))
{
- int i = 0;
-
- if (pgxc_node_receive(regular_conn_count, connections, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
+ TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node));
/*
- * Handle input from the data nodes.
- * If we got a RESPONSE_DATAROW we can break handling to wrap
- * it into a tuple and return. Handling will be continued upon
- * subsequent invocations.
- * If we got 0, we exclude connection from the list. We do not
- * expect more input from it. In case of non-SELECT query we quit
- * the loop when all nodes finish their work and send ReadyForQuery
- * with empty connections array.
- * If we got EOF, move to the next connection, will receive more
- * data on the next iteration.
+ * Use data row returned by the previus step as a parameters for
+ * the main query.
+ * Exit if no more slots.
*/
- while (i < regular_conn_count)
- {
- int res = handle_response(connections[i], node);
- if (res == RESPONSE_EOF)
- {
- i++;
- }
- else if (res == RESPONSE_COMPLETE)
- {
- if (i < --regular_conn_count)
- connections[i] = connections[regular_conn_count];
- }
- else if (res == RESPONSE_TUPDESC)
- {
- ExecSetSlotDescriptor(scanslot, node->tuple_desc);
- /*
- * Now tuple table slot is responsible for freeing the
- * descriptor
- */
- node->tuple_desc = NULL;
- if (step->sort)
- {
- SimpleSort *sort = step->sort;
-
- node->connections = connections;
- node->conn_count = regular_conn_count;
- /*
- * First message is already in the buffer
- * Further fetch will be under tuplesort control
- * If query does not produce rows tuplesort will not
- * be initialized
- */
- node->tuplesortstate = tuplesort_begin_merge(
- scanslot->tts_tupleDescriptor,
- sort->numCols,
- sort->sortColIdx,
- sort->sortOperators,
- sort->nullsFirst,
- node,
- work_mem);
- /*
- * Break the loop, do not wait for first row.
- * Tuplesort module want to control node it is
- * fetching rows from, while in this loop first
- * row would be got from random node
- */
- break;
- }
- }
- else if (res == RESPONSE_DATAROW)
- {
- /*
- * Got first data row, quit the loop
- */
- node->connections = connections;
- node->conn_count = regular_conn_count;
- node->current_conn = i;
- break;
- }
- }
+ if (!TupIsNull(innerSlot))
+ step->paramval_len = ExecCopySlotDatarow(innerSlot,
+ &step->paramval_data);
}
- if (node->cursor_count)
- {
- node->conn_count = node->cursor_count;
- memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *));
- node->connections = connections;
- }
+ do_query(node);
node->query_Done = true;
}
if (node->update_cursor)
{
- PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES);
+ PGXCNodeAllHandles *all_dn_handles = get_exec_connections(node, NULL, EXEC_ON_DATANODES);
close_node_cursors(all_dn_handles->datanode_handles,
all_dn_handles->dn_conn_count,
node->update_cursor);
@@ -3367,6 +3452,7 @@ ExecRemoteQuery(RemoteQueryState *node)
node->update_cursor = NULL;
}
+handle_results:
if (node->tuplesortstate)
{
while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
@@ -3458,12 +3544,22 @@ ExecRemoteQuery(RemoteQueryState *node)
return resultslot;
/*
- * If coordinator plan is specified execute it first.
- * If the plan is returning we are returning these tuples immediately.
- * If it is not returning or returned them all by current invocation
- * we will go ahead and execute remote query. Then we will never execute
- * the outer plan again because node->query_Done flag will be set and
- * execution won't get to that place.
+ * We can not use recursion here. We can run out of the stack memory if
+ * inner node returns long result set and this node does not returns rows
+ * (like INSERT ... SELECT)
+ */
+ if (innerPlanState(node))
+ {
+ TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node));
+ if (!TupIsNull(innerSlot))
+ {
+ do_query(node);
+ goto handle_results;
+ }
+ }
+
+ /*
+ * Execute outer plan if specified
*/
if (outerPlanState(node))
{
@@ -3557,7 +3653,7 @@ ExecEndRemoteQuery(RemoteQueryState *node)
if (node->update_cursor)
{
- PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES);
+ PGXCNodeAllHandles *all_dn_handles = get_exec_connections(node, NULL, EXEC_ON_DATANODES);
close_node_cursors(all_dn_handles->datanode_handles,
all_dn_handles->dn_conn_count,
node->update_cursor);
@@ -3566,6 +3662,16 @@ ExecEndRemoteQuery(RemoteQueryState *node)
}
/*
+ * Clean up parameters if they were set, since plan may be reused
+ */
+ if (((RemoteQuery *) node->ss.ps.plan)->paramval_data)
+ {
+ pfree(((RemoteQuery *) node->ss.ps.plan)->paramval_data);
+ ((RemoteQuery *) node->ss.ps.plan)->paramval_data = NULL;
+ ((RemoteQuery *) node->ss.ps.plan)->paramval_len = 0;
+ }
+
+ /*
* shut down the subplan
*/
if (outerPlanState(node))
@@ -3631,6 +3737,74 @@ close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor)
ValidateAndCloseCombiner(combiner);
}
+
+/*
+ * Encode parameter values to format of DataRow message (the same format is
+ * used in Bind) to prepare for sending down to data nodes.
+ * The buffer to store encoded value is palloc'ed and returned as the result
+ * parameter. Function returns size of the result
+ */
+int
+ParamListToDataRow(ParamListInfo params, char** result)
+{
+ StringInfoData buf;
+ uint16 n16;
+ int i;
+
+ initStringInfo(&buf);
+ /* Number of parameter values */
+ n16 = htons(params->numParams);
+ appendBinaryStringInfo(&buf, (char *) &n16, 2);
+
+ /* Parameter values */
+ for (i = 0; i < params->numParams; i++)
+ {
+ ParamExternData *param = params->params + i;
+ uint32 n32;
+ if (param->isnull)
+ {
+ n32 = htonl(-1);
+ appendBinaryStringInfo(&buf, (char *) &n32, 4);
+ }
+ else
+ {
+ Oid typOutput;
+ bool typIsVarlena;
+ Datum pval;
+ char *pstring;
+ int len;
+
+ /* Get info needed to output the value */
+ getTypeOutputInfo(param->ptype, &typOutput, &typIsVarlena);
+
+ /*
+ * If we have a toasted datum, forcibly detoast it here to avoid
+ * memory leakage inside the type's output routine.
+ */
+ if (typIsVarlena)
+ pval = PointerGetDatum(PG_DETOAST_DATUM(param->value));
+ else
+ pval = param->value;
+
+ /* Convert Datum to string */
+ pstring = OidOutputFunctionCall(typOutput, pval);
+
+ /* copy data to the buffer */
+ len = strlen(pstring);
+ n32 = htonl(len);
+ appendBinaryStringInfo(&buf, (char *) &n32, 4);
+ appendBinaryStringInfo(&buf, pstring, len);
+ }
+ }
+
+ /* Take data from the buffer */
+ *result = palloc(buf.len);
+ memcpy(*result, buf.data, buf.len);
+ pfree(buf.data);
+ return buf.len;
+}
+
+
/*
* Consume any remaining messages on the connections.
* This is useful for calling after ereport()
@@ -3744,8 +3918,7 @@ ExecRemoteUtility(RemoteQuery *node)
remotestate = CreateResponseCombiner(0, node->combine_type);
- pgxc_connections = get_exec_connections(node->exec_nodes,
- exec_type);
+ pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type);
primaryconnection = pgxc_connections->primary_handle;
@@ -4157,6 +4330,88 @@ pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles)
pfree(pgxc_handles);
}
+
+void
+ExecCloseRemoteStatement(const char *stmt_name, List *nodelist)
+{
+ PGXCNodeAllHandles *all_handles;
+ PGXCNodeHandle **connections;
+ RemoteQueryState *combiner;
+ int conn_count;
+ int i;
+
+ /* Exit if nodelist is empty */
+ if (list_length(nodelist) == 0)
+ return;
+
+ /* get needed data node connections */
+ all_handles = get_handles(nodelist, NIL, false);
+ conn_count = all_handles->dn_conn_count;
+ connections = all_handles->datanode_handles;
+
+ for (i = 0; i < conn_count; i++)
+ {
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
+ if (pgxc_node_send_close(connections[i], true, stmt_name) != 0)
+ {
+ /*
+ * statements are not affected by statement end, so consider
+ * unclosed statement on the datanode as a fatal issue and
+ * force connection is discarded
+ */
+ connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node statemrnt")));
+ }
+ if (pgxc_node_send_sync(connections[i]) != 0)
+ {
+ connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node statement")));
+ }
+ }
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+
+ while (conn_count > 0)
+ {
+ if (pgxc_node_receive(conn_count, connections, NULL))
+ {
+ for (i = 0; i <= conn_count; i++)
+ connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node statement")));
+ }
+ i = 0;
+ while (i < conn_count)
+ {
+ int res = handle_response(connections[i], combiner);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (--conn_count > i)
+ connections[i] = connections[conn_count];
+ }
+ else
+ {
+ connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
+ }
+ }
+ }
+
+ ValidateAndCloseCombiner(combiner);
+ pfree_pgxc_all_handles(all_handles);
+}
+
+
/*
* Check if an Implicit 2PC is necessary for this transaction.
* Check also if it is necessary to prepare transaction locally.
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index 4790f9573a..fc6345710c 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -27,6 +27,7 @@
#include "access/gtm.h"
#include "access/transam.h"
#include "access/xact.h"
+#include "commands/prepare.h"
#include "gtm/gtm_c.h"
#include "pgxc/pgxcnode.h"
#include "pgxc/locator.h"
@@ -592,6 +593,10 @@ release_handles(bool force_drop)
if (datanode_count == 0 && coord_count == 0)
return;
+ /* Do not release connections if we have prepared statements on nodes */
+ if (HaveActiveDatanodeStatements())
+ return;
+
/* Collect Data Nodes handles */
for (i = 0; i < NumDataNodes; i++)
{
@@ -1144,8 +1149,10 @@ pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query,
int paramlen, char *params,
bool send_describe, int fetch_size)
{
- if (pgxc_node_send_parse(handle, statement, query))
- return EOF;
+ /* NULL query indicates already prepared statement */
+ if (query)
+ if (pgxc_node_send_parse(handle, statement, query))
+ return EOF;
if (pgxc_node_send_bind(handle, portal, statement, paramlen, params))
return EOF;
if (send_describe)
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index 264271bd68..50e38dceea 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -350,7 +350,7 @@ PoolManagerInit()
else
ereport(FATAL,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"coordinator_ports\"")));
+ errmsg("invalid list syntax for \"coordinator_ports\"")));
}
if (count == 0)
@@ -369,7 +369,7 @@ PoolManagerInit()
else
ereport(FATAL,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"coordinator_users\"")));
+ errmsg("invalid list syntax for \"coordinator_users\"")));
}
i = 0;
@@ -715,14 +715,33 @@ agent_destroy(PoolAgent *agent)
/* Discard connections if any remaining */
if (agent->pool)
- agent_release_connections(agent, NULL, NULL);
+ {
+ List *dn_conn = NIL;
+ List *co_conn = NIL;
+ int i;
+
+ /* gather abandoned datanode connections */
+ if (agent->dn_connections)
+ for (i = 0; i < NumDataNodes; i++)
+ if (agent->dn_connections[i])
+ dn_conn = lappend_int(dn_conn, i+1);
+
+ /* gather abandoned coordinator connections */
+ if (agent->coord_connections)
+ for (i = 0; i < NumCoords; i++)
+ if (agent->coord_connections[i])
+ co_conn = lappend_int(co_conn, i+1);
+
+ /* release them all */
+ agent_release_connections(agent, dn_conn, co_conn);
+ }
/* find agent in the list */
for (i = 0; i < agentCount; i++)
{
if (poolAgents[i] == agent)
{
- /* free memory */
+ /* Free memory. All connection slots are NULL at this point */
if (agent->dn_connections)
{
pfree(agent->dn_connections);
@@ -1108,7 +1127,7 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
for (i = 0; i < NumCoords; i++)
agent->coord_connections[i] = NULL;
- }
+ }
/* Initialize result */
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 43737cae48..66c250fb1f 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -2287,6 +2287,21 @@ make_viewdef(StringInfo buf, HeapTuple ruletup, TupleDesc rulettc,
}
+#ifdef PGXC
+/* ----------
+ * deparse_query - Parse back one query parsetree
+ *
+ * Purpose of this function is to build up statement for a RemoteQuery
+ * It just calls get_query_def without pretty print flags
+ * ----------
+ */
+void
+deparse_query(Query *query, StringInfo buf, List *parentnamespace)
+{
+ get_query_def(query, buf, parentnamespace, NULL, 0, 0);
+}
+#endif
+
/* ----------
* get_query_def - Parse back one query parsetree
*
diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c
index 437b7d1114..bef1236b9c 100644
--- a/src/backend/utils/cache/plancache.c
+++ b/src/backend/utils/cache/plancache.c
@@ -56,6 +56,13 @@
#include "utils/resowner.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+#ifdef PGXC
+#include "commands/prepare.h"
+#include "pgxc/execRemote.h"
+
+
+static void release_datanode_statements(Plan *plannode);
+#endif
static List *cached_plans_list = NIL;
@@ -563,6 +570,29 @@ RevalidateCachedPlan(CachedPlanSource *plansource, bool useResOwner)
}
/*
+ * Find and release all datanode statements referenced by the plan node and subnodes
+ */
+#ifdef PGXC
+static void
+release_datanode_statements(Plan *plannode)
+{
+ if (IsA(plannode, RemoteQuery))
+ {
+ RemoteQuery *step = (RemoteQuery *) plannode;
+
+ if (step->statement)
+ DropDatanodeStatement(step->statement);
+ }
+
+ if (innerPlan(plannode))
+ release_datanode_statements(innerPlan(plannode));
+
+ if (outerPlan(plannode))
+ release_datanode_statements(outerPlan(plannode));
+}
+#endif
+
+/*
* ReleaseCachedPlan: release active use of a cached plan.
*
* This decrements the reference count, and frees the plan if the count
@@ -581,7 +611,22 @@ ReleaseCachedPlan(CachedPlan *plan, bool useResOwner)
Assert(plan->refcount > 0);
plan->refcount--;
if (plan->refcount == 0)
+ {
+#ifdef PGXC
+ if (plan->fully_planned)
+ {
+ ListCell *lc;
+ /* close any active datanode statements */
+ foreach (lc, plan->stmt_list)
+ {
+ PlannedStmt *ps = (PlannedStmt *)lfirst(lc);
+
+ release_datanode_statements(ps->planTree);
+ }
+ }
+#endif
MemoryContextDelete(plan->context);
+ }
}
/*
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index f52f001289..b6f5afdb0e 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -33,6 +33,15 @@ typedef struct
TimestampTz prepare_time; /* the time when the stmt was prepared */
} PreparedStatement;
+#ifdef PGXC
+typedef struct
+{
+ /* dynahash.c requires key to be first field */
+ char stmt_name[NAMEDATALEN];
+ int nodenum; /* number of nodes where statement is active */
+ int nodes[0]; /* node ids where statement is active */
+} DatanodeStatement;
+#endif
/* Utility statements PREPARE, EXECUTE, DEALLOCATE, EXPLAIN EXECUTE */
extern void PrepareQuery(PrepareStmt *stmt, const char *queryString);
@@ -62,4 +71,11 @@ extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
void DropAllPreparedStatements(void);
+#ifdef PGXC
+extern DatanodeStatement *FetchDatanodeStatement(const char *stmt_name, bool throwError);
+extern bool ActivateDatanodeStatementOnNode(const char *stmt_name, int node);
+extern bool HaveActiveDatanodeStatements(void);
+extern void DropDatanodeStatement(const char *stmt_name);
+#endif
+
#endif /* PREPARE_H */
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index 0c2c4fe0a2..3e218bfc20 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -187,6 +187,9 @@ extern TupleTableSlot *ExecStoreVirtualTuple(TupleTableSlot *slot);
extern TupleTableSlot *ExecStoreAllNullTuple(TupleTableSlot *slot);
extern HeapTuple ExecCopySlotTuple(TupleTableSlot *slot);
extern MinimalTuple ExecCopySlotMinimalTuple(TupleTableSlot *slot);
+#ifdef PGXC
+extern int ExecCopySlotDatarow(TupleTableSlot *slot, char **datarow);
+#endif
extern HeapTuple ExecFetchSlotTuple(TupleTableSlot *slot);
extern MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot);
extern Datum ExecFetchSlotTupleDatum(TupleTableSlot *slot);
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index a3c1868422..c5c45c0162 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -144,5 +144,9 @@ extern void BufferConnection(PGXCNodeHandle *conn);
extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt);
+extern int ParamListToDataRow(ParamListInfo params, char** result);
+
+extern void ExecCloseRemoteStatement(const char *stmt_name, List *nodelist);
+
extern int primary_data_node;
#endif
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index ee28c5a402..a6d1d531a3 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -26,6 +26,8 @@
#define HASH_MASK 0x00000FFF;
#define IsReplicated(x) (x->locatorType == LOCATOR_TYPE_REPLICATED)
+
+#include "nodes/primnodes.h"
#include "utils/relcache.h"
@@ -76,6 +78,10 @@ typedef struct
List *nodelist;
char baselocatortype;
TableUsageType tableusagetype; /* track pg_catalog usage */
+ Expr *expr; /* expression to evaluate at execution time if planner
+ * can not determine execution nodes */
+ Oid relid; /* Relation to determine execution nodes */
+ RelationAccessType accesstype; /* Access type to determine execution nodes */
} ExecNodes;
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index bb1f934be9..61cb6d3291 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -85,9 +85,14 @@ typedef struct
SimpleDistinct *distinct;
bool read_only; /* do not use 2PC when committing read only steps */
bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ char *statement; /* if specified use it as a PreparedStatement name on data nodes */
char *cursor; /* if specified use it as a Portal name on data nodes */
RemoteQueryExecType exec_type;
+ /* Support for parameters */
+ char *paramval_data; /* parameter data, format is like in BIND */
+ int paramval_len; /* length of parameter values data */
+
char *relname;
bool remotejoin; /* True if this is a reduced remote join */
bool partitioned_replicated; /* True if reduced and contains replicated-partitioned join */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 6e26da755d..4a56f9fb11 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -15,6 +15,7 @@
#define BUILTINS_H
#include "fmgr.h"
+#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
/*
@@ -598,6 +599,7 @@ extern char *deparse_expression(Node *expr, List *dpcontext,
#ifdef PGXC
extern List *deparse_context_for_remotequery(const char *aliasname, Oid relid);
extern List *deparse_context_for(const char *aliasname, Oid relid);
+extern void deparse_query(Query *query, StringInfo buf, List *parentnamespace);
#endif
extern List *deparse_context_for_plan(Node *plan, Node *outer_plan,
List *rtable, List *subplans);