diff options
| author | Mason Sharp | 2010-12-09 22:15:44 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2011-05-19 16:45:24 +0000 |
| commit | bd81c63c90ba42a5554cf5343bf71c5deb7c7cbf (patch) | |
| tree | c2278d06ec9ec8501c70ea08e8b057bf6ea02b24 /src | |
| parent | 44f83040767b619bd9e4bf4088c6eb00d994b620 (diff) | |
Add support for INSERT SELECT.
Also, initial changes for setting execution nodes in general planner.
We execute a query normally, directed by the Coordinator,
and then use COPY running down on the data nodes to insert
the data there.
We also check for a special case of INSERT SELECT where the source
and destination columns are both the distribution columns of
tables, and where the query is a single step query. In such a
case, we just execute the query locally on the data nodes without
the use of COPY directed by the Coordinator.
In testing we uncovered an issue for INSERT SELECT when the
query is from a replicated table; it was selecting from all
tables because exec_nodes was not set for the RemoteQuery.
As a result, this commit also sets the exec_nodes for base
RemoteQuery structs as well as join reduced ones.
It does not yet, however, take into account WHERE clause
equality conditions against the distribution column, as
is done in a regular SELECT. This is left as a future
optimization, best done as a step for further merging
the Postgres-XC planner and the standard planner.
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/commands/copy.c | 495 | ||||
| -rw-r--r-- | src/backend/executor/execMain.c | 27 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/createplan.c | 29 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 203 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 8 |
5 files changed, 562 insertions, 200 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f139fb696b..578e29d6c1 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -61,6 +61,9 @@ typedef enum CopyDest COPY_FILE, /* to/from file */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE /* to/from frontend (3.0 protocol) */ +#ifdef PGXC + ,COPY_BUFFER /* Do not send, just prepare */ +#endif } CopyDest; /* @@ -181,6 +184,7 @@ typedef struct CopyStateData int hash_idx; /* index of the hash column */ PGXCNodeHandle **connections; /* Involved data node connections */ + TupleDesc tupDesc; /* for INSERT SELECT */ #endif } CopyStateData; @@ -301,6 +305,17 @@ static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); +#ifdef PGXC +static ExecNodes *build_copy_statement(CopyState cstate, List *attnamelist, + TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull); +/* + * A kluge here making this static to avoid having to move the + * CopyState definition to a header file making it harder to merge + * with the vanilla PostgreSQL code + */ +static CopyState insertstate; +#endif + /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -487,6 +502,11 @@ CopySendEndOfRow(CopyState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; +#ifdef PGXC + case COPY_BUFFER: + /* Do not send yet anywhere, just return */ + return; +#endif } resetStringInfo(fe_msgbuf); @@ -598,6 +618,11 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; +#ifdef PGXC + case COPY_BUFFER: + elog(ERROR, "COPY_BUFFER not allowed in this context"); + break; +#endif } return bytesread; @@ -1133,155 +1158,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString) errmsg("table \"%s\" does not have OIDs", RelationGetRelationName(cstate->rel)))); #ifdef PGXC - /* Get locator information */ + /* Get copy statement and execution node information */ if (IS_PGXC_COORDINATOR) { - char *hash_att; - - exec_nodes = makeNode(ExecNodes); - - /* - * If target table does not exists on nodes (e.g. system table) - * the location info returned is NULL. This is the criteria, when - * we need to run Copy on coordinator - */ - cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); - - hash_att = GetRelationHashColumn(cstate->rel_loc); - if (cstate->rel_loc) - { - if (is_from || hash_att) - exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList); - else - { - /* - * Pick up one node only - * This case corresponds to a replicated table with COPY TO - */ - exec_nodes->nodelist = GetAnyDataNode(); - } - } - - cstate->hash_idx = -1; - if (hash_att) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0) - { - cstate->hash_idx = attnum - 1; - break; - } - } - } - - /* - * Build up query string for the data nodes, it should match - * to original string, but should have STDIN/STDOUT instead - * of filename. - */ - initStringInfo(&cstate->query_buf); - - appendStringInfoString(&cstate->query_buf, "COPY "); - appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel)); - - if (attnamelist) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " ("); - foreach (cell, attnamelist) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - appendStringInfoChar(&cstate->query_buf, ')'); - } - - if (stmt->is_from) - appendStringInfoString(&cstate->query_buf, " FROM STDIN"); - else - appendStringInfoString(&cstate->query_buf, " TO STDOUT"); - - - if (cstate->binary) - appendStringInfoString(&cstate->query_buf, " BINARY"); - - if (cstate->oids) - appendStringInfoString(&cstate->query_buf, " OIDS"); - - if (cstate->delim) - if ((!cstate->csv_mode && cstate->delim[0] != '\t') - || (cstate->csv_mode && cstate->delim[0] != ',')) - { - appendStringInfoString(&cstate->query_buf, " DELIMITER AS "); - CopyQuoteStr(&cstate->query_buf, cstate->delim); - } - - if (cstate->null_print) - if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N")) - || (cstate->csv_mode && strcmp(cstate->null_print, ""))) - { - appendStringInfoString(&cstate->query_buf, " NULL AS "); - CopyQuoteStr(&cstate->query_buf, cstate->null_print); - } - - if (cstate->csv_mode) - appendStringInfoString(&cstate->query_buf, " CSV"); - - /* - * Only rewrite the header part for COPY FROM, - * doing that for COPY TO results in multiple headers in output - */ - if (cstate->header_line && stmt->is_from) - appendStringInfoString(&cstate->query_buf, " HEADER"); - - if (cstate->quote && cstate->quote[0] == '"') - { - appendStringInfoString(&cstate->query_buf, " QUOTE AS "); - CopyQuoteStr(&cstate->query_buf, cstate->quote); - } - - if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0]) - { - appendStringInfoString(&cstate->query_buf, " ESCAPE AS "); - CopyQuoteStr(&cstate->query_buf, cstate->escape); - } - - if (force_quote) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " FORCE QUOTE "); - foreach (cell, force_quote) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - } - - if (force_notnull) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL "); - foreach (cell, force_notnull) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - } + exec_nodes = build_copy_statement(cstate, attnamelist, tupDesc, is_from, force_quote, force_notnull); } #endif } @@ -3848,3 +3728,324 @@ CreateCopyDestReceiver(void) return (DestReceiver *) self; } + +#ifdef PGXC +/* + * Rebuild a COPY statement in cstate and set ExecNodes + */ +static ExecNodes* +build_copy_statement(CopyState cstate, List *attnamelist, + TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull) +{ + char *hash_att; + + + ExecNodes *exec_nodes = makeNode(ExecNodes); + + /* + * If target table does not exists on nodes (e.g. system table) + * the location info returned is NULL. This is the criteria, when + * we need to run Copy on coordinator + */ + cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); + + hash_att = GetRelationHashColumn(cstate->rel_loc); + if (cstate->rel_loc) + { + if (is_from || hash_att) + exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList); + else + { + /* + * Pick up one node only + * This case corresponds to a replicated table with COPY TO + */ + exec_nodes->nodelist = GetAnyDataNode(); + } + } + + cstate->hash_idx = -1; + if (hash_att) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0) + { + cstate->hash_idx = attnum - 1; + break; + } + } + } + + /* + * Build up query string for the data nodes, it should match + * to original string, but should have STDIN/STDOUT instead + * of filename. + */ + initStringInfo(&cstate->query_buf); + + appendStringInfoString(&cstate->query_buf, "COPY "); + appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel)); + + if (attnamelist) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " ("); + foreach (cell, attnamelist) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + appendStringInfoChar(&cstate->query_buf, ')'); + } + + if (is_from) + appendStringInfoString(&cstate->query_buf, " FROM STDIN"); + else + appendStringInfoString(&cstate->query_buf, " TO STDOUT"); + + + if (cstate->binary) + appendStringInfoString(&cstate->query_buf, " BINARY"); + + if (cstate->oids) + appendStringInfoString(&cstate->query_buf, " OIDS"); + + if (cstate->delim) + if ((!cstate->csv_mode && cstate->delim[0] != '\t') + || (cstate->csv_mode && cstate->delim[0] != ',')) + { + appendStringInfoString(&cstate->query_buf, " DELIMITER AS "); + CopyQuoteStr(&cstate->query_buf, cstate->delim); + } + + if (cstate->null_print) + if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N")) + || (cstate->csv_mode && strcmp(cstate->null_print, ""))) + { + appendStringInfoString(&cstate->query_buf, " NULL AS "); + CopyQuoteStr(&cstate->query_buf, cstate->null_print); + } + + if (cstate->csv_mode) + appendStringInfoString(&cstate->query_buf, " CSV"); + + /* + * Only rewrite the header part for COPY FROM, + * doing that for COPY TO results in multiple headers in output + */ + if (cstate->header_line && is_from) + appendStringInfoString(&cstate->query_buf, " HEADER"); + + if (cstate->quote && cstate->quote[0] == '"') + { + appendStringInfoString(&cstate->query_buf, " QUOTE AS "); + CopyQuoteStr(&cstate->query_buf, cstate->quote); + } + + if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0]) + { + appendStringInfoString(&cstate->query_buf, " ESCAPE AS "); + CopyQuoteStr(&cstate->query_buf, cstate->escape); + } + + if (force_quote) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " FORCE QUOTE "); + foreach (cell, force_quote) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } + + if (force_notnull) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL "); + foreach (cell, force_notnull) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } + return exec_nodes; +} + +/* + * Use COPY for handling INSERT SELECT + * It may be a bit better to use binary mode here, but + * we have not implemented binary support for COPY yet. + * + * We borrow some code from CopyTo and DoCopy here. + * We do not refactor them so that it is later easier to remerge + * with vanilla PostgreSQL + */ +void +DoInsertSelectCopy(EState *estate, TupleTableSlot *slot) +{ + ExecNodes *exec_nodes; + HeapTuple tuple; + Datum *values; + bool *nulls; + Datum *hash_value = NULL; + MemoryContext oldcontext; + CopyState cstate; + + + Assert(IS_PGXC_COORDINATOR); + + /* See if we need to initialize COPY (first tuple) */ + if (estate->es_processed == 0) + { + ListCell *lc; + List *attnamelist = NIL; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + Form_pg_attribute *attr; + + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + exec_nodes = makeNode(ExecNodes); + + /* + * We use the cstate struct here, though we do not need everything + * We will just use the properties we are interested in here. + */ + insertstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + cstate = insertstate; + + cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY TO", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + cstate->rel = resultRelInfo->ri_RelationDesc; + cstate->tupDesc = RelationGetDescr(cstate->rel); + + foreach(lc, estate->es_plannedstmt->planTree->targetlist) + { + TargetEntry *target = (TargetEntry *) lfirst(lc); + attnamelist = lappend(attnamelist, makeString(target->resname)); + } + cstate->attnumlist = CopyGetAttnums(cstate->tupDesc, cstate->rel, attnamelist); + cstate->null_print_client = cstate->null_print; /* default */ + + /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ + cstate->fe_msgbuf = makeStringInfo(); + attr = cstate->tupDesc->attrs; + + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(cstate->tupDesc->natts * sizeof(FmgrInfo)); + foreach(lc, cstate->attnumlist) + { + int attnum = lfirst_int(lc); + Oid out_func_oid; + bool isvarlena; + + if (cstate->binary) + getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid, + &out_func_oid, + &isvarlena); + else + getTypeOutputInfo(attr[attnum - 1]->atttypid, + &out_func_oid, + &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* Set defaults for omitted options */ + if (!cstate->delim) + cstate->delim = cstate->csv_mode ? "," : "\t"; + + if (!cstate->null_print) + cstate->null_print = cstate->csv_mode ? "" : "\\N"; + cstate->null_print_len = strlen(cstate->null_print); + + if (cstate->csv_mode) + { + if (!cstate->quote) + cstate->quote = "\""; + if (!cstate->escape) + cstate->escape = cstate->quote; + } + + exec_nodes = build_copy_statement(cstate, attnamelist, + cstate->tupDesc, true, NULL, NULL); + + cstate->connections = DataNodeCopyBegin(cstate->query_buf.data, + exec_nodes->nodelist, + GetActiveSnapshot(), + true); + + if (!cstate->connections) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("Failed to initialize data nodes for COPY"))); + + cstate->copy_dest = COPY_BUFFER; + + MemoryContextSwitchTo(oldcontext); + } + cstate = insertstate; + + values = (Datum *) palloc(cstate->tupDesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(cstate->tupDesc->natts * sizeof(bool)); + + /* Process Tuple */ + /* We need to format the line for sending to data nodes */ + tuple = ExecMaterializeSlot(slot); + + /* Deconstruct the tuple ... faster than repeated heap_getattr */ + heap_deform_tuple(tuple, cstate->tupDesc, values, nulls); + + /* Format the input tuple for sending */ + CopyOneRowTo(cstate, 0, values, nulls); + + /* Get hash partition column, if any */ + if (cstate->hash_idx >= 0 && !nulls[cstate->hash_idx]) + hash_value = &values[cstate->hash_idx]; + + /* Send item to the appropriate data node(s) (buffer) */ + if (DataNodeCopyIn(cstate->fe_msgbuf->data, + cstate->fe_msgbuf->len, + GetRelationNodes(cstate->rel_loc, (long *)hash_value, RELATION_ACCESS_WRITE), + cstate->connections)) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("Copy failed on a data node"))); + + resetStringInfo(cstate->fe_msgbuf); + estate->es_processed++; +} + +/* + * + */ +void +EndInsertSelectCopy(void) +{ + Assert(IS_PGXC_COORDINATOR); + + DataNodeCopyFinish( + insertstate->connections, + primary_data_node, + COMBINE_TYPE_NONE); + pfree(insertstate->connections); + MemoryContextDelete(insertstate->rowcontext); +} +#endif diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index e07760356c..1ed43564c1 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -61,6 +61,7 @@ #include "utils/tqual.h" #ifdef PGXC #include "pgxc/pgxc.h" +#include "commands/copy.h" #endif /* Hooks for plugins to get control in ExecutorStart/Run/End() */ @@ -1685,6 +1686,26 @@ lnext: ; break; case CMD_INSERT: +#ifdef PGXC + /* + * If we get here on the Coordinator, we may have INSERT SELECT + * To handle INSERT SELECT, we use COPY to send down the nodes + */ + if (IS_PGXC_COORDINATOR && IsA(planstate, ResultState)) + { + PG_TRY(); + { + DoInsertSelectCopy(estate, slot); + } + PG_CATCH(); + { + EndInsertSelectCopy(); + PG_RE_THROW(); + } + PG_END_TRY(); + } + else +#endif ExecInsert(slot, tupleid, planSlot, dest, estate); break; @@ -1712,6 +1733,12 @@ lnext: ; break; } +#ifdef PGXC + /* See if we need to close a COPY started for INSERT SELECT */ + if (IS_PGXC_COORDINATOR && operation == CMD_INSERT && IsA(planstate, ResultState)) + EndInsertSelectCopy(); +#endif + /* * Process AFTER EACH STATEMENT triggers */ diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 30084633e4..bebb81a276 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -630,6 +630,7 @@ static Plan * create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Plan *outer_plan, Plan *inner_plan) { NestLoop *nest_parent; + JoinReduceInfo join_info; if (!enable_remotejoin) return parent; @@ -638,10 +639,6 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla if (root->hasPseudoConstantQuals) return parent; - /* Works only for SELECT commands right now */ - if (root->parse->commandType != CMD_SELECT) - return parent; - /* do not optimize CURSOR based select statements */ if (root->parse->rowMarks != NIL) return parent; @@ -664,7 +661,6 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla { int i; List *rtable_list = NIL; - bool partitioned_replicated_join = false; Material *outer_mat = (Material *)outer_plan; Material *inner_mat = (Material *)inner_plan; @@ -692,7 +688,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla } /* XXX Check if the join optimization is possible */ - if (IsJoinReducible(inner, outer, rtable_list, best_path, &partitioned_replicated_join)) + if (IsJoinReducible(inner, outer, rtable_list, best_path, &join_info)) { RemoteQuery *result; Plan *result_plan; @@ -829,6 +825,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla result->outer_reduce_level = outer->reduce_level; result->inner_relids = in_relids; result->outer_relids = out_relids; + result->exec_nodes = copyObject(join_info.exec_nodes); appendStringInfo(&fromlist, " %s (%s) %s", pname, inner->sql_statement, quote_identifier(in_alias)); @@ -917,8 +914,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla /* set_plan_refs needs this later */ result->base_tlist = base_tlist; result->relname = "__FOREIGN_QUERY__"; - - result->partitioned_replicated = partitioned_replicated_join; + result->partitioned_replicated = join_info.partitioned_replicated; /* * if there were any local scan clauses stick them up here. They @@ -2233,6 +2229,8 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, TupleDesc tupdesc; bool first; StringInfoData sql; + RelationLocInfo *rel_loc_info; + Assert(scan_relid > 0); rte = planner_rt_fetch(scan_relid, root); @@ -2393,6 +2391,21 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, scan_plan->sql_statement = sql.data; + /* + * Populate what nodes we execute on. + * This is still basic, and was done to make sure we do not select + * a replicated table from all nodes. + * It does not take into account conditions on partitioned relations + * that could reduce to one node. To do that, we need to move general + * planning earlier. + */ + rel_loc_info = GetRelationLocInfo(rte->relid); + scan_plan->exec_nodes = makeNode(ExecNodes); + scan_plan->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; + scan_plan->exec_nodes->baselocatortype = rel_loc_info->locatorType; + scan_plan->exec_nodes = GetRelationNodes(rel_loc_info, + NULL, + RELATION_ACCESS_READ); copy_path_costsize(&scan_plan->scan.plan, best_path); /* PGXCTODO - get better estimates */ diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 02b863a4c1..4c677aa83c 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -441,35 +441,37 @@ get_base_var(Var *var, XCWalkerContext *context) /* * get_plan_nodes_insert - determine nodes on which to execute insert. + * + * We handle INSERT ... VALUES. + * If we have INSERT SELECT, we try and see if it is patitioned-based + * inserting into a partitioned-based. + * + * We set step->exec_nodes if we determine the single-step execution + * nodes. If it is still NULL after returning from this function, + * then the caller should use the regular PG planner */ -static ExecNodes * -get_plan_nodes_insert(Query *query) +static void +get_plan_nodes_insert(Query *query, RemoteQuery *step) { RangeTblEntry *rte; RelationLocInfo *rel_loc_info; Const *constant; - ExecNodes *exec_nodes; ListCell *lc; long part_value; long *part_value_ptr = NULL; Expr *eval_expr = NULL; - /* Looks complex (correlated?) - best to skip */ - if (query->jointree != NULL && query->jointree->fromlist != NULL) - return NULL; - /* Make sure there is just one table */ - if (query->rtable == NULL) - return NULL; + step->exec_nodes = NULL; rte = (RangeTblEntry *) list_nth(query->rtable, query->resultRelation - 1); - if (rte != NULL && rte->rtekind != RTE_RELATION) /* Bad relation type */ - return NULL; + return; - /* See if we have the partitioned case. */ + + /* Get result relation info */ rel_loc_info = GetRelationLocInfo(rte->relid); if (!rel_loc_info) @@ -477,13 +479,62 @@ get_plan_nodes_insert(Query *query) (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("Could not find relation for oid = %d", rte->relid)))); + if (query->jointree != NULL && query->jointree->fromlist != NULL) + { + /* INSERT SELECT suspected */ + + /* We only optimize for when the destination is partitioned */ + if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH) + return; + + /* + * See if it is "single-step" + * Optimize for just known common case with 2 RTE entries + */ + if (query->resultRelation == 1 && query->rtable->length == 2) + { + RangeTblEntry *sub_rte = list_nth(query->rtable, 1); + + /* + * Get step->exec_nodes for the SELECT part of INSERT-SELECT + * to see if it is single-step + */ + if (sub_rte->rtekind == RTE_SUBQUERY && + !sub_rte->subquery->limitCount && + !sub_rte->subquery->limitOffset) + get_plan_nodes(sub_rte->subquery, step, RELATION_ACCESS_READ); + } + + /* Send to general planner if the query is multiple step */ + if (!step->exec_nodes) + return; + + /* If the source is not hash-based (eg, replicated) also send + * through general planner + */ + if (step->exec_nodes->baselocatortype != LOCATOR_TYPE_HASH) + { + step->exec_nodes = NULL; + return; + } + + /* + * If step->exec_nodes is not null, it is single step. + * Continue and check for destination table type cases below + */ + } + + if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH && rel_loc_info->partAttrName != NULL) { + Expr *checkexpr; + TargetEntry *tle = NULL; + /* It is a partitioned table, get value by looking in targetList */ foreach(lc, query->targetList) { - TargetEntry *tle = (TargetEntry *) lfirst(lc); + tle = (TargetEntry *) lfirst(lc); if (tle->resjunk) continue; @@ -493,46 +544,95 @@ get_plan_nodes_insert(Query *query) * designated partitioned column */ if (strcmp(tle->resname, rel_loc_info->partAttrName) == 0) + break; + } + + if (!lc) + { + /* give up */ + step->exec_nodes = NULL; + return; + } + + /* We found the TargetEntry for the partition column */ + checkexpr = tle->expr; + + /* Handle INSERT SELECT case */ + if (query->jointree != NULL && query->jointree->fromlist != NULL) + { + if (IsA(checkexpr,Var)) { - /* We may have a cast, try and handle it */ - Expr *checkexpr = tle->expr; + XCWalkerContext context; + ColumnBase *col_base; + RelationLocInfo *source_rel_loc_info; - if (!IsA(tle->expr, Const)) + /* Look for expression populating partition column */ + InitXCWalkerContext(&context); + context.query = query; + context.rtables = lappend(context.rtables, query->rtable); + col_base = get_base_var((Var*) checkexpr, &context); + + if (!col_base) { - eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) tle->expr); - checkexpr = get_numeric_constant(eval_expr); + step->exec_nodes = NULL; + return; } - if (checkexpr == NULL) - break; /* no constant */ + /* See if it is also a partitioned table */ + source_rel_loc_info = GetRelationLocInfo(col_base->relid); - constant = (Const *) checkexpr; + if (!source_rel_loc_info) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Could not find relation for oid = %d", rte->relid)))); - if (constant->consttype == INT4OID || - constant->consttype == INT2OID || - constant->consttype == INT8OID) + if (source_rel_loc_info->locatorType == LOCATOR_TYPE_HASH && + strcmp(col_base->colname, source_rel_loc_info->partAttrName) == 0) { - part_value = (long) constant->constvalue; - part_value_ptr = &part_value; - + /* + * Partition columns match, we have a "single-step INSERT SELECT". + * It is OK to use step->exec_nodes + */ + return; } - /* PGXCTODO - handle other data types */ - /* - else - if (constant->consttype == VARCHAR ... - */ } + /* Multi-step INSERT SELECT or some other case. Use general planner */ + step->exec_nodes = NULL; + return; + } + else + { + /* Check for constant */ + + /* We may have a cast, try and handle it */ + if (!IsA(tle->expr, Const)) + { + eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) tle->expr); + checkexpr = get_numeric_constant(eval_expr); + } + + if (checkexpr == NULL) + return; /* no constant */ + + constant = (Const *) checkexpr; + + if (constant->consttype == INT4OID || + constant->consttype == INT2OID || + constant->consttype == INT8OID) + { + part_value = (long) constant->constvalue; + part_value_ptr = &part_value; + } + /* PGXCTODO - handle other data types */ } } /* single call handles both replicated and partitioned types */ - exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr, + step->exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr, RELATION_ACCESS_WRITE); if (eval_expr) pfree(eval_expr); - - return exec_nodes; } @@ -1665,7 +1765,7 @@ get_plan_nodes_command(Query *query, RemoteQuery *step) break; case CMD_INSERT: - step->exec_nodes = get_plan_nodes_insert(query); + get_plan_nodes_insert(query, step); break; case CMD_UPDATE: @@ -2794,24 +2894,28 @@ free_query_step(RemoteQuery *query_step) * If the join between the two RemoteQuery nodes is partitioned - partitioned * it is always reducibile safely, * - * RemoteQuery *innernode - the inner node - * RemoteQuery *outernode - the outer node - * bool *partitioned_replicated - set to true if we have a partitioned-replicated + * RemoteQuery *innernode - the inner node + * RemoteQuery *outernode - the outer node + * List *rtable_list - rtables + * JoinPath *join_path - used to examine join restrictions + * PGXCJoinInfo *join_info - contains info about the join reduction + * join_info->partitioned_replicated is set to true if we have a partitioned-replicated * join. We want to use replicated tables with non-replicated * tables ony once. Only use this value if this function * returns true. */ bool IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, - List *rtable_list, JoinPath *join_path, bool *partitioned_replicated) + List *rtable_list, JoinPath *join_path, JoinReduceInfo *join_info) { XCWalkerContext context; ListCell *cell; bool maybe_reducible = false; bool result = false; - - *partitioned_replicated = false; + Assert(join_info); + join_info->partitioned_replicated = false; + join_info->exec_nodes = NULL; InitXCWalkerContext(&context); context.accessType = RELATION_ACCESS_READ; /* PGXCTODO - determine */ @@ -2819,7 +2923,6 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, context.rtables = lappend(context.rtables, rtable_list); /* add to list of lists */ - foreach(cell, join_path->joinrestrictinfo) { RestrictInfo *node = (RestrictInfo *) lfirst(cell); @@ -2874,7 +2977,7 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, if (pgxc_join->join_type == JOIN_REPLICATED_PARTITIONED) { - *partitioned_replicated = true; + join_info->partitioned_replicated = true; /* * If either of these already have such a join, we do not @@ -2891,6 +2994,18 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, } } + if (result) + { + /* + * Set exec_nodes from walker if it was set. + * If not, it is replicated and we can use existing + */ + if (context.query_step) + join_info->exec_nodes = copyObject(context.query_step->exec_nodes); + else + join_info->exec_nodes = copyObject(outernode->exec_nodes); + } + return result; } diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index 7284ef7432..bb1f934be9 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -168,6 +168,12 @@ typedef struct StringInfoData valuebuf; } SimpleAgg; +typedef struct +{ + bool partitioned_replicated; + ExecNodes *exec_nodes; +} JoinReduceInfo; + /* forbid SQL if unsafe, useful to turn off for development */ extern bool StrictStatementChecking; @@ -181,6 +187,6 @@ extern bool IsHashDistributable(Oid col_type); extern bool is_immutable_func(Oid funcid); extern bool IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, - List *rtable_list, JoinPath *join_path, bool *partitioned_replicated); + List *rtable_list, JoinPath *join_path, JoinReduceInfo *join_info); #endif /* PGXCPLANNER_H */ |
