diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/commands/explain.c | 31 | ||||
| -rw-r--r-- | src/backend/commands/tablecmds.c | 14 | ||||
| -rw-r--r-- | src/backend/executor/execAmi.c | 8 | ||||
| -rw-r--r-- | src/backend/executor/nodeMaterial.c | 38 | ||||
| -rw-r--r-- | src/backend/optimizer/path/allpaths.c | 20 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/createplan.c | 99 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/setrefs.c | 16 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/subselect.c | 6 | ||||
| -rw-r--r-- | src/backend/optimizer/util/pathnode.c | 22 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 196 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/Makefile | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 64 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/postgresql_fdw.c | 335 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 2 | ||||
| -rw-r--r-- | src/backend/tcop/utility.c | 20 | ||||
| -rw-r--r-- | src/include/optimizer/cost.h | 3 | ||||
| -rw-r--r-- | src/include/optimizer/pathnode.h | 3 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 5 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 3 |
19 files changed, 803 insertions, 84 deletions
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 45e8edf595..b5d0a904d5 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -565,6 +565,11 @@ explain_outNode(StringInfo str, case T_WorkTableScan: pname = "WorkTable Scan"; break; +#ifdef PGXC + case T_RemoteQuery: + pname = "Data Node Scan"; + break; +#endif case T_Material: pname = "Materialize"; break; @@ -668,6 +673,9 @@ explain_outNode(StringInfo str, case T_SeqScan: case T_BitmapHeapScan: case T_TidScan: +#ifdef PGXC + case T_RemoteQuery: +#endif if (((Scan *) plan)->scanrelid > 0) { RangeTblEntry *rte = rt_fetch(((Scan *) plan)->scanrelid, @@ -686,6 +694,26 @@ explain_outNode(StringInfo str, appendStringInfo(str, " %s", quote_identifier(rte->eref->aliasname)); } +#ifdef PGXC + if (IsA(plan, RemoteQuery)) + { + RemoteQuery *remote_query = (RemoteQuery *) plan; + + /* if it is a single-step plan, print out the sql being used */ + if (remote_query->sql_statement) + { + char *realsql = NULL; + realsql = strcasestr(remote_query->sql_statement, "explain"); + if (!realsql) + realsql = remote_query->sql_statement; + else + realsql += 8; /* skip "EXPLAIN" */ + + appendStringInfo(str, " %s", + quote_identifier(realsql)); + } + } +#endif break; case T_BitmapIndexScan: appendStringInfo(str, " on %s", @@ -854,6 +882,9 @@ explain_outNode(StringInfo str, case T_ValuesScan: case T_CteScan: case T_WorkTableScan: +#ifdef PGXC + case T_RemoteQuery: +#endif show_scan_qual(plan->qual, "Filter", ((Scan *) plan)->scanrelid, diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index ff3a1748da..b7df834f28 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -2915,6 +2915,20 @@ ATRewriteTables(List **wqueue) } } +#ifdef PGXC + /* + * In PGXC, do not check the FK constraints on the Coordinator, and just return + * That is because a SELECT is generated whose plan will try and use + * the data nodes. We (currently) do not want to do that on the Coordinator, + * when the command is passed down to the data nodes it will + * peform the check locally. + * This issue was introduced when we added multi-step handling, + * it caused foreign key constraints to fail. + * PGXCTODO - issue for pg_catalog or any other cases? + */ + if (IS_PGXC_COORDINATOR) + return; +#endif /* * Foreign key constraints are checked in a final pass, since (a) it's * generally best to examine each one separately, and (b) it's at least diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index cd23d8d8b6..0f6f81ff6b 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -44,6 +44,9 @@ #include "executor/nodeWindowAgg.h" #include "executor/nodeWorktablescan.h" #include "nodes/nodeFuncs.h" +#ifdef PGXC +#include "pgxc/execRemote.h" +#endif #include "utils/syscache.h" @@ -183,6 +186,11 @@ ExecReScan(PlanState *node, ExprContext *exprCtxt) ExecWorkTableScanReScan((WorkTableScanState *) node, exprCtxt); break; +#ifdef PGXC + case T_RemoteQueryState: + ExecRemoteQueryReScan((RemoteQueryState *) node, exprCtxt); + break; +#endif case T_NestLoopState: ExecReScanNestLoop((NestLoopState *) node, exprCtxt); break; diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c index 817a7e7824..597f6d7452 100644 --- a/src/backend/executor/nodeMaterial.c +++ b/src/backend/executor/nodeMaterial.c @@ -24,6 +24,9 @@ #include "executor/executor.h" #include "executor/nodeMaterial.h" #include "miscadmin.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif /* ---------------------------------------------------------------- * ExecMaterial @@ -56,9 +59,24 @@ ExecMaterial(MaterialState *node) /* * If first time through, and we need a tuplestore, initialize it. */ +#ifdef PGXC + /* + * For PGXC, temporarily always create the storage. + * This allows us to easily use the same connection to + * in multiple steps of the plan. + */ + if ((IS_PGXC_COORDINATOR && tuplestorestate == NULL) + || (IS_PGXC_DATANODE && tuplestorestate == NULL && node->eflags != 0)) +#else if (tuplestorestate == NULL && node->eflags != 0) +#endif { tuplestorestate = tuplestore_begin_heap(true, false, work_mem); +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + /* Note that we will rescan these results */ + node->eflags |= EXEC_FLAG_REWIND; +#endif tuplestore_set_eflags(tuplestorestate, node->eflags); if (node->eflags & EXEC_FLAG_MARK) { @@ -73,6 +91,26 @@ ExecMaterial(MaterialState *node) Assert(ptrno == 1); } node->tuplestorestate = tuplestorestate; + +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + TupleTableSlot *outerslot; + PlanState *outerNode = outerPlanState(node); + + /* We want to always materialize first temporarily in PG-XC */ + while (!node->eof_underlying) + { + outerslot = ExecProcNode(outerNode); + if (TupIsNull(outerslot)) + node->eof_underlying = true; + else + /* Append a copy of the returned tuple to tuplestore. */ + tuplestore_puttupleslot(tuplestorestate, outerslot); + } + tuplestore_rescan(node->tuplestorestate); + } +#endif } /* diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 4a0a1012c0..21581b07fb 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -17,6 +17,7 @@ #include <math.h> +#include "catalog/pg_namespace.h" #include "nodes/nodeFuncs.h" #ifdef OPTIMIZER_DEBUG #include "nodes/print.h" @@ -32,7 +33,11 @@ #include "optimizer/var.h" #include "parser/parse_clause.h" #include "parser/parsetree.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif #include "rewrite/rewriteManip.h" +#include "utils/lsyscache.h" /* These parameters are set by GUC */ @@ -253,6 +258,18 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) * least one dimension of cost or sortedness. */ +#ifdef PGXC + /* + * If we are on the coordinator, we always want to use + * the remote query path unless it is a pg_catalog table. + */ + if (IS_PGXC_COORDINATOR + && get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE) + add_path(rel, create_remotequery_path(root, rel)); + else + { +#endif + /* Consider sequential scan */ add_path(rel, create_seqscan_path(root, rel)); @@ -261,6 +278,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) /* Consider TID scans */ create_tidscan_paths(root, rel); +#ifdef PGXC + } +#endif /* Now find the cheapest of the paths for this rel */ set_cheapest(rel); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index ab07a0dbea..ca9cfbc371 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -32,6 +32,9 @@ #include "optimizer/var.h" #include "parser/parse_clause.h" #include "parser/parsetree.h" +#ifdef PGXC +#include "pgxc/planner.h" +#endif #include "utils/lsyscache.h" @@ -66,6 +69,10 @@ static CteScan *create_ctescan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); static WorkTableScan *create_worktablescan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +#ifdef PGXC +static RemoteQuery *create_remotequery_plan(PlannerInfo *root, Path *best_path, + List *tlist, List *scan_clauses); +#endif static NestLoop *create_nestloop_plan(PlannerInfo *root, NestPath *best_path, Plan *outer_plan, Plan *inner_plan); static MergeJoin *create_mergejoin_plan(PlannerInfo *root, MergePath *best_path, @@ -101,6 +108,10 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual, Index scanrelid, int ctePlanId, int cteParam); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); +#ifdef PGXC +static RemoteQuery *make_remotequery(List *qptlist, RangeTblEntry *rte, + List *qpqual, Index scanrelid); +#endif static BitmapAnd *make_bitmap_and(List *bitmapplans); static BitmapOr *make_bitmap_or(List *bitmapplans); static NestLoop *make_nestloop(List *tlist, @@ -162,6 +173,9 @@ create_plan(PlannerInfo *root, Path *best_path) case T_ValuesScan: case T_CteScan: case T_WorkTableScan: +#ifdef PGXC + case T_RemoteQuery: +#endif plan = create_scan_plan(root, best_path); break; case T_HashJoin: @@ -207,6 +221,9 @@ create_scan_plan(PlannerInfo *root, Path *best_path) List *tlist; List *scan_clauses; Plan *plan; +#ifdef PGXC + Plan *matplan; +#endif /* * For table scans, rather than using the relation targetlist (which is @@ -298,6 +315,23 @@ create_scan_plan(PlannerInfo *root, Path *best_path) scan_clauses); break; +#ifdef PGXC + case T_RemoteQuery: + plan = (Plan *) create_remotequery_plan(root, + best_path, + tlist, + scan_clauses); + + /* + * Insert a materialization plan above this temporarily + * until we better handle multiple steps using the same connection. + */ + matplan = (Plan *) make_material(plan); + copy_plan_costsize(matplan, plan); + matplan->total_cost += cpu_tuple_cost * matplan->plan_rows; + plan = matplan; + break; +#endif default: elog(ERROR, "unrecognized node type: %d", (int) best_path->pathtype); @@ -420,6 +454,9 @@ disuse_physical_tlist(Plan *plan, Path *path) case T_ValuesScan: case T_CteScan: case T_WorkTableScan: +#ifdef PGXC + case T_RemoteQuery: +#endif plan->targetlist = build_relation_tlist(path->parent); break; default: @@ -1544,6 +1581,46 @@ create_worktablescan_plan(PlannerInfo *root, Path *best_path, return scan_plan; } +#ifdef PGXC +/* + * create_remotequery_plan + * Returns a remotequery plan for the base relation scanned by 'best_path' + * with restriction clauses 'scan_clauses' and targetlist 'tlist'. + */ +static RemoteQuery * +create_remotequery_plan(PlannerInfo *root, Path *best_path, + List *tlist, List *scan_clauses) +{ + RemoteQuery *scan_plan; + Index scan_relid = best_path->parent->relid; + RangeTblEntry *rte; + + + Assert(scan_relid > 0); + rte = planner_rt_fetch(scan_relid, root); + Assert(best_path->parent->rtekind == RTE_RELATION); + Assert(rte->rtekind == RTE_RELATION); + + /* Sort clauses into best execution order */ + scan_clauses = order_qual_clauses(root, scan_clauses); + + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ + scan_clauses = extract_actual_clauses(scan_clauses, false); + + scan_plan = make_remotequery(tlist, + rte, + scan_clauses, + scan_relid); + + copy_path_costsize(&scan_plan->scan.plan, best_path); + + /* PGXCTODO - get better estimates */ + scan_plan->scan.plan.plan_rows = 1000; + + return scan_plan; +} +#endif + /***************************************************************************** * @@ -2541,6 +2618,28 @@ make_worktablescan(List *qptlist, return node; } +#ifdef PGXC +static RemoteQuery * +make_remotequery(List *qptlist, + RangeTblEntry *rte, + List *qpqual, + Index scanrelid) +{ + RemoteQuery *node = makeNode(RemoteQuery); + Plan *plan = &node->scan.plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = NULL; + plan->righttree = NULL; + node->scan.scanrelid = scanrelid; + node->read_only = true; + + return node; +} +#endif + Append * make_append(List *appendplans, bool isTarget, List *tlist) { diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 11e14f96c5..cbee7e9e9b 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -22,6 +22,9 @@ #include "optimizer/clauses.h" #include "optimizer/planmain.h" #include "optimizer/tlist.h" +#ifdef PGXC +#include "pgxc/planner.h" +#endif #include "parser/parsetree.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -369,6 +372,19 @@ set_plan_refs(PlannerGlobal *glob, Plan *plan, int rtoffset) fix_scan_list(glob, splan->scan.plan.qual, rtoffset); } break; +#ifdef PGXC + case T_RemoteQuery: + { + RemoteQuery *splan = (RemoteQuery *) plan; + + splan->scan.scanrelid += rtoffset; + splan->scan.plan.targetlist = + fix_scan_list(glob, splan->scan.plan.targetlist, rtoffset); + splan->scan.plan.qual = + fix_scan_list(glob, splan->scan.plan.qual, rtoffset); + } + break; +#endif case T_NestLoop: case T_MergeJoin: case T_HashJoin: diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index cdff123828..3e813c4f71 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -1926,6 +1926,12 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params) bms_add_member(context.paramids, ((WorkTableScan *) plan)->wtParam); break; +#ifdef PGXC + case T_RemoteQuery: + //PGXCTODO + context.paramids = bms_add_members(context.paramids, valid_params); + break; +#endif case T_Append: { diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index b0358cb112..5f1462f4e1 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1310,6 +1310,28 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel) return pathnode; } +#ifdef PGXC +/* + * create_remotequery_path + * Creates a path corresponding to a scan of a remote query, + * returning the pathnode. + */ +Path * +create_remotequery_path(PlannerInfo *root, RelOptInfo *rel) +{ + Path *pathnode = makeNode(Path); + + pathnode->pathtype = T_RemoteQuery; + pathnode->parent = rel; + pathnode->pathkeys = NIL; /* result is always unordered */ + + // PGXCTODO - set cost properly + cost_seqscan(pathnode, root, rel); + + return pathnode; +} +#endif + /* * create_nestloop_path * Creates a pathnode corresponding to a nestloop join between two diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 1dcfc2943a..c8911b7878 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -25,6 +25,7 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "optimizer/clauses.h" +#include "optimizer/planmain.h" #include "optimizer/planner.h" #include "optimizer/tlist.h" #include "parser/parse_agg.h" @@ -141,7 +142,7 @@ bool StrictSelectChecking = false; static Exec_Nodes *get_plan_nodes(Query *query, bool isRead); static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context); static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context); - +static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt); /* * True if both lists contain only one node and are the same @@ -1528,16 +1529,6 @@ get_simple_aggregates(Query * query) simple_agg_list = lappend(simple_agg_list, simple_agg); } - else - { - /* - * PGXCTODO relax this limit after adding GROUP BY support - * then support expressions of aggregates - */ - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Query is not yet supported")))); - } column_pos++; } } @@ -1629,7 +1620,7 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, { List *context; bool useprefix; - List *sub_tlist = step->plan.targetlist; + List *sub_tlist = step->scan.plan.targetlist; ListCell *l; StringInfo buf = makeStringInfo(); char *sql; @@ -1737,7 +1728,7 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step) { List *sortcls = query->sortClause; List *distinctcls = query->distinctClause; - List *sub_tlist = step->plan.targetlist; + List *sub_tlist = step->scan.plan.targetlist; SimpleSort *sort; SimpleDistinct *distinct; ListCell *l; @@ -1978,6 +1969,100 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step) } /* + * Special case optimization. + * Handle LIMIT and OFFSET for single-step queries on multiple nodes. + * + * Return non-zero if we need to fall back to the standard plan. + */ +static int +handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt) +{ + + /* check if no special handling needed */ + if (query_step && query_step->exec_nodes && + list_length(query_step->exec_nodes->nodelist) <= 1) + return 0; + + /* if order by and limit are present, do not optimize yet */ + if ((query->limitCount || query->limitOffset) && query->sortClause) + return 1; + + /* + * Note that query_step->is_single_step is set to true, but + * it is ok even if we add limit here. + * If OFFSET is set, we strip the final offset value and add + * it to the LIMIT passed down. If there is an OFFSET and no + * LIMIT, we just strip off OFFSET. + */ + if (query->limitOffset) + { + int64 newLimit = 0; + char *newpos; + char *pos; + char *limitpos; + char *newQuery; + char *newchar; + char *c; + + pos = NULL; + newpos = NULL; + + if (query->limitCount) + { + for (pos = query_step->sql_statement, newpos = pos; newpos != NULL; ) + { + pos = newpos; + newpos = strcasestr(pos+1, "LIMIT"); + } + limitpos = pos; + + if (IsA(query->limitCount, Const)) + newLimit = DatumGetInt64(((Const *) query->limitCount)->constvalue); + else + return 1; + } + + for (pos = query_step->sql_statement, newpos = pos; newpos != NULL; ) + { + pos = newpos; + newpos = strcasestr(pos+1, "OFFSET"); + } + + if (limitpos && limitpos < pos) + pos = limitpos; + + if (IsA(query->limitOffset, Const)) + newLimit += DatumGetInt64(((Const *) query->limitOffset)->constvalue); + else + return 1; + + if (!pos || pos == query_step->sql_statement) + elog(ERROR, "Could not handle LIMIT/OFFSET"); + + newQuery = (char *) palloc(strlen(query_step->sql_statement)+1); + newchar = newQuery; + + /* copy up until position where we found clause */ + for (c = &query_step->sql_statement[0]; c != pos && *c != '\0'; *newchar++ = *c++); + + if (query->limitCount) + sprintf(newchar, "LIMIT %I64d", newLimit); + else + *newchar = '\0'; + + pfree(query_step->sql_statement); + query_step->sql_statement = newQuery; + } + + /* Now add a limit execution node at the top of the plan */ + plan_stmt->planTree = (Plan *) make_limit(plan_stmt->planTree, + query->limitOffset, query->limitCount, 0, 0); + + return 0; +} + + +/* * Build up a QueryPlan to execute on. * * For the prototype, there will only be one step, @@ -1997,6 +2082,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) Plan *standardPlan = result->planTree; RemoteQuery *query_step = makeNode(RemoteQuery); + query_step->is_single_step = false; query_step->sql_statement = pstrdup(query->sql_statement); query_step->exec_nodes = NULL; query_step->combine_type = COMBINE_TYPE_NONE; @@ -2020,21 +2106,6 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) ereport(ERROR, (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("INTO clause not yet supported")))); - - if (query->setOperations) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("UNION, INTERSECT and EXCEPT are not yet supported")))); - - if (query->hasRecursive) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("WITH RECURSIVE not yet supported")))); - - if (query->hasWindowFuncs) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Window functions not yet supported")))); /* fallthru */ case T_InsertStmt: case T_UpdateStmt: @@ -2043,14 +2114,32 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) if (query_step->exec_nodes == NULL) { + /* Do not yet allow multi-node correlated UPDATE or DELETE */ + if ((query->nodeTag == T_UpdateStmt || query->nodeTag == T_DeleteStmt)) + { + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Complex and correlated UPDATE and DELETE not yet supported")))); + } + /* - * Processing guery against catalog tables, restore - * standard plan + * Processing guery against catalog tables, or multi-step command. + * Restore standard plan */ result->planTree = standardPlan; return result; } + /* Do not yet allow multi-node correlated UPDATE or DELETE */ + if ((query->nodeTag == T_UpdateStmt || query->nodeTag == T_DeleteStmt) + && !query_step->exec_nodes + && list_length(query->rtable) > 1) + { + result->planTree = standardPlan; + return result; + } + + query_step->is_single_step = true; /* * PGXCTODO * When Postgres runs insert into t (a) values (1); against table @@ -2064,7 +2153,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) * then call standard planner and take targetList from the plan * generated by Postgres. */ - query_step->plan.targetlist = standardPlan->targetlist; + query_step->scan.plan.targetlist = standardPlan->targetlist; if (query_step->exec_nodes) query_step->combine_type = get_plan_combine_type( @@ -2075,39 +2164,36 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) query_step->simple_aggregates = get_simple_aggregates(query); /* - * Add sortring to the step + * Add sorting to the step */ if (list_length(query_step->exec_nodes->nodelist) > 1 && (query->sortClause || query->distinctClause)) make_simple_sort_from_sortclauses(query, query_step); - /* - * PG-XC cannot yet support some variations of SQL statements. - * We perform some checks to at least catch common cases - */ + /* Handle LIMIT and OFFSET for single-step queries on multiple nodes*/ + if (handle_limit_offset(query_step, query, result)) + { + /* complicated expressions, just fallback to standard plan */ + result->planTree = standardPlan; + return result; + } + /* + * Use standard plan if we have more than one data node with either + * group by, hasWindowFuncs, or hasRecursive + */ /* - * Check if we have multiple nodes and an unsupported clause. This - * is temporary until we expand supported SQL + * PGXCTODO - this could be improved to check if the first + * group by expression is the partitioning column, in which + * case it is ok to treat as a single step. */ - if (query->nodeTag == T_SelectStmt) + if (query->nodeTag == T_SelectStmt + && query_step->exec_nodes + && list_length(query_step->exec_nodes->nodelist) > 1 + && (query->groupClause || query->hasWindowFuncs || query->hasRecursive)) { - if (StrictStatementChecking && query_step->exec_nodes - && list_length(query_step->exec_nodes->nodelist) > 1) - { - /* - * PGXCTODO - this could be improved to check if the first - * group by expression is the partitioning column - */ - if (query->groupClause) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node GROUP BY not yet supported")))); - if (query->limitCount && StrictSelectChecking) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node LIMIT not yet supported")))); - } + result->planTree = standardPlan; + return result; } break; default: diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile index e8753031c2..c7e950aaa4 100644 --- a/src/backend/pgxc/pool/Makefile +++ b/src/backend/pgxc/pool/Makefile @@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o +OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o postgresql_fdw.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 0f16c5143f..43569e0e75 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -30,6 +30,8 @@ #include "utils/tuplesort.h" #include "utils/snapmgr.h" +extern char *deparseSql(RemoteQueryState *scanstate); + /* * Buffer size does not affect performance significantly, just do not allow * connection buffer grows infinitely @@ -1461,8 +1463,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ { if (need_tran) DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE); - else - if (!PersistentConnections) release_handles(); + else if (!PersistentConnections) + release_handles(); } pfree(connections); @@ -1812,21 +1814,44 @@ ExecCountSlotsRemoteQuery(RemoteQuery *node) RemoteQueryState * ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) { - RemoteQueryState *remotestate; + RemoteQueryState *remotestate; + Relation currentRelation; + remotestate = CreateResponseCombiner(0, node->combine_type); remotestate->ss.ps.plan = (Plan *) node; remotestate->ss.ps.state = estate; remotestate->simple_aggregates = node->simple_aggregates; + remotestate->ss.ps.qual = (List *) + ExecInitExpr((Expr *) node->scan.plan.qual, + (PlanState *) remotestate); + ExecInitResultTupleSlot(estate, &remotestate->ss.ps); - if (node->plan.targetlist) + if (node->scan.plan.targetlist) { - TupleDesc typeInfo = ExecCleanTypeFromTL(node->plan.targetlist, false); + TupleDesc typeInfo = ExecCleanTypeFromTL(node->scan.plan.targetlist, false); ExecSetSlotDescriptor(remotestate->ss.ps.ps_ResultTupleSlot, typeInfo); } ExecInitScanTupleSlot(estate, &remotestate->ss); + + /* + * Initialize scan relation. get the relation object id from the + * relid'th entry in the range table, open that relation and acquire + * appropriate lock on it. + * This is needed for deparseSQL + * We should remove these lines once we plan and deparse earlier. + */ + if (!node->is_single_step) + { + currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid); + remotestate->ss.ss_currentRelation = currentRelation; + ExecAssignScanType(&remotestate->ss, RelationGetDescr(currentRelation)); + } + + remotestate->ss.ps.ps_TupFromTlist = false; + /* * Tuple description for the scan slot will be set on runtime from * a RowDescription message @@ -1991,7 +2016,6 @@ TupleTableSlot * ExecRemoteQuery(RemoteQueryState *node) { RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; - EState *estate = node->ss.ps.state; TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; bool have_tuple = false; @@ -2092,6 +2116,11 @@ ExecRemoteQuery(RemoteQueryState *node) data_node_begin(new_count, new_connections, gxid); } + /* Get the SQL string */ + /* only do if not single step */ + if (!step->is_single_step) + step->sql_statement = deparseSql(node); + /* See if we have a primary nodes, execute on it first before the others */ if (primaryconnection) { @@ -2427,12 +2456,35 @@ ExecEndRemoteQuery(RemoteQueryState *node) if (outerPlanState(node)) ExecEndNode(outerPlanState(node)); + if (node->ss.ss_currentRelation) + ExecCloseScanRelation(node->ss.ss_currentRelation); + if (node->tmp_ctx) MemoryContextDelete(node->tmp_ctx); CloseCombiner(node); } + +/* ---------------------------------------------------------------- + * ExecRemoteQueryReScan + * + * Rescans the relation. + * ---------------------------------------------------------------- + */ +void +ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt) +{ + /* At the moment we materialize results for multi-step queries, + * so no need to support rescan. + // PGXCTODO - rerun Init? + //node->routine->ReOpen(node); + + //ExecScanReScan((ScanState *) node); + */ +} + + /* * Execute utility statement on multiple data nodes * It does approximately the same as diff --git a/src/backend/pgxc/pool/postgresql_fdw.c b/src/backend/pgxc/pool/postgresql_fdw.c new file mode 100644 index 0000000000..9e418bead2 --- /dev/null +++ b/src/backend/pgxc/pool/postgresql_fdw.c @@ -0,0 +1,335 @@ +/*------------------------------------------------------------------------- + * + * postgresql_fdw.c + * foreign-data wrapper for PostgreSQL + * + * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_operator.h" +#include "catalog/pg_proc.h" +#include "funcapi.h" +//#include "libpq-fe.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/nodeFuncs.h" +#include "nodes/makefuncs.h" +#include "optimizer/clauses.h" +#include "parser/scansup.h" +#include "pgxc/execRemote.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +//#include "dblink.h" + +#define DEBUG_FDW + +/* + * WHERE caluse optimization level + */ +#define EVAL_QUAL_LOCAL 0 /* evaluate none in foreign, all in local */ +#define EVAL_QUAL_BOTH 1 /* evaluate some in foreign, all in local */ +#define EVAL_QUAL_FOREIGN 2 /* evaluate some in foreign, rest in local */ + +#define OPTIMIZE_WHERE_CLAUSE EVAL_QUAL_FOREIGN + + + +/* deparse SQL from the request */ +static bool is_immutable_func(Oid funcid); +static bool is_foreign_qual(ExprState *state); +static bool foreign_qual_walker(Node *node, void *context); +char *deparseSql(RemoteQueryState *scanstate); + + +/* + * Check whether the function is IMMUTABLE. + */ +static bool +is_immutable_func(Oid funcid) +{ + HeapTuple tp; + bool isnull; + Datum datum; + + tp = SearchSysCache(PROCOID, ObjectIdGetDatum(funcid), 0, 0, 0); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for function %u", funcid); + +#ifdef DEBUG_FDW + /* print function name and its immutability */ + { + char *proname; + datum = SysCacheGetAttr(PROCOID, tp, Anum_pg_proc_proname, &isnull); + proname = pstrdup(DatumGetName(datum)->data); + elog(DEBUG1, "func %s(%u) is%s immutable", proname, funcid, + (DatumGetChar(datum) == PROVOLATILE_IMMUTABLE) ? "" : " not"); + pfree(proname); + } +#endif + + datum = SysCacheGetAttr(PROCOID, tp, Anum_pg_proc_provolatile, &isnull); + ReleaseSysCache(tp); + + return (DatumGetChar(datum) == PROVOLATILE_IMMUTABLE); +} + +/* + * Check whether the ExprState node should be evaluated in foreign server. + * + * An expression which consists of expressions below will be evaluated in + * the foreign server. + * - constant value + * - variable (foreign table column) + * - external parameter (parameter of prepared statement) + * - array + * - bool expression (AND/OR/NOT) + * - NULL test (IS [NOT] NULL) + * - operator + * - IMMUTABLE only + * - It is required that the meaning of the operator be the same as the + * local server in the foreign server. + * - function + * - IMMUTABLE only + * - It is required that the meaning of the operator be the same as the + * local server in the foreign server. + * - scalar array operator (ANY/ALL) + */ +static bool +is_foreign_qual(ExprState *state) +{ + return !foreign_qual_walker((Node *) state->expr, NULL); +} + +/* + * return true if node cannot be evaluatated in foreign server. + */ +static bool +foreign_qual_walker(Node *node, void *context) +{ + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_Param: + /* TODO: pass internal parameters to the foreign server */ + if (((Param *) node)->paramkind != PARAM_EXTERN) + return true; + break; + case T_DistinctExpr: + case T_OpExpr: + /* + * An operator which uses IMMUTABLE function can be evaluated in + * foreign server . It is not necessary to worry about oprrest + * and oprjoin here because they are invoked by planner but not + * executor. DistinctExpr is a typedef of OpExpr. + */ + if (!is_immutable_func(((OpExpr*) node)->opfuncid)) + return true; + break; + case T_ScalarArrayOpExpr: + if (!is_immutable_func(((ScalarArrayOpExpr*) node)->opfuncid)) + return true; + break; + case T_FuncExpr: + /* IMMUTABLE function can be evaluated in foreign server */ + if (!is_immutable_func(((FuncExpr*) node)->funcid)) + return true; + break; + case T_TargetEntry: + case T_PlaceHolderVar: + case T_AppendRelInfo: + case T_PlaceHolderInfo: + /* TODO: research whether those complex nodes are evaluatable. */ + return true; + default: + break; + } + + return expression_tree_walker(node, foreign_qual_walker, context); +} + +/* + * Deparse SQL string from query request. + * + * The expressions in Plan.qual are deparsed when it satisfies is_foreign_qual() + * and removed. + */ +char * +deparseSql(RemoteQueryState *scanstate) +{ + EState *estate = scanstate->ss.ps.state; + bool prefix; + List *context; + StringInfoData sql; + RemoteQuery *scan; + RangeTblEntry *rte; + Oid nspid; + char *nspname; + char *relname; + const char *nspname_q; + const char *relname_q; + const char *aliasname_q; + int i; + TupleDesc tupdesc; + bool first; + +elog(DEBUG2, "%s(%u) called", __FUNCTION__, __LINE__); + + /* extract RemoteQuery and RangeTblEntry */ + scan = (RemoteQuery *)scanstate->ss.ps.plan; + rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1); + + /* prepare to deparse plan */ + initStringInfo(&sql); + context = deparse_context_for_plan((Node *)scan, NULL, + estate->es_range_table, NULL); + + /* + * Scanning multiple relations in a RemoteQuery node is not supported. + */ + prefix = false; +#if 0 + prefix = list_length(estate->es_range_table) > 1; +#endif + + /* Get quoted names of schema, table and alias */ + nspid = get_rel_namespace(rte->relid); + nspname = get_namespace_name(nspid); + relname = get_rel_name(rte->relid); + nspname_q = quote_identifier(nspname); + relname_q = quote_identifier(relname); + aliasname_q = quote_identifier(rte->eref->aliasname); + + /* deparse SELECT clause */ + appendStringInfo(&sql, "SELECT "); + + /* + * TODO: omit (deparse to "NULL") columns which are not used in the + * original SQL. + * + * We must parse nodes parents of this RemoteQuery node to determine unused + * columns because some columns may be used only in parent Sort/Agg/Limit + * nodes. + */ + tupdesc = scanstate->ss.ss_currentRelation->rd_att; + first = true; + for (i = 0; i < tupdesc->natts; i++) + { + /* skip dropped attributes */ + if (tupdesc->attrs[i]->attisdropped) + continue; + + if (!first) + appendStringInfoString(&sql, ", "); + + if (prefix) + appendStringInfo(&sql, "%s.%s", + aliasname_q, tupdesc->attrs[i]->attname.data); + else + appendStringInfo(&sql, "%s", tupdesc->attrs[i]->attname.data); + first = false; + } + + /* if target list is composed only of system attributes, add dummy column */ + if (first) + appendStringInfo(&sql, "NULL"); + + /* deparse FROM clause */ + appendStringInfo(&sql, " FROM "); + /* + * XXX: should use GENERIC OPTIONS like 'foreign_relname' or something for + * the foreign table name instead of the local name ? + */ + appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q); + pfree(nspname); + pfree(relname); + if (nspname_q != nspname_q) + pfree((char *) nspname_q); + if (relname_q != relname_q) + pfree((char *) relname_q); + if (aliasname_q != rte->eref->aliasname) + pfree((char *) aliasname_q); + + /* + * deparse WHERE cluase + * + * The expressions which satisfy is_foreign_qual() are deparsed into WHERE + * clause of result SQL string, and they could be removed from qual of + * PlanState to avoid duplicate evaluation at ExecScan(). + * + * The Plan.qual is never changed, so multiple use of the Plan with + * PREPARE/EXECUTE work properly. + */ +#if OPTIMIZE_WHERE_CLAUSE > EVAL_QUAL_LOCAL + if (scanstate->ss.ps.plan->qual) + { + List *local_qual = NIL; + List *foreign_qual = NIL; + List *foreign_expr = NIL; + ListCell *lc; + + /* + * Divide qual of PlanState into two lists, one for local evaluation + * and one for foreign evaluation. + */ + foreach (lc, scanstate->ss.ps.qual) + { + ExprState *state = lfirst(lc); + + if (is_foreign_qual(state)) + { + elog(DEBUG1, "foreign qual: %s", nodeToString(state->expr)); + foreign_qual = lappend(foreign_qual, state); + foreign_expr = lappend(foreign_expr, state->expr); + } + else + { + elog(DEBUG1, "local qual: %s", nodeToString(state->expr)); + local_qual = lappend(local_qual, state); + } + } +#if OPTIMIZE_WHERE_CLAUSE == EVAL_QUAL_FOREIGN + /* + * If the optimization level is EVAL_QUAL_FOREIGN, replace the original + * qual with the list of ExprStates which should be evaluated in the + * local server. + */ + scanstate->ss.ps.qual = local_qual; +#endif + + /* + * Deparse quals to be evaluated in the foreign server if any. + * TODO: modify deparse_expression() to deparse conditions which use + * internal parameters. + */ + if (foreign_expr != NIL) + { + Node *node; + node = (Node *) make_ands_explicit(foreign_expr); + appendStringInfo(&sql, " WHERE "); + appendStringInfo(&sql, + deparse_expression(node, context, prefix, false)); + /* + * The contents of the list MUST NOT be free-ed because they are + * referenced from Plan.qual list. + */ + list_free(foreign_expr); + } + } +#endif + + elog(DEBUG1, "deparsed SQL is \"%s\"", sql.data); + + return sql.data; +} + diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 5aa9e5d97a..bcaa8f0ada 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -198,8 +198,6 @@ static void log_disconnections(int code, Datum arg); #ifdef PGXC /* PGXC_DATANODE */ -static void pgxc_transaction_stmt (Node *parsetree); - /* ---------------------------------------------------------------- * PG-XC routines * ---------------------------------------------------------------- diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 3f857282f6..6bc9443ee9 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -287,7 +287,6 @@ ProcessUtility(Node *parsetree, case TRANS_STMT_START: { ListCell *lc; - #ifdef PGXC if (IS_PGXC_COORDINATOR) DataNodeBegin(); @@ -310,10 +309,6 @@ ProcessUtility(Node *parsetree, break; case TRANS_STMT_COMMIT: -#ifdef PGXC - if (IS_PGXC_COORDINATOR) - DataNodeCommit(); -#endif if (!EndTransactionBlock()) { /* report unsuccessful commit in completionTag */ @@ -342,10 +337,6 @@ ProcessUtility(Node *parsetree, break; case TRANS_STMT_ROLLBACK: -#ifdef PGXC - if (IS_PGXC_COORDINATOR) - DataNodeBegin(); -#endif UserAbortTransactionBlock(); break; @@ -1031,21 +1022,16 @@ ProcessUtility(Node *parsetree, case T_ExplainStmt: ExplainQuery((ExplainStmt *) parsetree, queryString, params, dest); -#ifdef PGXC - if (IS_PGXC_COORDINATOR) - { - Exec_Nodes *nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); - nodes->nodelist = GetAnyDataNode(); - ExecUtilityStmtOnNodes(queryString, nodes, false); - } -#endif break; case T_VariableSetStmt: ExecSetVariableStmt((VariableSetStmt *) parsetree); #ifdef PGXC +/* PGXCTODO - this currently causes an assertion failure. + We should change when we add SET handling properly if (IS_PGXC_COORDINATOR) ExecUtilityStmtOnNodes(queryString, NULL, false); +*/ #endif break; diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 343662f6b8..839270f355 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -79,6 +79,9 @@ extern void cost_functionscan(Path *path, PlannerInfo *root, RelOptInfo *baserel); extern void cost_valuesscan(Path *path, PlannerInfo *root, RelOptInfo *baserel); +#ifdef PGXC +extern void cost_remotequery(Path *path, PlannerInfo *root, RelOptInfo *baserel); +#endif extern void cost_ctescan(Path *path, PlannerInfo *root, RelOptInfo *baserel); extern void cost_recursive_union(Plan *runion, Plan *nrterm, Plan *rterm); extern void cost_sort(Path *path, PlannerInfo *root, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index b275b1dd49..eb4cbc07dd 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -56,6 +56,9 @@ extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel); extern Path *create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel); extern Path *create_ctescan_path(PlannerInfo *root, RelOptInfo *rel); extern Path *create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel); +#ifdef PGXC +extern Path *create_remotequery_path(PlannerInfo *root, RelOptInfo *rel); +#endif extern NestPath *create_nestloop_path(PlannerInfo *root, RelOptInfo *joinrel, diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index b7faa7dd28..143c8faae9 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -95,6 +95,7 @@ extern void ExecRemoteUtility(RemoteQuery *node); extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner); extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); -extern int primary_data_node; +extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt); -#endif
\ No newline at end of file +extern int primary_data_node; +#endif diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index bf8f2242fa..346dd65d71 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -58,7 +58,8 @@ typedef struct */ typedef struct { - Plan plan; + Scan scan; + bool is_single_step; /* special case, skip extra work */ char *sql_statement; Exec_Nodes *exec_nodes; CombineType combine_type; |
