summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAbbas2011-12-19 17:24:58 +0000
committerAbbas2011-12-19 17:24:58 +0000
commit6e6c62c81747e0c446d391152b137dd67496e9d3 (patch)
tree062705849e5ccd0cb34e5fdb51836b17988afb92 /src
parentaeab6bbfc39ccccf4be2bc29c1485c46a2fb7613 (diff)
Add support for UPDATE/DELETE WHERE CURRENT OF
This patch adds support for UPDATE/DELETE using WHERE CURRENT OF In order to identify rows with in a data node ctid is used and in order to find the data node itself data node name is used. Both ctid and data node name are added to the target list of the cursor query and when the user issues an UPDATE/DELETE using WHERE CURRENT OF, we identify the data node using node name and row using ctid. One limitation of current implementation is that it does not support successive UPDATEs or an UPDATE followed by a DELETE because the first UPDATE changes the ctid.
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/execCurrent.c23
-rw-r--r--src/backend/parser/parse_expr.c6
-rw-r--r--src/backend/parser/parse_relation.c8
-rw-r--r--src/backend/pgxc/plan/planner.c788
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c35
-rw-r--r--src/include/catalog/pg_proc.h2
-rw-r--r--src/include/executor/executor.h4
-rw-r--r--src/include/parser/parse_relation.h4
-rw-r--r--src/include/pgxc/pgxcnode.h1
-rw-r--r--src/include/utils/builtins.h3
-rw-r--r--src/test/regress/sql/portals.sql20
11 files changed, 574 insertions, 320 deletions
diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c
index 5312d9aeec..0f8bd18f2f 100644
--- a/src/backend/executor/execCurrent.c
+++ b/src/backend/executor/execCurrent.c
@@ -19,10 +19,15 @@
#include "utils/lsyscache.h"
#include "utils/portal.h"
+#ifdef PGXC
+#include "pgxc/execRemote.h"
+#endif
static char *fetch_cursor_param_value(ExprContext *econtext, int paramId);
-static ScanState *search_plan_tree(PlanState *node, Oid table_oid);
+#ifndef PGXC
+static ScanState *search_plan_tree(PlanState *node, Oid table_oid);
+#endif
/*
* execCurrentOf
@@ -249,13 +254,26 @@ fetch_cursor_param_value(ExprContext *econtext, int paramId)
* Search through a PlanState tree for a scan node on the specified table.
* Return NULL if not found or multiple candidates.
*/
+#ifdef PGXC
+ScanState *
+search_plan_tree(PlanState *node, Oid table_oid)
+#else
static ScanState *
search_plan_tree(PlanState *node, Oid table_oid)
+#endif
{
if (node == NULL)
return NULL;
switch (nodeTag(node))
{
+#ifdef PGXC
+ case T_RemoteQueryState:
+ {
+ RemoteQueryState *rqs = (RemoteQueryState *) node;
+ ScanState *sstate = &(rqs->ss);
+ return sstate;
+ }
+#endif
/*
* scan nodes can all be treated alike
*/
@@ -322,6 +340,9 @@ search_plan_tree(PlanState *node, Oid table_oid)
* Result and Limit can be descended through (these are safe
* because they always return their input's current row)
*/
+#ifdef PGXC
+ case T_MaterialState:
+#endif
case T_ResultState:
case T_LimitState:
return search_plan_tree(node->lefttree, table_oid);
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index bc73bc2c2b..125bfd5eff 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -2018,12 +2018,6 @@ transformCurrentOfExpr(ParseState *pstate, CurrentOfExpr *cexpr)
{
int sublevels_up;
-#ifdef PGXC
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("WHERE CURRENT OF clause not yet supported"))));
-#endif
-
/* CURRENT OF can only appear at top level of UPDATE/DELETE */
Assert(pstate->p_target_rangetblentry != NULL);
cexpr->cvarno = RTERangeTablePosn(pstate,
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index 3bc4bd2f2c..bb7c3e2ded 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -46,8 +46,9 @@ static void expandTupleDesc(TupleDesc tupdesc, Alias *eref,
int rtindex, int sublevels_up,
int location, bool include_dropped,
List **colnames, List **colvars);
+#ifndef PGXC
static int specialAttNum(const char *attname);
-
+#endif
/*
* refnameRangeTblEntry
@@ -2363,8 +2364,13 @@ attnameAttNum(Relation rd, const char *attname, bool sysColOK)
* Caller needs to verify that it really is an attribute of the rel,
* at least in the case of "oid", which is now optional.
*/
+#ifdef PGXC
+int
+specialAttNum(const char *attname)
+#else
static int
specialAttNum(const char *attname)
+#endif
{
Form_pg_attribute sysatt;
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 277462a253..a5bfa3c3fe 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -20,6 +20,7 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
+#include "catalog/pgxc_node.h"
#include "executor/executor.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
@@ -32,6 +33,10 @@
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
+#include "parser/parse_func.h"
+#include "parser/parse_relation.h"
+#include "parser/parsetree.h"
+#include "parser/parse_oper.h"
#include "pgxc/execRemote.h"
#include "pgxc/pgxc.h"
#include "pgxc/locator.h"
@@ -46,6 +51,7 @@
#include "utils/portal.h"
#include "utils/syscache.h"
#include "utils/numeric.h"
+#include "utils/memutils.h"
#include "access/hash.h"
#include "commands/tablecmds.h"
#include "utils/timestamp.h"
@@ -144,18 +150,18 @@ typedef struct ColumnBase
*/
typedef struct XCWalkerContext
{
- Query *query;
- RelationAccessType accessType;
- RemoteQuery *query_step; /* remote query step being analized */
- PlannerInfo *root; /* planner data for the subquery */
- Special_Conditions *conditions;
- bool multilevel_join;
- List *rtables; /* a pointer to a list of rtables */
- int varno;
- bool within_or;
- bool within_not;
- bool exec_on_coord; /* fallback to standard planner to have plan executed on coordinator only */
- List *join_list; /* A list of List*'s, one for each relation. */
+ Query *query;
+ RelationAccessType accessType;
+ RemoteQuery *query_step; /* remote query step being analized */
+ PlannerInfo *root; /* planner data for the subquery */
+ Special_Conditions *conditions;
+ bool multilevel_join;
+ List *rtables; /* a pointer to a list of rtables */
+ int varno;
+ bool within_or;
+ bool within_not;
+ bool exec_on_coord; /* fallback to standard planner to have plan executed on coordinator only */
+ List *join_list; /* A list of List*'s, one for each relation. */
} XCWalkerContext;
@@ -718,310 +724,398 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step)
pfree(eval_expr);
}
-
/*
- * examine_conditions_walker
- *
- * Examine conditions and find special ones to later help us determine
- * what tables can be joined together. Put findings in Special_Conditions
- * struct.
+ * make_ctid_col_ref
*
- * Get list of constant comparisons conditions on partitioned column
- * Get list of parent-child joins (partitioned together)
- * Get list of joins with replicated tables
+ * creates a Var for a column referring to ctid
+ */
+
+static Var *
+make_ctid_col_ref(Query *qry)
+{
+ ListCell *lc1, *lc2;
+ RangeTblEntry *rte1, *rte2;
+ int tableRTEs, firstTableRTENumber;
+ RangeTblEntry *rte_in_query;
+ AttrNumber attnum;
+ Oid vartypeid;
+ int32 type_mod;
+ Oid varcollid;
+
+ /* If the query has more than 1 table RTEs where both are different, we can not add ctid to the query target list
+ * We should in this case skip adding it to the target list and a WHERE CURRENT OF should then
+ * fail saying the query is not a simply update able scan of table
+ */
+
+ tableRTEs = 0;
+ foreach(lc1, qry->rtable)
+ {
+ rte1 = (RangeTblEntry *) lfirst(lc1);
+
+ if (rte1->rtekind == RTE_RELATION)
+ {
+ tableRTEs++;
+ if (tableRTEs > 1)
+ {
+ /* See if we get two RTEs in case we have two references
+ * to the same table with different aliases
+ */
+ foreach(lc2, qry->rtable)
+ {
+ rte2 = (RangeTblEntry *) lfirst(lc2);
+
+ if (rte2->rtekind == RTE_RELATION)
+ {
+ if (rte2->relid != rte1->relid)
+ {
+ return NULL;
+ }
+ }
+ }
+ continue;
+ }
+ rte_in_query = rte1;
+ }
+ }
+
+ if (tableRTEs > 1)
+ {
+ firstTableRTENumber = 0;
+ foreach(lc1, qry->rtable)
+ {
+ rte1 = (RangeTblEntry *) lfirst(lc1);
+ firstTableRTENumber++;
+ if (rte1->rtekind == RTE_RELATION)
+ {
+ break;
+ }
+ }
+ }
+ else
+ {
+ firstTableRTENumber = 1;
+ }
+
+ attnum = specialAttNum("ctid");
+ get_rte_attribute_type(rte_in_query, attnum, &vartypeid, &type_mod, &varcollid);
+ return makeVar(firstTableRTENumber, attnum, vartypeid, type_mod, varcollid, 0);
+}
+
+/*
+ * make_ctid_const
*
- * If we encounter an expression such as a cross-node join that cannot
- * be easily handled in a single step, we stop processing and return true,
- * otherwise false.
+ * creates a Const expression representing a ctid value (?,?)
+ */
+
+static Const *
+make_ctid_const(char *ctid_string)
+{
+ Datum val;
+ Const *ctid_const;
+
+ val = PointerGetDatum(ctid_string);
+
+ ctid_const = makeConst(UNKNOWNOID,
+ -1,
+ InvalidOid,
+ -2,
+ val,
+ false,
+ false);
+ ctid_const->location = -1;
+ return ctid_const;
+}
+
+/*
+ * IsRelSame
*
+ * Does the two query trees have a common relation
*/
+
static bool
-examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
+IsRelSame(List *upd_qry_rte, List *sel_qry_rte)
{
- RelationLocInfo *rel_loc_info1,
- *rel_loc_info2;
- Const *constant;
- Expr *checkexpr;
- bool result = false;
- bool is_and = false;
+ ListCell *lc1, *lc2;
+ RangeTblEntry *rte1, *rte2;
- Assert(context);
+ foreach(lc1, upd_qry_rte)
+ {
+ rte1 = (RangeTblEntry *) lfirst(lc1);
- if (expr_node == NULL)
- return false;
+ if (rte1->rtekind == RTE_RELATION)
+ {
+ foreach(lc2, sel_qry_rte)
+ {
+ rte2 = (RangeTblEntry *) lfirst(lc2);
+
+ if (rte2->rtekind == RTE_RELATION)
+ {
+ if (rte2->relid == rte1->relid)
+ {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
- if (!context->rtables)
- return true;
+/*
+ * pgxc_handle_current_of
+ *
+ * Handles UPDATE/DELETE WHERE CURRENT OF
+ */
- if (!context->conditions)
- context->conditions = new_special_conditions();
+static bool
+pgxc_handle_current_of(Node *expr_node, XCWalkerContext *context)
+{
+ /* Find referenced portal and figure out what was the last fetch node */
+ Portal portal;
+ QueryDesc *queryDesc;
+ CurrentOfExpr *cexpr = (CurrentOfExpr *) expr_node;
+ char *cursor_name = cexpr->cursor_name;
+ PlanState *ps;
+ TupleTableSlot *slot;
+ RangeTblEntry *table = (RangeTblEntry *) linitial(context->query->rtable);
+ ScanState *ss;
+
+ /* Find the cursor's portal */
+ portal = GetPortalByName(cursor_name);
+ if (!PortalIsValid(portal))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_CURSOR),
+ errmsg("cursor \"%s\" does not exist", cursor_name)));
- /* Handle UPDATE/DELETE ... WHERE CURRENT OF ... */
- if (IsA(expr_node, CurrentOfExpr))
- {
- /* Find referenced portal and figure out what was the last fetch node */
- Portal portal;
- QueryDesc *queryDesc;
- CurrentOfExpr *cexpr = (CurrentOfExpr *) expr_node;
- char *cursor_name = cexpr->cursor_name;
- char *node_cursor;
+ queryDesc = PortalGetQueryDesc(portal);
+ if (queryDesc == NULL || queryDesc->estate == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is held from a previous transaction",
+ cursor_name)));
- /* Find the cursor's portal */
- portal = GetPortalByName(cursor_name);
- if (!PortalIsValid(portal))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_CURSOR),
- errmsg("cursor \"%s\" does not exist", cursor_name)));
+ /*
+ * The cursor must have a current result row: per the SQL spec, it's
+ * an error if not.
+ */
+ if (portal->atStart || portal->atEnd)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not positioned on a row",
+ cursor_name)));
- queryDesc = PortalGetQueryDesc(portal);
- if (queryDesc == NULL || queryDesc->estate == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_CURSOR_STATE),
- errmsg("cursor \"%s\" is held from a previous transaction",
- cursor_name)));
+ ps = queryDesc->planstate;
+ slot = NULL;
- /*
- * The cursor must have a current result row: per the SQL spec, it's
- * an error if not.
- */
- if (portal->atStart || portal->atEnd)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_CURSOR_STATE),
- errmsg("cursor \"%s\" is not positioned on a row",
- cursor_name)));
+ ss = search_plan_tree(ps, table->relid);
+ if (ss != NULL)
+ {
+ slot = ss->ss_ScanTupleSlot;
+ }
- if (IsA(queryDesc->planstate, RemoteQueryState))
+ if (slot != NULL)
+ {
+ MemoryContext oldcontext;
+ MemoryContext tmpcontext;
+ RelationLocInfo *loc_info;
+
+ tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "Temp Context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcontext = MemoryContextSwitchTo(tmpcontext);
+
+ loc_info = GetRelationLocInfo(table->relid);
+ if (!loc_info)
{
- RemoteQueryState *node = (RemoteQueryState *) queryDesc->planstate;
- RemoteQuery *step = (RemoteQuery *) queryDesc->planstate->plan;
-
- /*
- * 1. step query: SELECT * FROM <table> WHERE ctid = <cur_ctid>,
- * <cur_ctid> is taken from the scantuple ot the target step
- * step node list: current node of the target step.
- * 2. step query: DECLARE <xxx> CURSOR FOR SELECT * FROM <table>
- * WHERE <col1> = <val1> AND <col2> = <val2> ... FOR UPDATE
- * <xxx> is generated from cursor name of the target step,
- * <col> and <val> pairs are taken from the step 1.
- * step node list: all nodes of <table>
- * 3. step query: MOVE <xxx>
- * step node list: all nodes of <table>
- */
- RangeTblEntry *table = (RangeTblEntry *) linitial(context->query->rtable);
- node_cursor = step->cursor;
- rel_loc_info1 = GetRelationLocInfo(table->relid);
- if (!rel_loc_info1)
- return true;
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextDelete(tmpcontext);
+ return true;
+ }
- context->query_step->exec_nodes = makeNode(ExecNodes);
- context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
- context->query_step->exec_nodes->baselocatortype = rel_loc_info1->locatorType;
- if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED)
+ switch (loc_info->locatorType)
+ {
+ case LOCATOR_TYPE_HASH:
+ case LOCATOR_TYPE_RROBIN:
+ case LOCATOR_TYPE_MODULO:
{
- RemoteQuery *step1, *step2, *step3;
- /*
- * We do not need first three steps if cursor already exists and
- * positioned.
- */
- if (node->update_cursor)
+ Query *temp_qry;
+ Var *ctid_expr;
+ bool ctid_found, node_str_found;
+ StringInfoData qry;
+ TupleDesc slot_meta = slot->tts_tupleDescriptor;
+ char *ctid_str = NULL;
+ int node_index = -1;
+ int i;
+ Const *cons_ctid;
+
+ /* make a copy of the query so as not to touch the original query tree */
+ temp_qry = copyObject(context->query);
+
+ /* Make sure the relation referenced in cursor query and UPDATE/DELETE query is the same */
+ if ( ! IsRelSame(temp_qry->rtable, queryDesc->plannedstmt->rtable ) )
{
- step3 = NULL;
- node_cursor = node->update_cursor;
+ char *tableName = get_rel_name(table->relid);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" does not have a FOR UPDATE/SHARE reference to table \"%s\"",
+ cursor_name, tableName)));
}
- else
+
+ /* Delete existing WHERE CURRENT OF qual from the query tree*/
+ pfree(((CurrentOfExpr *)(temp_qry->jointree->quals))->cursor_name);
+ pfree((CurrentOfExpr *)temp_qry->jointree->quals);
+
+ /* Make a ctid column ref expr for LHS of the operator */
+ ctid_expr = make_ctid_col_ref(temp_qry);
+ if (ctid_expr == NULL)
{
- char *tableName = get_rel_name(table->relid);
- int natts = get_relnatts(table->relid);
- char *attnames[natts];
- TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
- TupleDesc slot_meta = slot->tts_tupleDescriptor;
- Datum ctid = 0;
- char *ctid_str = NULL;
- int nindex = slot->tts_dataNodeIndex;
- AttrNumber att;
- StringInfoData buf;
- HeapTuple tp;
- int i;
- MemoryContext context_save;
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextDelete(tmpcontext);
+ return true; /* Bail out */
+ }
- /*
- * Iterate over attributes and find CTID. This attribute is
- * most likely at the end of the list, so iterate in
- * reverse order to find it quickly.
- * If not found, target table is not updatable through
- * the cursor, report problem to client
- */
- for (i = slot_meta->natts - 1; i >= 0; i--)
+ /*
+ * Iterate over attributes and find ctid and node index.
+ * These attributes are most likely at the end of the list,
+ * so iterate in reverse order to find them quickly.
+ * If not found target table is not updatable through
+ * the cursor, report problem to client
+ */
+ ctid_found = false;
+ node_str_found = false;
+ for (i = slot_meta->natts - 1; i >= 0; i--)
+ {
+ Form_pg_attribute attr = slot_meta->attrs[i];
+
+ if (ctid_found == false)
{
- Form_pg_attribute attr = slot_meta->attrs[i];
if (strcmp(attr->attname.data, "ctid") == 0)
{
+ Datum ctid = 0;
+
ctid = slot->tts_values[i];
ctid_str = (char *) DirectFunctionCall1(tidout, ctid);
- break;
+ ctid_found = true;
}
}
-
- if (ctid_str == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_CURSOR_STATE),
- errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"",
- cexpr->cursor_name, tableName)));
-
- initStringInfo(&buf);
-
- /* Step 1: select tuple values by ctid */
- step1 = makeRemoteQuery();
- appendStringInfoString(&buf, "SELECT ");
- for (att = 1; att <= natts; att++)
+ if (node_str_found == false)
{
- TargetEntry *tle;
- Var *expr;
-
- tp = SearchSysCache(ATTNUM,
- ObjectIdGetDatum(table->relid),
- Int16GetDatum(att),
- 0, 0);
- if (HeapTupleIsValid(tp))
+ if (strcmp(attr->attname.data, "pgxc_node_str") == 0)
{
- Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp);
-
- /* add comma before all except first attributes */
- if (att > 1)
- appendStringInfoString(&buf, ", ");
- attnames[att-1] = pstrdup(NameStr(att_tup->attname));
- appendStringInfoString(&buf, attnames[att - 1]);
- expr = makeVar(att, att, att_tup->atttypid,
- att_tup->atttypmod, InvalidOid, 0);
- tle = makeTargetEntry((Expr *) expr, att,
- attnames[att - 1], false);
- step1->scan.plan.targetlist = lappend(step1->scan.plan.targetlist, tle);
- ReleaseSysCache(tp);
+ Datum data_node = 0;
+ char *data_node_str = NULL;
+
+ data_node = slot->tts_values[i];
+ data_node_str = (char *) DirectFunctionCall1(nameout, data_node);
+ node_index = PGXCNodeGetNodeIdFromName(data_node_str, PGXC_NODE_DATANODE_MASTER);
+ node_str_found = true;
}
- else
- elog(ERROR, "cache lookup failed for attribute %d of relation %u",
- att, table->relid);
- }
- appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'",
- tableName, ctid_str);
- step1->sql_statement = pstrdup(buf.data);
- step1->exec_nodes = makeNode(ExecNodes);
- step1->exec_nodes->nodeList = list_make1_int(nindex);
-
- /* Step 2: declare cursor for update target table */
- step2 = makeRemoteQuery();
- resetStringInfo(&buf);
-
- appendStringInfoString(&buf, step->cursor);
- appendStringInfoString(&buf, "upd");
- /* This need to survive while the target Portal is alive */
- context_save = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
- node_cursor = pstrdup(buf.data);
- node->update_cursor = node_cursor;
- MemoryContextSwitchTo(context_save);
- resetStringInfo(&buf);
-
- appendStringInfo(&buf,
- "DECLARE %s CURSOR FOR SELECT * FROM %s WHERE ",
- node_cursor, tableName);
- for (i = 0; i < natts; i++)
- {
- /* add comma before all except first attributes */
- if (i)
- appendStringInfoString(&buf, "AND ");
- appendStringInfo(&buf, "%s = $%d ", attnames[i], i+1);
}
- appendStringInfoString(&buf, "FOR UPDATE");
- step2->sql_statement = pstrdup(buf.data);
- step2->exec_nodes = makeNode(ExecNodes);
-
- step2->exec_nodes->nodeList = list_copy(rel_loc_info1->nodeList);
+ if (ctid_found && node_str_found)
+ break;
+ }
- innerPlan(step2) = (Plan *) step1;
- /* Step 3: move cursor to first position */
- step3 = makeRemoteQuery();
- resetStringInfo(&buf);
- appendStringInfo(&buf, "MOVE %s", node_cursor);
- step3->sql_statement = pstrdup(buf.data);
- step3->exec_nodes = makeNode(ExecNodes);
+ if (ctid_str == NULL || node_index < 0)
+ {
+ char *tableName = get_rel_name(table->relid);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"",
+ cursor_name, tableName)));
+ }
- step3->exec_nodes->nodeList = list_copy(rel_loc_info1->nodeList);
+ /* Make the ctid value constant expr for RHS of the operator */
+ cons_ctid = make_ctid_const(ctid_str);
- innerPlan(step3) = (Plan *) step2;
+ /* Make the new qual ctid = (?,?) */
+ temp_qry->jointree->quals = (Node *)make_op(NULL, list_make1(makeString("=")), (Node *)ctid_expr, (Node *)cons_ctid, -1);
- innerPlan(context->query_step) = (Plan *) step3;
+ /* Now deparse the query tree */
+ initStringInfo(&qry);
+ deparse_query(temp_qry, &qry, NIL);
- pfree(buf.data);
- }
+ MemoryContextSwitchTo(oldcontext);
- context->query_step->exec_nodes->nodeList = list_copy(rel_loc_info1->nodeList);
- }
- else
- {
- /* Take target node from last scan tuple of referenced step */
- context->query_step->exec_nodes->nodeList = lappend_int(context->query_step->exec_nodes->nodeList,
- node->ss.ss_ScanTupleSlot->tts_dataNodeIndex);
- }
- FreeRelationLocInfo(rel_loc_info1);
+ if ( context->query_step->sql_statement != NULL )
+ pfree(context->query_step->sql_statement);
+ context->query_step->sql_statement = pstrdup(qry.data);
- /*
- * replace cursor name in the query if differs
- */
- if (strcmp(cursor_name, node_cursor))
- {
- StringInfoData buf;
- char *str = context->query->sql_statement;
- /*
- * Find last occurence of cursor_name
- */
- for (;;)
- {
- char *next = strstr(str + 1, cursor_name);
- if (next)
- str = next;
- else
- break;
- }
+ MemoryContextDelete(tmpcontext);
- /*
- * now str points to cursor name truncate string here
- * do not care the string is modified - we will pfree it
- * soon anyway
- */
- *str = '\0';
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->nodeList = list_make1_int(node_index);
+ context->query_step->read_only = false;
+ context->query_step->force_autocommit = false;
- /* and move str at the beginning of the reminder */
- str += strlen(cursor_name);
+ return false;
+ }
+ case LOCATOR_TYPE_REPLICATED:
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextDelete(tmpcontext);
- /* build up new statement */
- initStringInfo(&buf);
- appendStringInfoString(&buf, context->query->sql_statement);
- appendStringInfoString(&buf, node_cursor);
- appendStringInfoString(&buf, str);
+ return false;
- /* take the result */
- pfree(context->query->sql_statement);
- context->query->sql_statement = buf.data;
- }
- return false;
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("the distribution type is not supported")));
+ return false; // or true
}
+ }
+ else
+ {
+ char *tableName = get_rel_name(table->relid);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"",
+ cursor_name, tableName)));
+ }
+ return false; // or true
+}
- /* Even with a catalog table EXECUTE direct in launched on dedicated nodes */
- if (context->query_step->exec_direct_type == EXEC_DIRECT_LOCAL
- || context->query_step->exec_direct_type == EXEC_DIRECT_NONE
- || context->query_step->exec_direct_type == EXEC_DIRECT_LOCAL_UTILITY)
- {
- context->query_step->exec_nodes = makeNode(ExecNodes);
- context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
- context->exec_on_coord = true;
- }
- else
- {
- context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
- context->exec_on_coord = false;
- }
+/*
+ * examine_conditions_walker
+ *
+ * Examine conditions and find special ones to later help us determine
+ * what tables can be joined together. Put findings in Special_Conditions
+ * struct.
+ *
+ * Get list of constant comparisons conditions on partitioned column
+ * Get list of parent-child joins (partitioned together)
+ * Get list of joins with replicated tables
+ *
+ * If we encounter an expression such as a cross-node join that cannot
+ * be easily handled in a single step, we stop processing and return true,
+ * otherwise false.
+ *
+ */
+static bool
+examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
+{
+ RelationLocInfo *rel_loc_info1,
+ *rel_loc_info2;
+ Const *constant;
+ Expr *checkexpr;
+ bool result = false;
+ bool is_and = false;
+ Assert(context);
+
+ if (expr_node == NULL)
return false;
+
+ if (!context->rtables)
+ return true;
+
+ if (!context->conditions)
+ context->conditions = new_special_conditions();
+
+ /* Handle UPDATE/DELETE ... WHERE CURRENT OF ... */
+ if (IsA(expr_node, CurrentOfExpr))
+ {
+ return pgxc_handle_current_of(expr_node, context);
}
if (IsA(expr_node, Var))
@@ -2208,6 +2302,7 @@ makeRemoteQuery(void)
result->inner_statement = NULL;
result->outer_statement = NULL;
result->join_condition = NULL;
+ result->sql_statement = NULL;
return result;
}
@@ -2519,41 +2614,134 @@ set_cursor_name(Plan *subtree, char *cursor, int step_no)
return step_no;
}
+
+/*
+ * get oid of the function whose name is passed as argument
+ */
+
+static Oid
+get_fn_oid(char *fn_name, Oid *p_rettype)
+{
+ Value *fn_nm;
+ List *fn_name_list;
+ FuncDetailCode fdc;
+ bool retset;
+ int nvargs;
+ Oid *true_typeids;
+ Oid func_oid;
+
+ fn_nm = makeString(fn_name);
+ fn_name_list = list_make1(fn_nm);
+
+ fdc = func_get_detail(fn_name_list,
+ NULL, /* argument expressions */
+ NULL, /* argument names */
+ 0, /* argument numbers */
+ NULL, /* argument types */
+ false, /* expand variable number or args */
+ false, /* expand defaults */
+ &func_oid, /* oid of the function - returned detail*/
+ p_rettype, /* function return type - returned detail */
+ &retset, /* - returned detail*/
+ &nvargs, /* - returned detail*/
+ &true_typeids, /* - returned detail */
+ NULL /* arguemnt defaults returned*/
+ );
+
+ pfree(fn_name_list);
+ if (fdc == FUNCDETAIL_NORMAL)
+ {
+ return func_oid;
+ }
+ return InvalidOid;
+}
+
/*
* Append ctid to the field list of step queries to support update
* WHERE CURRENT OF. The ctid is not sent down to client but used as a key
* to find target tuple
*/
static void
-fetch_ctid_of(Plan *subtree, RowMarkClause *rmc)
+fetch_ctid_of(Plan *subtree, Query *query)
{
/* recursively process subnodes */
if (innerPlan(subtree))
- fetch_ctid_of(innerPlan(subtree), rmc);
+ fetch_ctid_of(innerPlan(subtree), query);
if (outerPlan(subtree))
- fetch_ctid_of(outerPlan(subtree), rmc);
+ fetch_ctid_of(outerPlan(subtree), query);
/* we are only interested in RemoteQueries */
if (IsA(subtree, RemoteQuery))
{
- RemoteQuery *step = (RemoteQuery *) subtree;
- /*
- * TODO Find if the table is referenced by the step query
- */
+ RemoteQuery *step = (RemoteQuery *) subtree;
+ TargetEntry *te1;
+ Query *temp_qry;
+ FuncExpr *func_expr;
+ AttrNumber resno;
+ Oid funcid;
+ Oid rettype;
+ Var *ctid_expr;
+ MemoryContext oldcontext;
+ MemoryContext tmpcontext;
+
+ tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "Temp Context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcontext = MemoryContextSwitchTo(tmpcontext);
+
+ /* Copy the query tree to make changes to the target list */
+ temp_qry = copyObject(query);
+ /* Get the number of entries in the target list */
+ resno = list_length(temp_qry->targetList);
+
+ /* Make a ctid column ref expr to add in target list */
+ ctid_expr = make_ctid_col_ref(temp_qry);
+ if (ctid_expr == NULL)
+ {
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextDelete(tmpcontext);
+ return;
+ }
- char *from_sql = strpos(step->sql_statement, " FROM ", 1);
- if (from_sql)
+ te1 = makeTargetEntry((Expr *)ctid_expr, resno+1, NULL, false);
+
+ /* add the target entry to the query target list */
+ temp_qry->targetList = lappend(temp_qry->targetList, te1);
+
+ /* PGXCTODO We can take this call in initialization rather than getting it always */
+
+ /* Get the Oid of the function */
+ funcid = get_fn_oid("pgxc_node_str", &rettype);
+ if (OidIsValid(funcid))
{
- StringInfoData buf;
+ StringInfoData deparsed_qry;
+ TargetEntry *te2;
- initStringInfo(&buf);
- appendBinaryStringInfo(&buf, step->sql_statement,
- (int) (from_sql - step->sql_statement));
- /* TODO qualify with the table name */
- appendStringInfoString(&buf, ", ctid");
- appendStringInfoString(&buf, from_sql);
- pfree(step->sql_statement);
- step->sql_statement = buf.data;
+ /* create a function expression */
+ func_expr = makeFuncExpr(funcid, rettype, NULL, InvalidOid, InvalidOid, COERCE_DONTCARE);
+ /* make a target entry for function call */
+ te2 = makeTargetEntry((Expr *)func_expr, resno+2, NULL, false);
+ /* add the target entry to the query target list */
+ temp_qry->targetList = lappend(temp_qry->targetList, te2);
+
+ initStringInfo(&deparsed_qry);
+ deparse_query(temp_qry, &deparsed_qry, NIL);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (step->sql_statement != NULL)
+ pfree(step->sql_statement);
+
+ step->sql_statement = pstrdup(deparsed_qry.data);
+
+ MemoryContextDelete(tmpcontext);
+ }
+ else
+ {
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextDelete(tmpcontext);
}
}
}
@@ -3067,7 +3255,6 @@ pgxc_fqs_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
result->planTree = (Plan *) query_step;
-
/* Set result relations */
if (query->commandType != CMD_SELECT)
result->resultRelations = list_make1_int(query->resultRelation);
@@ -3079,7 +3266,6 @@ pgxc_fqs_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query_step->exec_nodes == NULL)
get_plan_nodes_command(query_step, root);
-
if (query_step->exec_nodes == NULL)
{
/*
@@ -3088,17 +3274,19 @@ pgxc_fqs_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
*/
return NULL;
}
-
/* Datanodes should finalise the results of this query */
query->qry_finalise_aggs = true;
/*
* Deparse query tree to get step query. It may be modified later on
*/
- initStringInfo(&buf);
- deparse_query(query, &buf, NIL);
- query_step->sql_statement = pstrdup(buf.data);
- pfree(buf.data);
+ if ( query_step->sql_statement == NULL )
+ {
+ initStringInfo(&buf);
+ deparse_query(query, &buf, NIL);
+ query_step->sql_statement = pstrdup(buf.data);
+ pfree(buf.data);
+ }
/*
* PGXCTODO: we may route this same Query structure through
* standard_planner, where we don't want datanodes to finalise the results.
@@ -3106,7 +3294,6 @@ pgxc_fqs_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
* structure through the standard_planner
*/
query->qry_finalise_aggs = false;
-
/*
* PGXCTODO
* When Postgres runs insert into t (a) values (1); against table
@@ -3125,7 +3312,6 @@ pgxc_fqs_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query_step->exec_nodes)
query_step->combine_type = get_plan_combine_type(
query, query_step->exec_nodes->baselocatortype);
-
/*
* Add sorting to the step
*/
@@ -3160,19 +3346,13 @@ pgxc_fqs_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
}
/*
- * If query is FOR UPDATE fetch CTIDs from the remote node
- * Use CTID as a key to update tuples on remote nodes when handling
+ * If query is DECLARE CURSOR fetch CTIDs and node names from the remote node
+ * Use CTID as a key to update/delete tuples on remote nodes when handling
* WHERE CURRENT OF
*/
- if (query->rowMarks)
+ if ( query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt) )
{
- ListCell *lc;
- foreach(lc, query->rowMarks)
- {
- RowMarkClause *rmc = (RowMarkClause *) lfirst(lc);
-
- fetch_ctid_of(result->planTree, rmc);
- }
+ fetch_ctid_of(result->planTree, query);
}
return result;
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index 6cf8d91b69..83163e5375 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -2276,3 +2276,38 @@ PGXCNodeGetNodeOid(int nodeid, char node_type)
return handles[nodeid - 1].nodeoid;
}
+
+/*
+ * pgxc_node_str
+ *
+ * get the name of the node
+ */
+Datum
+pgxc_node_str(PG_FUNCTION_ARGS)
+{
+ PG_RETURN_NAME(PGXCNodeName);
+}
+
+/*
+ * PGXCNodeGetNodeIdFromName
+ * Return node position in handles array
+ */
+int
+PGXCNodeGetNodeIdFromName(char *node_name, char node_type)
+{
+ char *nm;
+ Oid nodeoid;
+
+ if (node_name == NULL)
+ return -1;
+
+ nm = str_tolower(node_name, strlen(node_name), DEFAULT_COLLATION_OID);
+
+ nodeoid = get_pgxc_nodeoid(nm);
+ pfree(nm);
+ if (!OidIsValid(nodeoid))
+ return -1;
+
+ return PGXCNodeGetNodeId(nodeoid, node_type);
+}
+
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 2d75af3d94..b8666c2a93 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4346,6 +4346,8 @@ DATA(insert OID = 3200 ( pgxc_pool_check PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16
DESCR("check connection information consistency in pooler");
DATA(insert OID = 3201 ( pgxc_pool_reload PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pgxc_pool_reload _null_ _null_ _null_ ));
DESCR("reload connection information in pooler and reload server sessions");
+DATA(insert OID = 3122 ( pgxc_node_str PGNSP PGUID 12 1 0 0 f f f t f s 0 0 19 "" _null_ _null_ _null_ _null_ pgxc_node_str _null_ _null_ _null_ ));
+DESCR("get the name of the node");
#endif
/*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index bdd499bea6..19f3c7cf90 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -107,6 +107,10 @@ extern bool execCurrentOf(CurrentOfExpr *cexpr,
Oid table_oid,
ItemPointer current_tid);
+#ifdef PGXC
+ScanState *search_plan_tree(PlanState *node, Oid table_oid);
+#endif
+
/*
* prototypes from functions in execGrouping.c
*/
diff --git a/src/include/parser/parse_relation.h b/src/include/parser/parse_relation.h
index 0158465c91..c53e34cc1f 100644
--- a/src/include/parser/parse_relation.h
+++ b/src/include/parser/parse_relation.h
@@ -92,4 +92,8 @@ extern Name attnumAttName(Relation rd, int attid);
extern Oid attnumTypeId(Relation rd, int attid);
extern Oid attnumCollationId(Relation rd, int attid);
+#ifdef PGXC
+extern int specialAttNum(const char *attname);
+#endif
+
#endif /* PARSE_RELATION_H */
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index d701e55b82..c7f1ab1e55 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -107,6 +107,7 @@ extern void PGXCNodeCleanAndRelease(int code, Datum arg);
/* Look at information cached in node handles */
extern int PGXCNodeGetNodeId(Oid nodeoid, char node_type);
extern Oid PGXCNodeGetNodeOid(int nodeid, char node_type);
+extern int PGXCNodeGetNodeIdFromName(char *node_name, char node_type);
extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only);
extern void pfree_pgxc_all_handles(PGXCNodeAllHandles *handles);
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 1e07ef18cb..53e6a56fa5 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -515,6 +515,9 @@ extern Datum void_in(PG_FUNCTION_ARGS);
extern Datum void_out(PG_FUNCTION_ARGS);
extern Datum void_recv(PG_FUNCTION_ARGS);
extern Datum void_send(PG_FUNCTION_ARGS);
+#ifdef PGXC
+extern Datum pgxc_node_str (PG_FUNCTION_ARGS);
+#endif
extern Datum trigger_in(PG_FUNCTION_ARGS);
extern Datum trigger_out(PG_FUNCTION_ARGS);
extern Datum language_handler_in(PG_FUNCTION_ARGS);
diff --git a/src/test/regress/sql/portals.sql b/src/test/regress/sql/portals.sql
index bccc81f487..541ad77a85 100644
--- a/src/test/regress/sql/portals.sql
+++ b/src/test/regress/sql/portals.sql
@@ -327,7 +327,7 @@ SELECT f1,f2 FROM uctest ORDER BY f1;
-- Check DELETE WHERE CURRENT
BEGIN;
-DECLARE c1 CURSOR FOR SELECT f1,f2 FROM uctest ORDER BY f1;
+DECLARE c1 SCROLL CURSOR FOR SELECT f1,f2 FROM uctest ORDER BY f1;
FETCH 2 FROM c1;
DELETE FROM uctest WHERE CURRENT OF c1;
-- should show deletion
@@ -352,7 +352,7 @@ SELECT f1,f2 FROM uctest ORDER BY f1;
-- Check repeated-update and update-then-delete cases
BEGIN;
-DECLARE c1 CURSOR FOR SELECT f1,f2 FROM uctest;
+DECLARE c1 SCROLL CURSOR FOR SELECT f1,f2 FROM uctest;
FETCH c1;
UPDATE uctest SET f1 = f1 + 10 WHERE CURRENT OF c1;
SELECT f1,f2 FROM uctest ORDER BY 1;
@@ -408,17 +408,17 @@ SELECT f1,f2 FROM uctest ORDER BY f1;
-- Can update from a self-join, but only if FOR UPDATE says which to use
BEGIN;
-DECLARE c1 CURSOR FOR SELECT a.f1,a.f2 FROM uctest a, uctest b WHERE a.f1 = b.f1 + 5 ORDER BY 1;
+DECLARE c1 CURSOR FOR SELECT a.f1,a.f2,b.f1,b.f2 FROM uctest a, uctest b WHERE a.f1 = b.f1 + 5 ORDER BY 1;
FETCH 1 FROM c1;
UPDATE uctest SET f1 = f1 + 10 WHERE CURRENT OF c1; -- fail
ROLLBACK;
BEGIN;
-DECLARE c1 CURSOR FOR SELECT a.f1,a.f2 FROM uctest a, uctest b WHERE a.f1 = b.f1 + 5 ORDER BY 1 FOR UPDATE;
+DECLARE c1 CURSOR FOR SELECT a.f1,a.f2, b.f1, b.f2 FROM uctest a, uctest b WHERE a.f1 = b.f1 + 5 ORDER BY 1 FOR UPDATE;
FETCH 1 FROM c1;
UPDATE uctest SET f1 = f1 + 10 WHERE CURRENT OF c1; -- fail
ROLLBACK;
BEGIN;
-DECLARE c1 CURSOR FOR SELECT a.f1,a.f2 FROM uctest a, uctest b WHERE a.f1 = b.f1 + 5 ORDER BY 1 FOR SHARE OF a;
+DECLARE c1 CURSOR FOR SELECT a.f1,a.f2,b.f1,b.f2 FROM uctest a, uctest b WHERE a.f1 = b.f1 + 5 ORDER BY 1 FOR SHARE OF a;
FETCH 1 FROM c1;
UPDATE uctest SET f1 = f1 + 10 WHERE CURRENT OF c1;
SELECT f1,f2 FROM uctest ORDER BY f1;
@@ -427,22 +427,26 @@ ROLLBACK;
-- Check various error cases
DELETE FROM uctest WHERE CURRENT OF c1; -- fail, no such cursor
-DECLARE cx CURSOR WITH HOLD FOR SELECT f1,f2 FROM uctest ORDER BY 1;
+DECLARE cx CURSOR WITH HOLD FOR SELECT f1,f2 FROM uctest;
DELETE FROM uctest WHERE CURRENT OF cx; -- fail, can't use held cursor
BEGIN;
DECLARE c CURSOR FOR SELECT * FROM tenk2 ORDER BY unique2;
+FETCH 1 FROM c;
DELETE FROM uctest WHERE CURRENT OF c; -- fail, cursor on wrong table
ROLLBACK;
BEGIN;
-DECLARE c CURSOR FOR SELECT * FROM tenk2 ORDER BY unique2 FOR SHARE;
+DECLARE c CURSOR FOR SELECT * FROM tenk2 FOR SHARE;
+FETCH 1 FROM c;
DELETE FROM uctest WHERE CURRENT OF c; -- fail, cursor on wrong table
ROLLBACK;
BEGIN;
DECLARE c CURSOR FOR SELECT * FROM tenk1 JOIN tenk2 USING (unique1);
+FETCH 1 FROM c;
DELETE FROM tenk1 WHERE CURRENT OF c; -- fail, cursor is on a join
ROLLBACK;
BEGIN;
DECLARE c CURSOR FOR SELECT f1,count(*) FROM uctest GROUP BY f1;
+FETCH 1 FROM c;
DELETE FROM uctest WHERE CURRENT OF c; -- fail, cursor is on aggregation
ROLLBACK;
BEGIN;
@@ -465,7 +469,7 @@ ROLLBACK;
-- 235395b90909301035v7228ce63q392931f15aa74b31@mail.gmail.com
BEGIN;
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-CREATE TABLE cursor (a int);
+CREATE TABLE cursor (a int, b int) distribute by hash(b);
INSERT INTO cursor VALUES (1);
DECLARE c1 NO SCROLL CURSOR FOR SELECT * FROM cursor FOR UPDATE;
UPDATE cursor SET a = 2;