summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorM S2010-08-23 03:53:55 +0000
committerPavan Deolasee2011-05-19 16:45:14 +0000
commitbd68c7342a793aa6b7c8a835196e85bb127b2f5b (patch)
tree77c623021e0f806e68e415c68cffc85ccebe1c93 /src
parentbd35da43a6a655c2acc067a241bbf63b6ef6a840 (diff)
Portal integration changes.
This integrates Postgres-XC code deeper into PostgreSQL. The Extended Query Protocol can now be used, which means that JDBC will now work. It also lays more groundwork for supporting multi-step queries (cross-node joins). Note that statements with parameters cannot yet be prepared and executed, only those without parameters will work. Note also that this patch introduces additional performance degradation because more processing occurs with each request. We will be working to address these issues in the coming weeks. Written by Andrei Martsinchyk
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/copy.c76
-rw-r--r--src/backend/executor/execMain.c10
-rw-r--r--src/backend/executor/execProcnode.c28
-rw-r--r--src/backend/optimizer/plan/planner.c11
-rw-r--r--src/backend/parser/analyze.c46
-rw-r--r--src/backend/parser/parse_utilcmd.c34
-rw-r--r--src/backend/pgxc/plan/planner.c346
-rw-r--r--src/backend/pgxc/pool/execRemote.c525
-rw-r--r--src/backend/tcop/postgres.c254
-rw-r--r--src/backend/tcop/pquery.c11
-rw-r--r--src/backend/tcop/utility.c233
-rw-r--r--src/include/commands/copy.h5
-rw-r--r--src/include/nodes/parsenodes.h5
-rw-r--r--src/include/pgxc/execRemote.h8
-rw-r--r--src/include/pgxc/locator.h6
-rw-r--r--src/include/pgxc/planner.h23
16 files changed, 869 insertions, 752 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 50650190ee..aee1b42fdd 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -179,7 +179,6 @@ typedef struct CopyStateData
/* Locator information */
RelationLocInfo *rel_loc; /* the locator key */
int hash_idx; /* index of the hash column */
- bool on_coord;
DataNodeHandle **connections; /* Involved data node connections */
#endif
@@ -800,31 +799,6 @@ CopyQuoteIdentifier(StringInfo query_buf, char *value)
}
#endif
-#ifdef PGXC
-/*
- * In case there is no locator info available, copy to/from is launched in portal on coordinator.
- * This happens for pg_catalog tables (not user defined ones)
- * such as pg_catalog, pg_attribute, etc.
- * This part is launched before the portal is activated, so check a first time if there
- * some locator data for this relid and if no, return and launch the portal.
- */
-bool
-IsCoordPortalCopy(const CopyStmt *stmt)
-{
- RelationLocInfo *rel_loc; /* the locator key */
-
- /* In the case of a COPY SELECT, this is launched on datanodes */
- if(!stmt->relation)
- return false;
-
- rel_loc = GetRelationLocInfo(RangeVarGetRelid(stmt->relation, true));
-
- if (!rel_loc)
- return true;
-
- return false;
-}
-#endif
/*
* DoCopy executes the SQL COPY statement
@@ -857,11 +831,7 @@ IsCoordPortalCopy(const CopyStmt *stmt)
* the table or the specifically requested columns.
*/
uint64
-#ifdef PGXC
-DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal)
-#else
DoCopy(const CopyStmt *stmt, const char *queryString)
-#endif
{
CopyState cstate;
bool is_from = stmt->is_from;
@@ -883,16 +853,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
-#ifdef PGXC
- /*
- * Copy to/from is initialized as being launched on datanodes
- * This functionnality is particularly interesting to have a result for
- * tables who have no locator informations such as pg_catalog, pg_class,
- * and pg_attribute.
- */
- cstate->on_coord = false;
-#endif
-
/* Extract options from the statement node tree */
foreach(option, stmt->options)
{
@@ -1180,13 +1140,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
+ /*
+ * If target table does not exists on nodes (e.g. system table)
+ * the location info returned is NULL. This is the criteria, when
+ * we need to run Copy on coordinator
+ */
cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel));
- if (exec_on_coord_portal)
- cstate->on_coord = true;
-
hash_att = GetRelationHashColumn(cstate->rel_loc);
- if (!cstate->on_coord)
+ if (cstate->rel_loc)
{
if (is_from || hash_att)
exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList);
@@ -1481,7 +1443,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
* In the case of CopyOut, it is just necessary to pick up one node randomly.
* This is done when rel_loc is found.
*/
- if (!cstate->on_coord)
+ if (cstate->rel_loc)
{
cstate->connections = DataNodeCopyBegin(cstate->query_buf.data,
exec_nodes->nodelist,
@@ -1506,7 +1468,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
}
PG_CATCH();
{
- if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && is_from && cstate->rel_loc)
{
DataNodeCopyFinish(
cstate->connections,
@@ -1519,18 +1481,13 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
PG_RE_THROW();
}
PG_END_TRY();
- if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && is_from && cstate->rel_loc)
{
- if (cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED)
- cstate->processed = DataNodeCopyFinish(
- cstate->connections,
- primary_data_node,
- COMBINE_TYPE_SAME);
- else
- cstate->processed = DataNodeCopyFinish(
- cstate->connections,
- 0,
- COMBINE_TYPE_SUM);
+ bool replicated = cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED;
+ DataNodeCopyFinish(
+ cstate->connections,
+ replicated ? primary_data_node : 0,
+ replicated ? COMBINE_TYPE_SAME : COMBINE_TYPE_SUM);
pfree(cstate->connections);
pfree(cstate->query_buf.data);
FreeRelationLocInfo(cstate->rel_loc);
@@ -1770,7 +1727,7 @@ CopyTo(CopyState cstate)
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && cstate->rel_loc)
{
cstate->processed = DataNodeCopyOut(
GetRelationNodes(cstate->rel_loc, NULL, true),
@@ -2480,7 +2437,7 @@ CopyFrom(CopyState cstate)
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR && !cstate->on_coord)
+ if (IS_PGXC_COORDINATOR && cstate->rel_loc)
{
Datum *hash_value = NULL;
@@ -2494,6 +2451,7 @@ CopyFrom(CopyState cstate)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
+ cstate->processed++;
}
else
{
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 1fddf10bc9..ca0c7b1f56 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -858,6 +858,14 @@ InitPlan(QueryDesc *queryDesc, int eflags)
{
case CMD_SELECT:
case CMD_INSERT:
+#ifdef PGXC
+ /*
+ * PGXC RemoteQuery do not require ctid junk field, so follow
+ * standard procedure for UPDATE and DELETE
+ */
+ case CMD_UPDATE:
+ case CMD_DELETE:
+#endif
foreach(tlist, plan->targetlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
@@ -869,10 +877,12 @@ InitPlan(QueryDesc *queryDesc, int eflags)
}
}
break;
+#ifndef PGXC
case CMD_UPDATE:
case CMD_DELETE:
junk_filter_needed = true;
break;
+#endif
default:
break;
}
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 1b1dd91f2a..f82ce4c97b 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -108,7 +108,9 @@
#include "executor/nodeWindowAgg.h"
#include "executor/nodeWorktablescan.h"
#include "miscadmin.h"
-
+#ifdef PGXC
+#include "pgxc/execRemote.h"
+#endif
/* ------------------------------------------------------------------------
* ExecInitNode
@@ -286,6 +288,13 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
estate, eflags);
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ result = (PlanState *) ExecInitRemoteQuery((RemoteQuery *) node,
+ estate, eflags);
+ break;
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
result = NULL; /* keep compiler quiet */
@@ -451,6 +460,12 @@ ExecProcNode(PlanState *node)
result = ExecLimit((LimitState *) node);
break;
+#ifdef PGXC
+ case T_RemoteQueryState:
+ result = ExecRemoteQuery((RemoteQueryState *) node);
+ break;
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
result = NULL;
@@ -627,6 +642,11 @@ ExecCountSlotsNode(Plan *node)
case T_Limit:
return ExecCountSlotsLimit((Limit *) node);
+#ifdef PGXC
+ case T_RemoteQuery:
+ return ExecCountSlotsRemoteQuery((RemoteQuery *) node);
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;
@@ -783,6 +803,12 @@ ExecEndNode(PlanState *node)
ExecEndLimit((LimitState *) node);
break;
+#ifdef PGXC
+ case T_RemoteQueryState:
+ ExecEndRemoteQuery((RemoteQueryState *) node);
+ break;
+#endif
+
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 3f344b3a14..7d41461096 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -38,6 +38,10 @@
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "parser/parsetree.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
+#endif
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@@ -119,7 +123,12 @@ planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (planner_hook)
result = (*planner_hook) (parse, cursorOptions, boundParams);
else
- result = standard_planner(parse, cursorOptions, boundParams);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ result = pgxc_planner(parse, cursorOptions, boundParams);
+ else
+#endif
+ result = standard_planner(parse, cursorOptions, boundParams);
return result;
}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 97c560b309..0fe392e99e 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -39,6 +39,11 @@
#include "parser/parse_target.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteManip.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
+#include "tcop/tcopprot.h"
+#endif
#include "utils/rel.h"
@@ -58,6 +63,10 @@ static Query *transformDeclareCursorStmt(ParseState *pstate,
DeclareCursorStmt *stmt);
static Query *transformExplainStmt(ParseState *pstate,
ExplainStmt *stmt);
+#ifdef PGXC
+static Query *transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt);
+#endif
+
static void transformLockingClause(ParseState *pstate,
Query *qry, LockingClause *lc);
static bool check_parameter_resolution_walker(Node *node, ParseState *pstate);
@@ -199,6 +208,13 @@ transformStmt(ParseState *pstate, Node *parseTree)
(ExplainStmt *) parseTree);
break;
+#ifdef PGXC
+ case T_ExecDirectStmt:
+ result = transformExecDirectStmt(pstate,
+ (ExecDirectStmt *) parseTree);
+ break;
+#endif
+
default:
/*
@@ -263,6 +279,17 @@ analyze_requires_snapshot(Node *parseTree)
result = true;
break;
+#ifdef PGXC
+ case T_ExecDirectStmt:
+
+ /*
+ * We will parse/analyze/plan inner query, which probably will
+ * need a snapshot. Ensure it is set.
+ */
+ result = true;
+ break;
+#endif
+
default:
/* utility statements don't have any active parse analysis */
result = false;
@@ -1925,6 +1952,25 @@ transformExplainStmt(ParseState *pstate, ExplainStmt *stmt)
return result;
}
+#ifdef PGXC
+/*
+ * transformExecDirectStmt -
+ * transform an EXECUTE DIRECT Statement
+ *
+ * Handling is depends if we should execute on nodes or on coordinator.
+ * To execute on nodes we return CMD_UTILITY query having one T_RemoteQuery node
+ * with the inner statement as a sql_command.
+ * If statement is to run on coordinator we should parse inner statement and
+ * analyze resulting query tree.
+ */
+static Query *
+transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt)
+{
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Support for EXECUTE DIRECT is temporary broken")));
+}
+#endif
/* exported so planner can check again after rewriting, query pullup, etc */
void
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index b0488d88c5..d7a932e781 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -52,6 +52,7 @@
#ifdef PGXC
#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
#endif
#include "rewrite/rewriteManip.h"
@@ -261,9 +262,9 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
result = list_concat(result, save_alist);
#ifdef PGXC
- /*
- * If the user did not specify any distribution clause and there is no
- * inherits clause, try and use PK or unique index
+ /*
+ * If the user did not specify any distribution clause and there is no
+ * inherits clause, try and use PK or unique index
*/
if (!stmt->distributeby && !stmt->inhRelations && cxt.fallback_dist_col)
{
@@ -271,6 +272,13 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
stmt->distributeby->disttype = DISTTYPE_HASH;
stmt->distributeby->colname = cxt.fallback_dist_col;
}
+ if (IS_PGXC_COORDINATOR)
+ {
+ RemoteQuery *step = makeNode(RemoteQuery);
+ step->combine_type = COMBINE_TYPE_SAME;
+ step->sql_statement = queryString;
+ result = lappend(result, step);
+ }
#endif
return result;
}
@@ -1171,7 +1179,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
if (cxt->distributeby)
isLocalSafe = CheckLocalIndexColumn (
- ConvertToLocatorType(cxt->distributeby->disttype),
+ ConvertToLocatorType(cxt->distributeby->disttype),
cxt->distributeby->colname, key);
}
#endif
@@ -1273,7 +1281,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
/*
* Set fallback distribution column.
- * If not set, set it to first column in index.
+ * If not set, set it to first column in index.
* If primary key, we prefer that over a unique constraint.
*/
if (index->indexParams == NIL
@@ -1281,7 +1289,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
{
cxt->fallback_dist_col = pstrdup(key);
}
-
+
/* Existing table, check if it is safe */
if (!cxt->distributeby && !isLocalSafe)
isLocalSafe = CheckLocalIndexColumn (
@@ -1299,7 +1307,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
index->indexParams = lappend(index->indexParams, iparam);
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR && cxt->distributeby
+ if (IS_PGXC_COORDINATOR && cxt->distributeby
&& cxt->distributeby->disttype == DISTTYPE_HASH && !isLocalSafe)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
@@ -1618,7 +1626,7 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("Rule may not use NOTIFY, it is not yet supported")));
-
+
#endif
/*
* Since outer ParseState isn't parent of inner, have to pass down
@@ -1956,7 +1964,15 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
result = lappend(cxt.blist, stmt);
result = list_concat(result, cxt.alist);
result = list_concat(result, save_alist);
-
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ RemoteQuery *step = makeNode(RemoteQuery);
+ step->combine_type = COMBINE_TYPE_SAME;
+ step->sql_statement = queryString;
+ result = lappend(result, step);
+ }
+#endif
return result;
}
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 002e710bd6..1dcfc2943a 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -25,6 +25,7 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "optimizer/clauses.h"
+#include "optimizer/planner.h"
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
@@ -116,7 +117,7 @@ typedef struct ColumnBase
*/
typedef struct XCWalkerContext
{
- Query *query;
+ Query *query;
bool isRead;
Exec_Nodes *exec_nodes; /* resulting execution nodes */
Special_Conditions *conditions;
@@ -125,6 +126,7 @@ typedef struct XCWalkerContext
int varno;
bool within_or;
bool within_not;
+ bool exec_on_coord; /* fallback to standard planner to have plan executed on coordinator only */
List *join_list; /* A list of List*'s, one for each relation. */
} XCWalkerContext;
@@ -971,6 +973,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
/* just pg_catalog tables */
context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->exec_on_coord = true;
return false;
}
@@ -1087,6 +1090,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
{
context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->exec_on_coord = true;
return false;
}
@@ -1253,7 +1257,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
static Exec_Nodes *
get_plan_nodes(Query *query, bool isRead)
{
- Exec_Nodes *result_nodes;
+ Exec_Nodes *result_nodes = NULL;
XCWalkerContext context;
@@ -1267,13 +1271,16 @@ get_plan_nodes(Query *query, bool isRead)
context.varno = 0;
context.within_or = false;
context.within_not = false;
+ context.exec_on_coord = false;
context.join_list = NIL;
- if (get_plan_nodes_walker((Node *) query, &context))
- result_nodes = NULL;
- else
+ if (!get_plan_nodes_walker((Node *) query, &context))
result_nodes = context.exec_nodes;
-
+ if (context.exec_on_coord && result_nodes)
+ {
+ pfree(result_nodes);
+ result_nodes = NULL;
+ }
free_special_relations(context.conditions);
free_join_list(context.join_list);
return result_nodes;
@@ -1976,68 +1983,89 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step)
* For the prototype, there will only be one step,
* and the nodelist will be NULL if it is not a PGXC-safe statement.
*/
-Query_Plan *
-GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
+PlannedStmt *
+pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
{
- Query_Plan *query_plan = palloc(sizeof(Query_Plan));
+ /*
+ * We waste some time invoking standard planner, but getting good enough
+ * PlannedStmt, we just need to replace standard plan.
+ * In future we may want to skip the standard_planner invocation and
+ * initialize the PlannedStmt here. At the moment not all queries works:
+ * ex. there was a problem with INSERT into a subset of table columns
+ */
+ PlannedStmt *result = standard_planner(query, cursorOptions, boundParams);
+ Plan *standardPlan = result->planTree;
RemoteQuery *query_step = makeNode(RemoteQuery);
- Query *query;
- query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1);
- strcpy(query_step->sql_statement, sql_statement);
+ query_step->sql_statement = pstrdup(query->sql_statement);
query_step->exec_nodes = NULL;
query_step->combine_type = COMBINE_TYPE_NONE;
query_step->simple_aggregates = NULL;
- query_step->read_only = false;
+ /* Optimize multi-node handling */
+ query_step->read_only = query->nodeTag == T_SelectStmt;
query_step->force_autocommit = false;
- query_plan->query_step_list = lappend(NULL, query_step);
+ result->planTree = (Plan *) query_step;
/*
* Determine where to execute the command, either at the Coordinator
* level, Data Nodes, or both. By default we choose both. We should be
* able to quickly expand this for more commands.
*/
- switch (nodeTag(parsetree))
+ switch (query->nodeTag)
{
case T_SelectStmt:
- /* Optimize multi-node handling */
- query_step->read_only = true;
+ /* Perform some checks to make sure we can support the statement */
+ if (query->intoClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("INTO clause not yet supported"))));
+
+ if (query->setOperations)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("UNION, INTERSECT and EXCEPT are not yet supported"))));
+
+ if (query->hasRecursive)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("WITH RECURSIVE not yet supported"))));
+
+ if (query->hasWindowFuncs)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("Window functions not yet supported"))));
/* fallthru */
case T_InsertStmt:
case T_UpdateStmt:
case T_DeleteStmt:
- /* just use first one in querytree_list */
- query = (Query *) linitial(querytree_list);
- /* should copy instead ? */
- query_step->plan.targetlist = query->targetList;
+ query_step->exec_nodes = get_plan_nodes_command(query);
- /* Perform some checks to make sure we can support the statement */
- if (nodeTag(parsetree) == T_SelectStmt)
+ if (query_step->exec_nodes == NULL)
{
- if (query->intoClause)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("INTO clause not yet supported"))));
-
- if (query->setOperations)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("UNION, INTERSECT and EXCEPT are not yet supported"))));
-
- if (query->hasRecursive)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("WITH RECURSIVE not yet supported"))));
-
- if (query->hasWindowFuncs)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Window functions not yet supported"))));
+ /*
+ * Processing guery against catalog tables, restore
+ * standard plan
+ */
+ result->planTree = standardPlan;
+ return result;
}
- query_step->exec_nodes =
- get_plan_nodes_command(query);
+ /*
+ * PGXCTODO
+ * When Postgres runs insert into t (a) values (1); against table
+ * defined as create table t (a int, b int); the plan is looking
+ * like insert into t (a,b) values (1,null);
+ * Later executor is verifying plan, to make sure table has not
+ * been altered since plan has been created and comparing table
+ * definition with plan target list and output error if they do
+ * not match.
+ * I could not find better way to generate targetList for pgxc plan
+ * then call standard planner and take targetList from the plan
+ * generated by Postgres.
+ */
+ query_step->plan.targetlist = standardPlan->targetlist;
+
if (query_step->exec_nodes)
query_step->combine_type = get_plan_combine_type(
query, query_step->exec_nodes->baselocatortype);
@@ -2047,37 +2075,9 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
query_step->simple_aggregates = get_simple_aggregates(query);
/*
- * See if it is a SELECT with no relations, like SELECT 1+1 or
- * SELECT nextval('fred'), and just use coord.
- */
- if (query_step->exec_nodes == NULL
- && (query->jointree->fromlist == NULL
- || query->jointree->fromlist->length == 0))
- /* Just execute it on Coordinator */
- query_plan->exec_loc_type = EXEC_ON_COORD;
- else
- {
- if (query_step->exec_nodes != NULL
- && query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_PGCATALOG)
- {
- /* pg_catalog query, run on coordinator */
- query_plan->exec_loc_type = EXEC_ON_COORD;
- }
- else
- {
- query_plan->exec_loc_type = EXEC_ON_DATA_NODES;
-
- /* If node list is NULL, execute on coordinator */
- if (!query_step->exec_nodes)
- query_plan->exec_loc_type = EXEC_ON_COORD;
- }
- }
-
- /*
* Add sortring to the step
*/
- if (query_plan->exec_loc_type == EXEC_ON_DATA_NODES &&
- list_length(query_step->exec_nodes->nodelist) > 1 &&
+ if (list_length(query_step->exec_nodes->nodelist) > 1 &&
(query->sortClause || query->distinctClause))
make_simple_sort_from_sortclauses(query, query_step);
@@ -2090,7 +2090,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
* Check if we have multiple nodes and an unsupported clause. This
* is temporary until we expand supported SQL
*/
- if (nodeTag(parsetree) == T_SelectStmt)
+ if (query->nodeTag == T_SelectStmt)
{
if (StrictStatementChecking && query_step->exec_nodes
&& list_length(query_step->exec_nodes->nodelist) > 1)
@@ -2110,180 +2110,6 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
}
}
break;
-
- /* Statements that we only want to execute on the Coordinator */
- case T_VariableShowStmt:
- query_plan->exec_loc_type = EXEC_ON_COORD;
- break;
-
- /*
- * Statements that need to run in autocommit mode, on Coordinator
- * and Data Nodes with suppressed implicit two phase commit.
- */
- case T_CheckPointStmt:
- case T_ClusterStmt:
- case T_CreatedbStmt:
- case T_DropdbStmt:
- case T_VacuumStmt:
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- query_step->force_autocommit = true;
- break;
-
- case T_DropPropertyStmt:
- /*
- * Triggers are not yet supported by PGXC
- * all other queries are executed on both Coordinator and Datanode
- * On the same point, assert also is not supported
- */
- if (((DropPropertyStmt *)parsetree)->removeType == OBJECT_TRIGGER)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("This command is not yet supported."))));
- else
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- case T_CreateStmt:
- if (((CreateStmt *)parsetree)->relation->istemp)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Temp tables are not yet supported."))));
-
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- /*
- * Statements that we execute on both the Coordinator and Data Nodes
- */
- case T_AlterDatabaseStmt:
- case T_AlterDatabaseSetStmt:
- case T_AlterDomainStmt:
- case T_AlterFdwStmt:
- case T_AlterForeignServerStmt:
- case T_AlterFunctionStmt:
- case T_AlterObjectSchemaStmt:
- case T_AlterOpFamilyStmt:
- case T_AlterSeqStmt:
- case T_AlterTableStmt: /* Can also be used to rename a sequence */
- case T_AlterTSConfigurationStmt:
- case T_AlterTSDictionaryStmt:
- case T_ClosePortalStmt: /* In case CLOSE ALL is issued */
- case T_CommentStmt:
- case T_CompositeTypeStmt:
- case T_ConstraintsSetStmt:
- case T_CreateCastStmt:
- case T_CreateConversionStmt:
- case T_CreateDomainStmt:
- case T_CreateEnumStmt:
- case T_CreateFdwStmt:
- case T_CreateForeignServerStmt:
- case T_CreateFunctionStmt: /* Only global functions are supported */
- case T_CreateOpClassStmt:
- case T_CreateOpFamilyStmt:
- case T_CreatePLangStmt:
- case T_CreateSeqStmt:
- case T_CreateSchemaStmt:
- case T_DeallocateStmt: /* Allow for DEALLOCATE ALL */
- case T_DiscardStmt:
- case T_DropCastStmt:
- case T_DropFdwStmt:
- case T_DropForeignServerStmt:
- case T_DropPLangStmt:
- case T_DropStmt:
- case T_IndexStmt:
- case T_LockStmt:
- case T_ReindexStmt:
- case T_RemoveFuncStmt:
- case T_RemoveOpClassStmt:
- case T_RemoveOpFamilyStmt:
- case T_RenameStmt:
- case T_RuleStmt:
- case T_TruncateStmt:
- case T_VariableSetStmt:
- case T_ViewStmt:
-
- /*
- * Also support these, should help later with pg_restore, although
- * not very useful because of the pooler using the same user
- */
- case T_GrantStmt:
- case T_GrantRoleStmt:
- case T_CreateRoleStmt:
- case T_AlterRoleStmt:
- case T_AlterRoleSetStmt:
- case T_AlterUserMappingStmt:
- case T_CreateUserMappingStmt:
- case T_DropRoleStmt:
- case T_AlterOwnerStmt:
- case T_DropOwnedStmt:
- case T_DropUserMappingStmt:
- case T_ReassignOwnedStmt:
- case T_DefineStmt: /* used for aggregates, some types */
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- case T_TransactionStmt:
- switch (((TransactionStmt *) parsetree)->kind)
- {
- case TRANS_STMT_SAVEPOINT:
- case TRANS_STMT_RELEASE:
- case TRANS_STMT_ROLLBACK_TO:
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("This type of transaction statement not yet supported"))));
- break;
-
- default:
- break; /* keep compiler quiet */
- }
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
-
- /*
- * For now, pick one of the data nodes until we modify real
- * planner It will give an approximate idea of what an isolated
- * data node will do
- */
- case T_ExplainStmt:
- if (((ExplainStmt *) parsetree)->analyze)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("ANALYZE with EXPLAIN is currently not supported."))));
-
- query_step->exec_nodes = palloc0(sizeof(Exec_Nodes));
- query_step->exec_nodes->nodelist = GetAnyDataNode();
- query_step->exec_nodes->baselocatortype = LOCATOR_TYPE_RROBIN;
- query_plan->exec_loc_type = EXEC_ON_DATA_NODES;
- break;
-
- /*
- * Trigger queries are not yet supported by PGXC.
- * Tablespace queries are also not yet supported.
- * Two nodes on the same servers cannot use the same tablespace.
- */
- case T_CreateTableSpaceStmt:
- case T_CreateTrigStmt:
- case T_DropTableSpaceStmt:
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("This command is not yet supported."))));
- break;
-
- /*
- * Other statements we do not yet want to handle.
- * By default they would be fobidden, but we list these for reference.
- * Note that there is not a 1-1 correspndence between
- * SQL command and the T_*Stmt structures.
- */
- case T_DeclareCursorStmt:
- case T_ExecuteStmt:
- case T_FetchStmt:
- case T_ListenStmt:
- case T_LoadStmt:
- case T_NotifyStmt:
- case T_PrepareStmt:
- case T_UnlistenStmt:
- /* fall through */
default:
/* Allow for override */
if (StrictStatementChecking)
@@ -2291,12 +2117,10 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("This command is not yet supported."))));
else
- query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES;
- break;
+ result->planTree = standardPlan;
}
-
- return query_plan;
+ return result;
}
@@ -2321,21 +2145,3 @@ free_query_step(RemoteQuery *query_step)
list_free_deep(query_step->simple_aggregates);
pfree(query_step);
}
-
-/*
- * Free Query_Plan struct
- */
-void
-FreeQueryPlan(Query_Plan *query_plan)
-{
- ListCell *item;
-
- if (query_plan == NULL)
- return;
-
- foreach(item, query_plan->query_step_list)
- free_query_step((RemoteQuery *) lfirst(item));
-
- pfree(query_plan->query_step_list);
- pfree(query_plan);
-}
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index bbedef0f2f..0f16c5143f 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -168,9 +168,7 @@ CreateResponseCombiner(int node_count, CombineType combine_type)
combiner->connections = NULL;
combiner->conn_count = 0;
combiner->combine_type = combine_type;
- combiner->dest = NULL;
combiner->command_complete_count = 0;
- combiner->row_count = 0;
combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
combiner->tuple_desc = NULL;
combiner->description_count = 0;
@@ -178,7 +176,6 @@ CreateResponseCombiner(int node_count, CombineType combine_type)
combiner->copy_out_count = 0;
combiner->errorMessage = NULL;
combiner->query_Done = false;
- combiner->completionTag = NULL;
combiner->msg = NULL;
combiner->msglen = 0;
combiner->initAggregates = true;
@@ -488,7 +485,8 @@ HandleCopyOutComplete(RemoteQueryState *combiner)
static void
HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
{
- int digits = 0;
+ int digits = 0;
+ EState *estate = combiner->ss.ps.state;
/*
* If we did not receive description we are having rowcount or OK response
@@ -496,7 +494,7 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
combiner->request_type = REQUEST_TYPE_COMMAND;
/* Extract rowcount */
- if (combiner->combine_type != COMBINE_TYPE_NONE)
+ if (combiner->combine_type != COMBINE_TYPE_NONE && estate)
{
uint64 rowcount;
digits = parse_row_count(msg_body, len, &rowcount);
@@ -507,7 +505,7 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
{
if (combiner->command_complete_count)
{
- if (rowcount != combiner->row_count)
+ if (rowcount != estate->es_processed)
/* There is a consistency issue in the database with the replicated table */
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
@@ -515,37 +513,15 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len)
}
else
/* first result */
- combiner->row_count = rowcount;
+ estate->es_processed = rowcount;
}
else
- combiner->row_count += rowcount;
+ estate->es_processed += rowcount;
}
else
combiner->combine_type = COMBINE_TYPE_NONE;
}
- if (++combiner->command_complete_count == combiner->node_count)
- {
- if (combiner->completionTag)
- {
- if (combiner->combine_type == COMBINE_TYPE_NONE)
- {
- /* ensure we do not go beyond buffer bounds */
- if (len > COMPLETION_TAG_BUFSIZE)
- len = COMPLETION_TAG_BUFSIZE;
- memcpy(combiner->completionTag, msg_body, len);
- }
- else
- {
- /* Truncate msg_body to get base string */
- msg_body[len - digits - 1] = '\0';
- snprintf(combiner->completionTag,
- COMPLETION_TAG_BUFSIZE,
- "%s" UINT64_FORMAT,
- msg_body,
- combiner->row_count);
- }
- }
- }
+ combiner->command_complete_count++;
}
/*
@@ -653,6 +629,9 @@ HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("Unexpected response from the data nodes for 'd' message, current request type %d", combiner->request_type)));
+ /* count the row */
+ combiner->processed++;
+
/* If there is a copy file, data has to be sent to the local file */
if (combiner->copy_file)
/* write data to the copy file */
@@ -881,7 +860,6 @@ ValidateAndResetCombiner(RemoteQueryState *combiner)
combiner->command_complete_count = 0;
combiner->connections = NULL;
combiner->conn_count = 0;
- combiner->row_count = 0;
combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
combiner->tuple_desc = NULL;
combiner->description_count = 0;
@@ -1106,7 +1084,6 @@ data_node_begin(int conn_count, DataNodeHandle ** connections,
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
@@ -1225,7 +1202,6 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
result = EOF;
@@ -1268,10 +1244,7 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
}
if (!combiner)
- {
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
- }
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
result = EOF;
@@ -1336,7 +1309,6 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections)
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner))
return EOF;
@@ -1480,7 +1452,6 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
* client runs console or file copy
*/
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
/* Receive responses */
if (data_node_receive_responses(conn_count, connections, timeout, combiner)
@@ -1541,7 +1512,6 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle**
if (primary_handle->inStart < primary_handle->inEnd)
{
RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
handle_response(primary_handle, combiner);
if (!ValidateAndCloseCombiner(combiner))
return EOF;
@@ -1603,7 +1573,6 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle**
if (handle->inStart < handle->inEnd)
{
RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE);
- combiner->dest = None_Receiver;
handle_response(handle, combiner);
if (!ValidateAndCloseCombiner(combiner))
return EOF;
@@ -1670,13 +1639,13 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
bool need_tran;
List *nodelist;
ListCell *nodeitem;
- uint64 processed = 0;
+ uint64 processed;
nodelist = exec_nodes->nodelist;
need_tran = !autocommit || conn_count > 1;
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM);
- combiner->dest = None_Receiver;
+ combiner->processed = 0;
/* If there is an existing file where to copy data, pass it to combiner */
if (copy_file)
combiner->copy_file = copy_file;
@@ -1712,7 +1681,7 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
}
}
- processed = combiner->row_count;
+ processed = combiner->processed;
if (!ValidateAndCloseCombiner(combiner))
{
@@ -1730,7 +1699,7 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
/*
* Finish copy process on all connections
*/
-uint64
+void
DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
CombineType combine_type)
{
@@ -1743,7 +1712,6 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
DataNodeHandle *connections[NumDataNodes];
DataNodeHandle *primary_handle = NULL;
int conn_count = 0;
- uint64 processed;
for (i = 0; i < NumDataNodes; i++)
{
@@ -1786,8 +1754,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
}
combiner = CreateResponseCombiner(conn_count + 1, combine_type);
- combiner->dest = None_Receiver;
- error = data_node_receive_responses(1, &primary_handle, timeout, combiner) || error;
+ error = (data_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error;
}
for (i = 0; i < conn_count; i++)
@@ -1823,22 +1790,25 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
need_tran = !autocommit || primary_handle || conn_count > 1;
if (!combiner)
- {
combiner = CreateResponseCombiner(conn_count, combine_type);
- combiner->dest = None_Receiver;
- }
error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error;
- processed = combiner->row_count;
-
if (!ValidateAndCloseCombiner(combiner) || error)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Error while running COPY")));
+}
- return processed;
+#define REMOTE_QUERY_NSLOTS 2
+int
+ExecCountSlotsRemoteQuery(RemoteQuery *node)
+{
+ return ExecCountSlotsNode(outerPlan((Plan *) node)) +
+ ExecCountSlotsNode(innerPlan((Plan *) node)) +
+ REMOTE_QUERY_NSLOTS;
}
+
RemoteQueryState *
ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
{
@@ -1876,6 +1846,9 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}
+ if (outerPlan(node))
+ outerPlanState(remotestate) = ExecInitNode(outerPlan(node), estate, eflags);
+
return remotestate;
}
@@ -1927,6 +1900,83 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
}
}
+static void
+get_exec_connections(Exec_Nodes *exec_nodes,
+ int *regular_conn_count,
+ int *total_conn_count,
+ DataNodeHandle ***connections,
+ DataNodeHandle ***primaryconnection)
+{
+ List *nodelist = NIL;
+ List *primarynode = NIL;
+
+ if (exec_nodes)
+ {
+ nodelist = exec_nodes->nodelist;
+ primarynode = exec_nodes->primarynodelist;
+ }
+
+ if (list_length(nodelist) == 0)
+ {
+ if (primarynode)
+ *regular_conn_count = NumDataNodes - 1;
+ else
+ *regular_conn_count = NumDataNodes;
+ }
+ else
+ {
+ *regular_conn_count = list_length(nodelist);
+ }
+
+ *total_conn_count = *regular_conn_count;
+
+ /* Get connection for primary node, if used */
+ if (primarynode)
+ {
+ *primaryconnection = get_handles(primarynode);
+ if (!*primaryconnection)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not obtain connection from pool")));
+ (*total_conn_count)++;
+ }
+
+ /* Get other connections (non-primary) */
+ *connections = get_handles(nodelist);
+ if (!*connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not obtain connection from pool")));
+
+}
+
+/*
+ * We would want to run 2PC if current transaction modified more then
+ * one node. So optimize little bit and do not look further if we
+ * already have more then one write nodes.
+ */
+static void
+register_write_nodes(int conn_count, DataNodeHandle **connections)
+{
+ int i, j;
+
+ for (i = 0; i < conn_count && write_node_count < 2; i++)
+ {
+ bool found = false;
+
+ for (j = 0; j < write_node_count && !found; j++)
+ {
+ if (write_node_list[j] == connections[i])
+ found = true;
+ }
+ if (!found)
+ {
+ /* Add to transaction wide-list */
+ write_node_list[write_node_count++] = connections[i];
+ }
+ }
+}
+
/*
* Execute step of PGXC plan.
* The step specifies a command to be executed on specified nodes.
@@ -1950,66 +2000,51 @@ ExecRemoteQuery(RemoteQueryState *node)
if (!node->query_Done)
{
/* First invocation, initialize */
- Exec_Nodes *exec_nodes = step->exec_nodes;
bool force_autocommit = step->force_autocommit;
bool is_read_only = step->read_only;
GlobalTransactionId gxid = InvalidGlobalTransactionId;
Snapshot snapshot = GetActiveSnapshot();
DataNodeHandle **connections = NULL;
DataNodeHandle **primaryconnection = NULL;
- List *nodelist = NIL;
- List *primarynode = NIL;
int i;
- int j;
int regular_conn_count;
int total_conn_count;
bool need_tran;
- if (exec_nodes)
- {
- nodelist = exec_nodes->nodelist;
- primarynode = exec_nodes->primarynodelist;
- }
-
- if (list_length(nodelist) == 0)
- {
- if (primarynode)
- regular_conn_count = NumDataNodes - 1;
- else
- regular_conn_count = NumDataNodes;
- }
- else
+ /*
+ * If coordinator plan is specified execute it first.
+ * If the plan is returning we are returning these tuples immediately.
+ * If it is not returning or returned them all by current invocation
+ * we will go ahead and execute remote query. Then we will never execute
+ * the outer plan again because node->query_Done flag will be set and
+ * execution won't get to that place.
+ */
+ if (outerPlanState(node))
{
- regular_conn_count = list_length(nodelist);
+ TupleTableSlot *slot = ExecProcNode(outerPlanState(node));
+ if (!TupIsNull(slot))
+ return slot;
}
- total_conn_count = regular_conn_count;
- node->node_count = total_conn_count;
+ get_exec_connections(step->exec_nodes,
+ &regular_conn_count,
+ &total_conn_count,
+ &connections,
+ &primaryconnection);
- /* Get connection for primary node, if used */
- if (primarynode)
- {
- primaryconnection = get_handles(primarynode);
- if (!primaryconnection)
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not obtain connection from pool")));
- total_conn_count++;
- }
-
- /* Get other connections (non-primary) */
- connections = get_handles(nodelist);
- if (!connections)
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not obtain connection from pool")));
+ /*
+ * We save only regular connections, at the time we exit the function
+ * we finish with the primary connection and deal only with regular
+ * connections on subsequent invocations
+ */
+ node->node_count = regular_conn_count;
if (force_autocommit)
need_tran = false;
else
need_tran = !autocommit || total_conn_count > 1;
- elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, statement_need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
+ elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
stat_statement();
if (autocommit)
@@ -2019,44 +2054,11 @@ ExecRemoteQuery(RemoteQueryState *node)
clear_write_node_list();
}
- /* Check status of connections */
- /*
- * We would want to run 2PC if current transaction modified more then
- * one node. So optimize little bit and do not look further if we
- * already have two.
- */
- if (!is_read_only && write_node_count < 2)
+ if (!is_read_only)
{
- bool found;
-
if (primaryconnection)
- {
- found = false;
- for (j = 0; j < write_node_count && !found; j++)
- {
- if (write_node_list[j] == primaryconnection[0])
- found = true;
- }
- if (!found)
- {
- /* Add to transaction wide-list */
- write_node_list[write_node_count++] = primaryconnection[0];
- }
- }
- for (i = 0; i < regular_conn_count && write_node_count < 2; i++)
- {
- found = false;
- for (j = 0; j < write_node_count && !found; j++)
- {
- if (write_node_list[j] == connections[i])
- found = true;
- }
- if (!found)
- {
- /* Add to transaction wide-list */
- write_node_list[write_node_count++] = connections[i];
- }
- }
+ register_write_nodes(1, primaryconnection);
+ register_write_nodes(regular_conn_count, connections);
}
gxid = GetCurrentGlobalTransactionId();
@@ -2209,12 +2211,10 @@ ExecRemoteQuery(RemoteQueryState *node)
{
ExecSetSlotDescriptor(scanslot, node->tuple_desc);
/*
- * we should send to client not the tuple_desc we just
- * received, but tuple_desc from the planner.
- * Data node may be sending junk columns for sorting
+ * Now tuple table slot is responcible for freeing the
+ * descriptor
*/
- (*node->dest->rStartup) (node->dest, CMD_SELECT,
- resultslot->tts_tupleDescriptor);
+ node->tuple_desc = NULL;
if (step->sort)
{
SimpleSort *sort = step->sort;
@@ -2228,7 +2228,7 @@ ExecRemoteQuery(RemoteQueryState *node)
* be initialized
*/
node->tuplesortstate = tuplesort_begin_merge(
- node->tuple_desc,
+ scanslot->tts_tupleDescriptor,
sort->numCols,
sort->sortColIdx,
sort->sortOperators,
@@ -2290,7 +2290,6 @@ ExecRemoteQuery(RemoteQueryState *node)
}
}
copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
break;
}
if (!have_tuple)
@@ -2310,12 +2309,26 @@ ExecRemoteQuery(RemoteQueryState *node)
{
if (node->simple_aggregates)
{
- /*
- * Advance aggregate functions and allow to read up next
- * data row message and get tuple in the same slot on
- * next iteration
- */
- exec_simple_aggregates(node, scanslot);
+ if (node->simple_aggregates)
+ {
+ /*
+ * Advance aggregate functions and allow to read up next
+ * data row message and get tuple in the same slot on
+ * next iteration
+ */
+ exec_simple_aggregates(node, scanslot);
+ }
+ else
+ {
+ /*
+ * Receive current slot and read up next data row
+ * message before exiting the loop. Next time when this
+ * function is invoked we will have either data row
+ * message ready or EOF
+ */
+ copy_slot(node, scanslot, resultslot);
+ have_tuple = true;
+ }
}
else
{
@@ -2326,7 +2339,6 @@ ExecRemoteQuery(RemoteQueryState *node)
* message ready or EOF
*/
copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
have_tuple = true;
}
}
@@ -2380,10 +2392,7 @@ ExecRemoteQuery(RemoteQueryState *node)
{
finish_simple_aggregates(node, resultslot);
if (!TupIsNull(resultslot))
- {
- (*node->dest->receiveSlot) (resultslot, node->dest);
have_tuple = true;
- }
}
if (!have_tuple) /* report end of scan */
@@ -2405,12 +2414,234 @@ ExecRemoteQuery(RemoteQueryState *node)
void
ExecEndRemoteQuery(RemoteQueryState *node)
{
- (*node->dest->rShutdown) (node->dest);
+ /*
+ * Release tuplesort resources
+ */
+ if (node->tuplesortstate != NULL)
+ tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+ node->tuplesortstate = NULL;
+
+ /*
+ * shut down the subplan
+ */
+ if (outerPlanState(node))
+ ExecEndNode(outerPlanState(node));
+
if (node->tmp_ctx)
MemoryContextDelete(node->tmp_ctx);
+
CloseCombiner(node);
}
+/*
+ * Execute utility statement on multiple data nodes
+ * It does approximately the same as
+ *
+ * RemoteQueryState *state = ExecInitRemoteQuery(plan, estate, flags);
+ * Assert(TupIsNull(ExecRemoteQuery(state));
+ * ExecEndRemoteQuery(state)
+ *
+ * But does not need an Estate instance and does not do some unnecessary work,
+ * like allocating tuple slots.
+ */
+void
+ExecRemoteUtility(RemoteQuery *node)
+{
+ RemoteQueryState *remotestate;
+ bool force_autocommit = node->force_autocommit;
+ bool is_read_only = node->read_only;
+ GlobalTransactionId gxid = InvalidGlobalTransactionId;
+ Snapshot snapshot = GetActiveSnapshot();
+ DataNodeHandle **connections = NULL;
+ DataNodeHandle **primaryconnection = NULL;
+ int regular_conn_count;
+ int total_conn_count;
+ bool need_tran;
+ int i;
+
+ remotestate = CreateResponseCombiner(0, node->combine_type);
+
+ get_exec_connections(node->exec_nodes,
+ &regular_conn_count,
+ &total_conn_count,
+ &connections,
+ &primaryconnection);
+
+ if (force_autocommit)
+ need_tran = false;
+ else
+ need_tran = !autocommit || total_conn_count > 1;
+
+ if (!is_read_only)
+ {
+ if (primaryconnection)
+ register_write_nodes(1, primaryconnection);
+ register_write_nodes(regular_conn_count, connections);
+ }
+
+ gxid = GetCurrentGlobalTransactionId();
+ if (!GlobalTransactionIdIsValid(gxid))
+ {
+ if (primaryconnection)
+ pfree(primaryconnection);
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to get next transaction ID")));
+ }
+
+ if (need_tran)
+ {
+ /*
+ * Check if data node connections are in transaction and start
+ * transactions on nodes where it is not started
+ */
+ DataNodeHandle *new_connections[total_conn_count];
+ int new_count = 0;
+
+ if (primaryconnection && primaryconnection[0]->transaction_status != 'T')
+ new_connections[new_count++] = primaryconnection[0];
+ for (i = 0; i < regular_conn_count; i++)
+ if (connections[i]->transaction_status != 'T')
+ new_connections[new_count++] = connections[i];
+
+ if (new_count)
+ data_node_begin(new_count, new_connections, gxid);
+ }
+
+ /* See if we have a primary nodes, execute on it first before the others */
+ if (primaryconnection)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid))
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (data_node_send_query(primaryconnection[0], node->sql_statement) != 0)
+ {
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+
+ Assert(remotestate->combine_type == COMBINE_TYPE_SAME);
+
+ while (remotestate->command_complete_count < 1)
+ {
+ PG_TRY();
+ {
+ data_node_receive(1, primaryconnection, NULL);
+ while (handle_response(primaryconnection[0], remotestate) == RESPONSE_EOF)
+ data_node_receive(1, primaryconnection, NULL);
+ if (remotestate->errorMessage)
+ {
+ char *code = remotestate->errorCode;
+ ereport(ERROR,
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", remotestate->errorMessage)));
+ }
+ }
+ /* If we got an error response return immediately */
+ PG_CATCH();
+ {
+ pfree(primaryconnection);
+ pfree(connections);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+ pfree(primaryconnection);
+ }
+
+ for (i = 0; i < regular_conn_count; i++)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(connections[i], gxid))
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && data_node_send_snapshot(connections[i], snapshot))
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (data_node_send_query(connections[i], node->sql_statement) != 0)
+ {
+ pfree(connections);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ }
+
+ /*
+ * Stop if all commands are completed or we got a data row and
+ * initialized state node for subsequent invocations
+ */
+ while (regular_conn_count > 0)
+ {
+ int i = 0;
+
+ data_node_receive(regular_conn_count, connections, NULL);
+ /*
+ * Handle input from the data nodes.
+ * If we got a RESPONSE_DATAROW we can break handling to wrap
+ * it into a tuple and return. Handling will be continued upon
+ * subsequent invocations.
+ * If we got 0, we exclude connection from the list. We do not
+ * expect more input from it. In case of non-SELECT query we quit
+ * the loop when all nodes finish their work and send ReadyForQuery
+ * with empty connections array.
+ * If we got EOF, move to the next connection, will receive more
+ * data on the next iteration.
+ */
+ while (i < regular_conn_count)
+ {
+ int res = handle_response(connections[i], remotestate);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (i < --regular_conn_count)
+ connections[i] = connections[regular_conn_count];
+ }
+ else if (res == RESPONSE_TUPDESC)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from data node")));
+ }
+ else if (res == RESPONSE_DATAROW)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from data node")));
+ }
+ }
+ }
+}
+
/*
* Called when the backend is ending.
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index e59c86920e..5aa9e5d97a 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -650,6 +650,20 @@ pg_analyze_and_rewrite(Node *parsetree, const char *query_string,
*/
querytree_list = pg_rewrite_query(query);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ ListCell *lc;
+
+ foreach(lc, querytree_list)
+ {
+ Query *query = (Query *) lfirst(lc);
+ query->sql_statement = pstrdup(query_string);
+ query->nodeTag = nodeTag(parsetree);
+ }
+ }
+#endif
+
TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);
return querytree_list;
@@ -900,9 +914,6 @@ exec_simple_query(const char *query_string)
DestReceiver *receiver;
int16 format;
#ifdef PGXC
- Query_Plan *query_plan;
- RemoteQuery *query_step;
- bool exec_on_coord;
/*
* By default we do not want data nodes to contact GTM directly,
@@ -910,9 +921,6 @@ exec_simple_query(const char *query_string)
*/
if (IS_PGXC_DATANODE)
SetForceXidFromGTM(false);
-
- exec_on_coord = true;
- query_plan = NULL;
#endif
/*
@@ -968,131 +976,11 @@ exec_simple_query(const char *query_string)
querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
NULL, 0);
-#ifdef PGXC /* PGXC_COORD */
- if (IS_PGXC_COORDINATOR)
- {
- if (IsA(parsetree, TransactionStmt))
- pgxc_transaction_stmt(parsetree);
-
- else if (IsA(parsetree, ExecDirectStmt))
- {
- ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree;
- List *inner_parse_tree_list;
-
- Assert(IS_PGXC_COORDINATOR);
-
- exec_on_coord = execdirect->coordinator;
-
- /*
- * Switch to appropriate context for constructing parse and
- * query trees (these must outlive the execution context).
- */
- oldcontext = MemoryContextSwitchTo(MessageContext);
-
- inner_parse_tree_list = pg_parse_query(execdirect->query);
- /*
- * we do not support complex commands (expanded to multiple
- * parse trees) within EXEC DIRECT
- */
- if (list_length(parsetree_list) != 1)
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("Can not execute %s with EXECUTE DIRECT",
- execdirect->query)));
- }
- parsetree = linitial(inner_parse_tree_list);
-
- /*
- * Set up a snapshot if parse analysis/planning will need
- * one.
- */
- if (analyze_requires_snapshot(parsetree))
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- snapshot_set = true;
- }
-
- querytree_list = pg_analyze_and_rewrite(parsetree,
- query_string,
- NULL,
- 0);
-
- if (execdirect->nodes)
- {
- ListCell *lc;
- Query *query = (Query *) linitial(querytree_list);
-
- query_plan = (Query_Plan *) palloc0(sizeof(Query_Plan));
- query_step = makeNode(RemoteQuery);
- query_step->plan.targetlist = query->targetList;
- query_step->sql_statement = pstrdup(execdirect->query);
- query_step->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
- foreach (lc, execdirect->nodes)
- {
- int node = intVal(lfirst(lc));
- query_step->exec_nodes->nodelist = lappend_int(query_step->exec_nodes->nodelist, node);
- }
- query_step->combine_type = COMBINE_TYPE_SAME;
-
- query_plan->query_step_list = lappend(NULL, query_step);
- query_plan->exec_loc_type = EXEC_ON_DATA_NODES;
- }
-
- /* Restore context */
- MemoryContextSwitchTo(oldcontext);
-
- }
- else if (IsA(parsetree, CopyStmt))
- {
- CopyStmt *copy = (CopyStmt *) parsetree;
- uint64 processed;
- /* Snapshot is needed for the Copy */
- if (!snapshot_set)
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- snapshot_set = true;
- }
- /*
- * A check on locator is made in DoCopy to determine if the copy can be launched on
- * Datanode or on Coordinator.
- * If a table has no locator data, then IsCoordPortalCopy returns false and copy is launched
- * on Coordinator instead (e.g., using pg_catalog tables).
- * If a table has some locator data (user tables), then copy was launched normally
- * in Datanodes
- */
- if (!IsCoordPortalCopy(copy))
- {
- exec_on_coord = false;
- processed = DoCopy(copy, query_string, false);
- snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
- "COPY " UINT64_FORMAT, processed);
- }
- else
- exec_on_coord = true;
- }
- else
- {
- query_plan = GetQueryPlan(parsetree, query_string, querytree_list);
-
- exec_on_coord = query_plan->exec_loc_type & EXEC_ON_COORD;
- }
-
- /* First execute on the coordinator, if involved (DDL), then data nodes */
- }
-
- plantree_list = NIL;
- if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE)
-#endif
- plantree_list = pg_plan_queries(querytree_list, 0, NULL);
+ plantree_list = pg_plan_queries(querytree_list, 0, NULL);
/* Done with the snapshot used for parsing/planning */
-#ifdef PGXC
- /* In PG-XC, hold on to it a bit longer */
-#else
if (snapshot_set)
PopActiveSnapshot();
-#endif
/* If we got a cancel signal in analysis or planning, quit */
CHECK_FOR_INTERRUPTS();
@@ -1102,13 +990,6 @@ exec_simple_query(const char *query_string)
/* Force getting Xid from GTM if not autovacuum, but a vacuum */
if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment)
SetForceXidFromGTM(true);
-
- /*
- * Create and run Portal only if it is needed.
- * In some special cases we have nothing to run at this point
- */
- if (plantree_list || query_plan)
- {
#endif
/*
@@ -1170,11 +1051,6 @@ exec_simple_query(const char *query_string)
*/
MemoryContextSwitchTo(oldcontext);
-#ifdef PGXC
- /* Skip the Portal stuff on coordinator if command only executes on data nodes */
- if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE)
- {
-#endif
/*
* Run the portal to completion, and then drop it (and the receiver).
*/
@@ -1185,55 +1061,10 @@ exec_simple_query(const char *query_string)
receiver,
completionTag);
-#ifdef PGXC
- }
-
- /* PGXC_COORD */
- /* If the coordinator ran ok, now run on the data nodes if planned */
- if (IS_PGXC_COORDINATOR)
- {
- if (query_plan && (query_plan->exec_loc_type & EXEC_ON_DATA_NODES))
- {
- RemoteQueryState *state;
- TupleTableSlot *slot;
- EState *estate = CreateExecutorState();
- oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
- query_step = linitial(query_plan->query_step_list);
- estate->es_tupleTable = ExecCreateTupleTable(2);
- state = ExecInitRemoteQuery(query_step, estate, 0);
- state->dest = receiver;
- state->completionTag = completionTag;
- if (!snapshot_set)
- {
- PushActiveSnapshot(GetTransactionSnapshot());
- snapshot_set = true;
- }
- do
- {
- slot = ExecRemoteQuery(state);
- }
- while (!TupIsNull(slot));
-
- ExecEndRemoteQuery(state);
- /* Restore context */
- MemoryContextSwitchTo(oldcontext);
- }
-
- FreeQueryPlan(query_plan);
- }
-#endif /* PGXC_COORD */
-
(*receiver->rDestroy) (receiver);
PortalDrop(portal, false);
-#ifdef PGXC
- }
-
- if (snapshot_set)
- PopActiveSnapshot();
-#endif
-
if (IsA(parsetree, TransactionStmt))
{
/*
@@ -1479,6 +1310,19 @@ exec_parse_message(const char *query_string, /* string to execute */
ShowUsage("PARSE ANALYSIS STATISTICS");
querytree_list = pg_rewrite_query(query);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ ListCell *lc;
+
+ foreach(lc, querytree_list)
+ {
+ Query *query = (Query *) lfirst(lc);
+ query->sql_statement = pstrdup(query_string);
+ query->nodeTag = nodeTag(raw_parse_tree);
+ }
+ }
+#endif
/*
* If this is the unnamed statement and it has parameters, defer query
@@ -4365,45 +4209,3 @@ log_disconnections(int code, Datum arg)
port->user_name, port->database_name, port->remote_host,
port->remote_port[0] ? " port=" : "", port->remote_port)));
}
-
-
-#ifdef PGXC
-/*
- * Handle transaction statements in PG-XC
- */
-void
-pgxc_transaction_stmt (Node *parsetree)
-{
- Assert(IS_PGXC_COORDINATOR);
-
-
- /* Handle transaction statements specially */
- if (IsA(parsetree, TransactionStmt))
- {
- TransactionStmt *stmt = (TransactionStmt *) parsetree;
-
- switch (stmt->kind)
- {
- case TRANS_STMT_BEGIN:
- /*
- * This does not yet send down a BEGIN,
- * we do that "on demand" as data nodes are added
- */
- DataNodeBegin();
- break;
-
- case TRANS_STMT_COMMIT:
- DataNodeCommit();
- break;
-
- case TRANS_STMT_ROLLBACK:
- DataNodeRollback();
- break;
-
- default:
- /* Ignore others for prototype */
- break;
- }
- }
-}
-#endif /* PGXC */
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 98716830cd..4e3096acaf 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -286,6 +286,17 @@ ChoosePortalStrategy(List *stmts)
}
}
}
+#ifdef PGXC
+ else if (IsA(stmt, RemoteQuery))
+ {
+ /*
+ * Let's choose PORTAL_ONE_SELECT for now
+ * After adding more PGXC functionality we may have more
+ * sophisticated algorithm of determining portal strategy
+ */
+ return PORTAL_ONE_SELECT;
+ }
+#endif
else if (IsA(stmt, PlannedStmt))
{
PlannedStmt *pstmt = (PlannedStmt *) stmt;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 0bb4b4645f..3f857282f6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -58,7 +58,12 @@
#include "utils/syscache.h"
#ifdef PGXC
+#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
+#include "pgxc/planner.h"
+
+static void ExecUtilityStmtOnNodes(const char *queryString, Exec_Nodes *nodes,
+ bool force_autocommit);
#endif
@@ -283,6 +288,10 @@ ProcessUtility(Node *parsetree,
{
ListCell *lc;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ DataNodeBegin();
+#endif
BeginTransactionBlock();
foreach(lc, stmt->options)
{
@@ -301,6 +310,10 @@ ProcessUtility(Node *parsetree,
break;
case TRANS_STMT_COMMIT:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ DataNodeCommit();
+#endif
if (!EndTransactionBlock())
{
/* report unsuccessful commit in completionTag */
@@ -329,6 +342,10 @@ ProcessUtility(Node *parsetree,
break;
case TRANS_STMT_ROLLBACK:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ DataNodeBegin();
+#endif
UserAbortTransactionBlock();
break;
@@ -406,6 +423,10 @@ ProcessUtility(Node *parsetree,
* relation and attribute manipulation
*/
case T_CreateSchemaStmt:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
CreateSchemaCommand((CreateSchemaStmt *) parsetree,
queryString);
break;
@@ -416,6 +437,7 @@ ProcessUtility(Node *parsetree,
ListCell *l;
Oid relOid;
+ /* PGXC transformCreateStmt also adds T_RemoteQuery node */
/* Run parse analysis ... */
stmts = transformCreateStmt((CreateStmt *) parsetree,
queryString);
@@ -522,7 +544,6 @@ ProcessUtility(Node *parsetree,
case T_DropStmt:
{
DropStmt *stmt = (DropStmt *) parsetree;
-
switch (stmt->removeType)
{
case OBJECT_TABLE:
@@ -566,11 +587,31 @@ ProcessUtility(Node *parsetree,
(int) stmt->removeType);
break;
}
+#ifdef PGXC
+ /*
+ * PGXCTODO
+ * We may need to check details of the object being dropped and
+ * run command on correct nodes
+ */
+ if (IS_PGXC_COORDINATOR)
+ /* sequence exists only on coordinator */
+ if (stmt->removeType != OBJECT_SEQUENCE)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
}
break;
case T_TruncateStmt:
ExecuteTruncate((TruncateStmt *) parsetree);
+#ifdef PGXC
+ /*
+ * PGXCTODO
+ * We may need to check details of the object being truncated and
+ * run command on correct nodes
+ */
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CommentStmt:
@@ -580,11 +621,7 @@ ProcessUtility(Node *parsetree,
case T_CopyStmt:
{
uint64 processed;
-#ifdef PGXC
- processed = DoCopy((CopyStmt *) parsetree, queryString, true);
-#else
- processed = DoCopy((CopyStmt *) parsetree, queryString):
-#endif
+ processed = DoCopy((CopyStmt *) parsetree, queryString);
if (completionTag)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
@@ -608,10 +645,18 @@ ProcessUtility(Node *parsetree,
* schema
*/
case T_RenameStmt:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
ExecRenameStmt((RenameStmt *) parsetree);
break;
case T_AlterObjectSchemaStmt:
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
ExecAlterObjectSchemaStmt((AlterObjectSchemaStmt *) parsetree);
break;
@@ -624,6 +669,7 @@ ProcessUtility(Node *parsetree,
List *stmts;
ListCell *l;
+ /* PGXC transformCreateStmt also adds T_RemoteQuery node */
/* Run parse analysis ... */
stmts = transformAlterTableStmt((AlterTableStmt *) parsetree,
queryString);
@@ -698,6 +744,10 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_GrantStmt:
@@ -751,6 +801,10 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CompositeTypeStmt: /* CREATE TYPE (composite) */
@@ -759,10 +813,18 @@ ProcessUtility(Node *parsetree,
DefineCompositeType(stmt->typevar, stmt->coldeflist);
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateEnumStmt: /* CREATE TYPE (enum) */
DefineEnum((CreateEnumStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_ViewStmt: /* CREATE VIEW */
@@ -771,10 +833,18 @@ ProcessUtility(Node *parsetree,
case T_CreateFunctionStmt: /* CREATE FUNCTION */
CreateFunction((CreateFunctionStmt *) parsetree, queryString);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterFunctionStmt: /* ALTER FUNCTION */
AlterFunction((AlterFunctionStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_IndexStmt: /* CREATE INDEX */
@@ -807,11 +877,20 @@ ProcessUtility(Node *parsetree,
false, /* skip_build */
false, /* quiet */
stmt->concurrent); /* concurrent */
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL,
+ stmt->concurrent);
+#endif
}
break;
case T_RuleStmt: /* CREATE RULE */
DefineRule((RuleStmt *) parsetree, queryString);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateSeqStmt:
@@ -843,19 +922,35 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreatedbStmt:
PreventTransactionChain(isTopLevel, "CREATE DATABASE");
createdb((CreatedbStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
case T_AlterDatabaseStmt:
AlterDatabase((AlterDatabaseStmt *) parsetree, isTopLevel);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterDatabaseSetStmt:
AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropdbStmt:
@@ -865,6 +960,10 @@ ProcessUtility(Node *parsetree,
PreventTransactionChain(isTopLevel, "DROP DATABASE");
dropdb(stmt->dbname, stmt->missing_ok);
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
/* Query-level asynchronous notification */
@@ -903,23 +1002,51 @@ ProcessUtility(Node *parsetree,
/* Allowed names are restricted if you're not superuser */
load_file(stmt->filename, !superuser());
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_ClusterStmt:
cluster((ClusterStmt *) parsetree, isTopLevel);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
case T_VacuumStmt:
+#ifdef PGXC
+ /*
+ * We have to run the command on nodes before coordinator because
+ * vacuum() pops active snapshot and we can not send it to nodes
+ */
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false,
isTopLevel);
break;
case T_ExplainStmt:
ExplainQuery((ExplainStmt *) parsetree, queryString, params, dest);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ Exec_Nodes *nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
+ nodes->nodelist = GetAnyDataNode();
+ ExecUtilityStmtOnNodes(queryString, nodes, false);
+ }
+#endif
break;
case T_VariableSetStmt:
ExecSetVariableStmt((VariableSetStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_VariableShowStmt:
@@ -936,6 +1063,10 @@ ProcessUtility(Node *parsetree,
case T_CreateTrigStmt:
CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, true);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropPropertyStmt:
@@ -963,14 +1094,26 @@ ProcessUtility(Node *parsetree,
break;
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreatePLangStmt:
CreateProceduralLanguage((CreatePLangStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropPLangStmt:
DropProceduralLanguage((DropPLangStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
/*
@@ -978,6 +1121,10 @@ ProcessUtility(Node *parsetree,
*/
case T_CreateDomainStmt:
DefineDomain((CreateDomainStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
/*
@@ -1027,6 +1174,10 @@ ProcessUtility(Node *parsetree,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to do CHECKPOINT")));
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, true);
+#endif
break;
case T_ReindexStmt:
@@ -1059,50 +1210,100 @@ ProcessUtility(Node *parsetree,
(int) stmt->kind);
break;
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL,
+ stmt->kind == OBJECT_DATABASE);
+#endif
break;
}
break;
case T_CreateConversionStmt:
CreateConversionCommand((CreateConversionStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateCastStmt:
CreateCast((CreateCastStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_DropCastStmt:
DropCast((DropCastStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateOpClassStmt:
DefineOpClass((CreateOpClassStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_CreateOpFamilyStmt:
DefineOpFamily((CreateOpFamilyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterOpFamilyStmt:
AlterOpFamily((AlterOpFamilyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_RemoveOpClassStmt:
RemoveOpClass((RemoveOpClassStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_RemoveOpFamilyStmt:
RemoveOpFamily((RemoveOpFamilyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterTSDictionaryStmt:
AlterTSDictionary((AlterTSDictionaryStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
case T_AlterTSConfigurationStmt:
AlterTSConfiguration((AlterTSConfigurationStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false);
+#endif
break;
-
+#ifdef PGXC
+ case T_RemoteQuery:
+ Assert(IS_PGXC_COORDINATOR);
+ ExecRemoteUtility((RemoteQuery *) parsetree);
+ break;
+#endif
default:
elog(ERROR, "unrecognized node type: %d",
(int) nodeTag(parsetree));
@@ -1110,6 +1311,22 @@ ProcessUtility(Node *parsetree,
}
}
+#ifdef PGXC
+static void
+ExecUtilityStmtOnNodes(const char *queryString, Exec_Nodes *nodes,
+ bool force_autocommit)
+{
+ RemoteQuery *step = makeNode(RemoteQuery);
+ step->combine_type = COMBINE_TYPE_SAME;
+ step->exec_nodes = nodes;
+ step->sql_statement = pstrdup(queryString);
+ step->force_autocommit = force_autocommit;
+ ExecRemoteUtility(step);
+ pfree(step->sql_statement);
+ pfree(step);
+}
+#endif
+
/*
* UtilityReturnsTuples
* Return "true" if this utility statement will send output to the
@@ -2076,7 +2293,7 @@ CreateCommandTag(Node *parsetree)
}
}
break;
-
+
case T_ExecDirectStmt:
tag = "EXECUTE DIRECT";
break;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 7a3054d90e..7e6edac9be 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -17,12 +17,7 @@
#include "nodes/parsenodes.h"
#include "tcop/dest.h"
-#ifdef PGXC
-extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal);
-extern bool IsCoordPortalCopy(const CopyStmt *stmt);
-#else
extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
-#endif
extern DestReceiver *CreateCopyDestReceiver(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e0515ba95d..88ae12a2c6 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -146,6 +146,11 @@ typedef struct Query
Node *setOperations; /* set-operation tree if this is top level of
* a UNION/INTERSECT/EXCEPT query */
+#ifdef PGXC
+ /* need this info for PGXC Planner, may be temporary */
+ char *sql_statement; /* original query */
+ NodeTag nodeTag; /* node tag of top node of parse tree */
+#endif
} Query;
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index e9b59ccbd9..b7faa7dd28 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -51,9 +51,7 @@ typedef struct RemoteQueryState
int conn_count; /* count of active connections */
int current_conn; /* used to balance load when reading from connections */
CombineType combine_type; /* see CombineType enum */
- DestReceiver *dest; /* output destination */
int command_complete_count; /* count of received CommandComplete messages */
- uint64 row_count; /* how many rows affected by the query */
RequestType request_type; /* see RequestType enum */
TupleDesc tuple_desc; /* tuple descriptor to be referenced by emitted tuples */
int description_count; /* count of received RowDescription messages */
@@ -62,7 +60,6 @@ typedef struct RemoteQueryState
char errorCode[5]; /* error code to send back to client */
char *errorMessage; /* error message to send back to client */
bool query_Done; /* query has been sent down to data nodes */
- char *completionTag; /* completion tag to present to caller */
char *msg; /* last data row message */
int msglen; /* length of the data row message */
/*
@@ -76,6 +73,7 @@ typedef struct RemoteQueryState
FmgrInfo *eqfunctions; /* functions to compare tuples */
MemoryContext tmp_ctx; /* separate context is needed to compare tuples */
FILE *copy_file; /* used if copy_dest == COPY_FILE */
+ uint64 processed; /* count of data rows when running CopyOut */
} RemoteQueryState;
/* Multinode Executor */
@@ -86,11 +84,13 @@ extern int DataNodeRollback(void);
extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections);
extern uint64 DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file);
-extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+extern void DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+extern int ExecCountSlotsRemoteQuery(RemoteQuery *node);
extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags);
extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step);
extern void ExecEndRemoteQuery(RemoteQueryState *step);
+extern void ExecRemoteUtility(RemoteQuery *node);
extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 5ead756e1c..7ae04746e8 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -44,7 +44,7 @@ typedef struct
/* track if tables use pg_catalog */
-typedef enum
+typedef enum
{
TABLE_USAGE_TYPE_NO_TABLE,
TABLE_USAGE_TYPE_PGCATALOG,
@@ -58,10 +58,10 @@ typedef enum
* primarynodelist is for replicated table writes, where to execute first.
* If it succeeds, only then should it be executed on nodelist.
* primarynodelist should be set to NULL if not doing replicated write operations
- */
+ */
typedef struct
{
- List *primarynodelist;
+ List *primarynodelist;
List *nodelist;
char baselocatortype;
TableUsageType tableusagetype; /* track pg_catalog usage */
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index 47665b2341..bf8f2242fa 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -17,16 +17,14 @@
#include "fmgr.h"
#include "lib/stringinfo.h"
+#include "nodes/params.h"
+#include "nodes/parsenodes.h"
#include "nodes/plannodes.h"
#include "nodes/primnodes.h"
#include "pgxc/locator.h"
#include "tcop/dest.h"
-/* for Query_Plan.exec_loc_type can have these OR'ed*/
-#define EXEC_ON_COORD 0x1
-#define EXEC_ON_DATA_NODES 0x2
-
typedef enum
{
COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */
@@ -72,18 +70,6 @@ typedef struct
} RemoteQuery;
-/*
- * The PGXC plan to execute.
- * In the prototype this will be simple, and queryStepList will
- * contain just one step.
- */
-typedef struct
-{
- int exec_loc_type;
- List *query_step_list; /* List of QuerySteps */
-} Query_Plan;
-
-
/* For handling simple aggregates (no group by present)
* For now, only MAX will be supported.
*/
@@ -154,9 +140,8 @@ extern bool StrictStatementChecking;
/* forbid SELECT even multi-node ORDER BY */
extern bool StrictSelectChecking;
-extern Query_Plan *GetQueryPlan(Node *parsetree, const char *sql_statement,
- List *querytree_list);
-extern void FreeQueryPlan(Query_Plan *query_plan);
+extern PlannedStmt *pgxc_planner(Query *query, int cursorOptions,
+ ParamListInfo boundParams);
extern bool IsHashDistributable(Oid col_type);
#endif /* PGXCPLANNER_H */