summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/copy.c495
-rw-r--r--src/backend/executor/execMain.c27
-rw-r--r--src/backend/optimizer/plan/createplan.c29
-rw-r--r--src/backend/pgxc/plan/planner.c203
-rw-r--r--src/include/pgxc/planner.h8
5 files changed, 562 insertions, 200 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f139fb696b..578e29d6c1 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -61,6 +61,9 @@ typedef enum CopyDest
COPY_FILE, /* to/from file */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
COPY_NEW_FE /* to/from frontend (3.0 protocol) */
+#ifdef PGXC
+ ,COPY_BUFFER /* Do not send, just prepare */
+#endif
} CopyDest;
/*
@@ -181,6 +184,7 @@ typedef struct CopyStateData
int hash_idx; /* index of the hash column */
PGXCNodeHandle **connections; /* Involved data node connections */
+ TupleDesc tupDesc; /* for INSERT SELECT */
#endif
} CopyStateData;
@@ -301,6 +305,17 @@ static void CopySendInt16(CopyState cstate, int16 val);
static bool CopyGetInt16(CopyState cstate, int16 *val);
+#ifdef PGXC
+static ExecNodes *build_copy_statement(CopyState cstate, List *attnamelist,
+ TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull);
+/*
+ * A kluge here making this static to avoid having to move the
+ * CopyState definition to a header file making it harder to merge
+ * with the vanilla PostgreSQL code
+ */
+static CopyState insertstate;
+#endif
+
/*
* Send copy start/stop messages for frontend copies. These have changed
* in past protocol redesigns.
@@ -487,6 +502,11 @@ CopySendEndOfRow(CopyState cstate)
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+#ifdef PGXC
+ case COPY_BUFFER:
+ /* Do not send yet anywhere, just return */
+ return;
+#endif
}
resetStringInfo(fe_msgbuf);
@@ -598,6 +618,11 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
bytesread += avail;
}
break;
+#ifdef PGXC
+ case COPY_BUFFER:
+ elog(ERROR, "COPY_BUFFER not allowed in this context");
+ break;
+#endif
}
return bytesread;
@@ -1133,155 +1158,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
#ifdef PGXC
- /* Get locator information */
+ /* Get copy statement and execution node information */
if (IS_PGXC_COORDINATOR)
{
- char *hash_att;
-
- exec_nodes = makeNode(ExecNodes);
-
- /*
- * If target table does not exists on nodes (e.g. system table)
- * the location info returned is NULL. This is the criteria, when
- * we need to run Copy on coordinator
- */
- cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel));
-
- hash_att = GetRelationHashColumn(cstate->rel_loc);
- if (cstate->rel_loc)
- {
- if (is_from || hash_att)
- exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList);
- else
- {
- /*
- * Pick up one node only
- * This case corresponds to a replicated table with COPY TO
- */
- exec_nodes->nodelist = GetAnyDataNode();
- }
- }
-
- cstate->hash_idx = -1;
- if (hash_att)
- {
- List *attnums;
- ListCell *cur;
-
- attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
- foreach(cur, attnums)
- {
- int attnum = lfirst_int(cur);
- if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0)
- {
- cstate->hash_idx = attnum - 1;
- break;
- }
- }
- }
-
- /*
- * Build up query string for the data nodes, it should match
- * to original string, but should have STDIN/STDOUT instead
- * of filename.
- */
- initStringInfo(&cstate->query_buf);
-
- appendStringInfoString(&cstate->query_buf, "COPY ");
- appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel));
-
- if (attnamelist)
- {
- ListCell *cell;
- ListCell *prev = NULL;
- appendStringInfoString(&cstate->query_buf, " (");
- foreach (cell, attnamelist)
- {
- if (prev)
- appendStringInfoString(&cstate->query_buf, ", ");
- CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
- prev = cell;
- }
- appendStringInfoChar(&cstate->query_buf, ')');
- }
-
- if (stmt->is_from)
- appendStringInfoString(&cstate->query_buf, " FROM STDIN");
- else
- appendStringInfoString(&cstate->query_buf, " TO STDOUT");
-
-
- if (cstate->binary)
- appendStringInfoString(&cstate->query_buf, " BINARY");
-
- if (cstate->oids)
- appendStringInfoString(&cstate->query_buf, " OIDS");
-
- if (cstate->delim)
- if ((!cstate->csv_mode && cstate->delim[0] != '\t')
- || (cstate->csv_mode && cstate->delim[0] != ','))
- {
- appendStringInfoString(&cstate->query_buf, " DELIMITER AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->delim);
- }
-
- if (cstate->null_print)
- if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N"))
- || (cstate->csv_mode && strcmp(cstate->null_print, "")))
- {
- appendStringInfoString(&cstate->query_buf, " NULL AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->null_print);
- }
-
- if (cstate->csv_mode)
- appendStringInfoString(&cstate->query_buf, " CSV");
-
- /*
- * Only rewrite the header part for COPY FROM,
- * doing that for COPY TO results in multiple headers in output
- */
- if (cstate->header_line && stmt->is_from)
- appendStringInfoString(&cstate->query_buf, " HEADER");
-
- if (cstate->quote && cstate->quote[0] == '"')
- {
- appendStringInfoString(&cstate->query_buf, " QUOTE AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->quote);
- }
-
- if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0])
- {
- appendStringInfoString(&cstate->query_buf, " ESCAPE AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->escape);
- }
-
- if (force_quote)
- {
- ListCell *cell;
- ListCell *prev = NULL;
- appendStringInfoString(&cstate->query_buf, " FORCE QUOTE ");
- foreach (cell, force_quote)
- {
- if (prev)
- appendStringInfoString(&cstate->query_buf, ", ");
- CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
- prev = cell;
- }
- }
-
- if (force_notnull)
- {
- ListCell *cell;
- ListCell *prev = NULL;
- appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL ");
- foreach (cell, force_notnull)
- {
- if (prev)
- appendStringInfoString(&cstate->query_buf, ", ");
- CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
- prev = cell;
- }
- }
+ exec_nodes = build_copy_statement(cstate, attnamelist, tupDesc, is_from, force_quote, force_notnull);
}
#endif
}
@@ -3848,3 +3728,324 @@ CreateCopyDestReceiver(void)
return (DestReceiver *) self;
}
+
+#ifdef PGXC
+/*
+ * Rebuild a COPY statement in cstate and set ExecNodes
+ */
+static ExecNodes*
+build_copy_statement(CopyState cstate, List *attnamelist,
+ TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull)
+{
+ char *hash_att;
+
+
+ ExecNodes *exec_nodes = makeNode(ExecNodes);
+
+ /*
+ * If target table does not exists on nodes (e.g. system table)
+ * the location info returned is NULL. This is the criteria, when
+ * we need to run Copy on coordinator
+ */
+ cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel));
+
+ hash_att = GetRelationHashColumn(cstate->rel_loc);
+ if (cstate->rel_loc)
+ {
+ if (is_from || hash_att)
+ exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList);
+ else
+ {
+ /*
+ * Pick up one node only
+ * This case corresponds to a replicated table with COPY TO
+ */
+ exec_nodes->nodelist = GetAnyDataNode();
+ }
+ }
+
+ cstate->hash_idx = -1;
+ if (hash_att)
+ {
+ List *attnums;
+ ListCell *cur;
+
+ attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
+ foreach(cur, attnums)
+ {
+ int attnum = lfirst_int(cur);
+ if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0)
+ {
+ cstate->hash_idx = attnum - 1;
+ break;
+ }
+ }
+ }
+
+ /*
+ * Build up query string for the data nodes, it should match
+ * to original string, but should have STDIN/STDOUT instead
+ * of filename.
+ */
+ initStringInfo(&cstate->query_buf);
+
+ appendStringInfoString(&cstate->query_buf, "COPY ");
+ appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel));
+
+ if (attnamelist)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&cstate->query_buf, " (");
+ foreach (cell, attnamelist)
+ {
+ if (prev)
+ appendStringInfoString(&cstate->query_buf, ", ");
+ CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ appendStringInfoChar(&cstate->query_buf, ')');
+ }
+
+ if (is_from)
+ appendStringInfoString(&cstate->query_buf, " FROM STDIN");
+ else
+ appendStringInfoString(&cstate->query_buf, " TO STDOUT");
+
+
+ if (cstate->binary)
+ appendStringInfoString(&cstate->query_buf, " BINARY");
+
+ if (cstate->oids)
+ appendStringInfoString(&cstate->query_buf, " OIDS");
+
+ if (cstate->delim)
+ if ((!cstate->csv_mode && cstate->delim[0] != '\t')
+ || (cstate->csv_mode && cstate->delim[0] != ','))
+ {
+ appendStringInfoString(&cstate->query_buf, " DELIMITER AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->delim);
+ }
+
+ if (cstate->null_print)
+ if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N"))
+ || (cstate->csv_mode && strcmp(cstate->null_print, "")))
+ {
+ appendStringInfoString(&cstate->query_buf, " NULL AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->null_print);
+ }
+
+ if (cstate->csv_mode)
+ appendStringInfoString(&cstate->query_buf, " CSV");
+
+ /*
+ * Only rewrite the header part for COPY FROM,
+ * doing that for COPY TO results in multiple headers in output
+ */
+ if (cstate->header_line && is_from)
+ appendStringInfoString(&cstate->query_buf, " HEADER");
+
+ if (cstate->quote && cstate->quote[0] == '"')
+ {
+ appendStringInfoString(&cstate->query_buf, " QUOTE AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->quote);
+ }
+
+ if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0])
+ {
+ appendStringInfoString(&cstate->query_buf, " ESCAPE AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->escape);
+ }
+
+ if (force_quote)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&cstate->query_buf, " FORCE QUOTE ");
+ foreach (cell, force_quote)
+ {
+ if (prev)
+ appendStringInfoString(&cstate->query_buf, ", ");
+ CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ }
+
+ if (force_notnull)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL ");
+ foreach (cell, force_notnull)
+ {
+ if (prev)
+ appendStringInfoString(&cstate->query_buf, ", ");
+ CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ }
+ return exec_nodes;
+}
+
+/*
+ * Use COPY for handling INSERT SELECT
+ * It may be a bit better to use binary mode here, but
+ * we have not implemented binary support for COPY yet.
+ *
+ * We borrow some code from CopyTo and DoCopy here.
+ * We do not refactor them so that it is later easier to remerge
+ * with vanilla PostgreSQL
+ */
+void
+DoInsertSelectCopy(EState *estate, TupleTableSlot *slot)
+{
+ ExecNodes *exec_nodes;
+ HeapTuple tuple;
+ Datum *values;
+ bool *nulls;
+ Datum *hash_value = NULL;
+ MemoryContext oldcontext;
+ CopyState cstate;
+
+
+ Assert(IS_PGXC_COORDINATOR);
+
+ /* See if we need to initialize COPY (first tuple) */
+ if (estate->es_processed == 0)
+ {
+ ListCell *lc;
+ List *attnamelist = NIL;
+ ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
+ Form_pg_attribute *attr;
+
+ oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+ exec_nodes = makeNode(ExecNodes);
+
+ /*
+ * We use the cstate struct here, though we do not need everything
+ * We will just use the properties we are interested in here.
+ */
+ insertstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
+ cstate = insertstate;
+
+ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "COPY TO",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ cstate->rel = resultRelInfo->ri_RelationDesc;
+ cstate->tupDesc = RelationGetDescr(cstate->rel);
+
+ foreach(lc, estate->es_plannedstmt->planTree->targetlist)
+ {
+ TargetEntry *target = (TargetEntry *) lfirst(lc);
+ attnamelist = lappend(attnamelist, makeString(target->resname));
+ }
+ cstate->attnumlist = CopyGetAttnums(cstate->tupDesc, cstate->rel, attnamelist);
+ cstate->null_print_client = cstate->null_print; /* default */
+
+ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
+ cstate->fe_msgbuf = makeStringInfo();
+ attr = cstate->tupDesc->attrs;
+
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(cstate->tupDesc->natts * sizeof(FmgrInfo));
+ foreach(lc, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(lc);
+ Oid out_func_oid;
+ bool isvarlena;
+
+ if (cstate->binary)
+ getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
+ &out_func_oid,
+ &isvarlena);
+ else
+ getTypeOutputInfo(attr[attnum - 1]->atttypid,
+ &out_func_oid,
+ &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /* Set defaults for omitted options */
+ if (!cstate->delim)
+ cstate->delim = cstate->csv_mode ? "," : "\t";
+
+ if (!cstate->null_print)
+ cstate->null_print = cstate->csv_mode ? "" : "\\N";
+ cstate->null_print_len = strlen(cstate->null_print);
+
+ if (cstate->csv_mode)
+ {
+ if (!cstate->quote)
+ cstate->quote = "\"";
+ if (!cstate->escape)
+ cstate->escape = cstate->quote;
+ }
+
+ exec_nodes = build_copy_statement(cstate, attnamelist,
+ cstate->tupDesc, true, NULL, NULL);
+
+ cstate->connections = DataNodeCopyBegin(cstate->query_buf.data,
+ exec_nodes->nodelist,
+ GetActiveSnapshot(),
+ true);
+
+ if (!cstate->connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("Failed to initialize data nodes for COPY")));
+
+ cstate->copy_dest = COPY_BUFFER;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ cstate = insertstate;
+
+ values = (Datum *) palloc(cstate->tupDesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(cstate->tupDesc->natts * sizeof(bool));
+
+ /* Process Tuple */
+ /* We need to format the line for sending to data nodes */
+ tuple = ExecMaterializeSlot(slot);
+
+ /* Deconstruct the tuple ... faster than repeated heap_getattr */
+ heap_deform_tuple(tuple, cstate->tupDesc, values, nulls);
+
+ /* Format the input tuple for sending */
+ CopyOneRowTo(cstate, 0, values, nulls);
+
+ /* Get hash partition column, if any */
+ if (cstate->hash_idx >= 0 && !nulls[cstate->hash_idx])
+ hash_value = &values[cstate->hash_idx];
+
+ /* Send item to the appropriate data node(s) (buffer) */
+ if (DataNodeCopyIn(cstate->fe_msgbuf->data,
+ cstate->fe_msgbuf->len,
+ GetRelationNodes(cstate->rel_loc, (long *)hash_value, RELATION_ACCESS_WRITE),
+ cstate->connections))
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("Copy failed on a data node")));
+
+ resetStringInfo(cstate->fe_msgbuf);
+ estate->es_processed++;
+}
+
+/*
+ *
+ */
+void
+EndInsertSelectCopy(void)
+{
+ Assert(IS_PGXC_COORDINATOR);
+
+ DataNodeCopyFinish(
+ insertstate->connections,
+ primary_data_node,
+ COMBINE_TYPE_NONE);
+ pfree(insertstate->connections);
+ MemoryContextDelete(insertstate->rowcontext);
+}
+#endif
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index e07760356c..1ed43564c1 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -61,6 +61,7 @@
#include "utils/tqual.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
+#include "commands/copy.h"
#endif
/* Hooks for plugins to get control in ExecutorStart/Run/End() */
@@ -1685,6 +1686,26 @@ lnext: ;
break;
case CMD_INSERT:
+#ifdef PGXC
+ /*
+ * If we get here on the Coordinator, we may have INSERT SELECT
+ * To handle INSERT SELECT, we use COPY to send down the nodes
+ */
+ if (IS_PGXC_COORDINATOR && IsA(planstate, ResultState))
+ {
+ PG_TRY();
+ {
+ DoInsertSelectCopy(estate, slot);
+ }
+ PG_CATCH();
+ {
+ EndInsertSelectCopy();
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+ else
+#endif
ExecInsert(slot, tupleid, planSlot, dest, estate);
break;
@@ -1712,6 +1733,12 @@ lnext: ;
break;
}
+#ifdef PGXC
+ /* See if we need to close a COPY started for INSERT SELECT */
+ if (IS_PGXC_COORDINATOR && operation == CMD_INSERT && IsA(planstate, ResultState))
+ EndInsertSelectCopy();
+#endif
+
/*
* Process AFTER EACH STATEMENT triggers
*/
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 30084633e4..bebb81a276 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -630,6 +630,7 @@ static Plan *
create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Plan *outer_plan, Plan *inner_plan)
{
NestLoop *nest_parent;
+ JoinReduceInfo join_info;
if (!enable_remotejoin)
return parent;
@@ -638,10 +639,6 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla
if (root->hasPseudoConstantQuals)
return parent;
- /* Works only for SELECT commands right now */
- if (root->parse->commandType != CMD_SELECT)
- return parent;
-
/* do not optimize CURSOR based select statements */
if (root->parse->rowMarks != NIL)
return parent;
@@ -664,7 +661,6 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla
{
int i;
List *rtable_list = NIL;
- bool partitioned_replicated_join = false;
Material *outer_mat = (Material *)outer_plan;
Material *inner_mat = (Material *)inner_plan;
@@ -692,7 +688,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla
}
/* XXX Check if the join optimization is possible */
- if (IsJoinReducible(inner, outer, rtable_list, best_path, &partitioned_replicated_join))
+ if (IsJoinReducible(inner, outer, rtable_list, best_path, &join_info))
{
RemoteQuery *result;
Plan *result_plan;
@@ -829,6 +825,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla
result->outer_reduce_level = outer->reduce_level;
result->inner_relids = in_relids;
result->outer_relids = out_relids;
+ result->exec_nodes = copyObject(join_info.exec_nodes);
appendStringInfo(&fromlist, " %s (%s) %s",
pname, inner->sql_statement, quote_identifier(in_alias));
@@ -917,8 +914,7 @@ create_remotejoin_plan(PlannerInfo *root, JoinPath *best_path, Plan *parent, Pla
/* set_plan_refs needs this later */
result->base_tlist = base_tlist;
result->relname = "__FOREIGN_QUERY__";
-
- result->partitioned_replicated = partitioned_replicated_join;
+ result->partitioned_replicated = join_info.partitioned_replicated;
/*
* if there were any local scan clauses stick them up here. They
@@ -2233,6 +2229,8 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path,
TupleDesc tupdesc;
bool first;
StringInfoData sql;
+ RelationLocInfo *rel_loc_info;
+
Assert(scan_relid > 0);
rte = planner_rt_fetch(scan_relid, root);
@@ -2393,6 +2391,21 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path,
scan_plan->sql_statement = sql.data;
+ /*
+ * Populate what nodes we execute on.
+ * This is still basic, and was done to make sure we do not select
+ * a replicated table from all nodes.
+ * It does not take into account conditions on partitioned relations
+ * that could reduce to one node. To do that, we need to move general
+ * planning earlier.
+ */
+ rel_loc_info = GetRelationLocInfo(rte->relid);
+ scan_plan->exec_nodes = makeNode(ExecNodes);
+ scan_plan->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
+ scan_plan->exec_nodes->baselocatortype = rel_loc_info->locatorType;
+ scan_plan->exec_nodes = GetRelationNodes(rel_loc_info,
+ NULL,
+ RELATION_ACCESS_READ);
copy_path_costsize(&scan_plan->scan.plan, best_path);
/* PGXCTODO - get better estimates */
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 02b863a4c1..4c677aa83c 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -441,35 +441,37 @@ get_base_var(Var *var, XCWalkerContext *context)
/*
* get_plan_nodes_insert - determine nodes on which to execute insert.
+ *
+ * We handle INSERT ... VALUES.
+ * If we have INSERT SELECT, we try and see if it is patitioned-based
+ * inserting into a partitioned-based.
+ *
+ * We set step->exec_nodes if we determine the single-step execution
+ * nodes. If it is still NULL after returning from this function,
+ * then the caller should use the regular PG planner
*/
-static ExecNodes *
-get_plan_nodes_insert(Query *query)
+static void
+get_plan_nodes_insert(Query *query, RemoteQuery *step)
{
RangeTblEntry *rte;
RelationLocInfo *rel_loc_info;
Const *constant;
- ExecNodes *exec_nodes;
ListCell *lc;
long part_value;
long *part_value_ptr = NULL;
Expr *eval_expr = NULL;
- /* Looks complex (correlated?) - best to skip */
- if (query->jointree != NULL && query->jointree->fromlist != NULL)
- return NULL;
- /* Make sure there is just one table */
- if (query->rtable == NULL)
- return NULL;
+ step->exec_nodes = NULL;
rte = (RangeTblEntry *) list_nth(query->rtable, query->resultRelation - 1);
-
if (rte != NULL && rte->rtekind != RTE_RELATION)
/* Bad relation type */
- return NULL;
+ return;
- /* See if we have the partitioned case. */
+
+ /* Get result relation info */
rel_loc_info = GetRelationLocInfo(rte->relid);
if (!rel_loc_info)
@@ -477,13 +479,62 @@ get_plan_nodes_insert(Query *query)
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("Could not find relation for oid = %d", rte->relid))));
+ if (query->jointree != NULL && query->jointree->fromlist != NULL)
+ {
+ /* INSERT SELECT suspected */
+
+ /* We only optimize for when the destination is partitioned */
+ if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH)
+ return;
+
+ /*
+ * See if it is "single-step"
+ * Optimize for just known common case with 2 RTE entries
+ */
+ if (query->resultRelation == 1 && query->rtable->length == 2)
+ {
+ RangeTblEntry *sub_rte = list_nth(query->rtable, 1);
+
+ /*
+ * Get step->exec_nodes for the SELECT part of INSERT-SELECT
+ * to see if it is single-step
+ */
+ if (sub_rte->rtekind == RTE_SUBQUERY &&
+ !sub_rte->subquery->limitCount &&
+ !sub_rte->subquery->limitOffset)
+ get_plan_nodes(sub_rte->subquery, step, RELATION_ACCESS_READ);
+ }
+
+ /* Send to general planner if the query is multiple step */
+ if (!step->exec_nodes)
+ return;
+
+ /* If the source is not hash-based (eg, replicated) also send
+ * through general planner
+ */
+ if (step->exec_nodes->baselocatortype != LOCATOR_TYPE_HASH)
+ {
+ step->exec_nodes = NULL;
+ return;
+ }
+
+ /*
+ * If step->exec_nodes is not null, it is single step.
+ * Continue and check for destination table type cases below
+ */
+ }
+
+
if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH &&
rel_loc_info->partAttrName != NULL)
{
+ Expr *checkexpr;
+ TargetEntry *tle = NULL;
+
/* It is a partitioned table, get value by looking in targetList */
foreach(lc, query->targetList)
{
- TargetEntry *tle = (TargetEntry *) lfirst(lc);
+ tle = (TargetEntry *) lfirst(lc);
if (tle->resjunk)
continue;
@@ -493,46 +544,95 @@ get_plan_nodes_insert(Query *query)
* designated partitioned column
*/
if (strcmp(tle->resname, rel_loc_info->partAttrName) == 0)
+ break;
+ }
+
+ if (!lc)
+ {
+ /* give up */
+ step->exec_nodes = NULL;
+ return;
+ }
+
+ /* We found the TargetEntry for the partition column */
+ checkexpr = tle->expr;
+
+ /* Handle INSERT SELECT case */
+ if (query->jointree != NULL && query->jointree->fromlist != NULL)
+ {
+ if (IsA(checkexpr,Var))
{
- /* We may have a cast, try and handle it */
- Expr *checkexpr = tle->expr;
+ XCWalkerContext context;
+ ColumnBase *col_base;
+ RelationLocInfo *source_rel_loc_info;
- if (!IsA(tle->expr, Const))
+ /* Look for expression populating partition column */
+ InitXCWalkerContext(&context);
+ context.query = query;
+ context.rtables = lappend(context.rtables, query->rtable);
+ col_base = get_base_var((Var*) checkexpr, &context);
+
+ if (!col_base)
{
- eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) tle->expr);
- checkexpr = get_numeric_constant(eval_expr);
+ step->exec_nodes = NULL;
+ return;
}
- if (checkexpr == NULL)
- break; /* no constant */
+ /* See if it is also a partitioned table */
+ source_rel_loc_info = GetRelationLocInfo(col_base->relid);
- constant = (Const *) checkexpr;
+ if (!source_rel_loc_info)
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("Could not find relation for oid = %d", rte->relid))));
- if (constant->consttype == INT4OID ||
- constant->consttype == INT2OID ||
- constant->consttype == INT8OID)
+ if (source_rel_loc_info->locatorType == LOCATOR_TYPE_HASH &&
+ strcmp(col_base->colname, source_rel_loc_info->partAttrName) == 0)
{
- part_value = (long) constant->constvalue;
- part_value_ptr = &part_value;
-
+ /*
+ * Partition columns match, we have a "single-step INSERT SELECT".
+ * It is OK to use step->exec_nodes
+ */
+ return;
}
- /* PGXCTODO - handle other data types */
- /*
- else
- if (constant->consttype == VARCHAR ...
- */
}
+ /* Multi-step INSERT SELECT or some other case. Use general planner */
+ step->exec_nodes = NULL;
+ return;
+ }
+ else
+ {
+ /* Check for constant */
+
+ /* We may have a cast, try and handle it */
+ if (!IsA(tle->expr, Const))
+ {
+ eval_expr = (Expr *) eval_const_expressions(NULL, (Node *) tle->expr);
+ checkexpr = get_numeric_constant(eval_expr);
+ }
+
+ if (checkexpr == NULL)
+ return; /* no constant */
+
+ constant = (Const *) checkexpr;
+
+ if (constant->consttype == INT4OID ||
+ constant->consttype == INT2OID ||
+ constant->consttype == INT8OID)
+ {
+ part_value = (long) constant->constvalue;
+ part_value_ptr = &part_value;
+ }
+ /* PGXCTODO - handle other data types */
}
}
/* single call handles both replicated and partitioned types */
- exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr,
+ step->exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr,
RELATION_ACCESS_WRITE);
if (eval_expr)
pfree(eval_expr);
-
- return exec_nodes;
}
@@ -1665,7 +1765,7 @@ get_plan_nodes_command(Query *query, RemoteQuery *step)
break;
case CMD_INSERT:
- step->exec_nodes = get_plan_nodes_insert(query);
+ get_plan_nodes_insert(query, step);
break;
case CMD_UPDATE:
@@ -2794,24 +2894,28 @@ free_query_step(RemoteQuery *query_step)
* If the join between the two RemoteQuery nodes is partitioned - partitioned
* it is always reducibile safely,
*
- * RemoteQuery *innernode - the inner node
- * RemoteQuery *outernode - the outer node
- * bool *partitioned_replicated - set to true if we have a partitioned-replicated
+ * RemoteQuery *innernode - the inner node
+ * RemoteQuery *outernode - the outer node
+ * List *rtable_list - rtables
+ * JoinPath *join_path - used to examine join restrictions
+ * PGXCJoinInfo *join_info - contains info about the join reduction
+ * join_info->partitioned_replicated is set to true if we have a partitioned-replicated
* join. We want to use replicated tables with non-replicated
* tables ony once. Only use this value if this function
* returns true.
*/
bool
IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode,
- List *rtable_list, JoinPath *join_path, bool *partitioned_replicated)
+ List *rtable_list, JoinPath *join_path, JoinReduceInfo *join_info)
{
XCWalkerContext context;
ListCell *cell;
bool maybe_reducible = false;
bool result = false;
-
- *partitioned_replicated = false;
+ Assert(join_info);
+ join_info->partitioned_replicated = false;
+ join_info->exec_nodes = NULL;
InitXCWalkerContext(&context);
context.accessType = RELATION_ACCESS_READ; /* PGXCTODO - determine */
@@ -2819,7 +2923,6 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode,
context.rtables = lappend(context.rtables, rtable_list); /* add to list of lists */
-
foreach(cell, join_path->joinrestrictinfo)
{
RestrictInfo *node = (RestrictInfo *) lfirst(cell);
@@ -2874,7 +2977,7 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode,
if (pgxc_join->join_type == JOIN_REPLICATED_PARTITIONED)
{
- *partitioned_replicated = true;
+ join_info->partitioned_replicated = true;
/*
* If either of these already have such a join, we do not
@@ -2891,6 +2994,18 @@ IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode,
}
}
+ if (result)
+ {
+ /*
+ * Set exec_nodes from walker if it was set.
+ * If not, it is replicated and we can use existing
+ */
+ if (context.query_step)
+ join_info->exec_nodes = copyObject(context.query_step->exec_nodes);
+ else
+ join_info->exec_nodes = copyObject(outernode->exec_nodes);
+ }
+
return result;
}
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index 7284ef7432..bb1f934be9 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -168,6 +168,12 @@ typedef struct
StringInfoData valuebuf;
} SimpleAgg;
+typedef struct
+{
+ bool partitioned_replicated;
+ ExecNodes *exec_nodes;
+} JoinReduceInfo;
+
/* forbid SQL if unsafe, useful to turn off for development */
extern bool StrictStatementChecking;
@@ -181,6 +187,6 @@ extern bool IsHashDistributable(Oid col_type);
extern bool is_immutable_func(Oid funcid);
extern bool IsJoinReducible(RemoteQuery *innernode, RemoteQuery *outernode,
- List *rtable_list, JoinPath *join_path, bool *partitioned_replicated);
+ List *rtable_list, JoinPath *join_path, JoinReduceInfo *join_info);
#endif /* PGXCPLANNER_H */