summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/commands/copy.c76
-rw-r--r--src/backend/executor/execMain.c10
-rw-r--r--src/backend/executor/execProcnode.c28
-rw-r--r--src/backend/optimizer/plan/planner.c11
-rw-r--r--src/backend/parser/analyze.c46
-rw-r--r--src/backend/parser/parse_utilcmd.c34
-rw-r--r--src/backend/pgxc/plan/planner.c346
-rw-r--r--src/backend/pgxc/pool/execRemote.c525
-rw-r--r--src/backend/tcop/postgres.c254
-rw-r--r--src/backend/tcop/pquery.c11
-rw-r--r--src/backend/tcop/utility.c233
-rw-r--r--src/include/commands/copy.h5
-rw-r--r--src/include/nodes/parsenodes.h5
-rw-r--r--src/include/pgxc/execRemote.h8
-rw-r--r--src/include/pgxc/locator.h6
-rw-r--r--src/include/pgxc/planner.h23
16 files changed, 869 insertions, 752 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 50650190ee..aee1b42fdd 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -179,7 +179,6 @@ typedef struct CopyStateData
/* Locator information */
RelationLocInfo *rel_loc; /* the locator key */
int hash_idx; /* index of the hash column */
- bool on_coord;
DataNodeHandle **connections; /* Involved data node connections */
#endif
@@ -800,31 +799,6 @@ CopyQuoteIdentifier(StringInfo query_buf, char *value)
}
#endif
-#ifdef PGXC
-/*
- * In case there is no locator info available, copy to/from is launched in portal on coordinator.
- * This happens for pg_catalog tables (not user defined ones)
- * such as pg_catalog, pg_attribute, etc.
- * This part is launched before the portal is activated, so check a first time if there
- * some locator data for this relid and if no, return and launch the portal.
- */
-bool
-IsCoordPortalCopy(const CopyStmt *stmt)
-{
- RelationLocInfo *rel_loc; /* the locator key */
-
- /* In the case of a COPY SELECT, this is launched on datanodes */
- if(!stmt->relation)
- return false;
-
- rel_loc = GetRelationLocInfo(RangeVarGetRelid(stmt->relation, true));
-
- if (!rel_loc)
- return true;
-
- return false;
-}
-#endif
/*
* DoCopy executes the SQL COPY statement
@@ -857,11 +831,7 @@ IsCoordPortalCopy(const CopyStmt *stmt)
* the table or the specifically requested columns.
*/
uint64
-#ifdef PGXC
-DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal)
-#else
DoCopy(const CopyStmt *stmt, const char *queryString)
-#endif
{
CopyState cstate;
bool is_from = stmt->is_from;
@@ -883,16 +853,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
-#ifdef PGXC
- /*
- * Copy to/from is initialized as being launched on datanodes
- * This functionnality is particularly interesting to have a result for
- * tables who have no locator informations such as pg_catalog, pg_class,
- * and pg_attribute.
- */
- cstate->on_coord = false;
-#endif
-
/* Extract options from the statement node tree */
foreach(option, stmt->options)
{
@@ -1180,13 +1140,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
+ /*
+ * If target table does not exists on nodes (e.g. system table)
+ * the location info returned is NULL. This is the criteria, when
+ * we need to run Copy on coordinator
+ */
cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel));
- if (exec_on_coord_portal)
- cstate->on_coord = true;
-
hash_att = GetRelationHashColumn(cstate->rel_loc);
- if (!cstate->on_coord)
+ if (cstate->rel_loc)
{
if (is_from || hash_att)
exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList);
@@ -1481,7 +1443,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
* In the case of CopyOut, it is just necessary to pick up one node randomly.
* This is done when rel_loc is found.
*/
- if (!cstate->on_coord)
+ if (cstate->rel_loc)
{
cstate->connections = DataNodeCopyBegin(cstate->query_buf.data,
exec_nodes->nodelist,
@@ -1506,7 +1468,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
}
PG_CATCH();
{
- if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && is_from && cstate->rel_loc)
{
DataNodeCopyFinish(
cstate->connections,
@@ -1519,18 +1481,13 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
PG_RE_THROW();
}
PG_END_TRY();
- if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && is_from && cstate->rel_loc)
{
- if (cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED)
- cstate->processed = DataNodeCopyFinish(
- cstate->connections,
- primary_data_node,
- COMBINE_TYPE_SAME);
- else
- cstate->processed = DataNodeCopyFinish(
- cstate->connections,
- 0,
- COMBINE_TYPE_SUM);
+ bool replicated = cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED;
+ DataNodeCopyFinish(
+ cstate->connections,
+ replicated ? primary_data_node : 0,
+ replicated ? COMBINE_TYPE_SAME : COMBINE_TYPE_SUM);
pfree(cstate->connections);
pfree(cstate->query_buf.data);
FreeRelationLocInfo(cstate->rel_loc);
@@ -1770,7 +1727,7 @@ CopyTo(CopyState cstate)
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && cstate->rel_loc)
{
cstate->processed = DataNodeCopyOut(
GetRelationNodes(cstate->rel_loc, NULL, true),
@@ -2480,7 +2437,7 @@ CopyFrom(CopyState cstate)
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && cstate->rel_loc)
{
Datum *hash_value = NULL;
@@ -2494,6 +2451,7 @@ CopyFrom(CopyState cstate)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
+ cstate->processed++;
}
else
{
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 1fddf10bc9..ca0c7b1f56 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -858,6 +858,14 @@ InitPlan(QueryDesc *queryDesc, int eflags)
{
case CMD_SELECT:
case CMD_INSERT:
+#ifdef PGXC
+ /*
+ * PGXC RemoteQuery do not require ctid junk field, so follow
+ * standard procedure for UPDATE and DELETE
+ */
+ case CMD_UPDATE:
+ case CMD_DELETE:
+#endif
foreach(tlist, plan->targetlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
@@ -869,10 +877,12 @@ InitPlan(QueryDesc *queryDesc, int eflags)
}
}
break;
+#ifndef PGXC
case CMD_UPDATE:
case CMD_DELETE:
junk_filter_needed = true;
break;
+#endif
default:
break;
}
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 1b1dd91f2a..f82ce4c97b 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -108,7 +108,9 @@
#include "executor/nodeWindowAgg.h"
#include "executor/nodeWorktablescan.h"
#include "miscadmin.h"
-
+#ifdef PGXC
+#include "pgxc/execRemote.h"
+#endif
/* ------------------------------------------------------------------------
* ExecInitNode
@@ -286,6 +288,13 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
estate, eflags);
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ result = (PlanState *) ExecInitRemoteQuery((RemoteQuery *) node,
+ estate, eflags);
+ break;
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
result = NULL; /* keep compiler quiet */
@@ -451,6 +460,12 @@ ExecProcNode(PlanState *node)
result = ExecLimit((LimitState *) node);
break;
+#ifdef PGXC
+ case T_RemoteQueryState:
+ result = ExecRemoteQuery((RemoteQueryState *) node);
+ break;
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
result = NULL;
@@ -627,6 +642,11 @@ ExecCountSlotsNode(Plan *node)
case T_Limit:
return ExecCountSlotsLimit((Limit *) node);
+#ifdef PGXC
+ case T_RemoteQuery:
+ return ExecCountSlotsRemoteQuery((RemoteQuery *) node);
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;
@@ -783,6 +803,12 @@ ExecEndNode(PlanState *node)
ExecEndLimit((LimitState *) node);
break;
+#ifdef PGXC
+ case T_RemoteQueryState:
+ ExecEndRemoteQuery((RemoteQueryState *) node);
+ break;
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 3f344b3a14..7d41461096 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -38,6 +38,10 @@
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "parser/parsetree.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
+#endif
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@@ -119,7 +123,12 @@ planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (planner_hook)
result = (*planner_hook) (parse, cursorOptions, boundParams);
else
- result = standard_planner(parse, cursorOptions, boundParams);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ result = pgxc_planner(parse, cursorOptions, boundParams);
+ else
+#endif
+ result = standard_planner(parse, cursorOptions, boundParams);
return result;
}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 97c560b309..0fe392e99e 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -39,6 +39,11 @@
#include "parser/parse_target.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteManip.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
+#include "tcop/tcopprot.h"
+#endif
#include "utils/rel.h"
@@ -58,6 +63,10 @@ static Query *transformDeclareCursorStmt(ParseState *pstate,
DeclareCursorStmt *stmt);
static Query *transformExplainStmt(ParseState *pstate,
ExplainStmt *stmt);
+#ifdef PGXC
+static Query *transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt);
+#endif
+
static void transformLockingClause(ParseState *pstate,
Query *qry, LockingClause *lc);
static bool check_parameter_resolution_walker(Node *node, ParseState *pstate);
@@ -199,6 +208,13 @@ transformStmt(ParseState *pstate, Node *parseTree)
(ExplainStmt *) parseTree);
break;
+#ifdef PGXC
+ case T_ExecDirectStmt:
+ result = transformExecDirectStmt(pstate,
+ (ExecDirectStmt *) parseTree);
+ break;
+#endif
+
default:
/*
@@ -263,6 +279,17 @@ analyze_requires_snapshot(Node *parseTree)
result = true;
break;
+#ifdef PGXC
+ case T_ExecDirectStmt:
+
+ /*
+ * We will parse/analyze/plan inner query, which probably will
+ * need a snapshot. Ensure it is set.
+ */
+ result = true;
+ break;
+#endif
+
default:
/* utility statements don't have any active parse analysis */
result = false;
@@ -1925,6 +1952,25 @@ transformExplainStmt(ParseState *pstate, ExplainStmt *stmt)
return result;
}
+#ifdef PGXC
+/*
+ * transformExecDirectStmt -
+ * transform an EXECUTE DIRECT Statement
+ *
+ * Handling is depends if we should execute on nodes or on coordinator.
+ * To execute on nodes we return CMD_UTILITY query having one T_RemoteQuery node
+ * with the inner statement as a sql_command.
+ * If statement is to run on coordinator we should parse inner statement and
+ * analyze resulting query tree.
+ */
+static Query *
+transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt)
+{
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Support for EXECUTE DIRECT is temporary broken")));
+}
+#endif
/* exported so planner can check again after rewriting, query pullup, etc */
void
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index b0488d88c5..d7a932e781 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -52,6 +52,7 @@
#ifdef PGXC
#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
#endif
#include "rewrite/rewriteManip.h"
@@ -261,9 +262,9 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
result = list_concat(result, save_alist);
#ifdef PGXC
- /*
- * If the user did not specify any distribution clause and there is no
- * inherits clause, try and use PK or unique index
+ /*
+ * If the user did not specify any distribution clause and there is no
+ * inherits clause, try and use PK or unique index
*/
if (!stmt->distributeby && !stmt->inhRelations && cxt.fallback_dist_col)
{
@@ -271,6 +272,13 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
stmt->distributeby->disttype = DISTTYPE_HASH;
stmt->distributeby->colname = cxt.fallback_dist_col;
}
+ if (IS_PGXC_COORDINATOR)
+ {
+ RemoteQuery *step = makeNode(RemoteQuery);
+ step->combine_type = COMBINE_TYPE_SAME;
+ step->sql_statement = queryString;
+ result = lappend(result, step);
+ }
#endif
return result;
}
@@ -1171,7 +1179,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
if (cxt->distributeby)
isLocalSafe = CheckLocalIndexColumn (
- ConvertToLocatorType(cxt->distributeby->disttype),
+ ConvertToLocatorType(cxt->distributeby->disttype),
cxt->distributeby->colname, key);
}
#endif
@@ -1273,7 +1281,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
/*
* Set fallback distribution column.
- * If not set, set it to first column in index.
+ * If not set, set it to first column in index.
* If primary key, we prefer that over a unique constraint.
*/
if (index->indexParams == NIL
@@ -1281,7 +1289,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
cxt->fallback_dist_col = pstrdup(key);
}
-
+
/* Existing table, check if it is safe */
if (!cxt->distributeby && !isLocalSafe)
isLocalSafe = CheckLocalIndexColumn (
@@ -1299,7 +1307,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
index->indexParams = lappend(index->indexParams, iparam);
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR && cxt->distributeby
+ if (IS_PGXC_COORDINATOR && cxt->distributeby
&& cxt->distributeby->disttype == DISTTYPE_HASH && !isLocalSafe)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
@@ -1618,7 +1626,7 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("Rule may not use NOTIFY, it is not yet supported")));
-
+
#endif
/*
* Since outer ParseState isn't parent of inner, have to pass down
@@ -1956,7 +1964,15 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
result = lappend(cxt.blist, stmt);
result = list_concat(result, cxt.alist);
result = list_concat(result, save_alist);
-
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ RemoteQuery *step = makeNode(RemoteQuery);
+ step->combine_type = COMBINE_TYPE_SAME;
+ step->sql_statement = queryString;
+ result = lappend(result, step);
+ }
+#endif
return result;
}
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 002e710bd6..1dcfc2943a 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -25,6 +25,7 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "optimizer/clauses.h"
+#include "optimizer/planner.h"
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
@@ -116,7 +117,7 @@ typedef struct ColumnBase
*/
typedef struct XCWalkerContext
{
- Query *query;
+ Query *query;
bool isRead;
Exec_Nodes *exec_nodes; /* resulting execution nodes */
Special_Conditions *conditions;
@@ -125,6 +126,7 @@ typedef struct XCWalkerContext
int varno;
bool within_or;
bool within_not;
+ bool exec_on_coord; /* fallback to standard planner to have plan executed on coordinator only */
List *join_list; /* A list of List*'s, one for each relation. */
} XCWalkerContext;
@@ -971,6 +973,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
/* just pg_catalog tables */
context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->exec_on_coord = true;
return false;
}
@@ -1087,6 +1090,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
{
context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->exec_on_coord = true;
return false;
}
@@ -1253,7 +1257,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
static Exec_Nodes *
get_plan_nodes(Query *query, bool isRead)
{
- Exec_Nodes *result_nodes;
+ Exec_Nodes *result_nodes = NULL;
XCWalkerContext context;
@@ -1267,13 +1271,16 @@ get_plan_nodes(Query *query, bool isRead)
context.varno = 0;
context.within_or = false;
context.within_not = false;
+ context.exec_on_coord = false;
context.join_list = NIL;
- if (get_plan_nodes_walker((Node *) query, &context))
- result_nodes = NULL;
- else
+ if (!get_plan_nodes_walker((Node *) query, &context))
result_nodes = context.exec_nodes;
-
+ if (context.exec_on_coord && result_nodes)
+ {
+ pfree(result_nodes);
+ result_nodes = NULL;
+ }
free_special_relations(context.conditions);
free_join_list(context.join_list);
return result_nodes;
@@ -1976,68 +1983,89 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step)
* For the prototype, there will only be one step,
* and the nodelist will be NULL if it is not a PGXC-safe statement.
*/
-Query_Plan *
-GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
+PlannedStmt *
+pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
{
- Query_Plan *query_plan = palloc(sizeof(Query_Plan));
+ /*
+ * We waste some time invoking standard planner, but getting good enough
+ * PlannedStmt, we just need to replace standard plan.
+ * In future we may want to skip the standard_planner invocation and
+ * initialize the PlannedStmt here. At the moment not all queries works:
+ * ex. there was a problem with INSERT into a subset of table columns
+ */
+ PlannedStmt *result = standard_planner(query, cursorOptions, boundParams);
+ Plan *standardPlan = result->planTree;
RemoteQuery *query_step = makeNode(RemoteQuery);
- Query *query;
- query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1);
- strcpy(query_step->sql_statement, sql_statement);
+ query_step->sql_statement = pstrdup(query->sql_statement);
query_step->exec_nodes = NULL;
query_step->combine_type = COMBINE_TYPE_NONE;
query_step->simple_aggregates = NULL;
- query_step->read_only = false;
+ /* Optimize multi-node handling */
+ query_step->read_only = query->nodeTag == T_SelectStmt;
query_step->force_autocommit = false;
- query_plan->query_step_list = lappend(NULL, query_step);
+ result->planTree = (Plan *) query_step;
/*
* Determine where to execute the command, either at the Coordinator
* level, Data Nodes, or both. By default we choose both. We should be
* able to quickly expand this for more commands.
*/
- switch (nodeTag(parsetree))
+ switch (query->nodeTag)
{
case T_SelectStmt:
- /* Optimize multi-node handling */
- query_step->read_only = true;
+ /* Perform some checks to make sure we can support the statement */
+ if (query->intoClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("INTO clause not yet supported"))));
+
+ if (query->setOperations)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("UNION, INTERSECT and EXCEPT are not yet supported"))));
+
+ if (query->hasRecursive)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("WITH RECURSIVE not yet supported"))));
+
+ if (query->hasWindowFuncs)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("Window functions not yet supported"))));
/* fallthru */
case T_InsertStmt:
case T_UpdateStmt:
case T_DeleteStmt:
- /* just use first one in querytree_list */
- query = (Query *) linitial(querytree_list);
- /* should copy instead ? */
- query_step->plan.targetlist = query->targetList;
+ query_step->exec_nodes = get_plan_nodes_command(query);
- /* Perform some checks to make sure we can support the statement */
- if (nodeTag(parsetree) == T_SelectStmt)
+ if (query_step->exec_nodes == NULL)
{
- if (query->intoClause)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("INTO clause not yet supported"))));
-
- if (query->setOperations)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("UNION, INTERSECT and EXCEPT are not yet supported"))));
-
- if (query->hasRecursive)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("WITH RECURSIVE not yet supported"))));
-
- if (query->hasWindowFuncs)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Window functions not yet supported"))));
+ /*
+ * Processing guery against catalog tables, restore
+ * standard plan
+ */
+ result->planTree = standardPlan;
+ return result;
}
- query_step->exec_nodes =
- get_plan_nodes_command(query);
+ /*
+ * PGXCTODO
+ * When Postgres runs insert into t (a) values (1); against table
+ * defined as create table t (a int, b int); the plan is looking
+ * like insert into t (a,b) values (1,null);
+ * Later executor is verifying plan, to make sure table has not
+ * been altered since plan has been created and comparing table
+ * definition with plan target list and output error if they do
+ * not match.
+ * I could not find better way to generate targetList for pgxc plan
+ * then call standard planner and take targetList from the plan
+ * generated by Postgres.
+ */
+ query_step->plan.targetlist = standardPlan->targetlist;
+
if (query_step->exec_nodes)
query_step->combine_type = get_plan_combine_type(
query, query_step->exec_nodes->baselocatortype);
@@ -2047,37 +2075,9 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
query_step->simple_aggregates = get_simple_aggregates(query);
/*
- * See if it is a SELECT with no relations, like SELECT 1+1 or
- * SELECT nextval('fred'), and just use coord.
- */
- if (query_step->exec_nodes == NULL
- && (query->jointree->fromlist == NULL
- || query->jointree->fromlist->length == 0))
- /* Just execute it on Coordinator */
- query_plan->exec_loc_type = EXEC_ON_COORD;
- else
- {
- if (query_step->exec_nodes != NULL
- && query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_PGCATALOG)
- {
- /* pg_catalog query, run on coordinator */
- query_plan->exec_loc_type = EXEC_ON_COORD;
- }
- else
- {
- query_plan->exec_loc_type = EXEC_ON_DATA_NODES;
-
- /* If node list is NULL, execute on coordinator */
- if (!query_step->exec_nodes)
- query_plan->exec_loc_type = EXEC_ON_COORD;
- }
- }
-
- /*
* Add sortring to the step
*/
- if (query_plan->exec_loc_type == EXEC_ON_DATA_NODES &&
- list_length(query_step->exec_nodes->nodelist) > 1 &&
+ if (list_length(query_step->exec_nodes->nodelist) > 1 &&
(query->sortClause || query->distinctClause))
make_simple_sort_from_sortclauses(query, query_step);
@@ -2090,7 +2090,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
* Check if we have multiple nodes and an unsupported clause. This
* is temporary until we expand supported SQL
*/
- if (nodeTag(parsetree) == T_SelectStmt)
+ if (query->nodeTag == T_SelectStmt)
{
if (StrictStatementChecking && query_step->exec_nodes
&& list_length(query_step->exec_nodes->nodelist) > 1)
@@ -2110,180 +2110,6 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
}
}
break;
-
- /* Statements that we only want to execute on the Coordinator */
- case T_VariableShowStmt:
- query_plan->exec_loc_type = EXEC_ON_COORD;
- break;
-
- /*
- * Statements that need to run in autocommit mode, on Coordinator
- * and Data Nodes with suppressed implicit two phase commit.
- */
- case T_CheckPointStmt:
- case T_ClusterStmt:
- case T_CreatedbStmt:
- case T_DropdbStmt:
- case T_VacuumStmt:
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- query_step->force_autocommit = true;
- break;
-
- case T_DropPropertyStmt:
- /*
- * Triggers are not yet supported by PGXC
- * all other queries are executed on both Coordinator and Datanode
- * On the same point, assert also is not supported
- */
- if (((DropPropertyStmt *)parsetree)->removeType == OBJECT_TRIGGER)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("This command is not yet supported."))));
- else
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- case T_CreateStmt:
- if (((CreateStmt *)parsetree)->relation->istemp)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Temp tables are not yet supported."))));
-
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- /*
- * Statements that we execute on both the Coordinator and Data Nodes
- */
- case T_AlterDatabaseStmt:
- case T_AlterDatabaseSetStmt:
- case T_AlterDomainStmt:
- case T_AlterFdwStmt:
- case T_AlterForeignServerStmt:
- case T_AlterFunctionStmt:
- case T_AlterObjectSchemaStmt:
- case T_AlterOpFamilyStmt:
- case T_AlterSeqStmt:
- case T_AlterTableStmt: /* Can also be used to rename a sequence */
- case T_AlterTSConfigurationStmt:
- case T_AlterTSDictionaryStmt:
- case T_ClosePortalStmt: /* In case CLOSE ALL is issued */
- case T_CommentStmt:
- case T_CompositeTypeStmt:
- case T_ConstraintsSetStmt:
- case T_CreateCastStmt:
- case T_CreateConversionStmt:
- case T_CreateDomainStmt:
- case T_CreateEnumStmt:
- case T_CreateFdwStmt:
- case T_CreateForeignServerStmt:
- case T_CreateFunctionStmt: /* Only global functions are supported */
- case T_CreateOpClassStmt:
- case T_CreateOpFamilyStmt:
- case T_CreatePLangStmt:
- case T_CreateSeqStmt:
- case T_CreateSchemaStmt:
- case T_DeallocateStmt: /* Allow for DEALLOCATE ALL */
- case T_DiscardStmt:
- case T_DropCastStmt:
- case T_DropFdwStmt:
- case T_DropForeignServerStmt:
- case T_DropPLangStmt:
- case T_DropStmt:
- case T_IndexStmt:
- case T_LockStmt:
- case T_ReindexStmt:
- case T_RemoveFuncStmt:
- case T_RemoveOpClassStmt:
- case T_RemoveOpFamilyStmt:
- case T_RenameStmt:
- case T_RuleStmt:
- case T_TruncateStmt:
- case T_VariableSetStmt:
- case T_ViewStmt:
-
- /*
- * Also support these, should help later with pg_restore, although
- * not very useful because of the pooler using the same user
- */
- case T_GrantStmt:
- case T_GrantRoleStmt:
- case T_CreateRoleStmt:
- case T_AlterRoleStmt:
- case T_AlterRoleSetStmt:
- case T_AlterUserMappingStmt:
- case T_CreateUserMappingStmt:
- case T_DropRoleStmt:
- case T_AlterOwnerStmt:
- case T_DropOwnedStmt:
- case T_DropUserMappingStmt:
- case T_ReassignOwnedStmt:
- case T_DefineStmt: /* used for aggregates, some types */
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- case T_TransactionStmt:
- switch (((TransactionStmt *) parsetree)->kind)
- {
- case TRANS_STMT_SAVEPOINT:
- case TRANS_STMT_RELEASE:
- case TRANS_STMT_ROLLBACK_TO:
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("This type of transaction statement not yet supported"))));
- break;
-
- default:
- break; /* keep compiler quiet */
- }
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- /*
- * For now, pick one of the data nodes until we modify real
- * planner It will give an approximate idea of what an isolated
- * data node will do
- */
- case T_ExplainStmt:
- if (((ExplainStmt *) parsetree)->analyze)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("ANALYZE with EXPLAIN is currently not supported."))));
-
- query_step->exec_nodes = palloc0(sizeof(Exec_Nodes));
- query_step->exec_nodes->nodelist = GetAnyDataNode();
- query_step->exec_nodes->baselocatortype = LOCATOR_TYPE_RROBIN;
- query_plan->exec_loc_type = EXEC_ON_DATA_NODES;
- break;
-
- /*
- * Trigger queries are not yet supported by PGXC.
- * Tablespace queries are also not yet supported.
- * Two nodes on the same servers cannot use the same tablespace.
- */
- case T_CreateTableSpaceStmt:
- case T_CreateTrigStmt:
- case T_DropTableSpaceStmt:
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("This command is not yet supported."))));
- break;
-
- /*
- * Other statements we do not yet want to handle.
- * By default they would be fobidden, but we list these for reference.
- * Note that there is not a 1-1 correspndence between
- * SQL command and the T_*Stmt structures.
- */
- case T_DeclareCursorStmt:
- case T_ExecuteStmt:
- case T_FetchStmt:
- case T_ListenStmt:
- case T_LoadStmt:
- case T_NotifyStmt:
- case T_PrepareStmt:
- case T_UnlistenStmt:
- /* fall through */
default:
/* Allow for override */
if (StrictStatementChecking)
@@ -2291,12 +2117,10 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("This command is not yet supported."))));
else
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
+ result->planTree = standardPlan;
}
-
- return query_plan;
+ return result;
}
@@ -2321,21 +2145,3 @@ free_query_step(RemoteQuery *query_step)
list_free_deep(query_step->simple_aggregates);
pfree(query_step);
}
-
-/*
- * Free Query_Plan struct
- */
-void
-FreeQueryPlan(Query_Plan *query_plan)
-{
- ListCell *item;
-
- if (query_plan == NULL)
- return;
-
- foreach(item, query_plan->query_step_list)
- free_query_step((RemoteQuery *) lfirst(item));
-
- pfree(query_plan->query_step_list);
- pfree(query_plan);
-}
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index bbedef0f2f..0f16c5143f 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -168,9 +168,7 @@ CreateResponseCombiner(int node_count, CombineType combine_type)
combiner->connections = NULL;
combiner->conn_count = 0;
combiner->combine_type = combine_type;
- combiner->dest = NULL;
combiner->command_complete_count = 0;
- combiner->row_count = 0;
combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
combiner->tuple_desc = NULL;
combiner->description_count = 0;
@@ -178,7 +176,6 @@ CreateResponseCombiner(int node_count, CombineType combine_type)
combiner->copy_out_count = 0;
combiner->errorMessage = NULL;
combiner->query_Done = false;
- combiner->completionTag = NULL;
combiner->msg = NULL;
combiner->msglen = 0;
combiner->initAggregates = true;
@@ -488,7 +485,8 @@ HandleCopyOutComplete(RemoteQueryState *combiner)
static void
HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
{
- int digits = 0;
+ int digits = 0;
+ EState *estate = combiner->ss.ps.state;
/*
* If we did not receive description we are having rowcount or OK response
@@ -496,7 +494,7 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
combiner->request_type = REQUEST_TYPE_COMMAND;
/* Extract rowcount */
- if (combiner->combine_type != COMBINE_TYPE_NONE)
+ if (combiner->combine_type != COMBINE_TYPE_NONE && estate)
{
uint64 rowcount;
digits = parse_row_count(msg_body, len, &rowcount);
@@ -507,7 +505,7 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
{
if (combiner->command_complete_count)
{
- if (rowcount != combiner->row_count)
+ if (rowcount != estate->es_processed)
/* There is a consistency issue in the database with the replicated table */
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
@@ -515,37 +513,15 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
}
else
/* first result */
- combiner->row_count = rowcount;
+ estate->es_processed = rowcount;
}
else
- combiner->row_count += rowcount;
+ estate->es_processed += rowcount;
}
else
combiner->combine_type = COMBINE_TYPE_NONE;
}
- if (++combiner->command_complete_count == combiner->node_count)
- {
- if (combiner->completionTag)
- {
- if (combiner->combine_type == COMBINE_TYPE_NONE)
- {
- /* ensure we do not go beyond buffer bounds */
- if (len > COMPLETION_TAG_BUFSIZE)
- len = COMPLETION_TAG_BUFSIZE;
- memcpy(combiner->completionTag, msg_body, len);
- }
- else
- {
- /* Truncate msg_body to get base string */
- msg_body[len - digits - 1] = '\0';
- snprintf(combiner->completionTag,
- COMPLETION_TAG_BUFSIZE,
- "%s" UINT64_FORMAT,
- msg_body,
- combiner->row_count);
- }
- }
- }
+ combiner->command_complete_count++;
}
/*
@@ -653,6 +629,9 @@ HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("Unexpected response from the data nodes for 'd' message, current request type %d", combiner->request_type)));
+ /* count the row */
+ combiner->processed++;
+
/* If there is a copy file, data has to be sent to the local file */
if (combiner->copy_file)
/* write data to the copy file */
@@ -881,7 +860,6 @@ ValidateAndResetCombiner(RemoteQueryState *combiner)
combiner->command_complete_count = 0;
combiner->connections = NULL;
combiner->conn_count = 0;
- combiner->row_count = 0;
combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
combiner->tuple_desc = NULL;
combiner->description_count = 0;
@@ -1106,7 +1084,6 @@ data_node_begin(int conn_count, DataNodeHandle ** connections,
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
@@ -1225,7 +1202,6 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
result = EOF;
@@ -1268,10 +1244,7 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
}
if (!combiner)
- {
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
- }
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
result = EOF;
@@ -1336,7 +1309,6 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections)
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
return EOF;
@@ -1480,7 +1452,6 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
* client runs console or file copy
*/
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner)
@@ -1541,7 +1512,6 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle**
if (primary_handle->inStart < primary_handle->inEnd)
{
RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
handle_response(primary_handle, combiner);
if (!ValidateAndCloseCombiner(combiner))
return EOF;
@@ -1603,7 +1573,6 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle**
if (handle->inStart < handle->inEnd)
{
RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
handle_response(handle, combiner);
if (!ValidateAndCloseCombiner(combiner))
return EOF;
@@ -1670,13 +1639,13 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
bool need_tran;
List *nodelist;
ListCell *nodeitem;
- uint64 processed = 0;
+ uint64 processed;
nodelist = exec_nodes->nodelist;
need_tran = !autocommit || conn_count > 1;
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM);
- combiner->dest = None_Receiver;
+ combiner->processed = 0;
/* If there is an existing file where to copy data, pass it to combiner */
if (copy_file)
combiner->copy_file = copy_file;
@@ -1712,7 +1681,7 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
}
}
- processed = combiner->row_count;
+ processed = combiner->processed;
if (!ValidateAndCloseCombiner(combiner))
{
@@ -1730,7 +1699,7 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
/*
* Finish copy process on all connections
*/
-uint64
+void
DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
CombineType combine_type)
{
@@ -1743,7 +1712,6 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
DataNodeHandle *connections[NumDataNodes];
DataNodeHandle *primary_handle = NULL;
int conn_count = 0;
- uint64 processed;
for (i = 0; i < NumDataNodes; i++)
{
@@ -1786,8 +1754,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
}
combiner = CreateResponseCombiner(conn_count + 1, combine_type);
- combiner->dest = None_Receiver;
- error = data_node_receive_responses(1, &primary_handle, timeout, combiner) || error;
+ error = (data_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error;
}
for (i = 0; i < conn_count; i++)
@@ -1823,22 +1790,25 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
need_tran = !autocommit || primary_handle || conn_count > 1;
if (!combiner)
- {
combiner = CreateResponseCombiner(conn_count, combine_type);
- combiner->dest = None_Receiver;
- }
error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error;
- processed = combiner->row_count;
-
if (!ValidateAndCloseCombiner(combiner) || error)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Error while running COPY")));
+}
- return processed;
+#define REMOTE_QUERY_NSLOTS 2
+int
+ExecCountSlotsRemoteQuery(RemoteQuery *node)
+{
+ return ExecCountSlotsNode(outerPlan((Plan *) node)) +
+ ExecCountSlotsNode(innerPlan((Plan *) node)) +
+ REMOTE_QUERY_NSLOTS;
}
+
RemoteQueryState *
ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
{
@@ -1876,6 +1846,9 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}
+ if (outerPlan(node))
+ outerPlanState(remotestate) = ExecInitNode(outerPlan(node), estate, eflags);
+
return remotestate;
}
@@ -1927,6 +1900,83 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
}
}
+static void
+get_exec_connections(Exec_Nodes *exec_nodes,
+ int *regular_conn_count,
+ int *total_conn_count,
+ DataNodeHandle ***connections,
+ DataNodeHandle ***primaryconnection)
+{
+ List *nodelist = NIL;
+ List *primarynode = NIL;
+
+ if (exec_nodes)
+ {
+ nodelist = exec_nodes->nodelist;
+ primarynode = exec_nodes->primarynodelist;
+ }
+
+ if (list_length(nodelist) == 0)
+ {
+ if (primarynode)
+ *regular_conn_count = NumDataNodes - 1;
+ else
+ *regular_conn_count = NumDataNodes;
+ }
+ else
+ {
+ *regular_conn_count = list_length(nodelist);
+ }
+
+ *total_conn_count = *regular_conn_count;
+
+ /* Get connection for primary node, if used */
+ if (primarynode)
+ {
+ *primaryconnection = get_handles(primarynode);
+ if (!*primaryconnection)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not obtain connection from pool")));
+ (*total_conn_count)++;
+ }
+
+ /* Get other connections (non-primary) */
+ *connections = get_handles(nodelist);
+ if (!*connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not obtain connection from pool")));
+
+}
+
+/*
+ * We would want to run 2PC if current transaction modified more then
+ * one node. So optimize little bit and do not look further if we
+ * already have more then one write nodes.
+ */
+static void
+register_write_nodes(int conn_count, DataNodeHandle **connections)
+{
+ int i, j;
+
+ for (i = 0; i < conn_count && write_node_count < 2; i++)
+ {
+ bool found = false;
+
+ for (j = 0; j < write_node_count && !found; j++)
+ {
+ if (write_node_list[j] == connections[i])
+ found = true;
+ }
+ if (!found)
+ {
+ /* Add to transaction wide-list */
+ write_node_list[write_node_count++] = connections[i];
+ }
+ }
+}
+
/*
* Execute step of PGXC plan.
* The step specifies a command to be executed on specified nodes.
@@ -1950,66 +2000,51 @@ ExecRemoteQuery(RemoteQueryState *node)
if (!node->query_Done)
{
/* First invocation, initialize */
- Exec_Nodes *exec_nodes = step->exec_nodes;
bool force_autocommit = step->force_autocommit;
bool is_read_only = step->read_only;
GlobalTransactionId gxid = InvalidGlobalTransactionId;
Snapshot snapshot = GetActiveSnapshot();
DataNodeHandle **connections = NULL;
DataNodeHandle **primaryconnection = NULL;
- List *nodelist = NIL;
- List *primarynode = NIL;
int i;
- int j;
int regular_conn_count;
int total_conn_count;
bool need_tran;
- if (exec_nodes)
- {
- nodelist = exec_nodes->nodelist;
- primarynode = exec_nodes->primarynodelist;
- }
-
- if (list_length(nodelist) == 0)
- {
- if (primarynode)
- regular_conn_count = NumDataNodes - 1;
- else
- regular_conn_count = NumDataNodes;
- }
- else
+ /*
+ * If coordinator plan is specified execute it first.
+ * If the plan is returning we are returning these tuples immediately.
+ * If it is not returning or returned them all by current invocation
+ * we will go ahead and execute remote query. Then we will never execute
+ * the outer plan again because node->query_Done flag will be set and
+ * execution won't get to that place.
+ */
+ if (outerPlanState(node))
{
- regular_conn_count = list_length(nodelist);
+ TupleTableSlot *slot = ExecProcNode(outerPlanState(node));
+ if (!TupIsNull(slot))
+ return slot;
}
- total_conn_count = regular_conn_count;
- node->node_count = total_conn_count;
+ get_exec_connections(step->exec_nodes,
+ &regular_conn_count,
+ &total_conn_count,
+ &connections,
+ &primaryconnection);
- /* Get connection for primary node, if used */
- if (primarynode)
- {
- primaryconnection = get_handles(primarynode);
- if (!primaryconnection)
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not obtain connection from pool")));
- total_conn_count++;
- }
-
- /* Get other connections (non-primary) */
- connections = get_handles(nodelist);
- if (!connections)
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not obtain connection from pool")));
+ /*
+ * We save only regular connections, at the time we exit the function
+ * we finish with the primary connection and deal only with regular
+ * connections on subsequent invocations
+ */
+ node->node_count = regular_conn_count;
if (force_autocommit)
need_tran = false;
else
need_tran = !autocommit || total_conn_count > 1;
- elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, statement_need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
+ elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
stat_statement();
if (autocommit)
@@ -2019,44 +2054,11 @@ ExecRemoteQuery(RemoteQueryState *node)
clear_write_node_list();
}
- /* Check status of connections */
- /*
- * We would want to run 2PC if current transaction modified more then
- * one node. So optimize little bit and do not look further if we
- * already have two.
- */
- if (!is_read_only && write_node_count < 2)
+ if (!is_read_only)
{
- bool found;
-
if (primaryconnection)
- {
- found = false;
- for (j = 0; j < write_node_count && !found; j++)
- {
- if (write_node_list[j] == primaryconnection[0])
- found = true;
- }
- if (!found)
- {
- /* Add to transaction wide-list */
- write_node_list[write_node_count++] = primaryconnection[0];
- }
- }
- for (i = 0; i < regular_conn_count && write_node_count < 2; i++)
- {
- found = false;
- for (j = 0; j < write_node_count && !found; j++)
- {
- if (write_node_list[j] == connections[i])
- found = true;
- }
- if (!found)
- {
- /* Add to transaction wide-list */
- write_node_list[write_node_count++] = connections[i];
- }
- }
+ register_write_nodes(1, primaryconnection);
+ register_write_nodes(regular_conn_count, connections);
}
gxid = GetCurrentGlobalTransactionId();
@@ -2209,12 +2211,10 @@ ExecRemoteQuery(RemoteQueryState *node)
{
ExecSetSlotDescriptor(scanslot, node->tuple_desc);
/*
- * we should send to client not the tuple_desc we just
- * received, but tuple_desc from the planner.
- * Data node may be sending junk columns for sorting
+ * Now tuple table slot is responcible for freeing the
+ * descriptor
*/
- (*node->dest->rStartup) (node->dest, CMD_SELECT,
- resultslot->tts_tupleDescriptor);
+ node->tuple_desc = NULL;
if (step->sort)
{
SimpleSort *sort = step->sort;
@@ -2228,7 +2228,7 @@ ExecRemoteQuery(RemoteQueryState *node)
* be initialized
*/
node->tuplesortstate = tuplesort_begin_merge(
- node->tuple_desc,
+ scanslot->tts_tupleDescriptor,
sort->numCols,
sort->sortColIdx,
sort->sortOperators,
@@ -2290,7 +2290,6 @@ ExecRemoteQuery(RemoteQueryState *node)
}
}
copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
break;
}
if (!have_tuple)
@@ -2310,12 +2309,26 @@ ExecRemoteQuery(RemoteQueryState *node)
{
if (node->simple_aggregates)
{
- /*
- * Advance aggregate functions and allow to read up next
- * data row message and get tuple in the same slot on
- * next iteration
- */
- exec_simple_aggregates(node, scanslot);
+ if (node->simple_aggregates)
+ {
+ /*
+ * Advance aggregate functions and allow to read up next
+ * data row message and get tuple in the same slot on
+ * next iteration
+ */
+ exec_simple_aggregates(node, scanslot);
+ }
+ else
+ {
+ /*
+ * Receive current slot and read up next data row
+ * message before exiting the loop. Next time when this
+ * function is invoked we will have either data row
+ * message ready or EOF
+ */
+ copy_slot(node, scanslot, resultslot);
+ have_tuple = true;
+ }
}
else
{
@@ -2326,7 +2339,6 @@ ExecRemoteQuery(RemoteQueryState *node)
* message ready or EOF
*/
copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
have_tuple = true;
}
}
@@ -2380,10 +2392,7 @@ ExecRemoteQuery(RemoteQueryState *node)
{
finish_simple_aggregates(node, resultslot);
if (!TupIsNull(resultslot))
- {
- (*node->dest->receiveSlot) (resultslot, node->dest);
have_tuple = true;
- }
}
if (!have_tuple) /* report end of scan */
@@ -2405,12 +2414,234 @@ ExecRemoteQuery(RemoteQueryState *node)
void
ExecEndRemoteQuery(RemoteQueryState *node)
{
- (*node->dest->rShutdown) (node->dest);
+ /*
+ * Release tuplesort resources
+ */
+ if (node->tuplesortstate != NULL)
+ tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+ node->tuplesortstate = NULL;
+
+ /*
+ * shut down the subplan
+ */
+ if (outerPlanState(node))
+ ExecEndNode(outerPlanState(node));
+
if (node->tmp_ctx)
MemoryContextDelete(node->tmp_ctx);
+
CloseCombiner(node);
}
+/*
+ * Execute utility statement on multiple data nodes
+ * It does approximately the same as
+ *
+ * RemoteQueryState *state = ExecInitRemoteQuery(plan, estate, flags);
+ * Assert(TupIsNull(ExecRemoteQuery(state));
+ * ExecEndRemoteQuery(state)
+ *
+ * But does not need an Estate instance and does not do some unnecessary work,
+ * like allocating tuple slots.
+ */
+void
+ExecRemoteUtility(RemoteQuery *node)
+{
+ RemoteQueryState *remotestate;
+ bool force_autocommit = node->force_autocommit;
+ bool is_read_only = node->read_only;
+ GlobalTransactionId gxid = InvalidGlobalTransactionId;
+ Snapshot snapshot = GetActiveSnapshot();
+ DataNodeHandle **connections = NULL;
+ DataNodeHandle **primaryconnection = NULL;
+ int regular_conn_count;
+ int total_conn_count;
+ bool need_tran;
+ int i;
+
+ remotestate = CreateResponseCombiner(0, node->combine_type);
+
+ get_exec_connections(node->exec_nodes,
+ &regular_conn_count,
+ &total_conn_count,
+ &connections,
+ &primaryconnection);
+
+ if (force_autocommit)
+ need_tran = false;
+ else
+ need_tran = !autocommit || total_conn_count > 1;
+
+ if (!is_read_only)
+ {
+ if (primaryconnection)
+ register_write_nodes(1, primaryconnection);
+ register_write_nodes(regular_conn_count, connections);
+ }
+
+ gxid = GetCurrentGlobalTransactionId();
+ if (!GlobalTransactionIdIsValid(gxid))
+ {
+ if (primaryconnection)
+ pfree(primaryconnection);
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to get next transaction ID")));
+ }
+
+ if (need_tran)
+ {
+ /*
+ * Check if data node connections are in transaction and start
+ * transactions on nodes where it is not started
+ */
+ DataNodeHandle *new_connections[total_conn_count];
+ int new_count = 0;
+
+ if (primaryconnection && primaryconnection[0]->transaction_status != 'T')
+ new_connections[new_count++] = primaryconnection[0];
+ for (i = 0; i < regular_conn_count; i++)
+ if (connections[i]->transaction_status != 'T')
+ new_connections[new_count++] = connections[i];
+
+ if (new_count)
+ data_node_begin(new_count, new_connections, gxid);
+ }
+
+ /* See if we have a primary nodes, execute on it first before the others */
+ if (primaryconnection)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid))
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (data_node_send_query(primaryconnection[0], node->sql_statement) != 0)
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+
+ Assert(remotestate->combine_type == COMBINE_TYPE_SAME);
+
+ while (remotestate->command_complete_count < 1)
+ {
+ PG_TRY();
+ {
+ data_node_receive(1, primaryconnection, NULL);
+ while (handle_response(primaryconnection[0], remotestate) == RESPONSE_EOF)
+ data_node_receive(1, primaryconnection, NULL);
+ if (remotestate->errorMessage)
+ {
+ char *code = remotestate->errorCode;
+ ereport(ERROR,
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", remotestate->errorMessage)));
+ }
+ }
+ /* If we got an error response return immediately */
+ PG_CATCH();
+ {
+ pfree(primaryconnection);
+ pfree(connections);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+ pfree(primaryconnection);
+ }
+
+ for (i = 0; i < regular_conn_count; i++)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(connections[i], gxid))
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && data_node_send_snapshot(connections[i], snapshot))
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (data_node_send_query(connections[i], node->sql_statement) != 0)
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ }
+
+ /*
+ * Stop if all commands are completed or we got a data row and
+ * initialized state node for subsequent invocations
+ */
+ while (regular_conn_count > 0)
+ {
+ int i = 0;
+
+ data_node_receive(regular_conn_count, connections, NULL);
+ /*
+ * Handle input from the data nodes.
+ * If we got a RESPONSE_DATAROW we can break handling to wrap
+ * it into a tuple and return. Handling will be continued upon
+ * subsequent invocations.
+ * If we got 0, we exclude connection from the list. We do not
+ * expect more input from it. In case of non-SELECT query we quit
+ * the loop when all nodes finish their work and send ReadyForQuery
+ * with empty connections array.
+ * If we got EOF, move to the next connection, will receive more
+ * data on the next iteration.
+ */
+ while (i < regular_conn_count)
+ {
+ int res = handle_response(connections[i], remotestate);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (i < --regular_conn_count)
+ connections[i] = connections[regular_conn_count];
+ }
+ else if (res == RESPONSE_TUPDESC)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from data node")));
+ }
+ else if (res == RESPONSE_DATAROW)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from data node")));
+ }
+ }
+ }
+}
+
/*
* Called when the backend is ending.
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index e59c86920e..5aa9e5d97a 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -650,6 +650,20 @@ pg_analyze_and_rewrite(Node *parsetree, const char *query_string,
*/
querytree_list = pg_rewrite_query(query);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ ListCell *lc;
+
+ foreach(lc, querytree_list)
+ {
+ Query *query = (Query *) lfirst(lc);
+ query->sql_statement = pstrdup(query_string);
+ query->nodeTag = nodeTag(parsetree);
+ }
+ }
+#endif
+
TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);
return querytree_list;
@@ -900,9 +914,6 @@ exec_simple_query(const char *query_string)
DestReceiver *receiver;
int16 format;
#ifdef PGXC
- Query_Plan *query_plan;
- RemoteQuery *query_step;
- bool exec_on_coord;
/*
* By default we do not want data nodes to contact GTM directly,
@@ -910,9 +921,6 @@ exec_simple_query(const char *query_string)
*/
if (IS_PGXC_DATANODE)
SetForceXidFromGTM(false);
-
- exec_on_coord = true;
- query_plan = NULL;
#endif
/*
@@ -968,131 +976,11 @@ exec_simple_query(const char *query_string)
querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
NULL, 0);
-#ifdef PGXC /* PGXC_COORD */
- if (IS_PGXC_COORDINATOR)
- {
- if (IsA(parsetree, TransactionStmt))
- pgxc_transaction_stmt(parsetree);
-
- else if (IsA(parsetree, ExecDirectStmt))
- {
- ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree;
- List *inner_parse_tree_list;
-
- Assert(IS_PGXC_COORDINATOR);
-
- exec_on_coord = execdirect->coordinator;
-
- /*
- * Switch to appropriate context for constructing parse and
- * query trees (these must outlive the execution context).
- */
- oldcontext = MemoryContextSwitchTo(MessageContext);
-
- inner_parse_tree_list = pg_parse_query(execdirect->query);
- /*
- * we do not support complex commands (expanded to multiple
- * parse trees) within EXEC DIRECT
- */
- if (list_length(parsetree_list) != 1)
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("Can not execute %s with EXECUTE DIRECT",
- execdirect->query)));
- }
- parsetree = linitial(inner_parse_tree_list);
-
- /*
- * Set up a snapshot if parse analysis/planning will need
- * one.
- */
- if (analyze_requires_snapshot(parsetree))
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- snapshot_set = true;
- }
-
- querytree_list = pg_analyze_and_rewrite(parsetree,
- query_string,
- NULL,
- 0);
-
- if (execdirect->nodes)
- {
- ListCell *lc;
- Query *query = (Query *) linitial(querytree_list);
-
- query_plan = (Query_Plan *) palloc0(sizeof(Query_Plan));
- query_step = makeNode(RemoteQuery);
- query_step->plan.targetlist = query->targetList;
- query_step->sql_statement = pstrdup(execdirect->query);
- query_step->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
- foreach (lc, execdirect->nodes)
- {
- int node = intVal(lfirst(lc));
- query_step->exec_nodes->nodelist = lappend_int(query_step->exec_nodes->nodelist, node);
- }
- query_step->combine_type = COMBINE_TYPE_SAME;
-
- query_plan->query_step_list = lappend(NULL, query_step);
- query_plan->exec_loc_type = EXEC_ON_DATA_NODES;
- }
-
- /* Restore context */
- MemoryContextSwitchTo(oldcontext);
-
- }
- else if (IsA(parsetree, CopyStmt))
- {
- CopyStmt *copy = (CopyStmt *) parsetree;
- uint64 processed;
- /* Snapshot is needed for the Copy */
- if (!snapshot_set)
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- snapshot_set = true;
- }
- /*
- * A check on locator is made in DoCopy to determine if the copy can be launched on
- * Datanode or on Coordinator.
- * If a table has no locator data, then IsCoordPortalCopy returns false and copy is launched
- * on Coordinator instead (e.g., using pg_catalog tables).
- * If a table has some locator data (user tables), then copy was launched normally
- * in Datanodes
- */
- if (!IsCoordPortalCopy(copy))
- {
- exec_on_coord = false;
- processed = DoCopy(copy, query_string, false);
- snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
- "COPY " UINT64_FORMAT, processed);
- }
- else
- exec_on_coord = true;
- }
- else
- {
- query_plan = GetQueryPlan(parsetree, query_string, querytree_list);
-
- exec_on_coord = query_plan->exec_loc_type & EXEC_ON_COORD;
- }
-
- /* First execute on the coordinator, if involved (DDL), then data nodes */
- }
-
- plantree_list = NIL;
- if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE)
-#endif
- plantree_list = pg_plan_queries(querytree_list, 0, NULL);
+ plantree_list = pg_plan_queries(querytree_list, 0, NULL);
/* Done with the snapshot used for parsing/planning */
-#ifdef PGXC
- /* In PG-XC, hold on to it a bit longer */
-#else
if (snapshot_set)
PopActiveSnapshot();
-#endif
/* If we got a cancel signal in analysis or planning, quit */
CHECK_FOR_INTERRUPTS();
@@ -1102,13 +990,6 @@ exec_simple_query(const char *query_string)
/* Force getting Xid from GTM if not autovacuum, but a vacuum */
if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment)
SetForceXidFromGTM(true);
-
- /*
- * Create and run Portal only if it is needed.
- * In some special cases we have nothing to run at this point
- */
- if (plantree_list || query_plan)
- {
#endif
/*
@@ -1170,11 +1051,6 @@ exec_simple_query(const char *query_string)
*/
MemoryContextSwitchTo(oldcontext);
-#ifdef PGXC
- /* Skip the Portal stuff on coordinator if command only executes on data nodes */
- if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE)
- {
-#endif
/*
* Run the portal to completion, and then drop it (and the receiver).
*/
@@ -1185,55 +1061,10 @@ exec_simple_query(const char *query_string)
receiver,
completionTag);
-#ifdef PGXC
- }
-
- /* PGXC_COORD */
- /* If the coordinator ran ok, now run on the data nodes if planned */
- if (IS_PGXC_COORDINATOR)
- {
- if (query_plan && (query_plan->exec_loc_type & EXEC_ON_DATA_NODES))
- {
- RemoteQueryState *state;
- TupleTableSlot *slot;
- EState *estate = CreateExecutorState();
- oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
- query_step = linitial(query_plan->query_step_list);
- estate->es_tupleTable = ExecCreateTupleTable(2);
- state = ExecInitRemoteQuery(query_step, estate, 0);
- state->dest = receiver;
- state->completionTag = completionTag;
- if (!snapshot_set)
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- snapshot_set = true;
- }
- do
- {
- slot = ExecRemoteQuery(state);
- }
- while (!TupIsNull(slot));
-
- ExecEndRemoteQuery(state);
- /* Restore context */
- MemoryContextSwitchTo(oldcontext);
- }
-
- FreeQueryPlan(query_plan);
- }
-#endif /* PGXC_COORD */
-
(*receiver->rDestroy) (receiver);
PortalDrop(portal, false);
-#ifdef PGXC
- }
-
- if (snapshot_set)
- PopActiveSnapshot();
-#endif
-
if (IsA(parsetree, TransactionStmt))
{
/*
@@ -1479,6 +1310,19 @@ exec_parse_message(const char *query_string, /* string to execute */
ShowUsage("PARSE ANALYSIS STATISTICS");
querytree_list = pg_rewrite_query(query);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ ListCell *lc;
+
+ foreach(lc, querytree_list)
+ {
+ Query *query = (Query *) lfirst(lc);
+ query->sql_statement = pstrdup(query_string);
+ query->nodeTag = nodeTag(raw_parse_tree);
+ }
+ }
+#endif
/*
* If this is the unnamed statement and it has parameters, defer query
@@ -4365,45 +4209,3 @@ log_disconnections(int code, Datum arg)
port->user_name, port->database_name, port->remote_host,
port->remote_port[0] ? " port=" : "", port->remote_port)));
}
-
-
-#ifdef PGXC
-/*
- * Handle transaction statements in PG-XC
- */
-void
-pgxc_transaction_stmt (Node *parsetree)
-{
- Assert(IS_PGXC_COORDINATOR);
-
-
- /* Handle transaction statements specially */
- if (IsA(parsetree, TransactionStmt))
- {
- TransactionStmt *stmt = (TransactionStmt *) parsetree;
-
- switch (stmt->kind)
- {
- case TRANS_STMT_BEGIN:
- /*
- * This does not yet send down a BEGIN,
- * we do that "on demand" as data nodes are added
- */
- DataNodeBegin();
- break;
-
- case TRANS_STMT_COMMIT:
- DataNodeCommit();
- break;
-
- case TRANS_STMT_ROLLBACK:
- DataNodeRollback();
- break;
-
- default:
- /* Ignore others for prototype */
- break;
- }
- }
-}
-#endif /* PGXC */
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 98716830cd..4e3096acaf 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -286,6 +286,17 @@ ChoosePortalStrategy(List *stmts)
}
}
}
+#ifdef PGXC
+ else if (IsA(stmt, RemoteQuery))
+ {
+ /*
+ * Let's choose PORTAL_ONE_SELECT for now
+ * After adding more PGXC functionality we may have more
+ * sophisticated algorithm of determining portal strategy
+ */
+ return PORTAL_ONE_SELECT;
+ }
+#endif
else if (IsA(stmt, PlannedStmt))
{
PlannedStmt *pstmt = (PlannedStmt *) stmt;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 0bb4b4645f..3f857282f6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -58,7 +58,12 @@
#include "utils/syscache.h"
#ifdef PGXC
+#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
+
+static void ExecUtilityStmtOnNodes(const char *queryString, Exec_Nodes *nodes,
+ bool force_autocommit);
#endif
@@ -283,6 +288,10 @@ ProcessUtility(Node *parsetree,
{
ListCell *lc;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ DataNodeBegin();
+#endif
BeginTransactionBlock();
foreach(lc, stmt->options)
{
@@ -301,6 +310,10 @@ ProcessUtility(Node *parsetree,
break;
case TRANS_STMT_COMMIT:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ DataNodeCommit();
+#endif
if (!EndTransactionBlock())
{
/* report unsuccessful commit in completionTag */
@@ -329,6 +342,10 @@ ProcessUtility(Node *parsetree,
break;
case TRANS_STMT_ROLLBACK:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ DataNodeBegin();
+#endif
UserAbortTransactionBlock();
break;
@@ -406,6 +423,10 @@ ProcessUtility(Node *parsetree,
* relation and attribute manipulation
*/
case T_CreateSchemaStmt:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
CreateSchemaCommand((CreateSchemaStmt *) parsetree,
queryString);
break;
@@ -416,6 +437,7 @@ ProcessUtility(Node *parsetree,
ListCell *l;
Oid relOid;
+ /* PGXC transformCreateStmt also adds T_RemoteQuery node */
/* Run parse analysis ... */
stmts = transformCreateStmt((CreateStmt *) parsetree,
queryString);
@@ -522,7 +544,6 @@ ProcessUtility(Node *parsetree,
case T_DropStmt:
{
DropStmt *stmt = (DropStmt *) parsetree;
-
switch (stmt->removeType)
{
case OBJECT_TABLE:
@@ -566,11 +587,31 @@ ProcessUtility(Node *parsetree,
(int) stmt->removeType);
break;
}
+#ifdef PGXC
+ /*
+ * PGXCTODO
+ * We may need to check details of the object being dropped and
+ * run command on correct nodes
+ */
+ if (IS_PGXC_COORDINATOR)
+ /* sequence exists only on coordinator */
+ if (stmt->removeType != OBJECT_SEQUENCE)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
}
break;
case T_TruncateStmt:
ExecuteTruncate((TruncateStmt *) parsetree);
+#ifdef PGXC
+ /*
+ * PGXCTODO
+ * We may need to check details of the object being truncated and
+ * run command on correct nodes
+ */
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CommentStmt:
@@ -580,11 +621,7 @@ ProcessUtility(Node *parsetree,
case T_CopyStmt:
{
uint64 processed;
-#ifdef PGXC
- processed = DoCopy((CopyStmt *) parsetree, queryString, true);
-#else
- processed = DoCopy((CopyStmt *) parsetree, queryString):
-#endif
+ processed = DoCopy((CopyStmt *) parsetree, queryString);
if (completionTag)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
@@ -608,10 +645,18 @@ ProcessUtility(Node *parsetree,
* schema
*/
case T_RenameStmt:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
ExecRenameStmt((RenameStmt *) parsetree);
break;
case T_AlterObjectSchemaStmt:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
ExecAlterObjectSchemaStmt((AlterObjectSchemaStmt *) parsetree);
break;
@@ -624,6 +669,7 @@ ProcessUtility(Node *parsetree,
List *stmts;
ListCell *l;
+ /* PGXC transformCreateStmt also adds T_RemoteQuery node */
/* Run parse analysis ... */
stmts = transformAlterTableStmt((AlterTableStmt *) parsetree,
queryString);
@@ -698,6 +744,10 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_GrantStmt:
@@ -751,6 +801,10 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CompositeTypeStmt: /* CREATE TYPE (composite) */
@@ -759,10 +813,18 @@ ProcessUtility(Node *parsetree,
DefineCompositeType(stmt->typevar, stmt->coldeflist);
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateEnumStmt: /* CREATE TYPE (enum) */
DefineEnum((CreateEnumStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_ViewStmt: /* CREATE VIEW */
@@ -771,10 +833,18 @@ ProcessUtility(Node *parsetree,
case T_CreateFunctionStmt: /* CREATE FUNCTION */
CreateFunction((CreateFunctionStmt *) parsetree, queryString);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterFunctionStmt: /* ALTER FUNCTION */
AlterFunction((AlterFunctionStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_IndexStmt: /* CREATE INDEX */
@@ -807,11 +877,20 @@ ProcessUtility(Node *parsetree,
false, /* skip_build */
false, /* quiet */
stmt->concurrent); /* concurrent */
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL,
+ stmt->concurrent);
+#endif
}
break;
case T_RuleStmt: /* CREATE RULE */
DefineRule((RuleStmt *) parsetree, queryString);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateSeqStmt:
@@ -843,19 +922,35 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreatedbStmt:
PreventTransactionChain(isTopLevel, "CREATE DATABASE");
createdb((CreatedbStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
case T_AlterDatabaseStmt:
AlterDatabase((AlterDatabaseStmt *) parsetree, isTopLevel);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterDatabaseSetStmt:
AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropdbStmt:
@@ -865,6 +960,10 @@ ProcessUtility(Node *parsetree,
PreventTransactionChain(isTopLevel, "DROP DATABASE");
dropdb(stmt->dbname, stmt->missing_ok);
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
/* Query-level asynchronous notification */
@@ -903,23 +1002,51 @@ ProcessUtility(Node *parsetree,
/* Allowed names are restricted if you're not superuser */
load_file(stmt->filename, !superuser());
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_ClusterStmt:
cluster((ClusterStmt *) parsetree, isTopLevel);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
case T_VacuumStmt:
+#ifdef PGXC
+ /*
+ * We have to run the command on nodes before coordinator because
+ * vacuum() pops active snapshot and we can not send it to nodes
+ */
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false,
isTopLevel);
break;
case T_ExplainStmt:
ExplainQuery((ExplainStmt *) parsetree, queryString, params, dest);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ Exec_Nodes *nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
+ nodes->nodelist = GetAnyDataNode();
+ ExecUtilityStmtOnNodes(queryString, nodes, false);
+ }
+#endif
break;
case T_VariableSetStmt:
ExecSetVariableStmt((VariableSetStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_VariableShowStmt:
@@ -936,6 +1063,10 @@ ProcessUtility(Node *parsetree,
case T_CreateTrigStmt:
CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, true);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropPropertyStmt:
@@ -963,14 +1094,26 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreatePLangStmt:
CreateProceduralLanguage((CreatePLangStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropPLangStmt:
DropProceduralLanguage((DropPLangStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
/*
@@ -978,6 +1121,10 @@ ProcessUtility(Node *parsetree,
*/
case T_CreateDomainStmt:
DefineDomain((CreateDomainStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
/*
@@ -1027,6 +1174,10 @@ ProcessUtility(Node *parsetree,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to do CHECKPOINT")));
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
case T_ReindexStmt:
@@ -1059,50 +1210,100 @@ ProcessUtility(Node *parsetree,
(int) stmt->kind);
break;
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL,
+ stmt->kind == OBJECT_DATABASE);
+#endif
break;
}
break;
case T_CreateConversionStmt:
CreateConversionCommand((CreateConversionStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateCastStmt:
CreateCast((CreateCastStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropCastStmt:
DropCast((DropCastStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateOpClassStmt:
DefineOpClass((CreateOpClassStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateOpFamilyStmt:
DefineOpFamily((CreateOpFamilyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterOpFamilyStmt:
AlterOpFamily((AlterOpFamilyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_RemoveOpClassStmt:
RemoveOpClass((RemoveOpClassStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_RemoveOpFamilyStmt:
RemoveOpFamily((RemoveOpFamilyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterTSDictionaryStmt:
AlterTSDictionary((AlterTSDictionaryStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterTSConfigurationStmt:
AlterTSConfiguration((AlterTSConfigurationStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
-
+#ifdef PGXC
+ case T_RemoteQuery:
+ Assert(IS_PGXC_COORDINATOR);
+ ExecRemoteUtility((RemoteQuery *) parsetree);
+ break;
+#endif
default:
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(parsetree));
@@ -1110,6 +1311,22 @@ ProcessUtility(Node *parsetree,
}
}
+#ifdef PGXC
+static void
+ExecUtilityStmtOnNodes(const char *queryString, Exec_Nodes *nodes,
+ bool force_autocommit)
+{
+ RemoteQuery *step = makeNode(RemoteQuery);
+ step->combine_type = COMBINE_TYPE_SAME;
+ step->exec_nodes = nodes;
+ step->sql_statement = pstrdup(queryString);
+ step->force_autocommit = force_autocommit;
+ ExecRemoteUtility(step);
+ pfree(step->sql_statement);
+ pfree(step);
+}
+#endif
+
/*
* UtilityReturnsTuples
* Return "true" if this utility statement will send output to the
@@ -2076,7 +2293,7 @@ CreateCommandTag(Node *parsetree)
}
}
break;
-
+
case T_ExecDirectStmt:
tag = "EXECUTE DIRECT";
break;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 7a3054d90e..7e6edac9be 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -17,12 +17,7 @@
#include "nodes/parsenodes.h"
#include "tcop/dest.h"
-#ifdef PGXC
-extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal);
-extern bool IsCoordPortalCopy(const CopyStmt *stmt);
-#else
extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
-#endif
extern DestReceiver *CreateCopyDestReceiver(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e0515ba95d..88ae12a2c6 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -146,6 +146,11 @@ typedef struct Query
Node *setOperations; /* set-operation tree if this is top level of
* a UNION/INTERSECT/EXCEPT query */
+#ifdef PGXC
+ /* need this info for PGXC Planner, may be temporary */
+ char *sql_statement; /* original query */
+ NodeTag nodeTag; /* node tag of top node of parse tree */
+#endif
} Query;
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index e9b59ccbd9..b7faa7dd28 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -51,9 +51,7 @@ typedef struct RemoteQueryState
int conn_count; /* count of active connections */
int current_conn; /* used to balance load when reading from connections */
CombineType combine_type; /* see CombineType enum */
- DestReceiver *dest; /* output destination */
int command_complete_count; /* count of received CommandComplete messages */
- uint64 row_count; /* how many rows affected by the query */
RequestType request_type; /* see RequestType enum */
TupleDesc tuple_desc; /* tuple descriptor to be referenced by emitted tuples */
int description_count; /* count of received RowDescription messages */
@@ -62,7 +60,6 @@ typedef struct RemoteQueryState
char errorCode[5]; /* error code to send back to client */
char *errorMessage; /* error message to send back to client */
bool query_Done; /* query has been sent down to data nodes */
- char *completionTag; /* completion tag to present to caller */
char *msg; /* last data row message */
int msglen; /* length of the data row message */
/*
@@ -76,6 +73,7 @@ typedef struct RemoteQueryState
FmgrInfo *eqfunctions; /* functions to compare tuples */
MemoryContext tmp_ctx; /* separate context is needed to compare tuples */
FILE *copy_file; /* used if copy_dest == COPY_FILE */
+ uint64 processed; /* count of data rows when running CopyOut */
} RemoteQueryState;
/* Multinode Executor */
@@ -86,11 +84,13 @@ extern int DataNodeRollback(void);
extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections);
extern uint64 DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file);
-extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+extern void DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+extern int ExecCountSlotsRemoteQuery(RemoteQuery *node);
extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags);
extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step);
extern void ExecEndRemoteQuery(RemoteQueryState *step);
+extern void ExecRemoteUtility(RemoteQuery *node);
extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 5ead756e1c..7ae04746e8 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -44,7 +44,7 @@ typedef struct
/* track if tables use pg_catalog */
-typedef enum
+typedef enum
{
TABLE_USAGE_TYPE_NO_TABLE,
TABLE_USAGE_TYPE_PGCATALOG,
@@ -58,10 +58,10 @@ typedef enum
* primarynodelist is for replicated table writes, where to execute first.
* If it succeeds, only then should it be executed on nodelist.
* primarynodelist should be set to NULL if not doing replicated write operations
- */
+ */
typedef struct
{
- List *primarynodelist;
+ List *primarynodelist;
List *nodelist;
char baselocatortype;
TableUsageType tableusagetype; /* track pg_catalog usage */
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index 47665b2341..bf8f2242fa 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -17,16 +17,14 @@
#include "fmgr.h"
#include "lib/stringinfo.h"
+#include "nodes/params.h"
+#include "nodes/parsenodes.h"
#include "nodes/plannodes.h"
#include "nodes/primnodes.h"
#include "pgxc/locator.h"
#include "tcop/dest.h"
-/* for Query_Plan.exec_loc_type can have these OR'ed*/
-#define EXEC_ON_COORD 0x1
-#define EXEC_ON_DATA_NODES 0x2
-
typedef enum
{
COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */
@@ -72,18 +70,6 @@ typedef struct
} RemoteQuery;
-/*
- * The PGXC plan to execute.
- * In the prototype this will be simple, and queryStepList will
- * contain just one step.
- */
-typedef struct
-{
- int exec_loc_type;
- List *query_step_list; /* List of QuerySteps */
-} Query_Plan;
-
-
/* For handling simple aggregates (no group by present)
* For now, only MAX will be supported.
*/
@@ -154,9 +140,8 @@ extern bool StrictStatementChecking;
/* forbid SELECT even multi-node ORDER BY */
extern bool StrictSelectChecking;
-extern Query_Plan *GetQueryPlan(Node *parsetree, const char *sql_statement,
- List *querytree_list);
-extern void FreeQueryPlan(Query_Plan *query_plan);
+extern PlannedStmt *pgxc_planner(Query *query, int cursorOptions,
+ ParamListInfo boundParams);
extern bool IsHashDistributable(Oid col_type);
#endif /* PGXCPLANNER_H */