diff options
| author | Ashutosh Bapat | 2012-06-29 06:39:37 +0000 |
|---|---|---|
| committer | Ashutosh Bapat | 2012-06-29 06:39:37 +0000 |
| commit | 0b7f51d36adb7c3daf2a52883a6d89caae32e838 (patch) | |
| tree | 02055f252519504cd7baac0a0f008219757f8576 /src | |
| parent | b804d7b35a4c9f2cdce99b49f5cba1effe2755b9 (diff) | |
The function is_foreign_expr() and pgxc_is_query_shippable() were using
different walkers to checking if an expression or a query respectively, is
shippable. The logic to decide whether an expression or query is shippable or
not needs to be same except for few differences between shippability of
expression and query, arising from the information in both of them. If we want
to code a rule for shippability, we need to make it to both of these functions,
becoming a maintenance burden. The commit unifies the walkers into one and makes
appropriate changes in naming.
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/commands/copy.c | 4 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/createplan.c | 103 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 218 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/postgresql_fdw.c | 158 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 59 | ||||
| -rw-r--r-- | src/include/pgxc/postgresql_fdw.h | 10 |
6 files changed, 244 insertions, 308 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 1cb4578410..26d0f553f3 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2461,7 +2461,7 @@ BeginCopyFrom(Relation rel, * the Datanode insert the default values. */ Expr *planned_defexpr = expression_planner((Expr *) defexpr); - if (!is_foreign_expr((Node*)planned_defexpr, NULL)) + if (!pgxc_is_expr_shippable(planned_defexpr, NULL)) { Oid out_func_oid; bool isvarlena; @@ -4472,7 +4472,7 @@ build_copy_statement(CopyState cstate, List *attnamelist, /* Append only if the default expression is not shippable. */ Expr *defexpr = (Expr*) build_column_default(cstate->rel, attnum); if (defexpr && - !is_foreign_expr((Node*)expression_planner(defexpr), NULL)) + !pgxc_is_expr_shippable(expression_planner(defexpr), NULL)) { appendStringInfoString(&cstate->query_buf, ", "); CopyQuoteIdentifier(&cstate->query_buf, diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 39d5bc4e2b..bce367c5b4 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -833,7 +833,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla * If the JOIN ON clause has a local dependency then we cannot ship * the join to the remote side at all, bail out immediately. */ - if (!is_foreign_expr((Node *)nest_parent->join.joinqual, NULL)) + if (!pgxc_is_expr_shippable((Expr *)nest_parent->join.joinqual, NULL)) { elog(DEBUG1, "cannot reduce: local dependencies in the joinqual"); return parent; @@ -845,7 +845,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla * entire list. These local quals will become part of the quals * list of the reduced remote scan node down later. */ - if (!is_foreign_expr((Node *)nest_parent->join.plan.qual, NULL)) + if (!pgxc_is_expr_shippable((Expr *)nest_parent->join.plan.qual, NULL)) { elog(DEBUG1, "local dependencies in the join plan qual"); @@ -863,7 +863,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla * is currentof clause, so keep that information intact and * pass a dummy argument here. */ - if (!is_foreign_expr((Node *)clause, NULL)) + if (!pgxc_is_expr_shippable((Expr *)clause, NULL)) local_scan_clauses = lappend(local_scan_clauses, clause); else remote_scan_clauses = lappend(remote_scan_clauses, clause); @@ -2599,7 +2599,7 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, { Node *clause = lfirst(l); - if (is_foreign_expr(clause, NULL)) + if (pgxc_is_expr_shippable((Expr *)clause, NULL)) remote_scan_clauses = lappend(remote_scan_clauses, clause); else local_scan_clauses = lappend(local_scan_clauses, clause); @@ -6447,7 +6447,6 @@ pgxc_add_node_to_grouping_tlist(List *remote_tlist, Node *expr, Index ressortgro } else { - if (remote_tle->ressortgroupref == 0) remote_tle->ressortgroupref = ressortgroupref; else if (ressortgroupref == 0) @@ -6522,9 +6521,8 @@ pgxc_process_grouping_targetlist(PlannerInfo *root, List **local_tlist) { TargetEntry *local_tle = lfirst(temp); Node *expr = (Node *)local_tle->expr; - foreign_qual_context context; + bool has_aggs; - pgxc_foreign_qual_context_init(&context); /* * If the expression is not Aggref but involves aggregates (has Aggref * nodes in the expression tree, we can not push the entire expression @@ -6542,9 +6540,7 @@ pgxc_process_grouping_targetlist(PlannerInfo *root, List **local_tlist) * SELECT sum(val), val2 FROM tab1 GROUP BY val2; * Notice that, if we include val in the query, it will become invalid. */ - context.collect_vars = true; - - if (!is_foreign_expr(expr, &context)) + if (!pgxc_is_expr_shippable((Expr *)expr, &has_aggs)) { shippable_remote_tlist = false; break; @@ -6554,46 +6550,37 @@ pgxc_process_grouping_targetlist(PlannerInfo *root, List **local_tlist) * We are about to change the local_tlist, check if we have already * copied original local_tlist, if not take a copy */ - if (!orig_local_tlist && (IsA(expr, Aggref) || context.aggs)) + if (!orig_local_tlist && has_aggs) orig_local_tlist = copyObject(*local_tlist); /* - * if there are aggregates involved in the expression, whole expression + * If there are aggregates involved in the expression, whole expression * can not be pushed to the Datanode. Pick up the aggregates and the * VAR nodes not covered by aggregates. */ - if (context.aggs) + if (has_aggs) { - ListCell *lcell; - /* - * if the target list expression is an Aggref, then the context should - * have only one Aggref in the list and no VARs. - */ - Assert(!IsA(expr, Aggref) || - (list_length(context.aggs) == 1 && - linitial(context.aggs) == expr && - !context.vars)); + ListCell *lcell; + List *aggs_n_vars; /* - * this expression is not going to be pushed as whole, thus other + * This expression is not going to be pushed as whole, thus other * clauses won't be able to find out this TLE in the results * obtained from Datanode. Hence can't optimize this query. + * PGXCTODO: with projection support in RemoteQuery node, this + * condition can be worked around, please check. */ if (local_tle->ressortgroupref > 0) { shippable_remote_tlist = false; break; } + + aggs_n_vars = pull_var_clause(expr, PVC_INCLUDE_AGGREGATES, + PVC_RECURSE_PLACEHOLDERS); /* copy the aggregates into the remote target list */ - foreach (lcell, context.aggs) - { - Assert(IsA(lfirst(lcell), Aggref)); - remote_tlist = pgxc_add_node_to_grouping_tlist(remote_tlist, lfirst(lcell), - 0); - } - /* copy the vars into the remote target list */ - foreach (lcell, context.vars) + foreach (lcell, aggs_n_vars) { - Assert(IsA(lfirst(lcell), Var)); + Assert(IsA(lfirst(lcell), Aggref) || IsA(lfirst(lcell), Var)); remote_tlist = pgxc_add_node_to_grouping_tlist(remote_tlist, lfirst(lcell), 0); } @@ -6602,8 +6589,6 @@ pgxc_process_grouping_targetlist(PlannerInfo *root, List **local_tlist) else remote_tlist = pgxc_add_node_to_grouping_tlist(remote_tlist, expr, local_tle->ressortgroupref); - - pgxc_foreign_qual_context_free(&context); } if (!shippable_remote_tlist) @@ -6648,7 +6633,6 @@ pgxc_process_having_clause(PlannerInfo *root, List *remote_tlist, Node *havingQu List **local_qual, List **remote_qual, bool *reduce_plan) { - foreign_qual_context context; List *qual; ListCell *temp; @@ -6674,45 +6658,34 @@ pgxc_process_having_clause(PlannerInfo *root, List *remote_tlist, Node *havingQu qual = copyObject(havingQual); foreach(temp, qual) { - Node *expr = lfirst(temp); - pgxc_foreign_qual_context_init(&context); - if (!is_foreign_expr(expr, &context)) + Node *expr = lfirst(temp); + bool has_aggs; + List *vars_n_aggs; + + if (!pgxc_is_expr_shippable((Expr *)expr, &has_aggs)) { *reduce_plan = false; break; } - if (context.aggs) + if (has_aggs) { - ListCell *lcell; - /* - * if the target list havingQual is an Aggref, then the context should - * have only one Aggref in the list and no VARs. - */ - Assert(!IsA(expr, Aggref) || - (list_length(context.aggs) == 1 && - linitial(context.aggs) == expr && - !context.vars)); - /* copy the aggregates into the remote target list */ - foreach (lcell, context.aggs) - { - Assert(IsA(lfirst(lcell), Aggref)); - remote_tlist = pgxc_add_node_to_grouping_tlist(remote_tlist, lfirst(lcell), - 0); - } - /* copy the vars into the remote target list */ - foreach (lcell, context.vars) - { - Assert(IsA(lfirst(lcell), Var)); - remote_tlist = pgxc_add_node_to_grouping_tlist(remote_tlist, lfirst(lcell), - 0); - } - *local_qual = lappend(*local_qual, expr); + ListCell *lcell; + + /* Pull the aggregates and var nodes from the quals */ + vars_n_aggs = pull_var_clause(expr, PVC_INCLUDE_AGGREGATES, + PVC_RECURSE_PLACEHOLDERS); + /* copy the aggregates into the remote target list */ + foreach (lcell, vars_n_aggs) + { + Assert(IsA(lfirst(lcell), Aggref) || IsA(lfirst(lcell), Var)); + remote_tlist = pgxc_add_node_to_grouping_tlist(remote_tlist, lfirst(lcell), + 0); + } + *local_qual = lappend(*local_qual, expr); } else *remote_qual = lappend(*remote_qual, expr); - - pgxc_foreign_qual_context_free(&context); } if (!(*reduce_plan)) diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 2cb5f64783..cdef850204 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -72,8 +72,7 @@ static PlannedStmt *pgxc_FQS_planner(Query *query, int cursorOptions, ParamListInfo boundParams); static bool pgxc_query_needs_coord(Query *query); static ExecNodes *pgxc_is_query_shippable(Query *query, int query_level); -static bool pgxc_FQS_walker(Node *node, FQS_context *fqs_context); -static void pgxc_FQS_find_datanodes(FQS_context *fqs_context); +static void pgxc_FQS_find_datanodes(Shippability_context *sc_context); static ExecNodes *pgxc_merge_exec_nodes(ExecNodes *exec_nodes1, ExecNodes *exec_nodes2, bool merge_dist_equijoin, @@ -89,8 +88,7 @@ static bool pgxc_qual_hash_dist_equijoin(Relids varnos_1, Relids varnos_2, Oid distcol_type, Node *quals, List *rtable); static bool VarAttrIsPartAttr(Var *var, List *rtable); -static void pgxc_FQS_set_reason(FQS_context *context, FQS_shippability reason); -static bool pgxc_FQS_test_reason(FQS_context *context, FQS_shippability reason); +static void pgxc_set_shippability_reason(Shippability_context *context, ShippabilityStat reason); /* * make_ctid_col_ref @@ -762,23 +760,23 @@ pgxc_query_needs_coord(Query *query) } /* - * Set the given reason in FQS_context indicating why the query can not be + * Set the given reason in Shippability_context indicating why the query can not be * shipped directly to the Datanodes. */ static void -pgxc_FQS_set_reason(FQS_context *context, FQS_shippability reason) +pgxc_set_shippability_reason(Shippability_context *context, ShippabilityStat reason) { - context->fqsc_shippability = bms_add_member(context->fqsc_shippability, reason); + context->sc_shippability = bms_add_member(context->sc_shippability, reason); } /* * See if a given reason is why the query can not be shipped directly * to the Datanodes. */ -static bool -pgxc_FQS_test_reason(FQS_context *context, FQS_shippability reason) +bool +pgxc_test_shippability_reason(Shippability_context *context, ShippabilityStat reason) { - return bms_is_member(reason, context->fqsc_shippability); + return bms_is_member(reason, context->sc_shippability); } /* @@ -794,22 +792,23 @@ pgxc_FQS_test_reason(FQS_context *context, FQS_shippability reason) static ExecNodes * pgxc_is_query_shippable(Query *query, int query_level) { - FQS_context fqs_context; + Shippability_context sc_context; ExecNodes *exec_nodes; bool canShip = true; Bitmapset *shippability; - memset(&fqs_context, 0, sizeof(fqs_context)); + memset(&sc_context, 0, sizeof(sc_context)); /* let's assume that by default query is shippable */ - fqs_context.fqsc_query = query; - fqs_context.fqsc_query_level = query_level; + sc_context.sc_query = query; + sc_context.sc_query_level = query_level; + sc_context.sc_for_expr = false; /* * We might have already decided not to ship the query to the Datanodes, but * still walk it anyway to find out if there are any subqueries which can be * shipped. */ - pgxc_FQS_walker((Node *)query, &fqs_context); + pgxc_shippability_walker((Node *)query, &sc_context); /* * We have merged the nodelists and distributions of all subqueries seen in * the query tree, merge it with the same obtained for the relations @@ -819,14 +818,14 @@ pgxc_is_query_shippable(Query *query, int query_level) * The logic to merge node lists with other distribution * strategy is not clear yet. */ - exec_nodes = fqs_context.fqsc_exec_nodes; + exec_nodes = sc_context.sc_exec_nodes; if (exec_nodes) exec_nodes = pgxc_merge_exec_nodes(exec_nodes, - fqs_context.fqsc_subquery_en, false, + sc_context.sc_subquery_en, false, true); /* - * Look at the information gathered by the walker in FQS_context and that + * Look at the information gathered by the walker in Shippability_context and that * in the Query structure to decide whether we should ship this query * directly to the Datanode or not */ @@ -840,17 +839,17 @@ pgxc_is_query_shippable(Query *query, int query_level) /* Copy the shippability reasons. We modify the copy for easier handling. * The original can be saved away */ - shippability = bms_copy(fqs_context.fqsc_shippability); + shippability = bms_copy(sc_context.sc_shippability); /* * If the query has an expression which renders the shippability to single * node, and query needs to be shipped to more than one node, it can not be * shipped */ - if (bms_is_member(FQS_SINGLENODE_EXPR, shippability)) + if (bms_is_member(SS_NEED_SINGLENODE, shippability)) { /* We handled the reason here, reset it */ - shippability = bms_del_member(shippability, FQS_SINGLENODE_EXPR); + shippability = bms_del_member(shippability, SS_NEED_SINGLENODE); /* if nodeList has no nodes, it ExecNodes will have other means to know * the nodes where to execute like distribution column expression. We * can't tell how many nodes the query will be executed on, hence treat @@ -859,6 +858,8 @@ pgxc_is_query_shippable(Query *query, int query_level) if (list_length(exec_nodes->nodeList) != 1) canShip = false; } + /* We have delt with aggregates as well, delete the Has aggregates status */ + shippability = bms_del_member(shippability, SS_HAS_AGG_EXPR); /* Can not ship the query for some reason */ if (!bms_is_empty(shippability)) @@ -1030,14 +1031,21 @@ pgxc_merge_exec_nodes(ExecNodes *en1, ExecNodes *en2, bool merge_dist_equijoin, } static void -pgxc_FQS_find_datanodes(FQS_context *fqs_context) +pgxc_FQS_find_datanodes(Shippability_context *sc_context) { - Query *query = fqs_context->fqsc_query; + Query *query = sc_context->sc_query; ListCell *rt; ExecNodes *exec_nodes = NULL; bool canShip = true; Index varno = 0; + /* No query, no nodes to execute! */ + if (!query) + { + sc_context->sc_exec_nodes = NULL; + return; + } + /* * For every range table entry, * 1. Find out the Datanodes needed for that range table @@ -1205,7 +1213,7 @@ pgxc_FQS_find_datanodes(FQS_context *fqs_context) exec_nodes->nodeList = list_make1_int(nodeid); list_free(tmp_list); } - fqs_context->fqsc_exec_nodes = exec_nodes; + sc_context->sc_exec_nodes = exec_nodes; } else if (exec_nodes) { @@ -1419,7 +1427,7 @@ pgxc_FQS_get_relation_nodes(RangeTblEntry *rte, Index varno, Query *query) return rel_exec_nodes; } /* - * pgxc_FQS_walker + * pgxc_shippability_walker * walks the query/expression tree routed at the node passed in, gathering * information which will help decide whether the query to which this node * belongs is shippable to the Datanodes. @@ -1432,8 +1440,8 @@ pgxc_FQS_get_relation_nodes(RangeTblEntry *rte, Index varno, Query *query) * Return value of this function is governed by the same rules as * expression_tree_walker(), see prologue of that function for details. */ -static bool -pgxc_FQS_walker(Node *node, FQS_context *fqs_context) +bool +pgxc_shippability_walker(Node *node, Shippability_context *sc_context) { if (node == NULL) return false; @@ -1455,10 +1463,14 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * that expression is encountered. */ case T_CaseTestExpr: - case T_SortGroupClause: case T_TargetEntry: break; + case T_SortGroupClause: + if (sc_context->sc_for_expr) + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); + break; + /* * Nodes, which are shippable if the tree rooted under these nodes is * shippable @@ -1499,7 +1511,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * FQS planner cannot yet handle SQL representation correctly. * So disable FQS in this case and let standard planner manage it. */ - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); break; case T_FieldStore: @@ -1509,7 +1521,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * handle it through standard planner, where whole row will be * constructed. */ - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); break; case T_SetToDefault: @@ -1519,7 +1531,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * nextval() of a sequence can not be shipped to the Datanode, hence * for now default values can not be shipped to the Datanodes */ - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); break; case T_Var: @@ -1529,22 +1541,17 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * if a subquery references an upper level variable, that query is * not shippable, if shipped alone. */ - if (var->varlevelsup > fqs_context->fqsc_max_varlevelsup) - fqs_context->fqsc_max_varlevelsup = var->varlevelsup; + if (var->varlevelsup > sc_context->sc_max_varlevelsup) + sc_context->sc_max_varlevelsup = var->varlevelsup; } break; case T_Param: { Param *param = (Param *)node; - /* - * PARAM_EXEC params should not appear in the tree, since we are - * analysing before planning. In case, they appear, we do not know - * what to do! Other kinds of parameters can be shipped to the - * remote side. - */ - if (param->paramkind == PARAM_EXEC) - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + /* PGXCTODO: Can we handle internally generated parameters? */ + if (param->paramkind != PARAM_EXTERN) + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); } break; @@ -1560,16 +1567,32 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) case T_Aggref: { + Aggref *aggref = (Aggref *)node; /* * An aggregate is completely shippable to the Datanode, if the * whole group resides on that Datanode. This will be clear when * we see the GROUP BY clause. - * Query::hasAggs will tell that the query has aggregates. * agglevelsup is minimum of variable's varlevelsup, so we will - * set the fqsc_max_varlevelsup when we reach the appropriate + * set the sc_max_varlevelsup when we reach the appropriate * VARs in the tree. - * Hence nothing to set here. */ + pgxc_set_shippability_reason(sc_context, SS_HAS_AGG_EXPR); + /* + * If a stand-alone expression to be shipped, is an + * 1. aggregate with ORDER BY, DISTINCT directives, it needs all + * the qualifying rows + * 2. aggregate without collection function + * 3. (PGXCTODO:)aggregate with polymorphic transition type, the + * the transition type needs to be resolved to correctly interpret + * the transition results from Datanodes. + * Hence, such an expression can not be shipped to the datanodes. + */ + if (aggref->aggorder || + aggref->aggdistinct || + aggref->agglevelsup || + !aggref->agghas_collectfn || + IsPolymorphicType(aggref->aggtrantype)) + pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE); } break; @@ -1582,7 +1605,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * can be shipped to the Datanode and what can not be. */ if (!is_immutable_func(funcexpr->funcid)) - pgxc_FQS_set_reason(fqs_context, FQS_UNSHIPPABLE_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR); } break; @@ -1599,7 +1622,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) Oid opfuncid = OidIsValid(op_expr->opfuncid) ? op_expr->opfuncid : get_opcode(op_expr->opno); if (!OidIsValid(opfuncid) || !is_immutable_func(opfuncid)) - pgxc_FQS_set_reason(fqs_context, FQS_UNSHIPPABLE_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR); } break; @@ -1613,7 +1636,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) Oid opfuncid = OidIsValid(sao_expr->opfuncid) ? sao_expr->opfuncid : get_opcode(sao_expr->opno); if (!OidIsValid(opfuncid) || !is_immutable_func(opfuncid)) - pgxc_FQS_set_reason(fqs_context, FQS_UNSHIPPABLE_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR); } break; @@ -1634,14 +1657,22 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) { Query *query = (Query *)node; + /* A stand-alone expression containing Query is not shippable */ + if (sc_context->sc_for_expr) + { + pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR); + break; + } + /* We are checking shippability of whole query, go ahead */ + if (query->hasRecursive || query->intoClause) - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); /* * If the query needs Coordinator for evaluation or the query can be * completed on Coordinator itself, we don't ship it to the Datanode */ if (pgxc_query_needs_coord(query)) - pgxc_FQS_set_reason(fqs_context, FQS_NEEDS_COORD); + pgxc_set_shippability_reason(sc_context, SS_NEEDS_COORD); /* PGXC_FQS_TODO: It should be possible to look at the Query and find out * whether it can be completely evaluated on the Datanode just like SELECT @@ -1652,7 +1683,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * For now DMLs with single rtable entry are candidates for FQS */ if (query->commandType != CMD_SELECT && list_length(query->rtable) > 1) - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); /* * In following conditions query is shippable when there is only one @@ -1674,10 +1705,10 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) if (query->hasAggs || query->hasWindowFuncs || query->sortClause || query->distinctClause || query->groupClause || query->havingQual || query->limitOffset || query->limitCount) - pgxc_FQS_set_reason(fqs_context, FQS_SINGLENODE_EXPR); + pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE); /* walk the entire query tree to analyse the query */ - if (query_tree_walker(query, pgxc_FQS_walker, fqs_context, 0)) + if (query_tree_walker(query, pgxc_shippability_walker, sc_context, 0)) return true; /* @@ -1686,19 +1717,23 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * query. For now stop shipping such queries. We should get rid of this * condition. */ - if (fqs_context->fqsc_max_varlevelsup != 0) - pgxc_FQS_set_reason(fqs_context, FQS_VARLEVEL); + if (sc_context->sc_max_varlevelsup != 0) + pgxc_set_shippability_reason(sc_context, SS_VARLEVEL); /* * Walk the RangeTableEntries of the query and find the * Datanodes needed for evaluating this query */ - pgxc_FQS_find_datanodes(fqs_context); + pgxc_FQS_find_datanodes(sc_context); } break; case T_FromExpr: { + /* We don't expect FromExpr in a stand-alone expression */ + if (sc_context->sc_for_expr) + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); + /* * We will be examining the range table entries separately and * Join expressions are not candidate for FQS. @@ -1706,9 +1741,9 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) * conditional rule, we can not handle those in FQS, since there is * not SQL representation for such quals. */ - if (fqs_context->fqsc_query->commandType == CMD_INSERT && + if (sc_context->sc_query->commandType == CMD_INSERT && ((FromExpr *)node)->quals) - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); } break; @@ -1718,27 +1753,48 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) WindowFunc *winf = (WindowFunc *)node; /* * A window function can be evaluated on a Datanode if there is - * only one Datanode involved. This can be checked outside the - * walker by looking at Query::hasWindowFuncs. + * only one Datanode involved. */ - if (!is_immutable_func(winf->winfnoid)) - pgxc_FQS_set_reason(fqs_context, FQS_UNSHIPPABLE_EXPR); + pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE); + + /* + * A window function is not shippable as part of a stand-alone + * expression. If the window function is non-immutable, it can not + * be shipped to the datanodes. + */ + if (sc_context->sc_for_expr || + !is_immutable_func(winf->winfnoid)) + pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR); } break; - /* Nodes which do not need to be examined but worth some explanation */ case T_WindowClause: - /* - * A window function can be evaluated on a Datanode if there is - * only one Datanode involved. This can be checked outside the - * walker by looking at Query::hasWindowFuncs. - */ - /* FALL THROUGH */ + { + /* + * A window function can be evaluated on a Datanode if there is + * only one Datanode involved. + */ + pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE); + + /* + * A window function is not shippable as part of a stand-alone + * expression + */ + if (sc_context->sc_for_expr) + pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR); + } + break; + case T_JoinExpr: - /* - * The compatibility of joining ranges will be deduced while - * examining the range table of the query. Nothing to do here - */ + /* We don't expect JoinExpr in a stand-alone expression */ + if (sc_context->sc_for_expr) + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); + + /* + * For JoinExpr in a Query + * The compatibility of joining ranges will be deduced while + * examining the range table of the query. Nothing to do here + */ break; case T_SubLink: @@ -1753,22 +1809,22 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) */ if (IsA(sublink->subselect, Query)) sublink_en = pgxc_is_query_shippable((Query *)(sublink->subselect), - fqs_context->fqsc_query_level); + sc_context->sc_query_level); else sublink_en = NULL; - /* PGXCTODO free the old fqsc_subquery_en. */ + /* PGXCTODO free the old sc_subquery_en. */ /* If we already know that this query does not have a set of nodes * to evaluate on, don't bother to merge again. */ - if (!pgxc_FQS_test_reason(fqs_context, FQS_NO_NODES)) + if (!pgxc_test_shippability_reason(sc_context, SS_NO_NODES)) { - fqs_context->fqsc_subquery_en = pgxc_merge_exec_nodes(sublink_en, - fqs_context->fqsc_subquery_en, + sc_context->sc_subquery_en = pgxc_merge_exec_nodes(sublink_en, + sc_context->sc_subquery_en, false, true); - if (!fqs_context->fqsc_subquery_en) - pgxc_FQS_set_reason(fqs_context, FQS_NO_NODES); + if (!sc_context->sc_subquery_en) + pgxc_set_shippability_reason(sc_context, SS_NO_NODES); } } break; @@ -1782,7 +1838,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) case T_PlaceHolderInfo: { /* PGXCTODO: till we exhaust this list */ - pgxc_FQS_set_reason(fqs_context, FQS_UNSUPPORTED_EXPR); + pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR); } break; @@ -1791,7 +1847,7 @@ pgxc_FQS_walker(Node *node, FQS_context *fqs_context) (int) nodeTag(node)); break; } - return expression_tree_walker(node, pgxc_FQS_walker, (void *)fqs_context); + return expression_tree_walker(node, pgxc_shippability_walker, (void *)sc_context); } /* diff --git a/src/backend/pgxc/pool/postgresql_fdw.c b/src/backend/pgxc/pool/postgresql_fdw.c index 3a37cdf1cc..85aa786c43 100644 --- a/src/backend/pgxc/pool/postgresql_fdw.c +++ b/src/backend/pgxc/pool/postgresql_fdw.c @@ -31,18 +31,6 @@ #define DEBUG_FDW /* - * WHERE clause optimization level - */ -#define EVAL_QUAL_LOCAL 0 /* evaluate none in foreign, all in local */ -#define EVAL_QUAL_BOTH 1 /* evaluate some in foreign, all in local */ -#define EVAL_QUAL_FOREIGN 2 /* evaluate some in foreign, rest in local */ - -#define OPTIMIZE_WHERE_CLAUSE EVAL_QUAL_FOREIGN - -/* deparse SQL from the request */ -static bool foreign_qual_walker(Node *node, foreign_qual_context *context); - -/* * Check whether the function is IMMUTABLE. */ bool @@ -96,130 +84,42 @@ is_immutable_func(Oid funcid) * - scalar array operator (ANY/ALL) */ bool -is_foreign_expr(Node *node, foreign_qual_context *context) +pgxc_is_expr_shippable(Expr *node, bool *has_aggs) { - return !foreign_qual_walker(node, context); -} + Shippability_context sc_context; -void -pgxc_foreign_qual_context_init(foreign_qual_context *context) -{ - context->collect_vars = false; - context->vars = NIL; - context->aggs = NIL; -} + /* Create the FQS context */ + memset(&sc_context, 0, sizeof(sc_context)); + sc_context.sc_query = NULL; + sc_context.sc_query_level = 0; + sc_context.sc_for_expr = true; -void -pgxc_foreign_qual_context_free(foreign_qual_context *context) -{ - list_free(context->vars); - context->vars = NIL; - list_free(context->aggs); - context->aggs = NIL; -} -/* - * return true if node cannot be evaluatated in foreign server. - */ -static bool -foreign_qual_walker(Node *node, foreign_qual_context *context) -{ - bool ret_val; - bool saved_collect_vars; - if (node == NULL) - return false; + /* Walk the expression to check its shippability */ + pgxc_shippability_walker((Node *)node, &sc_context); - switch (nodeTag(node)) - { - case T_ExprState: - return foreign_qual_walker((Node *) ((ExprState *) node)->expr, NULL); - - case T_Param: - /* TODO: pass internal parameters to the foreign server */ - if (((Param *) node)->paramkind != PARAM_EXTERN) - return true; - break; - case T_DistinctExpr: - case T_OpExpr: - /* - * An operator which uses IMMUTABLE function can be evaluated in - * foreign server . It is not necessary to worry about oprrest - * and oprjoin here because they are invoked by planner but not - * executor. DistinctExpr is a typedef of OpExpr. - * We need also to be sure that function id is correctly set - * before evaluation. - */ - set_opfuncid((OpExpr *) node); - - if (!is_immutable_func(((OpExpr*) node)->opfuncid)) - return true; - break; - case T_ScalarArrayOpExpr: - if (!is_immutable_func(((ScalarArrayOpExpr*) node)->opfuncid)) - return true; - break; - case T_FuncExpr: - /* IMMUTABLE function can be evaluated in foreign server */ - if (!is_immutable_func(((FuncExpr*) node)->funcid)) - return true; - break; - case T_Aggref: - { - Aggref *aggref = (Aggref *)node; - /* - * An aggregate with ORDER BY, DISTINCT directives need to be - * computed at Coordinator using all the rows. An aggregate - * without collection function needs to be computed at - * Coordinator. - * PGXCTODO: polymorphic transition types need to be resolved to - * correctly interpret the transition results from Datanodes. - * For now compute such aggregates at Coordinator. - */ - if (aggref->aggorder || - aggref->aggdistinct || - aggref->agglevelsup || - !aggref->agghas_collectfn || - IsPolymorphicType(aggref->aggtrantype)) - return true; - /* - * Datanode can compute transition results, so, add the - * aggregate to the context if context is present - */ - if (context) - { - /* - * Don't collect VARs under the Aggref node. See - * pgxc_process_grouping_targetlist() for details. - */ - saved_collect_vars = context->collect_vars; - context->collect_vars = false; - context->aggs = lappend(context->aggs, aggref); - } - } - break; - case T_Var: - if (context && context->collect_vars) - context->vars = lappend(context->vars, node); - break; - case T_PlaceHolderVar: - case T_AppendRelInfo: - case T_PlaceHolderInfo: - case T_SubPlan: - /* TODO: research whether those complex nodes are evaluatable. */ - return true; - default: - break; - } - - ret_val = expression_tree_walker(node, foreign_qual_walker, context); + /* + * If caller is interested in knowing, whether the expression has aggregets + * let the caller know about it. The caller is capable of handling such + * expressions. Otherwise assume such an expression as unshippable. + */ + if (has_aggs) + *has_aggs = pgxc_test_shippability_reason(&sc_context, SS_HAS_AGG_EXPR); + else if (pgxc_test_shippability_reason(&sc_context, SS_HAS_AGG_EXPR)) + return false; /* - * restore value of collect_vars in the context, since we have finished - * traversing tree rooted under and Aggref node + * If the expression unshippable or unsupported by expression shipping + * algorithm, return false. We don't have information about the number of + * nodes involved in expression evaluation, hence even if the expression can + * be evaluated only on single node, return false. */ - if (context && IsA(node, Aggref)) - context->collect_vars = saved_collect_vars; + if (pgxc_test_shippability_reason(&sc_context, SS_UNSUPPORTED_EXPR) || + pgxc_test_shippability_reason(&sc_context, SS_UNSHIPPABLE_EXPR) || + pgxc_test_shippability_reason(&sc_context, SS_NEED_SINGLENODE)) + return false; - return ret_val; + /* If nothing wrong found, the expression is shippable */ + return true; } /* @@ -350,7 +250,7 @@ deparseSql(RemoteQueryState *scanstate) { ExprState *state = lfirst(lc); - if (is_foreign_expr((Node *) state, NULL)) + if (pgxc_is_expr_shippable(state->expr, NULL)) { elog(DEBUG1, "foreign qual: %s", nodeToString(state->expr)); foreign_qual = lappend(foreign_qual, state); diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index 6ebe223227..f844050e56 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -120,39 +120,51 @@ typedef struct */ typedef struct { - Bitmapset *fqsc_shippability; /* The conditions for (un)shippability of the + bool sc_for_expr; /* if false, the we are checking shippability + * of the Query, otherwise, we are checking + * shippability of a stand-alone expression. + */ + Bitmapset *sc_shippability; /* The conditions for (un)shippability of the * query. */ - Query *fqsc_query; /* the query being analysed for FQS */ - int fqsc_query_level; /* level of the query */ - int fqsc_max_varlevelsup; /* maximum upper level referred to by any + Query *sc_query; /* the query being analysed for FQS */ + int sc_query_level; /* level of the query */ + int sc_max_varlevelsup; /* maximum upper level referred to by any * variable reference in the query. If this * value is greater than 0, the query is not * shippable, if shipped alone. */ - ExecNodes *fqsc_exec_nodes; /* nodes where the query should be executed */ - ExecNodes *fqsc_subquery_en; /* ExecNodes produced by merging the ExecNodes + ExecNodes *sc_exec_nodes; /* nodes where the query should be executed */ + ExecNodes *sc_subquery_en; /* ExecNodes produced by merging the ExecNodes * for individual subqueries. This gets - * ultimately merged with fqsc_exec_nodes. + * ultimately merged with sc_exec_nodes. */ -} FQS_context; +} Shippability_context; -/* enum for reasons as to why a query is not FQSable */ +/* enum for reasons as to why a query/expression is not FQSable */ typedef enum { - FQS_UNSHIPPABLE_EXPR = 0, /* it has unshippable expression */ - FQS_SINGLENODE_EXPR, /* it has single node expression, like - * aggregates, ORDER BY etc. */ - FQS_NEEDS_COORD, /* the query needs Coordinator */ - FQS_VARLEVEL, /* one of its subqueries has a VAR - * referencing an upper level query - * relation */ - FQS_NO_NODES, /* no suitable nodes can be found to ship - * the query */ - FQS_UNSUPPORTED_EXPR /* it has expressions currently unsupported - * by FQS, but such expressions might be - * supported by FQS in future */ -} FQS_shippability; + SS_UNSHIPPABLE_EXPR = 0, /* it has unshippable expression */ + SS_NEED_SINGLENODE, /* Has expressions which can be evaluated when + * there is only a single node involved. + * Athought aggregates too fit in this class, we + * have a separate status to report aggregates, + * see below. + */ + SS_NEEDS_COORD, /* the query needs Coordinator */ + SS_VARLEVEL, /* one of its subqueries has a VAR + * referencing an upper level query + * relation + */ + SS_NO_NODES, /* no suitable nodes can be found to ship + * the query + */ + SS_UNSUPPORTED_EXPR, /* it has expressions currently unsupported + * by FQS, but such expressions might be + * supported by FQS in future + */ + SS_HAS_AGG_EXPR /* it has aggregate expressions */ +} ShippabilityStat; /* global variable corresponding to the GUC with same name */ extern bool enable_fast_query_shipping; @@ -174,5 +186,8 @@ extern List *AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType remoteExecType, bool is_temp); extern bool pgxc_query_contains_temp_tables(List *queries); extern bool pgxc_query_contains_utility(List *queries); +extern bool pgxc_shippability_walker(Node *node, Shippability_context *sc_context); +extern bool pgxc_test_shippability_reason(Shippability_context *context, + ShippabilityStat reason); #endif /* PGXCPLANNER_H */ diff --git a/src/include/pgxc/postgresql_fdw.h b/src/include/pgxc/postgresql_fdw.h index e092951226..365f4769dc 100644 --- a/src/include/pgxc/postgresql_fdw.h +++ b/src/include/pgxc/postgresql_fdw.h @@ -17,16 +17,8 @@ #include "postgres.h" #include "pgxc/execRemote.h" -typedef struct -{ - bool collect_vars; - List *aggs; - List *vars; -} foreign_qual_context; -void pgxc_foreign_qual_context_init(foreign_qual_context *context); -void pgxc_foreign_qual_context_free(foreign_qual_context *context); bool is_immutable_func(Oid funcid); char *deparseSql(RemoteQueryState *scanstate); -bool is_foreign_expr(Node *node, foreign_qual_context *context); +bool pgxc_is_expr_shippable(Expr *node, bool *has_aggs); #endif |
