summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorMichael P2011-08-05 02:07:38 +0000
committerMichael P2011-08-05 02:07:38 +0000
commit8a15033d7d7ae7cbe9a100ea88b8e18f033f0cb4 (patch)
tree0afa598503e7af6615149db94f0de8509bb3e7dc /src/backend
parent87502bc809b852337e09f391317cbbc0cc35faf8 (diff)
Support for temporary TABLE/VIEW
Temporary tables and views are supported so as it uses the same interface as PostgreSQL. For Postgres-XC, it is possible for users to create un the cluster temporary tables that are replicated or distributed the same way as permanent tables. As those objects are session-limited, creation is made automatically on the Datanodes at the moment of query launch like normal DDL. Views are created only on local Coordinators. Support for ON COMMIT DROP/PRESERVE ROWS/DELETE ROWS is guarranted the same way as PostgreSQL. Support for INDEX, DROP, RENAME is also guarranted like normal PostgreSQL. One of the main changes is the addition of a DISCARD ALL clean up command run in pooler for pooler connections used during session to Datanodes when session finishes. This command is run the same way as RESET ALL for SET commands at the end of a session. A new regression test case called xc_temp has been added to check JOINs between temporary and persistent tables for the types types of distribution supported by Postgres-XC: REPLICATION, HASH (equivalent to MODULO), ROUND ROBIN.
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/xact.c17
-rw-r--r--src/backend/commands/comment.c42
-rw-r--r--src/backend/commands/copy.c6
-rw-r--r--src/backend/commands/schemacmds.c3
-rw-r--r--src/backend/commands/tablecmds.c117
-rw-r--r--src/backend/optimizer/plan/createplan.c10
-rw-r--r--src/backend/pgxc/plan/planner.c46
-rw-r--r--src/backend/pgxc/pool/execRemote.c63
-rw-r--r--src/backend/pgxc/pool/poolmgr.c186
-rw-r--r--src/backend/tcop/utility.c557
10 files changed, 797 insertions, 250 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1a199789b1..e3e0e0f9aa 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1962,6 +1962,14 @@ CommitTransaction(bool contact_gtm)
char implicitgid[256];
TransactionId xid = InvalidTransactionId;
+ /*
+ * Check if there are any On Commit actions and force temporary object flag.
+ * This is possible in the case of a session using ON COMMIT DELETE ROWS.
+ * It is essential to do this check *before* calling PGXCNodeIsImplicit2PC.
+ */
+ if (IsOnCommitActions())
+ ExecSetTempObjectIncluded();
+
if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && contact_gtm)
PreparePGXCNodes = PGXCNodeIsImplicit2PC(&PrepareLocalCoord);
@@ -2330,6 +2338,15 @@ PrepareTransaction(void)
ShowTransactionState("PrepareTransaction");
+#ifdef PGXC
+ /*
+ * Check if there are any On Commit actions and force temporary object flag.
+ * This is possible in the case of a session using ON COMMIT DELETE ROWS.
+ */
+ if (IsOnCommitActions())
+ ExecSetTempObjectIncluded();
+#endif
+
/*
* check the current transaction state
*/
diff --git a/src/backend/commands/comment.c b/src/backend/commands/comment.c
index d09bef0682..f18a132219 100644
--- a/src/backend/commands/comment.c
+++ b/src/backend/commands/comment.c
@@ -4,7 +4,8 @@
*
* PostgreSQL object comments utility code.
*
- * Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2011 Nippon Telegraph and Telephone Corporation
*
* IDENTIFICATION
* src/backend/commands/comment.c
@@ -460,3 +461,42 @@ GetComment(Oid oid, Oid classoid, int32 subid)
return comment;
}
+
+
+#ifdef PGXC
+/*
+ * GetCommentObjectId
+ *
+ * Return Object ID of object commented
+ * Note: This function uses portions of the code of CommentObject,
+ * even if this code is duplicated this is done like this to facilitate
+ * merges with PostgreSQL head.
+ */
+Oid
+GetCommentObjectId(CommentStmt *stmt)
+{
+ ObjectAddress address;
+ Relation relation;
+
+ if (stmt->objtype == OBJECT_DATABASE && list_length(stmt->objname) == 1)
+ {
+ char *database = strVal(linitial(stmt->objname));
+
+ if (!OidIsValid(get_database_oid(database, true)))
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", database)));
+ return InvalidOid;
+ }
+ }
+
+ address = get_object_address(stmt->objtype, stmt->objname, stmt->objargs,
+ &relation, ShareUpdateExclusiveLock);
+
+ if (relation != NULL)
+ relation_close(relation, NoLock);
+
+ return address.objectId;
+}
+#endif
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 952b88b533..bfaf6e2860 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -918,6 +918,12 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
rte->relkind = rel->rd_rel->relkind;
rte->requiredPerms = required_access;
+#ifdef PGXC
+ /* In case COPY is used on a temporary table, never use 2PC for implicit commits */
+ if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
+ ExecSetTempObjectIncluded();
+#endif
+
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
foreach(cur, attnums)
diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c
index 831886f46d..851b0dca3b 100644
--- a/src/backend/commands/schemacmds.c
+++ b/src/backend/commands/schemacmds.c
@@ -132,7 +132,8 @@ CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString)
* Add a RemoteQuery node for a query at top level on a remote Coordinator
*/
if (is_top_level)
- parsetree_list = AddRemoteQueryNode(parsetree_list, queryString, EXEC_ON_ALL_NODES);
+ parsetree_list = AddRemoteQueryNode(parsetree_list, queryString,
+ EXEC_ON_ALL_NODES, false);
#endif
/*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 644e572934..7fe0015868 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -89,6 +89,7 @@
#include "pgxc/pgxc.h"
#include "access/gtm.h"
#include "commands/sequence.h"
+#include "pgxc/execRemote.h"
#endif
/*
@@ -439,20 +440,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
* code. This is needed because calling code might not expect untrusted
* tables to appear in pg_temp at the front of its search path.
*/
-#ifdef PGXC
- if (stmt->relation->relpersistence == RELPERSISTENCE_TEMP &&
- IsUnderPostmaster &&
- relkind != RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("PG-XC does not yet support temporary tables")));
-#else
if (stmt->relation->relpersistence == RELPERSISTENCE_TEMP
&& InSecurityRestrictedOperation())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("cannot create temporary table within security-restricted operation")));
-#endif
/*
* Look up the namespace in which we are supposed to create the relation,
@@ -9462,3 +9454,110 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
}
}
}
+
+#ifdef PGXC
+/*
+ * IsTempTable
+ *
+ * Check if given table Oid is temporary.
+ */
+bool
+IsTempTable(Oid relid)
+{
+ Relation rel;
+ bool res;
+ /*
+ * PGXCTODO: Is it correct to open without locks?
+ * we just check if this table is temporary though...
+ */
+ rel = relation_open(relid, NoLock);
+ res = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
+ relation_close(rel, NoLock);
+ return res;
+}
+
+/*
+ * IsIndexUsingTemp
+ *
+ * Check if given index relation uses temporary tables.
+ */
+bool
+IsIndexUsingTempTable(Oid relid)
+{
+ bool res = false;
+ HeapTuple tuple;
+ Oid parent_id = InvalidOid;
+
+ tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relid));
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple);
+ parent_id = index->indrelid;
+
+ /* Release system cache BEFORE looking at the parent table */
+ ReleaseSysCache(tuple);
+
+ res = IsTempTable(parent_id);
+ }
+ else
+ res = false; /* Default case */
+
+ return res;
+}
+
+/*
+ * IsOnCommitDeleteRows
+ *
+ * Check if there are any on-commit actions activated
+ * This is possible in the case of ON COMMIT DELETE ROWS for example.
+ * In this case 2PC cannot be used.
+ */
+bool
+IsOnCommitActions(void)
+{
+ return list_length(on_commits) > 0;
+}
+
+/*
+ * DropTableThrowErrorExternal
+ *
+ * Error interface for DROP when looking for execution node type.
+ */
+void
+DropTableThrowErrorExternal(RangeVar *relation, ObjectType removeType, bool missing_ok)
+{
+ char relkind;
+
+ /* Determine required relkind */
+ switch (removeType)
+ {
+ case OBJECT_TABLE:
+ relkind = RELKIND_RELATION;
+ break;
+
+ case OBJECT_INDEX:
+ relkind = RELKIND_INDEX;
+ break;
+
+ case OBJECT_SEQUENCE:
+ relkind = RELKIND_SEQUENCE;
+ break;
+
+ case OBJECT_VIEW:
+ relkind = RELKIND_VIEW;
+ break;
+
+ case OBJECT_FOREIGN_TABLE:
+ relkind = RELKIND_FOREIGN_TABLE;
+ break;
+
+ default:
+ elog(ERROR, "unrecognized drop object type: %d",
+ (int) removeType);
+ relkind = 0; /* keep compiler quiet */
+ break;
+ }
+
+ DropErrorMsgNonExistent(relation->relname, relkind, missing_ok);
+}
+#endif
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index d3b5c7793b..bc32049b87 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -47,6 +47,7 @@
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "rewrite/rewriteManip.h"
+#include "commands/tablecmds.h"
#endif
#include "utils/lsyscache.h"
@@ -2601,8 +2602,15 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path,
/*
* XXX: should use GENERIC OPTIONS like 'foreign_relname' or something for
* the foreign table name instead of the local name ?
+ *
+ * A temporary table does not use namespace as it may not be
+ * consistent among nodes cluster. Relation name is sufficient.
*/
- appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q);
+ if (IsTempTable(rte->relid))
+ appendStringInfo(&sql, "%s %s", relname_q, aliasname_q);
+ else
+ appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q);
+
pfree(nspname);
pfree(relname);
if (nspname_q != nspname_q)
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 7da7f848af..e3499134db 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -46,6 +46,7 @@
#include "utils/syscache.h"
#include "utils/numeric.h"
#include "access/hash.h"
+#include "commands/tablecmds.h"
#include "utils/timestamp.h"
#include "utils/date.h"
@@ -171,7 +172,8 @@ static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStm
static void InitXCWalkerContext(XCWalkerContext *context);
static RemoteQuery *makeRemoteQuery(void);
static void validate_part_col_updatable(const Query *query);
-
+static bool contains_temp_tables(List *rtable);
+static bool contains_only_pg_catalog(List *rtable);
/*
* Find position of specified substring in the string
@@ -1403,7 +1405,7 @@ examine_conditions_fromlist(Node *treenode, XCWalkerContext *context)
* only contain pg_catalog entries.
*/
static bool
-contains_only_pg_catalog (List *rtable)
+contains_only_pg_catalog(List *rtable)
{
ListCell *item;
@@ -1416,13 +1418,39 @@ contains_only_pg_catalog (List *rtable)
{
if (get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE)
return false;
- } else if (rte->rtekind == RTE_SUBQUERY &&
- !contains_only_pg_catalog (rte->subquery->rtable))
+ }
+ else if (rte->rtekind == RTE_SUBQUERY &&
+ !contains_only_pg_catalog(rte->subquery->rtable))
return false;
}
return true;
}
+/*
+ * Returns true if at least one temporary table is in use
+ * in query (and its subqueries)
+ */
+static bool
+contains_temp_tables(List *rtable)
+{
+ ListCell *item;
+
+ foreach(item, rtable)
+ {
+ RangeTblEntry *rte = (RangeTblEntry *) lfirst(item);
+
+ if (rte->rtekind == RTE_RELATION)
+ {
+ if (IsTempTable(rte->relid))
+ return true;
+ }
+ else if (rte->rtekind == RTE_SUBQUERY &&
+ contains_temp_tables(rte->subquery->rtable))
+ return true;
+ }
+
+ return false;
+}
/*
* get_plan_nodes - determine the nodes to execute the command on.
@@ -1934,6 +1962,7 @@ makeRemoteQuery(void)
result->paramval_data = NULL;
result->paramval_len = 0;
result->exec_direct_type = EXEC_DIRECT_NONE;
+ result->is_temp = false;
result->relname = NULL;
result->remotejoin = false;
@@ -2737,12 +2766,16 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query->commandType != CMD_SELECT)
result->resultRelations = list_make1_int(query->resultRelation);
- if (contains_only_pg_catalog (query->rtable))
+ if (contains_only_pg_catalog(query->rtable))
{
result = standard_planner(query, cursorOptions, boundParams);
return result;
}
+ /* Check if temporary tables are in use in target list */
+ if (contains_temp_tables(query->rtable))
+ query_step->is_temp = true;
+
if (query_step->exec_nodes == NULL)
get_plan_nodes_command(query_step, root);
@@ -3184,7 +3217,7 @@ GetHashExecNodes(RelationLocInfo *rel_loc_info, ExecNodes **exec_nodes, const Ex
* duplicated queries on Datanodes.
*/
List *
-AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType remoteExecType)
+AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType remoteExecType, bool is_temp)
{
List *result = stmts;
@@ -3199,6 +3232,7 @@ AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType rem
step->combine_type = COMBINE_TYPE_SAME;
step->sql_statement = queryString;
step->exec_type = remoteExecType;
+ step->is_temp = is_temp;
result = lappend(result, step);
}
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 1efb364333..121245d696 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -52,6 +52,7 @@
static bool autocommit = true;
static bool is_ddl = false;
static bool implicit_force_autocommit = false;
+static bool temp_object_included = false;
static PGXCNodeHandle **write_node_list = NULL;
static int write_node_count = 0;
static char *begin_string = NULL;
@@ -1536,11 +1537,27 @@ finish:
if (res != 0)
{
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not prepare transaction on data nodes")));
+
+ /* In case transaction has operated on temporary objects */
+ if (temp_object_included)
+ {
+ /* Reset temporary object flag */
+ temp_object_included = false;
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not prepare transaction on data nodes")));
+ }
}
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
return local_operation;
}
@@ -1824,6 +1841,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Clean up connections */
pfree_pgxc_all_handles(pgxc_connections);
@@ -1973,6 +1993,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Free node list taken from GTM */
if (datanodes && datanodecnt != 0)
free(datanodes);
@@ -2113,6 +2136,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Free node list taken from GTM */
if (datanodes)
free(datanodes);
@@ -2210,6 +2236,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Clean up connections */
pfree_pgxc_all_handles(pgxc_connections);
if (res != 0)
@@ -2284,6 +2313,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Clean up connections */
pfree_pgxc_all_handles(pgxc_connections);
return res;
@@ -3217,6 +3249,10 @@ do_query(RemoteQueryState *node)
bool need_tran;
PGXCNodeAllHandles *pgxc_connections;
+ /* Be sure to set temporary object flag if necessary */
+ if (step->is_temp)
+ temp_object_included = true;
+
/*
* Get connections for Datanodes only, utilities and DDLs
* are launched in ExecRemoteUtility
@@ -4033,6 +4069,9 @@ ExecRemoteUtility(RemoteQuery *node)
implicit_force_autocommit = force_autocommit;
+ /* A transaction using temporary objects cannot use 2PC */
+ temp_object_included = node->is_temp;
+
remotestate = CreateResponseCombiner(0, node->combine_type);
pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type);
@@ -4516,12 +4555,14 @@ PGXCNodeIsImplicit2PC(bool *prepare_local_coord)
/*
* In case of an autocommit or forced autocommit transaction, 2PC is not involved
- * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...)
+ * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...).
+ * For a transaction using temporary objects, 2PC is not authorized.
*/
- if (implicit_force_autocommit)
+ if (implicit_force_autocommit || temp_object_included)
{
*prepare_local_coord = false;
implicit_force_autocommit = false;
+ temp_object_included = false;
return false;
}
@@ -4622,3 +4663,15 @@ int DataNodeCopyInBinaryForAll(char *msg_buf, int len, PGXCNodeHandle** copy_con
return 0;
}
+
+/*
+ * ExecSetTempObjectIncluded
+ *
+ * Set Temp object flag on the fly for transactions
+ * This flag will be reinitialized at commit.
+ */
+void
+ExecSetTempObjectIncluded(void)
+{
+ temp_object_included = true;
+}
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index 463bd5a170..c0d8d56aab 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -93,7 +93,13 @@ static void agent_init(PoolAgent *agent, const char *database, const char *user_
static void agent_destroy(PoolAgent *agent);
static void agent_create(void);
static void agent_handle_input(PoolAgent *agent, StringInfo s);
-static int agent_set_command(PoolAgent *agent, const char *set_command, bool is_local);
+static int agent_session_command(PoolAgent *agent,
+ const char *set_command,
+ PoolCommandType command_type);
+static int agent_set_command(PoolAgent *agent,
+ const char *set_command,
+ PoolCommandType command_type);
+static int agent_temp_command(PoolAgent *agent);
static DatabasePool *create_database_pool(const char *database, const char *user_name);
static void insert_database_pool(DatabasePool *pool);
static int destroy_database_pool(const char *database, const char *user_name);
@@ -107,7 +113,7 @@ static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List
static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist);
static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node, char client_conn_type);
static void agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard);
-static void agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list);
+static void agent_reset_session(PoolAgent *agent, List *dn_list, List *co_list);
static void release_connection(DatabasePool *dbPool, PGXCNodePoolSlot *slot, int index, bool clean,
char client_conn_type);
static void destroy_slot(PGXCNodePoolSlot *slot);
@@ -490,6 +496,7 @@ agent_create(void)
agent->coord_connections = NULL;
agent->session_params = NULL;
agent->local_params = NULL;
+ agent->is_temp = false;
agent->pid = 0;
/* Append new agent to the list */
@@ -543,30 +550,44 @@ PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_na
}
int
-PoolManagerSetCommand(bool is_local, const char *set_command)
+PoolManagerSetCommand(PoolCommandType command_type, const char *set_command)
{
int n32;
char msgtype = 's';
- Assert(set_command);
Assert(Handle);
/* Message type */
pool_putbytes(&Handle->port, &msgtype, 1);
/* Message length */
- n32 = htonl(strlen(set_command) + 10);
+ if (set_command)
+ n32 = htonl(strlen(set_command) + 13);
+ else
+ n32 = htonl(12);
+
pool_putbytes(&Handle->port, (char *) &n32, 4);
/* LOCAL or SESSION parameter ? */
- pool_putbytes(&Handle->port, (char *) &is_local, 1);
-
- /* Length of SET command string */
- n32 = htonl(strlen(set_command) + 1);
+ n32 = htonl(command_type);
pool_putbytes(&Handle->port, (char *) &n32, 4);
- /* Send command string followed by \0 terminator */
- pool_putbytes(&Handle->port, set_command, strlen(set_command) + 1);
+ if (set_command)
+ {
+ /* Length of SET command string */
+ n32 = htonl(strlen(set_command) + 1);
+ pool_putbytes(&Handle->port, (char *) &n32, 4);
+
+ /* Send command string followed by \0 terminator */
+ pool_putbytes(&Handle->port, set_command, strlen(set_command) + 1);
+ }
+ else
+ {
+ /* Send empty command */
+ n32 = htonl(0);
+ pool_putbytes(&Handle->port, (char *) &n32, 4);
+ }
+
pool_flush(&Handle->port);
/* Get result */
@@ -628,10 +649,10 @@ agent_destroy(PoolAgent *agent)
co_conn = lappend_int(co_conn, i+1);
/*
- * agent is being destroyed, so reset session parameters
- * before putting back connections to pool
+ * Agent is being destroyed, so reset session parameters
+ * and temporary objects before putting back connections to pool.
*/
- agent_reset_params(agent, dn_conn, co_conn);
+ agent_reset_session(agent, dn_conn, co_conn);
/* release them all */
agent_release_connections(agent, dn_conn, co_conn);
@@ -881,8 +902,8 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
{
const char *database = NULL;
const char *user_name = NULL;
- const char *set_command;
- bool is_local;
+ const char *set_command = NULL;
+ PoolCommandType command_type;
int datanodecount;
int coordcount;
List *datanodelist = NIL;
@@ -1043,16 +1064,19 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
list_free(datanodelist);
list_free(coordlist);
break;
- case 's': /* SET COMMAND */
+ case 's': /* Session-related COMMAND */
pool_getmessage(&agent->port, s, 0);
/* Determine if command is local or session */
- is_local = (bool) pq_getmsgbyte(s);
- /* Get the SET command */
+ command_type = (PoolCommandType) pq_getmsgint(s, 4);
+ /* Get the SET command if necessary */
len = pq_getmsgint(s, 4);
- set_command = pq_getmsgbytes(s, len);
+ if (len != 0)
+ set_command = pq_getmsgbytes(s, len);
+
pq_getmsgend(s);
- res = agent_set_command(agent, set_command, is_local);
+ /* Manage command depending on its type */
+ res = agent_session_command(agent, set_command, command_type);
/* Send success result */
pool_sendres(&agent->port, res);
@@ -1068,11 +1092,46 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
}
/*
+ * Manage a session command for pooler
+ */
+static int
+agent_session_command(PoolAgent *agent, const char *set_command, PoolCommandType command_type)
+{
+ int res;
+
+ switch (command_type)
+ {
+ case POOL_CMD_LOCAL_SET:
+ case POOL_CMD_GLOBAL_SET:
+ res = agent_set_command(agent, set_command, command_type);
+ break;
+ case POOL_CMD_TEMP:
+ res = agent_temp_command(agent);
+ break;
+ default:
+ res = -1;
+ break;
+ }
+
+ return res;
+}
+
+/*
+ * Set agent flag that a temporary object is in use.
+ */
+static int
+agent_temp_command(PoolAgent *agent)
+{
+ agent->is_temp = true;
+ return 0;
+}
+
+/*
* Save a SET command and distribute it to the agent connections
* already in use.
*/
static int
-agent_set_command(PoolAgent *agent, const char *set_command, bool is_local)
+agent_set_command(PoolAgent *agent, const char *set_command, PoolCommandType command_type)
{
char *params_string;
int i;
@@ -1080,11 +1139,16 @@ agent_set_command(PoolAgent *agent, const char *set_command, bool is_local)
Assert(agent);
Assert(set_command);
+ Assert(command_type == POOL_CMD_LOCAL_SET || command_type == POOL_CMD_GLOBAL_SET);
- if (is_local)
+ if (command_type == POOL_CMD_LOCAL_SET)
params_string = agent->local_params;
- else
+ else if (command_type == POOL_CMD_GLOBAL_SET)
params_string = agent->session_params;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Set command process failed")));
/* First command recorded */
if (!params_string)
@@ -1131,9 +1195,9 @@ agent_set_command(PoolAgent *agent, const char *set_command, bool is_local)
}
/* Save the latest string */
- if (is_local)
+ if (command_type == POOL_CMD_LOCAL_SET)
agent->local_params = params_string;
- else
+ else if (command_type == POOL_CMD_GLOBAL_SET)
agent->session_params = params_string;
return res;
@@ -1278,7 +1342,6 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
static int
cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
{
- int i;
ListCell *nodelist_item;
char errbuf[256];
int nCount;
@@ -1435,8 +1498,9 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard)
return;
/*
- * If there are some session parameters, do not put back connections to pool
- * disconnection will be made when session is cut for this user.
+ * If there are some session parameters or temporary objects,
+ * do not put back connections to pool.
+ * Disconnection will be made when session is cut for this user.
* Local parameters are reset when transaction block is finished,
* so don't do anything for them, but just reset their list.
*/
@@ -1445,7 +1509,8 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard)
pfree(agent->local_params);
agent->local_params = NULL;
}
- if (agent->session_params)
+ if (agent->session_params ||
+ agent->is_temp)
return;
/* Discard first for Datanodes */
@@ -1515,50 +1580,61 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard)
* modified by session parameters.
*/
static void
-agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list)
+agent_reset_session(PoolAgent *agent, List *dn_list, List *co_list)
{
- PGXCNodePoolSlot *slot;
if (!agent->dn_connections && !agent->coord_connections)
return;
- /* Parameters are reset, so free commands */
- if (agent->session_params)
- {
- pfree(agent->session_params);
- agent->session_params = NULL;
- }
- if (agent->local_params)
- {
- pfree(agent->local_params);
- agent->local_params = NULL;
- }
+ if (!agent->session_params && !agent->local_params && !agent->is_temp)
+ return;
- /* Reset Datanode connection params */
- if (dn_list)
+ /*
+ * Reset Datanode connection params.
+ * Discard is only done for Datanodes as Temporary objects are never created
+ * to other Coordinators in a session.
+ */
+ if (dn_list &&
+ (agent->session_params || agent->local_params || agent->is_temp))
{
ListCell *lc;
foreach(lc, dn_list)
{
+ PGXCNodePoolSlot *slot;
int node = lfirst_int(lc);
+
Assert(node > 0 && node <= NumDataNodes);
slot = agent->dn_connections[node - 1];
/* Reset connection params */
if (slot)
- PGXCNodeSendSetQuery(slot->conn, "RESET ALL;");
+ {
+ if (agent->session_params || agent->local_params)
+ PGXCNodeSendSetQuery(slot->conn, "RESET ALL;");
+
+ /*
+ * Discard queries cannot be sent as multiple-queries,
+ * so do it separately. It is OK to use this slow process
+ * as session is ending.
+ */
+ if (agent->is_temp)
+ PGXCNodeSendSetQuery(slot->conn, "DISCARD ALL;");
+ }
}
}
/* Reset Coordinator connection params */
- if (co_list)
+ if (co_list &&
+ (agent->session_params || agent->local_params))
{
ListCell *lc;
foreach(lc, co_list)
{
+ PGXCNodePoolSlot *slot;
int node = lfirst_int(lc);
+
Assert(node > 0 && node <= NumCoords);
slot = agent->coord_connections[node - 1];
@@ -1567,6 +1643,19 @@ agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list)
PGXCNodeSendSetQuery(slot->conn, "RESET ALL;");
}
}
+
+ /* Parameters are reset, so free commands */
+ if (agent->session_params)
+ {
+ pfree(agent->session_params);
+ agent->session_params = NULL;
+ }
+ if (agent->local_params)
+ {
+ pfree(agent->local_params);
+ agent->local_params = NULL;
+ }
+ agent->is_temp = false;
}
/*
@@ -2070,6 +2159,9 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type)
errmsg("out of memory")));
}
+ /* If connection fails, be sure that slot is destroyed cleanly */
+ slot->xc_cancelConn = NULL;
+
/* Establish connection */
slot->conn = PGXCNodeConnect(nodePool->connstr);
if (!PGXCNodeConnected(slot->conn))
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index e4e6ed55b4..9e88b268b2 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -7,6 +7,7 @@
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 2010-2011 Nippon Telegraph and Telephone Corporation
*
*
* IDENTIFICATION
@@ -68,12 +69,15 @@
#include "pgxc/planner.h"
#include "pgxc/poolutils.h"
#include "pgxc/poolmgr.h"
+#include "utils/lsyscache.h"
static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
- bool force_autocommit, RemoteQueryExecType exec_type);
-static RemoteQueryExecType ExecUtilityFindNodes(const char *queryString,
- ObjectType objectType,
- Oid relid);
+ bool force_autocommit, RemoteQueryExecType exec_type,
+ bool is_temp);
+static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType,
+ Oid relid,
+ bool *is_temp);
+static RemoteQueryExecType ExecUtilityFindNodesRelkind(Oid relid, bool *is_temp);
#endif
@@ -653,17 +657,77 @@ standard_ProcessUtility(Node *parsetree,
List *stmts;
ListCell *l;
Oid relOid;
+#ifdef PGXC
+ bool is_temp = false;
+#endif
/* Run parse analysis ... */
stmts = transformCreateStmt((CreateStmt *) parsetree,
queryString);
-
#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ /*
+ * Scan the list of objects.
+ * Temporary tables are created on Datanodes only.
+ * Non-temporary objects are created on all nodes.
+ * In case temporary and non-temporary objects are mized return an error.
+ */
+ bool is_first = true;
+
+ foreach(l, stmts)
+ {
+ Node *stmt = (Node *) lfirst(l);
+
+ if (IsA(stmt, CreateStmt))
+ {
+ CreateStmt *stmt_loc = (CreateStmt *) stmt;
+ bool is_object_temp = stmt_loc->relation->relpersistence == RELPERSISTENCE_TEMP;
+
+ if (is_first)
+ {
+ is_first = false;
+ if (is_object_temp)
+ is_temp = true;
+ }
+ else
+ {
+ if (is_object_temp != is_temp)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("CREATE not supported for TEMP and non-TEMP objects"),
+ errdetail("You should separate TEMP and non-TEMP objects")));
+ }
+ }
+ else if (IsA(stmt, CreateForeignTableStmt))
+ {
+ /* There are no temporary foreign tables */
+ if (is_first)
+ {
+ is_first = false;
+ }
+ else
+ {
+ if (!is_temp)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("CREATE not supported for TEMP and non-TEMP objects"),
+ errdetail("You should separate TEMP and non-TEMP objects")));
+ }
+ }
+ }
+ }
+
/*
* Add a RemoteQuery node for a query at top level on a remote Coordinator
*/
if (isTopLevel)
- stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_ALL_NODES);
+ {
+ if (is_temp)
+ stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_DATANODES, is_temp);
+ else
+ stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_ALL_NODES, is_temp);
+ }
#endif
/* ... and do it */
@@ -676,6 +740,12 @@ standard_ProcessUtility(Node *parsetree,
Datum toast_options;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
+#ifdef PGXC
+ /* Set temporary object object flag in pooler */
+ if (is_temp)
+ PoolManagerSetCommand(POOL_CMD_TEMP, NULL);
+#endif
+
/* Create the table itself */
relOid = DefineRelation((CreateStmt *) stmt,
RELKIND_RELATION,
@@ -798,6 +868,7 @@ standard_ProcessUtility(Node *parsetree,
DropStmt *stmt = (DropStmt *) parsetree;
#ifdef PGXC
bool is_temp = false;
+ RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES;
/*
* We need to check details of the objects being dropped and
@@ -806,41 +877,75 @@ standard_ProcessUtility(Node *parsetree,
*/
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- /*
- * Check the list of sequences/tables going to be dropped.
- * XC does not allow yet to mix drop of temporary and
- * non-temporary objects because this involves to rewrite
- * query to process for tables and stop remote query process
- * for sequences.
- * PGXCTODO: For the time being this is just checked for
- * sequences but should also be done for tables
- */
-
- if (stmt->removeType == OBJECT_SEQUENCE)
+ switch (stmt->removeType)
{
- ListCell *cell;
- bool is_first = true;
-
- foreach(cell, stmt->objects)
- {
- RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
- Oid relid;
-
- relid = RangeVarGetRelid(rel, false);
- if (is_first)
+ case OBJECT_TABLE:
+ case OBJECT_SEQUENCE:
+ case OBJECT_VIEW:
+ case OBJECT_INDEX:
{
- is_temp = IsTempSequence(relid);
- is_first = false;
- }
- else
- {
- if (IsTempSequence(relid) != is_temp)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DROP SEQUENCE not supported for TEMP and non-TEMP sequences"),
- errdetail("DROP can be done if TEMP and non-TEMP objects are separated")));
+ /*
+ * Check the list of objects going to be dropped.
+ * XC does not allow yet to mix drop of temporary and
+ * non-temporary objects because this involves to rewrite
+ * query to process for tables.
+ */
+ ListCell *cell;
+ bool is_first = true;
+
+ foreach(cell, stmt->objects)
+ {
+ RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
+ Oid relid;
+
+ /*
+ * Do not print result at all, error is thrown
+ * after if necessary
+ */
+ relid = RangeVarGetRelid(rel, true);
+
+ /*
+ * In case this relation ID is incorrect throw
+ * a correct DROP error.
+ */
+ if (!OidIsValid(relid) && !stmt->missing_ok)
+ DropTableThrowErrorExternal(rel,
+ stmt->removeType,
+ stmt->missing_ok);
+
+ /* In case of DROP ... IF EXISTS bypass */
+ if (!OidIsValid(relid) && stmt->missing_ok)
+ continue;
+
+ if (is_first)
+ {
+ exec_type = ExecUtilityFindNodes(stmt->removeType,
+ relid,
+ &is_temp);
+ is_first = false;
+ }
+ else
+ {
+ RemoteQueryExecType exec_type_loc;
+ bool is_temp_loc;
+ exec_type_loc = ExecUtilityFindNodes(stmt->removeType,
+ relid,
+ &is_temp_loc);
+ if (exec_type_loc != exec_type ||
+ is_temp_loc != is_temp)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DROP not supported for TEMP and non-TEMP objects"),
+ errdetail("You should separate TEMP and non-TEMP objects")));
+ }
+ }
}
- }
+ break;
+
+ default:
+ is_temp = false;
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
}
}
#endif
@@ -898,20 +1003,9 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
- /*
- * Drop can be done on non-temporary objects and temporary objects
- * if they are not mixed up.
- * PGXCTODO: This should be extended to tables
- */
- if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !is_temp)
- {
- /* Sequence and views exists only on Coordinators */
- if (stmt->removeType == OBJECT_SEQUENCE ||
- stmt->removeType == OBJECT_VIEW)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
- else
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
- }
+ /* DROP is done depending on the object type and its temporary type */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ ExecUtilityStmtOnNodes(queryString, NULL, false, exec_type, is_temp);
#endif
}
break;
@@ -920,12 +1014,31 @@ standard_ProcessUtility(Node *parsetree,
ExecuteTruncate((TruncateStmt *) parsetree);
#ifdef PGXC
/*
- * PGXCTODO
- * We may need to check details of the object being truncated and
- * run command on correct nodes
+ * Check details of the object being truncated.
+ * If at least one temporary table is truncated truncate cannot use 2PC
+ * at commit.
*/
- if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ bool is_temp = false;
+ ListCell *cell;
+ TruncateStmt *stmt = (TruncateStmt *) parsetree;
+
+ foreach(cell, stmt->relations)
+ {
+ Oid relid;
+ RangeVar *rel = (RangeVar *) lfirst(cell);
+
+ relid = RangeVarGetRelid(rel, true);
+ if (IsTempTable(relid))
+ {
+ is_temp = true;
+ break;
+ }
+ }
+
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES, is_temp);
+ }
#endif
break;
@@ -933,13 +1046,23 @@ standard_ProcessUtility(Node *parsetree,
CommentObject((CommentStmt *) parsetree);
#ifdef PGXC
- /*
- * PGXCTODO
- * Comments on temporary objects need special handling
- * depending on their types.
- */
+ /* Comment objects depending on their object and temporary types */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ {
+ bool is_temp = false;
+ RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES;
+ CommentStmt *stmt = (CommentStmt *) parsetree;
+ Oid relid = GetCommentObjectId(stmt);
+
+ /* Commented object may not have a valid object ID, so move to default */
+ if (OidIsValid(relid))
+ {
+ exec_type = ExecUtilityFindNodes(stmt->objtype,
+ relid,
+ &is_temp);
+ }
+ ExecUtilityStmtOnNodes(queryString, NULL, false, exec_type, is_temp);
+ }
#endif
break;
@@ -981,19 +1104,21 @@ standard_ProcessUtility(Node *parsetree,
{
RenameStmt *stmt = (RenameStmt *) parsetree;
RemoteQueryExecType exec_type;
+ bool is_temp = false;
/* Relation is not set for a schema */
if (stmt->relation)
- exec_type = ExecUtilityFindNodes(queryString,
- stmt->renameType,
- RangeVarGetRelid(stmt->relation, false));
+ exec_type = ExecUtilityFindNodes(stmt->renameType,
+ RangeVarGetRelid(stmt->relation, false),
+ &is_temp);
else
exec_type = EXEC_ON_ALL_NODES;
ExecUtilityStmtOnNodes(queryString,
NULL,
false,
- exec_type);
+ exec_type,
+ is_temp);
}
#endif
ExecRenameStmt((RenameStmt *) parsetree);
@@ -1004,19 +1129,21 @@ standard_ProcessUtility(Node *parsetree,
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) parsetree;
-
RemoteQueryExecType exec_type;
+ bool is_temp = false;
+
if (stmt->relation)
- exec_type = ExecUtilityFindNodes(queryString,
- stmt->objectType,
- RangeVarGetRelid(stmt->relation, false));
+ exec_type = ExecUtilityFindNodes(stmt->objectType,
+ RangeVarGetRelid(stmt->relation, false),
+ &is_temp);
else
exec_type = EXEC_ON_ALL_NODES;
ExecUtilityStmtOnNodes(queryString,
NULL,
false,
- exec_type);
+ exec_type,
+ is_temp);
}
#endif
ExecAlterObjectSchemaStmt((AlterObjectSchemaStmt *) parsetree);
@@ -1027,7 +1154,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1045,12 +1172,14 @@ standard_ProcessUtility(Node *parsetree,
*/
if (isTopLevel)
{
+ bool is_temp = false;
AlterTableStmt *stmt = (AlterTableStmt *) parsetree;
- RemoteQueryExecType exec_type = ExecUtilityFindNodes(queryString,
- stmt->relkind,
- RangeVarGetRelid(stmt->relation, false));
+ RemoteQueryExecType exec_type;
+ exec_type = ExecUtilityFindNodes(stmt->relkind,
+ RangeVarGetRelid(stmt->relation, false),
+ &is_temp);
- stmts = AddRemoteQueryNode(stmts, queryString, exec_type);
+ stmts = AddRemoteQueryNode(stmts, queryString, exec_type, is_temp);
}
#endif
@@ -1126,7 +1255,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1136,6 +1265,7 @@ standard_ProcessUtility(Node *parsetree,
{
RemoteQueryExecType remoteExecType = EXEC_ON_ALL_NODES;
GrantStmt *stmt = (GrantStmt *) parsetree;
+ bool is_temp = false;
/* Launch GRANT on Coordinator if object is a sequence */
if ((stmt->objtype == ACL_OBJECT_RELATION &&
@@ -1156,22 +1286,9 @@ standard_ProcessUtility(Node *parsetree,
RangeVar *relvar = (RangeVar *) lfirst(cell);
Oid relid = RangeVarGetRelid(relvar, false);
- if (get_rel_relkind(relid) == RELKIND_SEQUENCE ||
- get_rel_relkind(relid) == RELKIND_VIEW)
- {
- /* PGXCTODO: extend that for temporary views and tables */
- if (get_rel_relkind(relid) == RELKIND_SEQUENCE &&
- IsTempSequence(relid))
- remoteExecType = EXEC_ON_NONE;
- else
- remoteExecType = EXEC_ON_COORDS;
- }
- else
- {
- remoteExecType = EXEC_ON_ALL_NODES;
- }
+ remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp);
- /* Check if objects can be launched at the same place as 1st one */
+ /* Check if object node type corresponds to the first one */
if (first)
{
type_local = remoteExecType;
@@ -1183,11 +1300,11 @@ standard_ProcessUtility(Node *parsetree,
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PGXC does not support GRANT on multiple object types"),
- errdetail("Grant VIEW/SEQUENCE and relations on separate queries")));
+ errdetail("Grant VIEW/SEQUENCE/TABLE with separate queries")));
}
}
}
- ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp);
}
#endif
ExecuteGrantStmt((GrantStmt *) parsetree);
@@ -1198,7 +1315,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1207,7 +1324,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1260,7 +1377,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1272,7 +1389,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1280,7 +1397,7 @@ standard_ProcessUtility(Node *parsetree,
DefineEnum((CreateEnumStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1299,7 +1416,16 @@ standard_ProcessUtility(Node *parsetree,
DefineView((ViewStmt *) parsetree, queryString);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ {
+ /*
+ * If view is temporary, no need to send this query to other
+ * remote Coordinators
+ */
+ ViewStmt *stmt = (ViewStmt *) parsetree;
+
+ if (stmt->view->relpersistence != RELPERSISTENCE_TEMP)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false);
+ }
#endif
break;
@@ -1307,7 +1433,7 @@ standard_ProcessUtility(Node *parsetree,
CreateFunction((CreateFunctionStmt *) parsetree, queryString);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1315,7 +1441,7 @@ standard_ProcessUtility(Node *parsetree,
AlterFunction((AlterFunctionStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1324,6 +1450,10 @@ standard_ProcessUtility(Node *parsetree,
IndexStmt *stmt = (IndexStmt *) parsetree;
#ifdef PGXC
+ Oid relid;
+ bool is_temp = false;
+ RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES;
+
if (stmt->concurrent)
{
ereport(ERROR,
@@ -1331,6 +1461,11 @@ standard_ProcessUtility(Node *parsetree,
errmsg("PGXC does not support concurrent INDEX yet"),
errdetail("The feature is not currently supported")));
}
+
+ /* INDEX on a temporary table cannot use 2PC at commit */
+ relid = RangeVarGetRelid(stmt->relation, true);
+ if (OidIsValid(relid))
+ exec_type = ExecUtilityFindNodes(OBJECT_TABLE, relid, &is_temp);
#endif
if (stmt->concurrent)
@@ -1365,7 +1500,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR && !stmt->isconstraint && !IsConnFromCoord())
ExecUtilityStmtOnNodes(queryString, NULL,
- stmt->concurrent, EXEC_ON_ALL_NODES);
+ stmt->concurrent, exec_type, is_temp);
#endif
}
break;
@@ -1377,14 +1512,11 @@ standard_ProcessUtility(Node *parsetree,
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
RemoteQueryExecType remoteExecType;
+ bool is_temp;
Oid relid = RangeVarGetRelid(((RuleStmt *) parsetree)->relation, false);
- if (get_rel_relkind(relid) == RELKIND_VIEW)
- remoteExecType = EXEC_ON_COORDS;
- else
- remoteExecType = EXEC_ON_ALL_NODES;
-
- ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType);
+ remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp);
}
#endif
break;
@@ -1400,9 +1532,8 @@ standard_ProcessUtility(Node *parsetree,
*/
CreateSeqStmt *stmt = (CreateSeqStmt *) parsetree;
- if (stmt->sequence->relpersistence == RELPERSISTENCE_PERMANENT ||
- stmt->sequence->relpersistence == RELPERSISTENCE_UNLOGGED)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ if (stmt->sequence->relpersistence != RELPERSISTENCE_TEMP)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false);
}
#endif
break;
@@ -1420,7 +1551,7 @@ standard_ProcessUtility(Node *parsetree,
Oid relid = RangeVarGetRelid(stmt->sequence, false);
if (!IsTempSequence(relid))
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false);
}
#endif
break;
@@ -1448,7 +1579,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1461,7 +1592,7 @@ standard_ProcessUtility(Node *parsetree,
createdb((CreatedbStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1469,7 +1600,7 @@ standard_ProcessUtility(Node *parsetree,
AlterDatabase((AlterDatabaseStmt *) parsetree, isTopLevel);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1477,7 +1608,7 @@ standard_ProcessUtility(Node *parsetree,
AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1495,7 +1626,7 @@ standard_ProcessUtility(Node *parsetree,
/* Clean also remote Coordinators */
sprintf(query, "CLEAN CONNECTION TO ALL FOR DATABASE %s;", stmt->dbname);
- ExecUtilityStmtOnNodes(query, NULL, true, EXEC_ON_COORDS);
+ ExecUtilityStmtOnNodes(query, NULL, true, EXEC_ON_COORDS, false);
}
#endif
@@ -1504,7 +1635,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1551,7 +1682,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES, false);
#endif
break;
@@ -1561,7 +1692,7 @@ standard_ProcessUtility(Node *parsetree,
cluster((ClusterStmt *) parsetree, isTopLevel);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false);
#endif
break;
@@ -1574,7 +1705,7 @@ standard_ProcessUtility(Node *parsetree,
* vacuum() pops active snapshot and we can not send it to nodes
*/
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false);
#endif
vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false,
isTopLevel);
@@ -1596,8 +1727,16 @@ standard_ProcessUtility(Node *parsetree,
* send this query to backend nodes
*/
if (!stmt->is_local || !IsTransactionBlock())
- if (PoolManagerSetCommand(stmt->is_local, queryString) < 0)
+ {
+ PoolCommandType command_type;
+ if (stmt->is_local)
+ command_type = POOL_CMD_LOCAL_SET;
+ else
+ command_type = POOL_CMD_GLOBAL_SET;
+
+ if (PoolManagerSetCommand(command_type, queryString) < 0)
elog(ERROR, "Postgres-XC: ERROR SET query");
+ }
}
#endif
break;
@@ -1623,7 +1762,7 @@ standard_ProcessUtility(Node *parsetree,
* send this query to backend nodes
*/
if (!IsTransactionBlock())
- if (PoolManagerSetCommand(false, queryString) < 0)
+ if (PoolManagerSetCommand(POOL_CMD_GLOBAL_SET, queryString) < 0)
elog(ERROR, "Postgres-XC: ERROR DISCARD query");
}
#endif
@@ -1640,7 +1779,7 @@ standard_ProcessUtility(Node *parsetree,
errdetail("The feature is not currently supported")));
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1661,15 +1800,12 @@ standard_ProcessUtility(Node *parsetree,
/* If rule is defined on a view, drop it only on Coordinators */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- RemoteQueryExecType remoteExecType;
+ RemoteQueryExecType remoteExecType = EXEC_ON_ALL_NODES;
+ bool is_temp = false;
Oid relid = RangeVarGetRelid(stmt->relation, false);
- if (get_rel_relkind(relid) == RELKIND_VIEW)
- remoteExecType = EXEC_ON_COORDS;
- else
- remoteExecType = EXEC_ON_ALL_NODES;
-
- ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType);
+ remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp);
}
#endif
break;
@@ -1679,7 +1815,7 @@ standard_ProcessUtility(Node *parsetree,
stmt->behavior, stmt->missing_ok);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
default:
@@ -1694,7 +1830,7 @@ standard_ProcessUtility(Node *parsetree,
CreateProceduralLanguage((CreatePLangStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1702,7 +1838,7 @@ standard_ProcessUtility(Node *parsetree,
DropProceduralLanguage((DropPLangStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1713,7 +1849,7 @@ standard_ProcessUtility(Node *parsetree,
DefineDomain((CreateDomainStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1724,7 +1860,7 @@ standard_ProcessUtility(Node *parsetree,
CreateRole((CreateRoleStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1732,7 +1868,7 @@ standard_ProcessUtility(Node *parsetree,
AlterRole((AlterRoleStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1740,7 +1876,7 @@ standard_ProcessUtility(Node *parsetree,
AlterRoleSet((AlterRoleSetStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1748,7 +1884,7 @@ standard_ProcessUtility(Node *parsetree,
DropRole((DropRoleStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1756,7 +1892,7 @@ standard_ProcessUtility(Node *parsetree,
DropOwnedObjects((DropOwnedStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1764,7 +1900,7 @@ standard_ProcessUtility(Node *parsetree,
ReassignOwnedObjects((ReassignOwnedStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1778,7 +1914,7 @@ standard_ProcessUtility(Node *parsetree,
LockTableCommand((LockStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1810,7 +1946,7 @@ standard_ProcessUtility(Node *parsetree,
(RecoveryInProgress() ? 0 : CHECKPOINT_FORCE));
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false);
#endif
break;
@@ -1855,7 +1991,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
ExecUtilityStmtOnNodes(queryString, NULL,
- stmt->kind == OBJECT_DATABASE, EXEC_ON_ALL_NODES);
+ stmt->kind == OBJECT_DATABASE, EXEC_ON_ALL_NODES, false);
#endif
break;
}
@@ -1865,7 +2001,7 @@ standard_ProcessUtility(Node *parsetree,
CreateConversionCommand((CreateConversionStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1873,7 +2009,7 @@ standard_ProcessUtility(Node *parsetree,
CreateCast((CreateCastStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1881,7 +2017,7 @@ standard_ProcessUtility(Node *parsetree,
DropCast((DropCastStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1889,7 +2025,7 @@ standard_ProcessUtility(Node *parsetree,
DefineOpClass((CreateOpClassStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1897,7 +2033,7 @@ standard_ProcessUtility(Node *parsetree,
DefineOpFamily((CreateOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1905,7 +2041,7 @@ standard_ProcessUtility(Node *parsetree,
AlterOpFamily((AlterOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1913,7 +2049,7 @@ standard_ProcessUtility(Node *parsetree,
RemoveOpClass((RemoveOpClassStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1921,7 +2057,7 @@ standard_ProcessUtility(Node *parsetree,
RemoveOpFamily((RemoveOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1929,7 +2065,7 @@ standard_ProcessUtility(Node *parsetree,
AlterTSDictionary((AlterTSDictionaryStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1937,7 +2073,7 @@ standard_ProcessUtility(Node *parsetree,
AlterTSConfiguration((AlterTSConfigurationStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
#ifdef PGXC
@@ -1956,7 +2092,7 @@ standard_ProcessUtility(Node *parsetree,
CleanConnection((CleanConnStmt *) parsetree);
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_COORDS);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_COORDS, false);
break;
#endif
default:
@@ -1975,7 +2111,7 @@ standard_ProcessUtility(Node *parsetree,
*/
static void
ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
- bool force_autocommit, RemoteQueryExecType exec_type)
+ bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp)
{
/* Return if query is launched on no nodes */
if (exec_type == EXEC_ON_NONE)
@@ -1989,6 +2125,7 @@ ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
step->sql_statement = pstrdup(queryString);
step->force_autocommit = force_autocommit;
step->exec_type = exec_type;
+ step->is_temp = is_temp;
ExecRemoteUtility(step);
pfree(step->sql_statement);
pfree(step);
@@ -2000,37 +2137,97 @@ ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
*
* Determine the list of nodes to launch query on.
* This depends on temporary nature of object and object type.
- * PGXCTODO: Extend temporary object check for tables.
+ * Return also a flag indicating if relation is temporary.
*/
static RemoteQueryExecType
-ExecUtilityFindNodes(const char *queryString,
- ObjectType object_type,
- Oid relid)
+ExecUtilityFindNodes(ObjectType object_type,
+ Oid relid,
+ bool *is_temp)
{
- bool is_temp = false;
- RemoteQueryExecType remoteExecType = EXEC_ON_NONE;
+ RemoteQueryExecType exec_type;
- /* Check if object is a temporary sequence */
- if (object_type == OBJECT_SEQUENCE ||
- (object_type == OBJECT_TABLE &&
- get_rel_relkind(relid) == RELKIND_SEQUENCE))
- is_temp = IsTempSequence(relid);
+ switch (object_type)
+ {
+ case OBJECT_SEQUENCE:
+ /* Check if object is a temporary sequence */
+ if ((*is_temp = IsTempSequence(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
+
+ case OBJECT_TABLE:
+ /* Do the check on relation kind */
+ exec_type = ExecUtilityFindNodesRelkind(relid, is_temp);
+ break;
+
+ case OBJECT_VIEW:
+ /* Check if object is a temporary view */
+ if ((*is_temp = IsTempTable(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
+
+ case OBJECT_INDEX:
+ /* Check if given index uses temporary tables */
+ if ((*is_temp = IsIndexUsingTempTable(relid)))
+ exec_type = EXEC_ON_DATANODES;
+ else
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
+
+ default:
+ *is_temp = false;
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
+ }
+
+ return exec_type;
+}
- if (!is_temp)
+/*
+ * ExecUtilityFindNodesRelkind
+ *
+ * Get node execution and temporary type
+ * for given relation depending on its relkind
+ */
+static RemoteQueryExecType
+ExecUtilityFindNodesRelkind(Oid relid, bool *is_temp)
+{
+ char relkind_str = get_rel_relkind(relid);
+ RemoteQueryExecType exec_type;
+
+ switch (relkind_str)
{
- remoteExecType = EXEC_ON_ALL_NODES;
+ case RELKIND_SEQUENCE:
+ if ((*is_temp = IsTempSequence(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
- if (object_type == OBJECT_SEQUENCE ||
- object_type == OBJECT_VIEW)
- remoteExecType = EXEC_ON_COORDS;
- else if (object_type == OBJECT_TABLE)
- {
- if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
- remoteExecType = EXEC_ON_COORDS;
- }
+ case RELKIND_RELATION:
+ if ((*is_temp = IsTempTable(relid)))
+ exec_type = EXEC_ON_DATANODES;
+ else
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
+
+ case RELKIND_VIEW:
+ if ((*is_temp = IsTempTable(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
+
+ default:
+ *is_temp = false;
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
}
- return remoteExecType;
+ return exec_type;
}
#endif