diff options
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/commands/copy.c | 495 | ||||
| -rw-r--r-- | src/backend/executor/execMain.c | 27 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/createplan.c | 29 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 203 |
4 files changed, 555 insertions, 199 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f139fb696b..578e29d6c1 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -61,6 +61,9 @@ typedef enum CopyDest COPY_FILE, /* to/from file */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE /* to/from frontend (3.0 protocol) */ +#ifdef PGXC + ,COPY_BUFFER /* Do not send, just prepare */ +#endif } CopyDest; /* @@ -181,6 +184,7 @@ typedef struct CopyStateData int hash_idx; /* index of the hash column */ PGXCNodeHandle **connections; /* Involved data node connections */ + TupleDesc tupDesc; /* for INSERT SELECT */ #endif } CopyStateData; @@ -301,6 +305,17 @@ static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); +#ifdef PGXC +static ExecNodes *build_copy_statement(CopyState cstate, List *attnamelist, + TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull); +/* + * A kluge here making this static to avoid having to move the + * CopyState definition to a header file making it harder to merge + * with the vanilla PostgreSQL code + */ +static CopyState insertstate; +#endif + /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -487,6 +502,11 @@ CopySendEndOfRow(CopyState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; +#ifdef PGXC + case COPY_BUFFER: + /* Do not send yet anywhere, just return */ + return; +#endif } resetStringInfo(fe_msgbuf); @@ -598,6 +618,11 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; +#ifdef PGXC + case COPY_BUFFER: + elog(ERROR, "COPY_BUFFER not allowed in this context"); + break; +#endif } return bytesread; @@ -1133,155 +1158,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString) errmsg("table \"%s\" does not have OIDs", RelationGetRelationName(cstate->rel)))); #ifdef PGXC - /* Get locator information */ + /* Get copy statement and execution node information */ if (IS_PGXC_COORDINATOR) { - char *hash_att; - - exec_nodes = makeNode(ExecNodes); - - /* - * If target table does not exists on nodes (e.g. system table) - * the location info returned is NULL. This is the criteria, when - * we need to run Copy on coordinator - */ - cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); - - hash_att = GetRelationHashColumn(cstate->rel_loc); - if (cstate->rel_loc) - { - if (is_from || hash_att) - exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList); - else - { - /* - * Pick up one node only - * This case corresponds to a replicated table with COPY TO - */ - exec_nodes->nodelist = GetAnyDataNode(); - } - } - - cstate->hash_idx = -1; - if (hash_att) - { - List *attnums; - ListCell *cur; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); - foreach(cur, attnums) - { - int attnum = lfirst_int(cur); - if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0) - { - cstate->hash_idx = attnum - 1; - break; - } - } - } - - /* - * Build up query string for the data nodes, it should match - * to original string, but should have STDIN/STDOUT instead - * of filename. - */ - initStringInfo(&cstate->query_buf); - - appendStringInfoString(&cstate->query_buf, "COPY "); - appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel)); - - if (attnamelist) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " ("); - foreach (cell, attnamelist) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - appendStringInfoChar(&cstate->query_buf, ')'); - } - - if (stmt->is_from) - appendStringInfoString(&cstate->query_buf, " FROM STDIN"); - else - appendStringInfoString(&cstate->query_buf, " TO STDOUT"); - - - if (cstate->binary) - appendStringInfoString(&cstate->query_buf, " BINARY"); - - if (cstate->oids) - appendStringInfoString(&cstate->query_buf, " OIDS"); - - if (cstate->delim) - if ((!cstate->csv_mode && cstate->delim[0] != '\t') - || (cstate->csv_mode && cstate->delim[0] != ',')) - { - appendStringInfoString(&cstate->query_buf, " DELIMITER AS "); - CopyQuoteStr(&cstate->query_buf, cstate->delim); - } - - if (cstate->null_print) - if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N")) - || (cstate->csv_mode && strcmp(cstate->null_print, ""))) - { - appendStringInfoString(&cstate->query_buf, " NULL AS "); - CopyQuoteStr(&cstate->query_buf, cstate->null_print); - } - - if (cstate->csv_mode) - appendStringInfoString(&cstate->query_buf, " CSV"); - - /* - * Only rewrite the header part for COPY FROM, - * doing that for COPY TO results in multiple headers in output - */ - if (cstate->header_line && stmt->is_from) - appendStringInfoString(&cstate->query_buf, " HEADER"); - - if (cstate->quote && cstate->quote[0] == '"') - { - appendStringInfoString(&cstate->query_buf, " QUOTE AS "); - CopyQuoteStr(&cstate->query_buf, cstate->quote); - } - - if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0]) - { - appendStringInfoString(&cstate->query_buf, " ESCAPE AS "); - CopyQuoteStr(&cstate->query_buf, cstate->escape); - } - - if (force_quote) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " FORCE QUOTE "); - foreach (cell, force_quote) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - } - - if (force_notnull) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL "); - foreach (cell, force_notnull) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - } + exec_nodes = build_copy_statement(cstate, attnamelist, tupDesc, is_from, force_quote, force_notnull); } #endif } @@ -3848,3 +3728,324 @@ CreateCopyDestReceiver(void) return (DestReceiver *) self; } + +#ifdef PGXC +/* + * Rebuild a COPY statement in cstate and set ExecNodes + */ +static ExecNodes* +build_copy_statement(CopyState cstate, List *attnamelist, + TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull) +{ + char *hash_att; + + + ExecNodes *exec_nodes = makeNode(ExecNodes); + + /* + * If target table does not exists on nodes (e.g. system table) + * the location info returned is NULL. This is the criteria, when + * we need to run Copy on coordinator + */ + cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); + + hash_att = GetRelationHashColumn(cstate->rel_loc); + if (cstate->rel_loc) + { + if (is_from || hash_att) + exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList); + else + { + /* + * Pick up one node only + * This case corresponds to a replicated table with COPY TO + */ + exec_nodes->nodelist = GetAnyDataNode(); + } + } + + cstate->hash_idx = -1; + if (hash_att) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0) + { + cstate->hash_idx = attnum - 1; + break; + } + } + } + + /* + * Build up query string for the data nodes, it should match + * to original string, but should have STDIN/STDOUT instead + * of filename. + */ + initStringInfo(&cstate->query_buf); + + appendStringInfoString(&cstate->query_buf, "COPY "); + appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel)); + + if (attnamelist) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " ("); + foreach (cell, attnamelist) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + appendStringInfoChar(&cstate->query_buf, ')'); + } + + if (is_from) + appendStringInfoString(&cstate->query_buf, " FROM STDIN"); + else + appendStringInfoString(&cstate->query_buf, " TO STDOUT"); + + + if (cstate->binary) + appendStringInfoString(&cstate->query_buf, " BINARY"); + + if (cstate->oids) + appendStringInfoString(&cstate->query_buf, " OIDS"); + + if (cstate->delim) + if ((!cstate->csv_mode && cstate->delim[0] != '\t') + || (cstate->csv_mode && cstate->delim[0] != ',')) + { + appendStringInfoString(&cstate->query_buf, " DELIMITER AS "); + CopyQuoteStr(&cstate->query_buf, cstate->delim); + } + + if (cstate->null_print) + if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N")) + || (cstate->csv_mode && strcmp(cstate->null_print, ""))) + { + appendStringInfoString(&cstate->query_buf, " NULL AS "); + CopyQuoteStr(&cstate->query_buf, cstate->null_print); + } + + if (cstate->csv_mode) + appendStringInfoString(&cstate->query_buf, " CSV"); + + /* + * Only rewrite the header part for COPY FROM, + * doing that for COPY TO results in multiple headers in output + */ + if (cstate->header_line && is_from) + appendStringInfoString(&cstate->query_buf, " HEADER"); + + if (cstate->quote && cstate->quote[0] == '"') + { + appendStringInfoString(&cstate->query_buf, " QUOTE AS "); + CopyQuoteStr(&cstate->query_buf, cstate->quote); + } + + if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0]) + { + appendStringInfoString(&cstate->query_buf, " ESCAPE AS "); + CopyQuoteStr(&cstate->query_buf, cstate->escape); + } + + if (force_quote) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " FORCE QUOTE "); + foreach (cell, force_quote) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } + + if (force_notnull) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL "); + foreach (cell, force_notnull) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } + return exec_nodes; +} + +/* + * Use COPY for handling INSERT SELECT + * It may be a bit better to use binary mode here, but + * we have not implemented binary support for COPY yet. + * + * We borrow some code from CopyTo and DoCopy here. + * We do not refactor them so that it is later easier to remerge + * with vanilla PostgreSQL + */ +void +DoInsertSelectCopy(EState *estate, TupleTableSlot *slot) +{ + ExecNodes *exec_nodes; + HeapTuple tuple; + Datum *values; + bool *nulls; + Datum *hash_value = NULL; + MemoryContext oldcontext; + CopyState cstate; + + + Assert(IS_PGXC_COORDINATOR); + + /* See if we need to initialize COPY (first tuple) */ + if (estate->es_processed == 0) + { + ListCell *lc; + List *attnamelist = NIL; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + Form_pg_attribute *attr; + + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + exec_nodes = makeNode(ExecNodes); + + /* + * We use the cstate struct here, though we do not need everything + * We will just use the properties we are interested in here. + */ + insertstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + cstate = insertstate; + + cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY TO", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + cstate->rel = resultRelInfo->ri_RelationDesc; + cstate->tupDesc = RelationGetDescr(cstate->rel); + + foreach(lc, estate->es_plannedstmt->planTree->targetlist) + { + TargetEntry *target = (TargetEntry *) lfirst(lc); + attnamelist = lappend(attnamelist, makeString(target->resname)); + } + cstate->attnumlist = CopyGetAttnums(cstate->tupDesc, cstate->rel, attnamelist); + cstate->null_print_client = cstate->null_print; /* default */ + + /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ + cstate->fe_msgbuf = makeStringInfo(); + attr = cstate->tupDesc->attrs; + + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(cstate->tupDesc->natts * sizeof(FmgrInfo)); + foreach(lc, cstate->attnumlist) + { + int attnum = lfirst_int(lc); + Oid out_func_oid; + bool isvarlena; + + if (cstate->binary) + getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid, + &out_func_oid, + &isvarlena); + else + getTypeOutputInfo(attr[attnum - 1]->atttypid, + &out_func_oid, + &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* Set defaults for omitted options */ + if (!cstate->delim) + cstate->delim = cstate->csv_mode ? "," : "\t"; + + if (!cstate->null_print) + cstate->null_print = cstate->csv_mode ? "" : "\\N"; + cstate->null_print_len = strlen(cstate->null_print); + + if (cstate->csv_mode) + { + if (!cstate->quote) + cstate->quote = "\""; + if (!cstate->escape) + cstate->escape = cstate->quote; + } + + exec_nodes = build_copy_statement(cstate, attnamelist, + cstate->tupDesc, true, NULL, NULL); + + cstate->connections = DataNodeCopyBegin(cstate->query_buf.data, + exec_nodes->nodelist, + GetActiveSnapshot(), + true); + + if (!cstate->connections) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("Failed to initialize data nodes for COPY"))); + + cstate->copy_dest = COPY_BUFFER; + + MemoryContextSwitchTo(oldcontext); + } + cstate = insertstate; + + values = (Datum *) palloc(cstate->tupDesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(cstate->tupDesc->natts * sizeof(bool)); + + /* Process Tuple */ + /* We need to format the line for sending to data nodes */ + tuple = ExecMaterializeSlot(slot); + + /* Deconstruct the tuple ... faster than repeated heap_getattr */ + heap_deform_tuple(tuple, cstate->tupDesc, values, nulls); + + /* Format the input tuple for sending */ + CopyOneRowTo(cstate, 0, values, nulls); + + /* Get hash partition column, if any */ + if (cstate->hash_idx >= 0 && !nulls[cstate->hash_idx]) + hash_value = &values[cstate->hash_idx]; + + /* Send item to the appropriate data node(s) (buffer) */ + if (DataNodeCopyIn(cstate->fe_msgbuf->data, + cstate->fe_msgbuf->len, + GetRelationNodes(cstate->rel_loc, (long *)hash_value, RELATION_ACCESS_WRITE), + cstate->connections)) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("Copy failed on a data node"))); + + resetStringInfo(cstate->fe_msgbuf); + estate->es_processed++; +} + +/* + * + */ +void +EndInsertSelectCopy(void) +{ + Assert(IS_PGXC_COORDINATOR); + + DataNodeCopyFinish( + insertstate->connections, + primary_data_node, + COMBINE_TYPE_NONE); + pfree(insertstate->connections); + MemoryContextDelete(insertstate->rowcontext); +} +#endif diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index e07760356c..1ed43564c1 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -61,6 +61,7 @@ #include "utils/tqual.h" #ifdef PGXC #include "pgxc/pgxc.h" +#include "commands/copy.h" #endif /* Hooks for plugins to get control in ExecutorStart/Run/End() */ @@ -1685,6 +1686,26 @@ lnext: ; break; case CMD_INSERT: +#ifdef PGXC + /* + * If we get here on the Coordinator, we may have INSERT SELECT + * To handle INSERT SELECT, we use COPY to send down the nodes + */ + if (IS_PGXC_COORDINATOR && IsA(planstate, ResultState)) + { + PG_TRY(); + { + DoInsertSelectCopy(estate, slot); + } + PG_CATCH(); + { + EndInsertSelectCopy(); + PG_RE_THROW(); + } + PG_END_TRY(); + } + else +#endif ExecInsert(slot, tupleid, planSlot, dest, estate); break; @@ -1712,6 +1733,12 @@ lnext: ; break; } +#ifdef PGXC + /* See if we need to close a COPY started for INSERT SELECT */ + if (IS_PGXC_COORDINATOR && operation == CMD_INSERT && IsA(planstate, ResultState)) + EndInsertSelectCopy(); +#endif + /* * Process AFTER EACH STATEMENT triggers */ diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 30084633e4..bebb81a276 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -630,6 +630,7 @@ static Plan * create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Plan *outer_plan, Plan *inner_plan) { NestLoop *nest_parent; + JoinReduceInfo join_info; if (!enable_remotejoin) return parent; @@ -638,10 +639,6 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla if (root->hasPseudoConstantQuals) return parent; - /* Works only for SELECT commands right now */ - if (root->parse->commandType != CMD_SELECT) - return parent; - /* do not optimize CURSOR based select statements */ if (root->parse->rowMarks != NIL) return parent; @@ -664,7 +661,6 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla { int i; List *rtable_list = NIL; - bool partitioned_replicated_join = false; Material *outer_mat = (Material *)outer_plan; Material *inner_mat = (Material *)inner_plan; @@ -692,7 +688,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla } /* XXX Check if the join optimization is possible */ - if (IsJoinReducible(inner, outer, rtable_list, best_path, &partitioned_replicated_join)) + if (IsJoinReducible(inner, outer, rtable_list, best_path, &join_info)) { RemoteQuery *result; Plan *result_plan; @@ -829,6 +825,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla result->outer_reduce_level = outer->reduce_level; result->inner_relids = in_relids; result->outer_relids = out_relids; + result->exec_nodes = copyObject(join_info.exec_nodes); appendStringInfo(&fromlist, " %s (%s) %s", pname, inner->sql_statement, quote_identifier(in_alias)); @@ -917,8 +914,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla /* set_plan_refs needs this later */ result->base_tlist = base_tlist; result->relname = "__FOREIGN_QUERY__"; - - result->partitioned_replicated = partitioned_replicated_join; + result->partitioned_replicated = join_info.partitioned_replicated; /* * if there were any local scan clauses stick them up here. They @@ -2233,6 +2229,8 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, TupleDesc tupdesc; bool first; StringInfoData sql; + RelationLocInfo *rel_loc_info; + Assert(scan_relid > 0); rte = planner_rt_fetch(scan_relid, root); @@ -2393,6 +2391,21 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, scan_plan->sql_statement = sql.data; + /* + * Populate what nodes we execute on. + * This is still basic, and was done to make sure we do not select + * a replicated table from all nodes. + * It does not take into account conditions on partitioned relations + * that could reduce to one node. To do that, we need to move general + * planning earlier. + */ + rel_loc_info = GetRelationLocInfo(rte->relid); + scan_plan->exec_nodes = makeNode(ExecNodes); + scan_plan->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; + scan_plan->exec_nodes->baselocatortype = rel_loc_info->locatorType; + scan_plan->exec_nodes = GetRelationNodes(rel_loc_info, + NULL, + RELATION_ACCESS_READ); copy_path_costsize(&scan_plan->scan.plan, best_path); /* PGXCTODO - get better estimates */ diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 02b863a4c1..4c677aa83c 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -441,35 +441,37 @@ get_base_var(Var *var, XCWalkerContext *context) /* * get_plan_nodes_insert - determine nodes on which to execute insert. + * + * We handle INSERT ... VALUES. + * If we have INSERT SELECT, we try and see if it is patitioned-based + * inserting into a partitioned-based. + * + * We set step->exec_nodes if we determine the single-step execution + * nodes. If it is still NULL after returning from this function, + * then the caller should use the regular PG planner */ -static ExecNodes * -get_plan_nodes_insert(Query *query) +static void +get_plan_nodes_insert(Query *query, RemoteQuery *step) { RangeTblEntry *rte; RelationLocInfo *rel_loc_info; Const *constant; - ExecNodes *exec_nodes; ListCell *lc; long part_value; long *part_value_ptr = NULL; Expr *eval_expr = NULL; - /* Looks complex (correlated?) - best to skip */ - if (query->jointree != NULL && query->jointree->fromlist != NULL) - return NULL; - /* Make sure there is just one table */ - if (query->rtable == NULL) - return NULL; + step->exec_nodes = NULL; rte = (RangeTblEntry *) list_nth(query->rtable, query->resultRelation - 1); - if (rte != NULL && rte->rtekind != RTE_RELATION) /* Bad relation type */ - return NULL; + return; - /* See if we have the partitioned case. */ + + /* Get result relation info */ rel_loc_info = GetRelationLocInfo(rte->relid); if (!rel_loc_info) @@ -477,13 +479,62 @@ get_plan_nodes_insert(Query *query) (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("Could not find relation for oid = %d", rte->relid)))); + if (query->jointree != NULL && query->jointree->fromlist != NULL) + { + /* INSERT SELECT suspected */ + + /* We only optimize for when the destination is partitioned */ + if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH) + return; + + /* + * See if it is "single-step" + * Optimize for just known common case with 2 RTE entries + */ + if (query->resultRelation == 1 && query->rtable->length == 2) + { + RangeTblEntry *sub_rte = list_nth(query->rtable, 1); + + /* + * Get step->exec_nodes for the SELECT part of INSERT-SELECT + * to see if it is single-step + */ + if (sub_rte->rtekind == RTE_SUBQUERY && + !sub_rte->subquery->limitCount && + !sub_rte->subquery->limitOffset) + get_plan_nodes(sub_rte->subquery, step, RELATION_ACCESS_READ); + } + + /* Send to general planner if the query is multiple step */ + if (!step->exec_nodes) + return; + + /* If the source is not hash-based (eg, replicated) also send + * through general planner + */ + if (step->exec_nodes->baselocatortype != LOCATOR_TYPE_HASH) + { + step->exec_nodes = NULL; + return; + } + + /* + * If step->exec_nodes is not null, it is single step. + * Continue and check for destination table type cases below + */ + } + + if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH && rel_loc_info->partAttrName != NULL) { + Expr *checkexpr; + TargetEntry *tle = NULL; + /* It is a partitioned table, get value by looking in targetList */ foreach(lc, query->targetList) { - TargetEntry *tle = (TargetEntry *) lfirst(lc); + tle = (TargetEntry *) lfirst(lc); if (tle->resjunk) continue; @@ -493,46 +544,95 @@ get_plan_nodes_insert(Query *query) * designated partitioned column */ if (strcmp(tle->resname, rel_loc_info->partAttrName) == 0) + break; + } + + if (!lc) + { + /* give up */ + step->exec_nodes = NULL; + return; + } + + /* We found the TargetEntry for the partition column */ + checkexpr = tle->expr; + + /* Handle INSERT SELECT case */ + if (query->jointree != NULL && query->jointree->fromlist != NULL) + { + if (IsA(checkexpr,Var)) { - /* We may have a cast, try and handle it */ - Expr *checkexpr = tle->expr; + XCWalkerContext context; + ColumnBase *col_base; + RelationLocInfo *source_rel_loc_info; - if (!IsA(tle->expr, Const)) + /* Look for expression populating partition column */ + InitXCWalkerContext(&context); + context.query = query; + context.rtables = lappend(context.rtables, query->rtable); + col_base = get_base_var((Var*) checkexpr, &context); + + if (!col_base) { - eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) tle->expr); - checkexpr = get_numeric_constant(eval_expr); + step->exec_nodes = NULL; + return; } - if (checkexpr == NULL) - break; /* no constant */ + /* See if it is also a partitioned table */ + source_rel_loc_info = GetRelationLocInfo(col_base->relid); - constant = (Const *) checkexpr; + if (!source_rel_loc_info) + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Could not find relation for oid = %d", rte->relid)))); - if (constant->consttype == INT4OID || - constant->consttype == INT2OID || - constant->consttype == INT8OID) + if (source_rel_loc_info->locatorType == LOCATOR_TYPE_HASH && + strcmp(col_base->colname, source_rel_loc_info->partAttrName) == 0) { - part_value = (long) constant->constvalue; - part_value_ptr = &part_value; - + /* + * Partition columns match, we have a "single-step INSERT SELECT". + * It is OK to use step->exec_nodes + */ + return; } - /* PGXCTODO - handle other data types */ - /* - else - if (constant->consttype == VARCHAR ... - */ } + /* Multi-step INSERT SELECT or some other case. Use general planner */ + step->exec_nodes = NULL; + return; + } + else + { + /* Check for constant */ + + /* We may have a cast, try and handle it */ + if (!IsA(tle->expr, Const)) + { + eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) tle->expr); + checkexpr = get_numeric_constant(eval_expr); + } + + if (checkexpr == NULL) + return; /* no constant */ + + constant = (Const *) checkexpr; + + if (constant->consttype == INT4OID || + constant->consttype == INT2OID || + constant->consttype == INT8OID) + { + part_value = (long) constant->constvalue; + part_value_ptr = &part_value; + } + /* PGXCTODO - handle other data types */ } } /* single call handles both replicated and partitioned types */ - exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr, + step->exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr, RELATION_ACCESS_WRITE); if (eval_expr) pfree(eval_expr); - - return exec_nodes; } @@ -1665,7 +1765,7 @@ get_plan_nodes_command(Query *query, RemoteQuery *step) break; case CMD_INSERT: - step->exec_nodes = get_plan_nodes_insert(query); + get_plan_nodes_insert(query, step); break; case CMD_UPDATE: @@ -2794,24 +2894,28 @@ free_query_step(RemoteQuery *query_step) * If the join between the two RemoteQuery nodes is partitioned - partitioned * it is always reducibile safely, * - * RemoteQuery *innernode - the inner node - * RemoteQuery *outernode - the outer node - * bool *partitioned_replicated - set to true if we have a partitioned-replicated + * RemoteQuery *innernode - the inner node + * RemoteQuery *outernode - the outer node + * List *rtable_list - rtables + * JoinPath *join_path - used to examine join restrictions + * PGXCJoinInfo *join_info - contains info about the join reduction + * join_info->partitioned_replicated is set to true if we have a partitioned-replicated * join. We want to use replicated tables with non-replicated * tables ony once. Only use this value if this function * returns true. */ bool IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, - List *rtable_list, JoinPath *join_path, bool *partitioned_replicated) + List *rtable_list, JoinPath *join_path, JoinReduceInfo *join_info) { XCWalkerContext context; ListCell *cell; bool maybe_reducible = false; bool result = false; - - *partitioned_replicated = false; + Assert(join_info); + join_info->partitioned_replicated = false; + join_info->exec_nodes = NULL; InitXCWalkerContext(&context); context.accessType = RELATION_ACCESS_READ; /* PGXCTODO - determine */ @@ -2819,7 +2923,6 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, context.rtables = lappend(context.rtables, rtable_list); /* add to list of lists */ - foreach(cell, join_path->joinrestrictinfo) { RestrictInfo *node = (RestrictInfo *) lfirst(cell); @@ -2874,7 +2977,7 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, if (pgxc_join->join_type == JOIN_REPLICATED_PARTITIONED) { - *partitioned_replicated = true; + join_info->partitioned_replicated = true; /* * If either of these already have such a join, we do not @@ -2891,6 +2994,18 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode, } } + if (result) + { + /* + * Set exec_nodes from walker if it was set. + * If not, it is replicated and we can use existing + */ + if (context.query_step) + join_info->exec_nodes = copyObject(context.query_step->exec_nodes); + else + join_info->exec_nodes = copyObject(outernode->exec_nodes); + } + return result; } |
