diff options
| -rw-r--r-- | src/backend/commands/copy.c | 76 | ||||
| -rw-r--r-- | src/backend/executor/execMain.c | 10 | ||||
| -rw-r--r-- | src/backend/executor/execProcnode.c | 28 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/planner.c | 11 | ||||
| -rw-r--r-- | src/backend/parser/analyze.c | 46 | ||||
| -rw-r--r-- | src/backend/parser/parse_utilcmd.c | 34 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 346 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 525 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 254 | ||||
| -rw-r--r-- | src/backend/tcop/pquery.c | 11 | ||||
| -rw-r--r-- | src/backend/tcop/utility.c | 233 | ||||
| -rw-r--r-- | src/include/commands/copy.h | 5 | ||||
| -rw-r--r-- | src/include/nodes/parsenodes.h | 5 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 8 | ||||
| -rw-r--r-- | src/include/pgxc/locator.h | 6 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 23 |
16 files changed, 869 insertions, 752 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 50650190ee..aee1b42fdd 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -179,7 +179,6 @@ typedef struct CopyStateData /* Locator information */ RelationLocInfo *rel_loc; /* the locator key */ int hash_idx; /* index of the hash column */ - bool on_coord; DataNodeHandle **connections; /* Involved data node connections */ #endif @@ -800,31 +799,6 @@ CopyQuoteIdentifier(StringInfo query_buf, char *value) } #endif -#ifdef PGXC -/* - * In case there is no locator info available, copy to/from is launched in portal on coordinator. - * This happens for pg_catalog tables (not user defined ones) - * such as pg_catalog, pg_attribute, etc. - * This part is launched before the portal is activated, so check a first time if there - * some locator data for this relid and if no, return and launch the portal. - */ -bool -IsCoordPortalCopy(const CopyStmt *stmt) -{ - RelationLocInfo *rel_loc; /* the locator key */ - - /* In the case of a COPY SELECT, this is launched on datanodes */ - if(!stmt->relation) - return false; - - rel_loc = GetRelationLocInfo(RangeVarGetRelid(stmt->relation, true)); - - if (!rel_loc) - return true; - - return false; -} -#endif /* * DoCopy executes the SQL COPY statement @@ -857,11 +831,7 @@ IsCoordPortalCopy(const CopyStmt *stmt) * the table or the specifically requested columns. */ uint64 -#ifdef PGXC -DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal) -#else DoCopy(const CopyStmt *stmt, const char *queryString) -#endif { CopyState cstate; bool is_from = stmt->is_from; @@ -883,16 +853,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString) /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); -#ifdef PGXC - /* - * Copy to/from is initialized as being launched on datanodes - * This functionnality is particularly interesting to have a result for - * tables who have no locator informations such as pg_catalog, pg_class, - * and pg_attribute. - */ - cstate->on_coord = false; -#endif - /* Extract options from the statement node tree */ foreach(option, stmt->options) { @@ -1180,13 +1140,15 @@ DoCopy(const CopyStmt *stmt, const char *queryString) exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); + /* + * If target table does not exists on nodes (e.g. system table) + * the location info returned is NULL. This is the criteria, when + * we need to run Copy on coordinator + */ cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); - if (exec_on_coord_portal) - cstate->on_coord = true; - hash_att = GetRelationHashColumn(cstate->rel_loc); - if (!cstate->on_coord) + if (cstate->rel_loc) { if (is_from || hash_att) exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList); @@ -1481,7 +1443,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) * In the case of CopyOut, it is just necessary to pick up one node randomly. * This is done when rel_loc is found. */ - if (!cstate->on_coord) + if (cstate->rel_loc) { cstate->connections = DataNodeCopyBegin(cstate->query_buf.data, exec_nodes->nodelist, @@ -1506,7 +1468,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) } PG_CATCH(); { - if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord) + if (IS_PGXC_COORDINATOR && is_from && cstate->rel_loc) { DataNodeCopyFinish( cstate->connections, @@ -1519,18 +1481,13 @@ DoCopy(const CopyStmt *stmt, const char *queryString) PG_RE_THROW(); } PG_END_TRY(); - if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord) + if (IS_PGXC_COORDINATOR && is_from && cstate->rel_loc) { - if (cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED) - cstate->processed = DataNodeCopyFinish( - cstate->connections, - primary_data_node, - COMBINE_TYPE_SAME); - else - cstate->processed = DataNodeCopyFinish( - cstate->connections, - 0, - COMBINE_TYPE_SUM); + bool replicated = cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED; + DataNodeCopyFinish( + cstate->connections, + replicated ? primary_data_node : 0, + replicated ? COMBINE_TYPE_SAME : COMBINE_TYPE_SUM); pfree(cstate->connections); pfree(cstate->query_buf.data); FreeRelationLocInfo(cstate->rel_loc); @@ -1770,7 +1727,7 @@ CopyTo(CopyState cstate) } #ifdef PGXC - if (IS_PGXC_COORDINATOR && !cstate->on_coord) + if (IS_PGXC_COORDINATOR && cstate->rel_loc) { cstate->processed = DataNodeCopyOut( GetRelationNodes(cstate->rel_loc, NULL, true), @@ -2480,7 +2437,7 @@ CopyFrom(CopyState cstate) } #ifdef PGXC - if (IS_PGXC_COORDINATOR && !cstate->on_coord) + if (IS_PGXC_COORDINATOR && cstate->rel_loc) { Datum *hash_value = NULL; @@ -2494,6 +2451,7 @@ CopyFrom(CopyState cstate) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("Copy failed on a data node"))); + cstate->processed++; } else { diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 1fddf10bc9..ca0c7b1f56 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -858,6 +858,14 @@ InitPlan(QueryDesc *queryDesc, int eflags) { case CMD_SELECT: case CMD_INSERT: +#ifdef PGXC + /* + * PGXC RemoteQuery do not require ctid junk field, so follow + * standard procedure for UPDATE and DELETE + */ + case CMD_UPDATE: + case CMD_DELETE: +#endif foreach(tlist, plan->targetlist) { TargetEntry *tle = (TargetEntry *) lfirst(tlist); @@ -869,10 +877,12 @@ InitPlan(QueryDesc *queryDesc, int eflags) } } break; +#ifndef PGXC case CMD_UPDATE: case CMD_DELETE: junk_filter_needed = true; break; +#endif default: break; } diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 1b1dd91f2a..f82ce4c97b 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -108,7 +108,9 @@ #include "executor/nodeWindowAgg.h" #include "executor/nodeWorktablescan.h" #include "miscadmin.h" - +#ifdef PGXC +#include "pgxc/execRemote.h" +#endif /* ------------------------------------------------------------------------ * ExecInitNode @@ -286,6 +288,13 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; +#ifdef PGXC + case T_RemoteQuery: + result = (PlanState *) ExecInitRemoteQuery((RemoteQuery *) node, + estate, eflags); + break; +#endif + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); result = NULL; /* keep compiler quiet */ @@ -451,6 +460,12 @@ ExecProcNode(PlanState *node) result = ExecLimit((LimitState *) node); break; +#ifdef PGXC + case T_RemoteQueryState: + result = ExecRemoteQuery((RemoteQueryState *) node); + break; +#endif + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); result = NULL; @@ -627,6 +642,11 @@ ExecCountSlotsNode(Plan *node) case T_Limit: return ExecCountSlotsLimit((Limit *) node); +#ifdef PGXC + case T_RemoteQuery: + return ExecCountSlotsRemoteQuery((RemoteQuery *) node); +#endif + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); break; @@ -783,6 +803,12 @@ ExecEndNode(PlanState *node) ExecEndLimit((LimitState *) node); break; +#ifdef PGXC + case T_RemoteQueryState: + ExecEndRemoteQuery((RemoteQueryState *) node); + break; +#endif + default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); break; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 3f344b3a14..7d41461096 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -38,6 +38,10 @@ #include "parser/parse_expr.h" #include "parser/parse_oper.h" #include "parser/parsetree.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#include "pgxc/planner.h" +#endif #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -119,7 +123,12 @@ planner(Query *parse, int cursorOptions, ParamListInfo boundParams) if (planner_hook) result = (*planner_hook) (parse, cursorOptions, boundParams); else - result = standard_planner(parse, cursorOptions, boundParams); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + result = pgxc_planner(parse, cursorOptions, boundParams); + else +#endif + result = standard_planner(parse, cursorOptions, boundParams); return result; } diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 97c560b309..0fe392e99e 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -39,6 +39,11 @@ #include "parser/parse_target.h" #include "parser/parsetree.h" #include "rewrite/rewriteManip.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#include "pgxc/planner.h" +#include "tcop/tcopprot.h" +#endif #include "utils/rel.h" @@ -58,6 +63,10 @@ static Query *transformDeclareCursorStmt(ParseState *pstate, DeclareCursorStmt *stmt); static Query *transformExplainStmt(ParseState *pstate, ExplainStmt *stmt); +#ifdef PGXC +static Query *transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt); +#endif + static void transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc); static bool check_parameter_resolution_walker(Node *node, ParseState *pstate); @@ -199,6 +208,13 @@ transformStmt(ParseState *pstate, Node *parseTree) (ExplainStmt *) parseTree); break; +#ifdef PGXC + case T_ExecDirectStmt: + result = transformExecDirectStmt(pstate, + (ExecDirectStmt *) parseTree); + break; +#endif + default: /* @@ -263,6 +279,17 @@ analyze_requires_snapshot(Node *parseTree) result = true; break; +#ifdef PGXC + case T_ExecDirectStmt: + + /* + * We will parse/analyze/plan inner query, which probably will + * need a snapshot. Ensure it is set. + */ + result = true; + break; +#endif + default: /* utility statements don't have any active parse analysis */ result = false; @@ -1925,6 +1952,25 @@ transformExplainStmt(ParseState *pstate, ExplainStmt *stmt) return result; } +#ifdef PGXC +/* + * transformExecDirectStmt - + * transform an EXECUTE DIRECT Statement + * + * Handling is depends if we should execute on nodes or on coordinator. + * To execute on nodes we return CMD_UTILITY query having one T_RemoteQuery node + * with the inner statement as a sql_command. + * If statement is to run on coordinator we should parse inner statement and + * analyze resulting query tree. + */ +static Query * +transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Support for EXECUTE DIRECT is temporary broken"))); +} +#endif /* exported so planner can check again after rewriting, query pullup, etc */ void diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index b0488d88c5..d7a932e781 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -52,6 +52,7 @@ #ifdef PGXC #include "pgxc/locator.h" #include "pgxc/pgxc.h" +#include "pgxc/planner.h" #endif #include "rewrite/rewriteManip.h" @@ -261,9 +262,9 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) result = list_concat(result, save_alist); #ifdef PGXC - /* - * If the user did not specify any distribution clause and there is no - * inherits clause, try and use PK or unique index + /* + * If the user did not specify any distribution clause and there is no + * inherits clause, try and use PK or unique index */ if (!stmt->distributeby && !stmt->inhRelations && cxt.fallback_dist_col) { @@ -271,6 +272,13 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) stmt->distributeby->disttype = DISTTYPE_HASH; stmt->distributeby->colname = cxt.fallback_dist_col; } + if (IS_PGXC_COORDINATOR) + { + RemoteQuery *step = makeNode(RemoteQuery); + step->combine_type = COMBINE_TYPE_SAME; + step->sql_statement = queryString; + result = lappend(result, step); + } #endif return result; } @@ -1171,7 +1179,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) { if (cxt->distributeby) isLocalSafe = CheckLocalIndexColumn ( - ConvertToLocatorType(cxt->distributeby->disttype), + ConvertToLocatorType(cxt->distributeby->disttype), cxt->distributeby->colname, key); } #endif @@ -1273,7 +1281,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) { /* * Set fallback distribution column. - * If not set, set it to first column in index. + * If not set, set it to first column in index. * If primary key, we prefer that over a unique constraint. */ if (index->indexParams == NIL @@ -1281,7 +1289,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) { cxt->fallback_dist_col = pstrdup(key); } - + /* Existing table, check if it is safe */ if (!cxt->distributeby && !isLocalSafe) isLocalSafe = CheckLocalIndexColumn ( @@ -1299,7 +1307,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) index->indexParams = lappend(index->indexParams, iparam); } #ifdef PGXC - if (IS_PGXC_COORDINATOR && cxt->distributeby + if (IS_PGXC_COORDINATOR && cxt->distributeby && cxt->distributeby->disttype == DISTTYPE_HASH && !isLocalSafe) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), @@ -1618,7 +1626,7 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString, ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("Rule may not use NOTIFY, it is not yet supported"))); - + #endif /* * Since outer ParseState isn't parent of inner, have to pass down @@ -1956,7 +1964,15 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) result = lappend(cxt.blist, stmt); result = list_concat(result, cxt.alist); result = list_concat(result, save_alist); - +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + RemoteQuery *step = makeNode(RemoteQuery); + step->combine_type = COMBINE_TYPE_SAME; + step->sql_statement = queryString; + result = lappend(result, step); + } +#endif return result; } diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 002e710bd6..1dcfc2943a 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -25,6 +25,7 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "optimizer/clauses.h" +#include "optimizer/planner.h" #include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" @@ -116,7 +117,7 @@ typedef struct ColumnBase */ typedef struct XCWalkerContext { - Query *query; + Query *query; bool isRead; Exec_Nodes *exec_nodes; /* resulting execution nodes */ Special_Conditions *conditions; @@ -125,6 +126,7 @@ typedef struct XCWalkerContext int varno; bool within_or; bool within_not; + bool exec_on_coord; /* fallback to standard planner to have plan executed on coordinator only */ List *join_list; /* A list of List*'s, one for each relation. */ } XCWalkerContext; @@ -971,6 +973,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* just pg_catalog tables */ context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->exec_on_coord = true; return false; } @@ -1087,6 +1090,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) { context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; + context->exec_on_coord = true; return false; } @@ -1253,7 +1257,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) static Exec_Nodes * get_plan_nodes(Query *query, bool isRead) { - Exec_Nodes *result_nodes; + Exec_Nodes *result_nodes = NULL; XCWalkerContext context; @@ -1267,13 +1271,16 @@ get_plan_nodes(Query *query, bool isRead) context.varno = 0; context.within_or = false; context.within_not = false; + context.exec_on_coord = false; context.join_list = NIL; - if (get_plan_nodes_walker((Node *) query, &context)) - result_nodes = NULL; - else + if (!get_plan_nodes_walker((Node *) query, &context)) result_nodes = context.exec_nodes; - + if (context.exec_on_coord && result_nodes) + { + pfree(result_nodes); + result_nodes = NULL; + } free_special_relations(context.conditions); free_join_list(context.join_list); return result_nodes; @@ -1976,68 +1983,89 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step) * For the prototype, there will only be one step, * and the nodelist will be NULL if it is not a PGXC-safe statement. */ -Query_Plan * -GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) +PlannedStmt * +pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) { - Query_Plan *query_plan = palloc(sizeof(Query_Plan)); + /* + * We waste some time invoking standard planner, but getting good enough + * PlannedStmt, we just need to replace standard plan. + * In future we may want to skip the standard_planner invocation and + * initialize the PlannedStmt here. At the moment not all queries works: + * ex. there was a problem with INSERT into a subset of table columns + */ + PlannedStmt *result = standard_planner(query, cursorOptions, boundParams); + Plan *standardPlan = result->planTree; RemoteQuery *query_step = makeNode(RemoteQuery); - Query *query; - query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1); - strcpy(query_step->sql_statement, sql_statement); + query_step->sql_statement = pstrdup(query->sql_statement); query_step->exec_nodes = NULL; query_step->combine_type = COMBINE_TYPE_NONE; query_step->simple_aggregates = NULL; - query_step->read_only = false; + /* Optimize multi-node handling */ + query_step->read_only = query->nodeTag == T_SelectStmt; query_step->force_autocommit = false; - query_plan->query_step_list = lappend(NULL, query_step); + result->planTree = (Plan *) query_step; /* * Determine where to execute the command, either at the Coordinator * level, Data Nodes, or both. By default we choose both. We should be * able to quickly expand this for more commands. */ - switch (nodeTag(parsetree)) + switch (query->nodeTag) { case T_SelectStmt: - /* Optimize multi-node handling */ - query_step->read_only = true; + /* Perform some checks to make sure we can support the statement */ + if (query->intoClause) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("INTO clause not yet supported")))); + + if (query->setOperations) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("UNION, INTERSECT and EXCEPT are not yet supported")))); + + if (query->hasRecursive) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("WITH RECURSIVE not yet supported")))); + + if (query->hasWindowFuncs) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Window functions not yet supported")))); /* fallthru */ case T_InsertStmt: case T_UpdateStmt: case T_DeleteStmt: - /* just use first one in querytree_list */ - query = (Query *) linitial(querytree_list); - /* should copy instead ? */ - query_step->plan.targetlist = query->targetList; + query_step->exec_nodes = get_plan_nodes_command(query); - /* Perform some checks to make sure we can support the statement */ - if (nodeTag(parsetree) == T_SelectStmt) + if (query_step->exec_nodes == NULL) { - if (query->intoClause) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("INTO clause not yet supported")))); - - if (query->setOperations) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("UNION, INTERSECT and EXCEPT are not yet supported")))); - - if (query->hasRecursive) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("WITH RECURSIVE not yet supported")))); - - if (query->hasWindowFuncs) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Window functions not yet supported")))); + /* + * Processing guery against catalog tables, restore + * standard plan + */ + result->planTree = standardPlan; + return result; } - query_step->exec_nodes = - get_plan_nodes_command(query); + /* + * PGXCTODO + * When Postgres runs insert into t (a) values (1); against table + * defined as create table t (a int, b int); the plan is looking + * like insert into t (a,b) values (1,null); + * Later executor is verifying plan, to make sure table has not + * been altered since plan has been created and comparing table + * definition with plan target list and output error if they do + * not match. + * I could not find better way to generate targetList for pgxc plan + * then call standard planner and take targetList from the plan + * generated by Postgres. + */ + query_step->plan.targetlist = standardPlan->targetlist; + if (query_step->exec_nodes) query_step->combine_type = get_plan_combine_type( query, query_step->exec_nodes->baselocatortype); @@ -2047,37 +2075,9 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) query_step->simple_aggregates = get_simple_aggregates(query); /* - * See if it is a SELECT with no relations, like SELECT 1+1 or - * SELECT nextval('fred'), and just use coord. - */ - if (query_step->exec_nodes == NULL - && (query->jointree->fromlist == NULL - || query->jointree->fromlist->length == 0)) - /* Just execute it on Coordinator */ - query_plan->exec_loc_type = EXEC_ON_COORD; - else - { - if (query_step->exec_nodes != NULL - && query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_PGCATALOG) - { - /* pg_catalog query, run on coordinator */ - query_plan->exec_loc_type = EXEC_ON_COORD; - } - else - { - query_plan->exec_loc_type = EXEC_ON_DATA_NODES; - - /* If node list is NULL, execute on coordinator */ - if (!query_step->exec_nodes) - query_plan->exec_loc_type = EXEC_ON_COORD; - } - } - - /* * Add sortring to the step */ - if (query_plan->exec_loc_type == EXEC_ON_DATA_NODES && - list_length(query_step->exec_nodes->nodelist) > 1 && + if (list_length(query_step->exec_nodes->nodelist) > 1 && (query->sortClause || query->distinctClause)) make_simple_sort_from_sortclauses(query, query_step); @@ -2090,7 +2090,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) * Check if we have multiple nodes and an unsupported clause. This * is temporary until we expand supported SQL */ - if (nodeTag(parsetree) == T_SelectStmt) + if (query->nodeTag == T_SelectStmt) { if (StrictStatementChecking && query_step->exec_nodes && list_length(query_step->exec_nodes->nodelist) > 1) @@ -2110,180 +2110,6 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) } } break; - - /* Statements that we only want to execute on the Coordinator */ - case T_VariableShowStmt: - query_plan->exec_loc_type = EXEC_ON_COORD; - break; - - /* - * Statements that need to run in autocommit mode, on Coordinator - * and Data Nodes with suppressed implicit two phase commit. - */ - case T_CheckPointStmt: - case T_ClusterStmt: - case T_CreatedbStmt: - case T_DropdbStmt: - case T_VacuumStmt: - query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - query_step->force_autocommit = true; - break; - - case T_DropPropertyStmt: - /* - * Triggers are not yet supported by PGXC - * all other queries are executed on both Coordinator and Datanode - * On the same point, assert also is not supported - */ - if (((DropPropertyStmt *)parsetree)->removeType == OBJECT_TRIGGER) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("This command is not yet supported.")))); - else - query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - break; - - case T_CreateStmt: - if (((CreateStmt *)parsetree)->relation->istemp) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Temp tables are not yet supported.")))); - - query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - break; - - /* - * Statements that we execute on both the Coordinator and Data Nodes - */ - case T_AlterDatabaseStmt: - case T_AlterDatabaseSetStmt: - case T_AlterDomainStmt: - case T_AlterFdwStmt: - case T_AlterForeignServerStmt: - case T_AlterFunctionStmt: - case T_AlterObjectSchemaStmt: - case T_AlterOpFamilyStmt: - case T_AlterSeqStmt: - case T_AlterTableStmt: /* Can also be used to rename a sequence */ - case T_AlterTSConfigurationStmt: - case T_AlterTSDictionaryStmt: - case T_ClosePortalStmt: /* In case CLOSE ALL is issued */ - case T_CommentStmt: - case T_CompositeTypeStmt: - case T_ConstraintsSetStmt: - case T_CreateCastStmt: - case T_CreateConversionStmt: - case T_CreateDomainStmt: - case T_CreateEnumStmt: - case T_CreateFdwStmt: - case T_CreateForeignServerStmt: - case T_CreateFunctionStmt: /* Only global functions are supported */ - case T_CreateOpClassStmt: - case T_CreateOpFamilyStmt: - case T_CreatePLangStmt: - case T_CreateSeqStmt: - case T_CreateSchemaStmt: - case T_DeallocateStmt: /* Allow for DEALLOCATE ALL */ - case T_DiscardStmt: - case T_DropCastStmt: - case T_DropFdwStmt: - case T_DropForeignServerStmt: - case T_DropPLangStmt: - case T_DropStmt: - case T_IndexStmt: - case T_LockStmt: - case T_ReindexStmt: - case T_RemoveFuncStmt: - case T_RemoveOpClassStmt: - case T_RemoveOpFamilyStmt: - case T_RenameStmt: - case T_RuleStmt: - case T_TruncateStmt: - case T_VariableSetStmt: - case T_ViewStmt: - - /* - * Also support these, should help later with pg_restore, although - * not very useful because of the pooler using the same user - */ - case T_GrantStmt: - case T_GrantRoleStmt: - case T_CreateRoleStmt: - case T_AlterRoleStmt: - case T_AlterRoleSetStmt: - case T_AlterUserMappingStmt: - case T_CreateUserMappingStmt: - case T_DropRoleStmt: - case T_AlterOwnerStmt: - case T_DropOwnedStmt: - case T_DropUserMappingStmt: - case T_ReassignOwnedStmt: - case T_DefineStmt: /* used for aggregates, some types */ - query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - break; - - case T_TransactionStmt: - switch (((TransactionStmt *) parsetree)->kind) - { - case TRANS_STMT_SAVEPOINT: - case TRANS_STMT_RELEASE: - case TRANS_STMT_ROLLBACK_TO: - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("This type of transaction statement not yet supported")))); - break; - - default: - break; /* keep compiler quiet */ - } - query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - break; - - /* - * For now, pick one of the data nodes until we modify real - * planner It will give an approximate idea of what an isolated - * data node will do - */ - case T_ExplainStmt: - if (((ExplainStmt *) parsetree)->analyze) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("ANALYZE with EXPLAIN is currently not supported.")))); - - query_step->exec_nodes = palloc0(sizeof(Exec_Nodes)); - query_step->exec_nodes->nodelist = GetAnyDataNode(); - query_step->exec_nodes->baselocatortype = LOCATOR_TYPE_RROBIN; - query_plan->exec_loc_type = EXEC_ON_DATA_NODES; - break; - - /* - * Trigger queries are not yet supported by PGXC. - * Tablespace queries are also not yet supported. - * Two nodes on the same servers cannot use the same tablespace. - */ - case T_CreateTableSpaceStmt: - case T_CreateTrigStmt: - case T_DropTableSpaceStmt: - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("This command is not yet supported.")))); - break; - - /* - * Other statements we do not yet want to handle. - * By default they would be fobidden, but we list these for reference. - * Note that there is not a 1-1 correspndence between - * SQL command and the T_*Stmt structures. - */ - case T_DeclareCursorStmt: - case T_ExecuteStmt: - case T_FetchStmt: - case T_ListenStmt: - case T_LoadStmt: - case T_NotifyStmt: - case T_PrepareStmt: - case T_UnlistenStmt: - /* fall through */ default: /* Allow for override */ if (StrictStatementChecking) @@ -2291,12 +2117,10 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("This command is not yet supported.")))); else - query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - break; + result->planTree = standardPlan; } - - return query_plan; + return result; } @@ -2321,21 +2145,3 @@ free_query_step(RemoteQuery *query_step) list_free_deep(query_step->simple_aggregates); pfree(query_step); } - -/* - * Free Query_Plan struct - */ -void -FreeQueryPlan(Query_Plan *query_plan) -{ - ListCell *item; - - if (query_plan == NULL) - return; - - foreach(item, query_plan->query_step_list) - free_query_step((RemoteQuery *) lfirst(item)); - - pfree(query_plan->query_step_list); - pfree(query_plan); -} diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index bbedef0f2f..0f16c5143f 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -168,9 +168,7 @@ CreateResponseCombiner(int node_count, CombineType combine_type) combiner->connections = NULL; combiner->conn_count = 0; combiner->combine_type = combine_type; - combiner->dest = NULL; combiner->command_complete_count = 0; - combiner->row_count = 0; combiner->request_type = REQUEST_TYPE_NOT_DEFINED; combiner->tuple_desc = NULL; combiner->description_count = 0; @@ -178,7 +176,6 @@ CreateResponseCombiner(int node_count, CombineType combine_type) combiner->copy_out_count = 0; combiner->errorMessage = NULL; combiner->query_Done = false; - combiner->completionTag = NULL; combiner->msg = NULL; combiner->msglen = 0; combiner->initAggregates = true; @@ -488,7 +485,8 @@ HandleCopyOutComplete(RemoteQueryState *combiner) static void HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len) { - int digits = 0; + int digits = 0; + EState *estate = combiner->ss.ps.state; /* * If we did not receive description we are having rowcount or OK response @@ -496,7 +494,7 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len) if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) combiner->request_type = REQUEST_TYPE_COMMAND; /* Extract rowcount */ - if (combiner->combine_type != COMBINE_TYPE_NONE) + if (combiner->combine_type != COMBINE_TYPE_NONE && estate) { uint64 rowcount; digits = parse_row_count(msg_body, len, &rowcount); @@ -507,7 +505,7 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len) { if (combiner->command_complete_count) { - if (rowcount != combiner->row_count) + if (rowcount != estate->es_processed) /* There is a consistency issue in the database with the replicated table */ ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), @@ -515,37 +513,15 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len) } else /* first result */ - combiner->row_count = rowcount; + estate->es_processed = rowcount; } else - combiner->row_count += rowcount; + estate->es_processed += rowcount; } else combiner->combine_type = COMBINE_TYPE_NONE; } - if (++combiner->command_complete_count == combiner->node_count) - { - if (combiner->completionTag) - { - if (combiner->combine_type == COMBINE_TYPE_NONE) - { - /* ensure we do not go beyond buffer bounds */ - if (len > COMPLETION_TAG_BUFSIZE) - len = COMPLETION_TAG_BUFSIZE; - memcpy(combiner->completionTag, msg_body, len); - } - else - { - /* Truncate msg_body to get base string */ - msg_body[len - digits - 1] = '\0'; - snprintf(combiner->completionTag, - COMPLETION_TAG_BUFSIZE, - "%s" UINT64_FORMAT, - msg_body, - combiner->row_count); - } - } - } + combiner->command_complete_count++; } /* @@ -653,6 +629,9 @@ HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len) (errcode(ERRCODE_DATA_CORRUPTED), errmsg("Unexpected response from the data nodes for 'd' message, current request type %d", combiner->request_type))); + /* count the row */ + combiner->processed++; + /* If there is a copy file, data has to be sent to the local file */ if (combiner->copy_file) /* write data to the copy file */ @@ -881,7 +860,6 @@ ValidateAndResetCombiner(RemoteQueryState *combiner) combiner->command_complete_count = 0; combiner->connections = NULL; combiner->conn_count = 0; - combiner->row_count = 0; combiner->request_type = REQUEST_TYPE_NOT_DEFINED; combiner->tuple_desc = NULL; combiner->description_count = 0; @@ -1106,7 +1084,6 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, } combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); - combiner->dest = None_Receiver; /* Receive responses */ if (data_node_receive_responses(conn_count, connections, timeout, combiner)) @@ -1225,7 +1202,6 @@ data_node_commit(int conn_count, DataNodeHandle ** connections) } combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); - combiner->dest = None_Receiver; /* Receive responses */ if (data_node_receive_responses(conn_count, connections, timeout, combiner)) result = EOF; @@ -1268,10 +1244,7 @@ data_node_commit(int conn_count, DataNodeHandle ** connections) } if (!combiner) - { combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); - combiner->dest = None_Receiver; - } /* Receive responses */ if (data_node_receive_responses(conn_count, connections, timeout, combiner)) result = EOF; @@ -1336,7 +1309,6 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections) } combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); - combiner->dest = None_Receiver; /* Receive responses */ if (data_node_receive_responses(conn_count, connections, timeout, combiner)) return EOF; @@ -1480,7 +1452,6 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ * client runs console or file copy */ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); - combiner->dest = None_Receiver; /* Receive responses */ if (data_node_receive_responses(conn_count, connections, timeout, combiner) @@ -1541,7 +1512,6 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** if (primary_handle->inStart < primary_handle->inEnd) { RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE); - combiner->dest = None_Receiver; handle_response(primary_handle, combiner); if (!ValidateAndCloseCombiner(combiner)) return EOF; @@ -1603,7 +1573,6 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** if (handle->inStart < handle->inEnd) { RemoteQueryState *combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE); - combiner->dest = None_Receiver; handle_response(handle, combiner); if (!ValidateAndCloseCombiner(combiner)) return EOF; @@ -1670,13 +1639,13 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* bool need_tran; List *nodelist; ListCell *nodeitem; - uint64 processed = 0; + uint64 processed; nodelist = exec_nodes->nodelist; need_tran = !autocommit || conn_count > 1; combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM); - combiner->dest = None_Receiver; + combiner->processed = 0; /* If there is an existing file where to copy data, pass it to combiner */ if (copy_file) combiner->copy_file = copy_file; @@ -1712,7 +1681,7 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* } } - processed = combiner->row_count; + processed = combiner->processed; if (!ValidateAndCloseCombiner(combiner)) { @@ -1730,7 +1699,7 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* /* * Finish copy process on all connections */ -uint64 +void DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type) { @@ -1743,7 +1712,6 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, DataNodeHandle *connections[NumDataNodes]; DataNodeHandle *primary_handle = NULL; int conn_count = 0; - uint64 processed; for (i = 0; i < NumDataNodes; i++) { @@ -1786,8 +1754,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, } combiner = CreateResponseCombiner(conn_count + 1, combine_type); - combiner->dest = None_Receiver; - error = data_node_receive_responses(1, &primary_handle, timeout, combiner) || error; + error = (data_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error; } for (i = 0; i < conn_count; i++) @@ -1823,22 +1790,25 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, need_tran = !autocommit || primary_handle || conn_count > 1; if (!combiner) - { combiner = CreateResponseCombiner(conn_count, combine_type); - combiner->dest = None_Receiver; - } error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error; - processed = combiner->row_count; - if (!ValidateAndCloseCombiner(combiner) || error) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Error while running COPY"))); +} - return processed; +#define REMOTE_QUERY_NSLOTS 2 +int +ExecCountSlotsRemoteQuery(RemoteQuery *node) +{ + return ExecCountSlotsNode(outerPlan((Plan *) node)) + + ExecCountSlotsNode(innerPlan((Plan *) node)) + + REMOTE_QUERY_NSLOTS; } + RemoteQueryState * ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) { @@ -1876,6 +1846,9 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); } + if (outerPlan(node)) + outerPlanState(remotestate) = ExecInitNode(outerPlan(node), estate, eflags); + return remotestate; } @@ -1927,6 +1900,83 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) } } +static void +get_exec_connections(Exec_Nodes *exec_nodes, + int *regular_conn_count, + int *total_conn_count, + DataNodeHandle ***connections, + DataNodeHandle ***primaryconnection) +{ + List *nodelist = NIL; + List *primarynode = NIL; + + if (exec_nodes) + { + nodelist = exec_nodes->nodelist; + primarynode = exec_nodes->primarynodelist; + } + + if (list_length(nodelist) == 0) + { + if (primarynode) + *regular_conn_count = NumDataNodes - 1; + else + *regular_conn_count = NumDataNodes; + } + else + { + *regular_conn_count = list_length(nodelist); + } + + *total_conn_count = *regular_conn_count; + + /* Get connection for primary node, if used */ + if (primarynode) + { + *primaryconnection = get_handles(primarynode); + if (!*primaryconnection) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not obtain connection from pool"))); + (*total_conn_count)++; + } + + /* Get other connections (non-primary) */ + *connections = get_handles(nodelist); + if (!*connections) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not obtain connection from pool"))); + +} + +/* + * We would want to run 2PC if current transaction modified more then + * one node. So optimize little bit and do not look further if we + * already have more then one write nodes. + */ +static void +register_write_nodes(int conn_count, DataNodeHandle **connections) +{ + int i, j; + + for (i = 0; i < conn_count && write_node_count < 2; i++) + { + bool found = false; + + for (j = 0; j < write_node_count && !found; j++) + { + if (write_node_list[j] == connections[i]) + found = true; + } + if (!found) + { + /* Add to transaction wide-list */ + write_node_list[write_node_count++] = connections[i]; + } + } +} + /* * Execute step of PGXC plan. * The step specifies a command to be executed on specified nodes. @@ -1950,66 +2000,51 @@ ExecRemoteQuery(RemoteQueryState *node) if (!node->query_Done) { /* First invocation, initialize */ - Exec_Nodes *exec_nodes = step->exec_nodes; bool force_autocommit = step->force_autocommit; bool is_read_only = step->read_only; GlobalTransactionId gxid = InvalidGlobalTransactionId; Snapshot snapshot = GetActiveSnapshot(); DataNodeHandle **connections = NULL; DataNodeHandle **primaryconnection = NULL; - List *nodelist = NIL; - List *primarynode = NIL; int i; - int j; int regular_conn_count; int total_conn_count; bool need_tran; - if (exec_nodes) - { - nodelist = exec_nodes->nodelist; - primarynode = exec_nodes->primarynodelist; - } - - if (list_length(nodelist) == 0) - { - if (primarynode) - regular_conn_count = NumDataNodes - 1; - else - regular_conn_count = NumDataNodes; - } - else + /* + * If coordinator plan is specified execute it first. + * If the plan is returning we are returning these tuples immediately. + * If it is not returning or returned them all by current invocation + * we will go ahead and execute remote query. Then we will never execute + * the outer plan again because node->query_Done flag will be set and + * execution won't get to that place. + */ + if (outerPlanState(node)) { - regular_conn_count = list_length(nodelist); + TupleTableSlot *slot = ExecProcNode(outerPlanState(node)); + if (!TupIsNull(slot)) + return slot; } - total_conn_count = regular_conn_count; - node->node_count = total_conn_count; + get_exec_connections(step->exec_nodes, + ®ular_conn_count, + &total_conn_count, + &connections, + &primaryconnection); - /* Get connection for primary node, if used */ - if (primarynode) - { - primaryconnection = get_handles(primarynode); - if (!primaryconnection) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Could not obtain connection from pool"))); - total_conn_count++; - } - - /* Get other connections (non-primary) */ - connections = get_handles(nodelist); - if (!connections) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Could not obtain connection from pool"))); + /* + * We save only regular connections, at the time we exit the function + * we finish with the primary connection and deal only with regular + * connections on subsequent invocations + */ + node->node_count = regular_conn_count; if (force_autocommit) need_tran = false; else need_tran = !autocommit || total_conn_count > 1; - elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, statement_need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); stat_statement(); if (autocommit) @@ -2019,44 +2054,11 @@ ExecRemoteQuery(RemoteQueryState *node) clear_write_node_list(); } - /* Check status of connections */ - /* - * We would want to run 2PC if current transaction modified more then - * one node. So optimize little bit and do not look further if we - * already have two. - */ - if (!is_read_only && write_node_count < 2) + if (!is_read_only) { - bool found; - if (primaryconnection) - { - found = false; - for (j = 0; j < write_node_count && !found; j++) - { - if (write_node_list[j] == primaryconnection[0]) - found = true; - } - if (!found) - { - /* Add to transaction wide-list */ - write_node_list[write_node_count++] = primaryconnection[0]; - } - } - for (i = 0; i < regular_conn_count && write_node_count < 2; i++) - { - found = false; - for (j = 0; j < write_node_count && !found; j++) - { - if (write_node_list[j] == connections[i]) - found = true; - } - if (!found) - { - /* Add to transaction wide-list */ - write_node_list[write_node_count++] = connections[i]; - } - } + register_write_nodes(1, primaryconnection); + register_write_nodes(regular_conn_count, connections); } gxid = GetCurrentGlobalTransactionId(); @@ -2209,12 +2211,10 @@ ExecRemoteQuery(RemoteQueryState *node) { ExecSetSlotDescriptor(scanslot, node->tuple_desc); /* - * we should send to client not the tuple_desc we just - * received, but tuple_desc from the planner. - * Data node may be sending junk columns for sorting + * Now tuple table slot is responcible for freeing the + * descriptor */ - (*node->dest->rStartup) (node->dest, CMD_SELECT, - resultslot->tts_tupleDescriptor); + node->tuple_desc = NULL; if (step->sort) { SimpleSort *sort = step->sort; @@ -2228,7 +2228,7 @@ ExecRemoteQuery(RemoteQueryState *node) * be initialized */ node->tuplesortstate = tuplesort_begin_merge( - node->tuple_desc, + scanslot->tts_tupleDescriptor, sort->numCols, sort->sortColIdx, sort->sortOperators, @@ -2290,7 +2290,6 @@ ExecRemoteQuery(RemoteQueryState *node) } } copy_slot(node, scanslot, resultslot); - (*node->dest->receiveSlot) (resultslot, node->dest); break; } if (!have_tuple) @@ -2310,12 +2309,26 @@ ExecRemoteQuery(RemoteQueryState *node) { if (node->simple_aggregates) { - /* - * Advance aggregate functions and allow to read up next - * data row message and get tuple in the same slot on - * next iteration - */ - exec_simple_aggregates(node, scanslot); + if (node->simple_aggregates) + { + /* + * Advance aggregate functions and allow to read up next + * data row message and get tuple in the same slot on + * next iteration + */ + exec_simple_aggregates(node, scanslot); + } + else + { + /* + * Receive current slot and read up next data row + * message before exiting the loop. Next time when this + * function is invoked we will have either data row + * message ready or EOF + */ + copy_slot(node, scanslot, resultslot); + have_tuple = true; + } } else { @@ -2326,7 +2339,6 @@ ExecRemoteQuery(RemoteQueryState *node) * message ready or EOF */ copy_slot(node, scanslot, resultslot); - (*node->dest->receiveSlot) (resultslot, node->dest); have_tuple = true; } } @@ -2380,10 +2392,7 @@ ExecRemoteQuery(RemoteQueryState *node) { finish_simple_aggregates(node, resultslot); if (!TupIsNull(resultslot)) - { - (*node->dest->receiveSlot) (resultslot, node->dest); have_tuple = true; - } } if (!have_tuple) /* report end of scan */ @@ -2405,12 +2414,234 @@ ExecRemoteQuery(RemoteQueryState *node) void ExecEndRemoteQuery(RemoteQueryState *node) { - (*node->dest->rShutdown) (node->dest); + /* + * Release tuplesort resources + */ + if (node->tuplesortstate != NULL) + tuplesort_end((Tuplesortstate *) node->tuplesortstate); + node->tuplesortstate = NULL; + + /* + * shut down the subplan + */ + if (outerPlanState(node)) + ExecEndNode(outerPlanState(node)); + if (node->tmp_ctx) MemoryContextDelete(node->tmp_ctx); + CloseCombiner(node); } +/* + * Execute utility statement on multiple data nodes + * It does approximately the same as + * + * RemoteQueryState *state = ExecInitRemoteQuery(plan, estate, flags); + * Assert(TupIsNull(ExecRemoteQuery(state)); + * ExecEndRemoteQuery(state) + * + * But does not need an Estate instance and does not do some unnecessary work, + * like allocating tuple slots. + */ +void +ExecRemoteUtility(RemoteQuery *node) +{ + RemoteQueryState *remotestate; + bool force_autocommit = node->force_autocommit; + bool is_read_only = node->read_only; + GlobalTransactionId gxid = InvalidGlobalTransactionId; + Snapshot snapshot = GetActiveSnapshot(); + DataNodeHandle **connections = NULL; + DataNodeHandle **primaryconnection = NULL; + int regular_conn_count; + int total_conn_count; + bool need_tran; + int i; + + remotestate = CreateResponseCombiner(0, node->combine_type); + + get_exec_connections(node->exec_nodes, + ®ular_conn_count, + &total_conn_count, + &connections, + &primaryconnection); + + if (force_autocommit) + need_tran = false; + else + need_tran = !autocommit || total_conn_count > 1; + + if (!is_read_only) + { + if (primaryconnection) + register_write_nodes(1, primaryconnection); + register_write_nodes(regular_conn_count, connections); + } + + gxid = GetCurrentGlobalTransactionId(); + if (!GlobalTransactionIdIsValid(gxid)) + { + if (primaryconnection) + pfree(primaryconnection); + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to get next transaction ID"))); + } + + if (need_tran) + { + /* + * Check if data node connections are in transaction and start + * transactions on nodes where it is not started + */ + DataNodeHandle *new_connections[total_conn_count]; + int new_count = 0; + + if (primaryconnection && primaryconnection[0]->transaction_status != 'T') + new_connections[new_count++] = primaryconnection[0]; + for (i = 0; i < regular_conn_count; i++) + if (connections[i]->transaction_status != 'T') + new_connections[new_count++] = connections[i]; + + if (new_count) + data_node_begin(new_count, new_connections, gxid); + } + + /* See if we have a primary nodes, execute on it first before the others */ + if (primaryconnection) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid)) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot)) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (data_node_send_query(primaryconnection[0], node->sql_statement) != 0) + { + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + + Assert(remotestate->combine_type == COMBINE_TYPE_SAME); + + while (remotestate->command_complete_count < 1) + { + PG_TRY(); + { + data_node_receive(1, primaryconnection, NULL); + while (handle_response(primaryconnection[0], remotestate) == RESPONSE_EOF) + data_node_receive(1, primaryconnection, NULL); + if (remotestate->errorMessage) + { + char *code = remotestate->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", remotestate->errorMessage))); + } + } + /* If we got an error response return immediately */ + PG_CATCH(); + { + pfree(primaryconnection); + pfree(connections); + PG_RE_THROW(); + } + PG_END_TRY(); + } + pfree(primaryconnection); + } + + for (i = 0; i < regular_conn_count; i++) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(connections[i], gxid)) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (snapshot && data_node_send_snapshot(connections[i], snapshot)) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + if (data_node_send_query(connections[i], node->sql_statement) != 0) + { + pfree(connections); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } + } + + /* + * Stop if all commands are completed or we got a data row and + * initialized state node for subsequent invocations + */ + while (regular_conn_count > 0) + { + int i = 0; + + data_node_receive(regular_conn_count, connections, NULL); + /* + * Handle input from the data nodes. + * If we got a RESPONSE_DATAROW we can break handling to wrap + * it into a tuple and return. Handling will be continued upon + * subsequent invocations. + * If we got 0, we exclude connection from the list. We do not + * expect more input from it. In case of non-SELECT query we quit + * the loop when all nodes finish their work and send ReadyForQuery + * with empty connections array. + * If we got EOF, move to the next connection, will receive more + * data on the next iteration. + */ + while (i < regular_conn_count) + { + int res = handle_response(connections[i], remotestate); + if (res == RESPONSE_EOF) + { + i++; + } + else if (res == RESPONSE_COMPLETE) + { + if (i < --regular_conn_count) + connections[i] = connections[regular_conn_count]; + } + else if (res == RESPONSE_TUPDESC) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unexpected response from data node"))); + } + else if (res == RESPONSE_DATAROW) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unexpected response from data node"))); + } + } + } +} + /* * Called when the backend is ending. diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index e59c86920e..5aa9e5d97a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -650,6 +650,20 @@ pg_analyze_and_rewrite(Node *parsetree, const char *query_string, */ querytree_list = pg_rewrite_query(query); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + ListCell *lc; + + foreach(lc, querytree_list) + { + Query *query = (Query *) lfirst(lc); + query->sql_statement = pstrdup(query_string); + query->nodeTag = nodeTag(parsetree); + } + } +#endif + TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string); return querytree_list; @@ -900,9 +914,6 @@ exec_simple_query(const char *query_string) DestReceiver *receiver; int16 format; #ifdef PGXC - Query_Plan *query_plan; - RemoteQuery *query_step; - bool exec_on_coord; /* * By default we do not want data nodes to contact GTM directly, @@ -910,9 +921,6 @@ exec_simple_query(const char *query_string) */ if (IS_PGXC_DATANODE) SetForceXidFromGTM(false); - - exec_on_coord = true; - query_plan = NULL; #endif /* @@ -968,131 +976,11 @@ exec_simple_query(const char *query_string) querytree_list = pg_analyze_and_rewrite(parsetree, query_string, NULL, 0); -#ifdef PGXC /* PGXC_COORD */ - if (IS_PGXC_COORDINATOR) - { - if (IsA(parsetree, TransactionStmt)) - pgxc_transaction_stmt(parsetree); - - else if (IsA(parsetree, ExecDirectStmt)) - { - ExecDirectStmt *execdirect = (ExecDirectStmt *) parsetree; - List *inner_parse_tree_list; - - Assert(IS_PGXC_COORDINATOR); - - exec_on_coord = execdirect->coordinator; - - /* - * Switch to appropriate context for constructing parse and - * query trees (these must outlive the execution context). - */ - oldcontext = MemoryContextSwitchTo(MessageContext); - - inner_parse_tree_list = pg_parse_query(execdirect->query); - /* - * we do not support complex commands (expanded to multiple - * parse trees) within EXEC DIRECT - */ - if (list_length(parsetree_list) != 1) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Can not execute %s with EXECUTE DIRECT", - execdirect->query))); - } - parsetree = linitial(inner_parse_tree_list); - - /* - * Set up a snapshot if parse analysis/planning will need - * one. - */ - if (analyze_requires_snapshot(parsetree)) - { - PushActiveSnapshot(GetTransactionSnapshot()); - snapshot_set = true; - } - - querytree_list = pg_analyze_and_rewrite(parsetree, - query_string, - NULL, - 0); - - if (execdirect->nodes) - { - ListCell *lc; - Query *query = (Query *) linitial(querytree_list); - - query_plan = (Query_Plan *) palloc0(sizeof(Query_Plan)); - query_step = makeNode(RemoteQuery); - query_step->plan.targetlist = query->targetList; - query_step->sql_statement = pstrdup(execdirect->query); - query_step->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); - foreach (lc, execdirect->nodes) - { - int node = intVal(lfirst(lc)); - query_step->exec_nodes->nodelist = lappend_int(query_step->exec_nodes->nodelist, node); - } - query_step->combine_type = COMBINE_TYPE_SAME; - - query_plan->query_step_list = lappend(NULL, query_step); - query_plan->exec_loc_type = EXEC_ON_DATA_NODES; - } - - /* Restore context */ - MemoryContextSwitchTo(oldcontext); - - } - else if (IsA(parsetree, CopyStmt)) - { - CopyStmt *copy = (CopyStmt *) parsetree; - uint64 processed; - /* Snapshot is needed for the Copy */ - if (!snapshot_set) - { - PushActiveSnapshot(GetTransactionSnapshot()); - snapshot_set = true; - } - /* - * A check on locator is made in DoCopy to determine if the copy can be launched on - * Datanode or on Coordinator. - * If a table has no locator data, then IsCoordPortalCopy returns false and copy is launched - * on Coordinator instead (e.g., using pg_catalog tables). - * If a table has some locator data (user tables), then copy was launched normally - * in Datanodes - */ - if (!IsCoordPortalCopy(copy)) - { - exec_on_coord = false; - processed = DoCopy(copy, query_string, false); - snprintf(completionTag, COMPLETION_TAG_BUFSIZE, - "COPY " UINT64_FORMAT, processed); - } - else - exec_on_coord = true; - } - else - { - query_plan = GetQueryPlan(parsetree, query_string, querytree_list); - - exec_on_coord = query_plan->exec_loc_type & EXEC_ON_COORD; - } - - /* First execute on the coordinator, if involved (DDL), then data nodes */ - } - - plantree_list = NIL; - if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) -#endif - plantree_list = pg_plan_queries(querytree_list, 0, NULL); + plantree_list = pg_plan_queries(querytree_list, 0, NULL); /* Done with the snapshot used for parsing/planning */ -#ifdef PGXC - /* In PG-XC, hold on to it a bit longer */ -#else if (snapshot_set) PopActiveSnapshot(); -#endif /* If we got a cancel signal in analysis or planning, quit */ CHECK_FOR_INTERRUPTS(); @@ -1102,13 +990,6 @@ exec_simple_query(const char *query_string) /* Force getting Xid from GTM if not autovacuum, but a vacuum */ if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment) SetForceXidFromGTM(true); - - /* - * Create and run Portal only if it is needed. - * In some special cases we have nothing to run at this point - */ - if (plantree_list || query_plan) - { #endif /* @@ -1170,11 +1051,6 @@ exec_simple_query(const char *query_string) */ MemoryContextSwitchTo(oldcontext); -#ifdef PGXC - /* Skip the Portal stuff on coordinator if command only executes on data nodes */ - if ((IS_PGXC_COORDINATOR && exec_on_coord) || IS_PGXC_DATANODE) - { -#endif /* * Run the portal to completion, and then drop it (and the receiver). */ @@ -1185,55 +1061,10 @@ exec_simple_query(const char *query_string) receiver, completionTag); -#ifdef PGXC - } - - /* PGXC_COORD */ - /* If the coordinator ran ok, now run on the data nodes if planned */ - if (IS_PGXC_COORDINATOR) - { - if (query_plan && (query_plan->exec_loc_type & EXEC_ON_DATA_NODES)) - { - RemoteQueryState *state; - TupleTableSlot *slot; - EState *estate = CreateExecutorState(); - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); - query_step = linitial(query_plan->query_step_list); - estate->es_tupleTable = ExecCreateTupleTable(2); - state = ExecInitRemoteQuery(query_step, estate, 0); - state->dest = receiver; - state->completionTag = completionTag; - if (!snapshot_set) - { - PushActiveSnapshot(GetTransactionSnapshot()); - snapshot_set = true; - } - do - { - slot = ExecRemoteQuery(state); - } - while (!TupIsNull(slot)); - - ExecEndRemoteQuery(state); - /* Restore context */ - MemoryContextSwitchTo(oldcontext); - } - - FreeQueryPlan(query_plan); - } -#endif /* PGXC_COORD */ - (*receiver->rDestroy) (receiver); PortalDrop(portal, false); -#ifdef PGXC - } - - if (snapshot_set) - PopActiveSnapshot(); -#endif - if (IsA(parsetree, TransactionStmt)) { /* @@ -1479,6 +1310,19 @@ exec_parse_message(const char *query_string, /* string to execute */ ShowUsage("PARSE ANALYSIS STATISTICS"); querytree_list = pg_rewrite_query(query); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + ListCell *lc; + + foreach(lc, querytree_list) + { + Query *query = (Query *) lfirst(lc); + query->sql_statement = pstrdup(query_string); + query->nodeTag = nodeTag(raw_parse_tree); + } + } +#endif /* * If this is the unnamed statement and it has parameters, defer query @@ -4365,45 +4209,3 @@ log_disconnections(int code, Datum arg) port->user_name, port->database_name, port->remote_host, port->remote_port[0] ? " port=" : "", port->remote_port))); } - - -#ifdef PGXC -/* - * Handle transaction statements in PG-XC - */ -void -pgxc_transaction_stmt (Node *parsetree) -{ - Assert(IS_PGXC_COORDINATOR); - - - /* Handle transaction statements specially */ - if (IsA(parsetree, TransactionStmt)) - { - TransactionStmt *stmt = (TransactionStmt *) parsetree; - - switch (stmt->kind) - { - case TRANS_STMT_BEGIN: - /* - * This does not yet send down a BEGIN, - * we do that "on demand" as data nodes are added - */ - DataNodeBegin(); - break; - - case TRANS_STMT_COMMIT: - DataNodeCommit(); - break; - - case TRANS_STMT_ROLLBACK: - DataNodeRollback(); - break; - - default: - /* Ignore others for prototype */ - break; - } - } -} -#endif /* PGXC */ diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 98716830cd..4e3096acaf 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -286,6 +286,17 @@ ChoosePortalStrategy(List *stmts) } } } +#ifdef PGXC + else if (IsA(stmt, RemoteQuery)) + { + /* + * Let's choose PORTAL_ONE_SELECT for now + * After adding more PGXC functionality we may have more + * sophisticated algorithm of determining portal strategy + */ + return PORTAL_ONE_SELECT; + } +#endif else if (IsA(stmt, PlannedStmt)) { PlannedStmt *pstmt = (PlannedStmt *) stmt; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 0bb4b4645f..3f857282f6 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -58,7 +58,12 @@ #include "utils/syscache.h" #ifdef PGXC +#include "pgxc/locator.h" #include "pgxc/pgxc.h" +#include "pgxc/planner.h" + +static void ExecUtilityStmtOnNodes(const char *queryString, Exec_Nodes *nodes, + bool force_autocommit); #endif @@ -283,6 +288,10 @@ ProcessUtility(Node *parsetree, { ListCell *lc; +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + DataNodeBegin(); +#endif BeginTransactionBlock(); foreach(lc, stmt->options) { @@ -301,6 +310,10 @@ ProcessUtility(Node *parsetree, break; case TRANS_STMT_COMMIT: +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + DataNodeCommit(); +#endif if (!EndTransactionBlock()) { /* report unsuccessful commit in completionTag */ @@ -329,6 +342,10 @@ ProcessUtility(Node *parsetree, break; case TRANS_STMT_ROLLBACK: +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + DataNodeBegin(); +#endif UserAbortTransactionBlock(); break; @@ -406,6 +423,10 @@ ProcessUtility(Node *parsetree, * relation and attribute manipulation */ case T_CreateSchemaStmt: +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif CreateSchemaCommand((CreateSchemaStmt *) parsetree, queryString); break; @@ -416,6 +437,7 @@ ProcessUtility(Node *parsetree, ListCell *l; Oid relOid; + /* PGXC transformCreateStmt also adds T_RemoteQuery node */ /* Run parse analysis ... */ stmts = transformCreateStmt((CreateStmt *) parsetree, queryString); @@ -522,7 +544,6 @@ ProcessUtility(Node *parsetree, case T_DropStmt: { DropStmt *stmt = (DropStmt *) parsetree; - switch (stmt->removeType) { case OBJECT_TABLE: @@ -566,11 +587,31 @@ ProcessUtility(Node *parsetree, (int) stmt->removeType); break; } +#ifdef PGXC + /* + * PGXCTODO + * We may need to check details of the object being dropped and + * run command on correct nodes + */ + if (IS_PGXC_COORDINATOR) + /* sequence exists only on coordinator */ + if (stmt->removeType != OBJECT_SEQUENCE) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif } break; case T_TruncateStmt: ExecuteTruncate((TruncateStmt *) parsetree); +#ifdef PGXC + /* + * PGXCTODO + * We may need to check details of the object being truncated and + * run command on correct nodes + */ + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CommentStmt: @@ -580,11 +621,7 @@ ProcessUtility(Node *parsetree, case T_CopyStmt: { uint64 processed; -#ifdef PGXC - processed = DoCopy((CopyStmt *) parsetree, queryString, true); -#else - processed = DoCopy((CopyStmt *) parsetree, queryString): -#endif + processed = DoCopy((CopyStmt *) parsetree, queryString); if (completionTag) snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT, processed); @@ -608,10 +645,18 @@ ProcessUtility(Node *parsetree, * schema */ case T_RenameStmt: +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif ExecRenameStmt((RenameStmt *) parsetree); break; case T_AlterObjectSchemaStmt: +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif ExecAlterObjectSchemaStmt((AlterObjectSchemaStmt *) parsetree); break; @@ -624,6 +669,7 @@ ProcessUtility(Node *parsetree, List *stmts; ListCell *l; + /* PGXC transformCreateStmt also adds T_RemoteQuery node */ /* Run parse analysis ... */ stmts = transformAlterTableStmt((AlterTableStmt *) parsetree, queryString); @@ -698,6 +744,10 @@ ProcessUtility(Node *parsetree, break; } } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_GrantStmt: @@ -751,6 +801,10 @@ ProcessUtility(Node *parsetree, break; } } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CompositeTypeStmt: /* CREATE TYPE (composite) */ @@ -759,10 +813,18 @@ ProcessUtility(Node *parsetree, DefineCompositeType(stmt->typevar, stmt->coldeflist); } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CreateEnumStmt: /* CREATE TYPE (enum) */ DefineEnum((CreateEnumStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_ViewStmt: /* CREATE VIEW */ @@ -771,10 +833,18 @@ ProcessUtility(Node *parsetree, case T_CreateFunctionStmt: /* CREATE FUNCTION */ CreateFunction((CreateFunctionStmt *) parsetree, queryString); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_AlterFunctionStmt: /* ALTER FUNCTION */ AlterFunction((AlterFunctionStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_IndexStmt: /* CREATE INDEX */ @@ -807,11 +877,20 @@ ProcessUtility(Node *parsetree, false, /* skip_build */ false, /* quiet */ stmt->concurrent); /* concurrent */ +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, + stmt->concurrent); +#endif } break; case T_RuleStmt: /* CREATE RULE */ DefineRule((RuleStmt *) parsetree, queryString); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CreateSeqStmt: @@ -843,19 +922,35 @@ ProcessUtility(Node *parsetree, break; } } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CreatedbStmt: PreventTransactionChain(isTopLevel, "CREATE DATABASE"); createdb((CreatedbStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, true); +#endif break; case T_AlterDatabaseStmt: AlterDatabase((AlterDatabaseStmt *) parsetree, isTopLevel); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_AlterDatabaseSetStmt: AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_DropdbStmt: @@ -865,6 +960,10 @@ ProcessUtility(Node *parsetree, PreventTransactionChain(isTopLevel, "DROP DATABASE"); dropdb(stmt->dbname, stmt->missing_ok); } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, true); +#endif break; /* Query-level asynchronous notification */ @@ -903,23 +1002,51 @@ ProcessUtility(Node *parsetree, /* Allowed names are restricted if you're not superuser */ load_file(stmt->filename, !superuser()); } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_ClusterStmt: cluster((ClusterStmt *) parsetree, isTopLevel); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, true); +#endif break; case T_VacuumStmt: +#ifdef PGXC + /* + * We have to run the command on nodes before coordinator because + * vacuum() pops active snapshot and we can not send it to nodes + */ + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, true); +#endif vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false, isTopLevel); break; case T_ExplainStmt: ExplainQuery((ExplainStmt *) parsetree, queryString, params, dest); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + Exec_Nodes *nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); + nodes->nodelist = GetAnyDataNode(); + ExecUtilityStmtOnNodes(queryString, nodes, false); + } +#endif break; case T_VariableSetStmt: ExecSetVariableStmt((VariableSetStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_VariableShowStmt: @@ -936,6 +1063,10 @@ ProcessUtility(Node *parsetree, case T_CreateTrigStmt: CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, true); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_DropPropertyStmt: @@ -963,14 +1094,26 @@ ProcessUtility(Node *parsetree, break; } } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CreatePLangStmt: CreateProceduralLanguage((CreatePLangStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_DropPLangStmt: DropProceduralLanguage((DropPLangStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; /* @@ -978,6 +1121,10 @@ ProcessUtility(Node *parsetree, */ case T_CreateDomainStmt: DefineDomain((CreateDomainStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; /* @@ -1027,6 +1174,10 @@ ProcessUtility(Node *parsetree, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to do CHECKPOINT"))); RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, true); +#endif break; case T_ReindexStmt: @@ -1059,50 +1210,100 @@ ProcessUtility(Node *parsetree, (int) stmt->kind); break; } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, + stmt->kind == OBJECT_DATABASE); +#endif break; } break; case T_CreateConversionStmt: CreateConversionCommand((CreateConversionStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CreateCastStmt: CreateCast((CreateCastStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_DropCastStmt: DropCast((DropCastStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CreateOpClassStmt: DefineOpClass((CreateOpClassStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_CreateOpFamilyStmt: DefineOpFamily((CreateOpFamilyStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_AlterOpFamilyStmt: AlterOpFamily((AlterOpFamilyStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_RemoveOpClassStmt: RemoveOpClass((RemoveOpClassStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_RemoveOpFamilyStmt: RemoveOpFamily((RemoveOpFamilyStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_AlterTSDictionaryStmt: AlterTSDictionary((AlterTSDictionaryStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; case T_AlterTSConfigurationStmt: AlterTSConfiguration((AlterTSConfigurationStmt *) parsetree); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ExecUtilityStmtOnNodes(queryString, NULL, false); +#endif break; - +#ifdef PGXC + case T_RemoteQuery: + Assert(IS_PGXC_COORDINATOR); + ExecRemoteUtility((RemoteQuery *) parsetree); + break; +#endif default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(parsetree)); @@ -1110,6 +1311,22 @@ ProcessUtility(Node *parsetree, } } +#ifdef PGXC +static void +ExecUtilityStmtOnNodes(const char *queryString, Exec_Nodes *nodes, + bool force_autocommit) +{ + RemoteQuery *step = makeNode(RemoteQuery); + step->combine_type = COMBINE_TYPE_SAME; + step->exec_nodes = nodes; + step->sql_statement = pstrdup(queryString); + step->force_autocommit = force_autocommit; + ExecRemoteUtility(step); + pfree(step->sql_statement); + pfree(step); +} +#endif + /* * UtilityReturnsTuples * Return "true" if this utility statement will send output to the @@ -2076,7 +2293,7 @@ CreateCommandTag(Node *parsetree) } } break; - + case T_ExecDirectStmt: tag = "EXECUTE DIRECT"; break; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 7a3054d90e..7e6edac9be 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -17,12 +17,7 @@ #include "nodes/parsenodes.h" #include "tcop/dest.h" -#ifdef PGXC -extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal); -extern bool IsCoordPortalCopy(const CopyStmt *stmt); -#else extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString); -#endif extern DestReceiver *CreateCopyDestReceiver(void); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e0515ba95d..88ae12a2c6 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -146,6 +146,11 @@ typedef struct Query Node *setOperations; /* set-operation tree if this is top level of * a UNION/INTERSECT/EXCEPT query */ +#ifdef PGXC + /* need this info for PGXC Planner, may be temporary */ + char *sql_statement; /* original query */ + NodeTag nodeTag; /* node tag of top node of parse tree */ +#endif } Query; diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index e9b59ccbd9..b7faa7dd28 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -51,9 +51,7 @@ typedef struct RemoteQueryState int conn_count; /* count of active connections */ int current_conn; /* used to balance load when reading from connections */ CombineType combine_type; /* see CombineType enum */ - DestReceiver *dest; /* output destination */ int command_complete_count; /* count of received CommandComplete messages */ - uint64 row_count; /* how many rows affected by the query */ RequestType request_type; /* see RequestType enum */ TupleDesc tuple_desc; /* tuple descriptor to be referenced by emitted tuples */ int description_count; /* count of received RowDescription messages */ @@ -62,7 +60,6 @@ typedef struct RemoteQueryState char errorCode[5]; /* error code to send back to client */ char *errorMessage; /* error message to send back to client */ bool query_Done; /* query has been sent down to data nodes */ - char *completionTag; /* completion tag to present to caller */ char *msg; /* last data row message */ int msglen; /* length of the data row message */ /* @@ -76,6 +73,7 @@ typedef struct RemoteQueryState FmgrInfo *eqfunctions; /* functions to compare tuples */ MemoryContext tmp_ctx; /* separate context is needed to compare tuples */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ + uint64 processed; /* count of data rows when running CopyOut */ } RemoteQueryState; /* Multinode Executor */ @@ -86,11 +84,13 @@ extern int DataNodeRollback(void); extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); extern uint64 DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file); -extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type); +extern void DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type); +extern int ExecCountSlotsRemoteQuery(RemoteQuery *node); extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags); extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step); extern void ExecEndRemoteQuery(RemoteQueryState *step); +extern void ExecRemoteUtility(RemoteQuery *node); extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner); extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index 5ead756e1c..7ae04746e8 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -44,7 +44,7 @@ typedef struct /* track if tables use pg_catalog */ -typedef enum +typedef enum { TABLE_USAGE_TYPE_NO_TABLE, TABLE_USAGE_TYPE_PGCATALOG, @@ -58,10 +58,10 @@ typedef enum * primarynodelist is for replicated table writes, where to execute first. * If it succeeds, only then should it be executed on nodelist. * primarynodelist should be set to NULL if not doing replicated write operations - */ + */ typedef struct { - List *primarynodelist; + List *primarynodelist; List *nodelist; char baselocatortype; TableUsageType tableusagetype; /* track pg_catalog usage */ diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index 47665b2341..bf8f2242fa 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -17,16 +17,14 @@ #include "fmgr.h" #include "lib/stringinfo.h" +#include "nodes/params.h" +#include "nodes/parsenodes.h" #include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "pgxc/locator.h" #include "tcop/dest.h" -/* for Query_Plan.exec_loc_type can have these OR'ed*/ -#define EXEC_ON_COORD 0x1 -#define EXEC_ON_DATA_NODES 0x2 - typedef enum { COMBINE_TYPE_NONE, /* it is known that no row count, do not parse */ @@ -72,18 +70,6 @@ typedef struct } RemoteQuery; -/* - * The PGXC plan to execute. - * In the prototype this will be simple, and queryStepList will - * contain just one step. - */ -typedef struct -{ - int exec_loc_type; - List *query_step_list; /* List of QuerySteps */ -} Query_Plan; - - /* For handling simple aggregates (no group by present) * For now, only MAX will be supported. */ @@ -154,9 +140,8 @@ extern bool StrictStatementChecking; /* forbid SELECT even multi-node ORDER BY */ extern bool StrictSelectChecking; -extern Query_Plan *GetQueryPlan(Node *parsetree, const char *sql_statement, - List *querytree_list); -extern void FreeQueryPlan(Query_Plan *query_plan); +extern PlannedStmt *pgxc_planner(Query *query, int cursorOptions, + ParamListInfo boundParams); extern bool IsHashDistributable(Oid col_type); #endif /* PGXCPLANNER_H */ |
