summaryrefslogtreecommitdiff
path: root/src/backend/optimizer
diff options
context:
space:
mode:
authorPavan Deolasee2011-12-14 04:05:53 +0000
committerPavan Deolasee2011-12-14 04:05:53 +0000
commit8a05756a702051d55a35ec3f4953f381f977b53a (patch)
treea3cae66eac03e14f184142e14e25c14b4f6e075c /src/backend/optimizer
parenta53dd68ce6ab8a66ae7b4f7487b3b1aa78423894 (diff)
Implement support for CREATE TABLE AS, SELECT INTO and INSERT INTO
statements. We start by fixing the INSERT INTO support. For every result relation, we now build a corresponding RemoteQuery node so that the inserts can be carried out at the remote datanodes. Subsequently, at the coordinator at execution time, instead of inserting the resulting tuples in a local heap, we invoke remote execution and insert the rows in the remote datanodes. This works nicely even for prepared queries, multiple values clause for insert as well as any other mechanism of generating tuples. We use this infrastructure to then support CREATE TABLE AS SELECT (CTAS). The query is transformed into a CREATE TABLE statement followed by INSERT INTO statement and then run through normal planning/execution. There are many regression cases that need fixing because these statements now work correctly. This patch fixes many of them. Few might still be failing, but they seem unrelated to the work itself and might be a side-effect. We will fix them once this patch gets in.
Diffstat (limited to 'src/backend/optimizer')
-rw-r--r--src/backend/optimizer/plan/createplan.c175
1 files changed, 174 insertions, 1 deletions
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index b1d8b44839..4f152eb318 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -37,6 +37,7 @@
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
#ifdef PGXC
+#include "access/gtm.h"
#include "pgxc/pgxc.h"
#include "pgxc/planner.h"
#include "pgxc/postgresql_fdw.h"
@@ -5363,11 +5364,183 @@ findReferencedVars(List *parent_vars, Plan *plan, List **out_tlist, Relids *out_
/*
* create_remoteinsert_plan()
*
- * Dummy
+ * For every target relation, add a remote query node to carry out remote
+ * operations.
+ *
+ * !!PGXCTODO We should also fix the create_remotedelete_plan to work on similar
+ * lines. Right now, it seems to assume only one result relation, which
+ * certainly does not look true for inherited tables. But then, we can work on
+ * this (and fix this comment) when we test and add support for inherited
+ * tables
*/
Plan *
create_remoteinsert_plan(PlannerInfo *root, Plan *topplan)
{
+ ModifyTable *mt = (ModifyTable *)topplan;
+ ListCell *l;
+
+ /* We expect to work only on ModifyTable node */
+ if (!IsA(topplan, ModifyTable))
+ elog(ERROR, "Unexpected node type: %d", topplan->type);
+
+ /*
+ * For every result relation, build a remote plan to execute remote insert.
+ */
+ foreach(l, mt->resultRelations)
+ {
+ Index resultRelationIndex = lfirst_int(l);
+ RangeTblEntry *ttab;
+ RelationLocInfo *rel_loc_info;
+ StringInfo buf;
+ RemoteQuery *fstep;
+ Oid nspid;
+ char *nspname;
+ int natts, att;
+ Var *var;
+ Oid *att_types;
+
+
+ ttab = rt_fetch(resultRelationIndex, root->parse->rtable);
+
+ /* Bad relation ? */
+ if (ttab == NULL || ttab->rtekind != RTE_RELATION)
+ continue;
+
+ /* Get location info of the target table */
+ rel_loc_info = GetRelationLocInfo(ttab->relid);
+ if (rel_loc_info == NULL)
+ continue;
+
+ buf = makeStringInfo();
+
+ /* Compose DELETE FROM target_table */
+ nspid = get_rel_namespace(ttab->relid);
+ nspname = get_namespace_name(nspid);
+
+ /*
+ * Do not qualify with namespace for TEMP tables. The schema name may
+ * vary on each node
+ */
+ if (IsTempTable(ttab->relid))
+ appendStringInfo(buf, "INSERT INTO %s (",
+ quote_identifier(ttab->relname));
+ else
+ appendStringInfo(buf, "INSERT INTO %s.%s (", quote_identifier(nspname),
+ quote_identifier(ttab->relname));
+
+ fstep = make_remotequery(NIL, ttab, NIL, ttab->relid);
+ fstep->is_temp = IsTempTable(ttab->relid);
+
+ natts = get_relnatts(ttab->relid);
+ att_types = (Oid *) palloc0 (sizeof (Oid) * natts);
+
+ /*
+ * Populate the column information
+ */
+ for (att = 1; att <= natts; att++)
+ {
+ HeapTuple tp;
+
+ tp = SearchSysCache(ATTNUM,
+ ObjectIdGetDatum(ttab->relid),
+ Int16GetDatum(att),
+ 0, 0);
+ if (HeapTupleIsValid(tp))
+ {
+ Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp);
+
+ /* Add comma before all except first attributes */
+ if (att > 1)
+ appendStringInfoString(buf, ", ");
+
+ att_types[att - 1] = att_tup->atttypid;
+ appendStringInfoString(buf, quote_identifier(NameStr(att_tup->attname)));
+
+ ReleaseSysCache(tp);
+ }
+ else
+ elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+ att, ttab->relid);
+ }
+
+ appendStringInfoString(buf, ") VALUES (");
+
+ /*
+ * Create parameterized statement. The values will be filled at the run
+ * time
+ */
+ for (att = 1; att <= natts; att++)
+ {
+ if (att > 1)
+ appendStringInfoString(buf, ", ");
+
+ appendStringInfo(buf, "$%d", att);
+ }
+
+ appendStringInfoString(buf, ")");
+
+ fstep->sql_statement = pstrdup(buf->data);
+
+ /* Processed rows are counted by the main planner */
+ fstep->combine_type = COMBINE_TYPE_NONE;
+
+ fstep->read_only = false;
+ fstep->exec_nodes = makeNode(ExecNodes);
+ fstep->exec_nodes->baselocatortype = rel_loc_info->locatorType;
+ fstep->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
+ fstep->exec_nodes->primarynodelist = NULL;
+ fstep->exec_nodes->nodeList = NULL;
+ fstep->exec_nodes->en_relid = ttab->relid;
+ fstep->exec_nodes->accesstype = RELATION_ACCESS_INSERT;
+
+ /*
+ * For hash/modulo distributed tables, the target node must be selected
+ * at the execution time based on the partition column value.
+ *
+ * For round robin distributed tables, tuples must be divided equally
+ * between the nodes.
+ *
+ * For replicated tables, tuple must be inserted in all the data nodes
+ *
+ * XXX Need further testing for replicated and round-robin tables
+ */
+ if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH ||
+ rel_loc_info->locatorType == LOCATOR_TYPE_MODULO)
+ {
+ HeapTuple tp;
+ Form_pg_attribute partAttrTup;
+
+ tp = SearchSysCache(ATTNUM,
+ ObjectIdGetDatum(ttab->relid),
+ Int16GetDatum(rel_loc_info->partAttrNum),
+ 0, 0);
+ partAttrTup = (Form_pg_attribute) GETSTRUCT(tp);
+
+ /*
+ * Create a Var for the distribution column and set it for
+ * execution time evaluation of target node. ExecEvalVar() picks
+ * up values from ecxt_scantuple if Var does not refer either OUTER
+ * or INNER varno. We utilize that mechanism to pick up values from
+ * the tuple returned by the current plan node
+ */
+ var = makeVar(resultRelationIndex, rel_loc_info->partAttrNum,
+ partAttrTup->atttypid,
+ partAttrTup->atttypmod,
+ partAttrTup->attcollation,
+ 0);
+ ReleaseSysCache(tp);
+
+ fstep->exec_nodes->en_expr = (Expr *) var;
+ }
+
+ SetRemoteStatementName(fstep, NULL, natts, att_types, 0);
+
+ pfree(buf->data);
+ pfree(buf);
+
+ mt->remote_plans = lappend(mt->remote_plans, fstep);
+ }
+
return topplan;
}