summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorKoichi Suzuki2011-08-05 07:38:03 +0000
committerKoichi Suzuki2011-08-05 07:38:03 +0000
commitf1a32c6227136d722aead50b8d4a85fceba7523f (patch)
tree26feb93534c888eac3396c3e6b568df1b401ddeb /src/backend
parent694cab5487b60da9b07cf0418bac6b1e41875ea9 (diff)
parent8a15033d7d7ae7cbe9a100ea88b8e18f033f0cb4 (diff)
Merge branch 'master' of ssh://postgres-xc.git.sourceforge.net/gitroot/postgres-xc/postgres-xc
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/xact.c17
-rw-r--r--src/backend/commands/comment.c42
-rw-r--r--src/backend/commands/copy.c6
-rw-r--r--src/backend/commands/schemacmds.c3
-rw-r--r--src/backend/commands/tablecmds.c117
-rw-r--r--src/backend/optimizer/plan/createplan.c10
-rw-r--r--src/backend/pgxc/plan/planner.c46
-rw-r--r--src/backend/pgxc/pool/execRemote.c63
-rw-r--r--src/backend/pgxc/pool/poolmgr.c186
-rw-r--r--src/backend/tcop/utility.c557
10 files changed, 797 insertions, 250 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1a199789b1..e3e0e0f9aa 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1962,6 +1962,14 @@ CommitTransaction(bool contact_gtm)
char implicitgid[256];
TransactionId xid = InvalidTransactionId;
+ /*
+ * Check if there are any On Commit actions and force temporary object flag.
+ * This is possible in the case of a session using ON COMMIT DELETE ROWS.
+ * It is essential to do this check *before* calling PGXCNodeIsImplicit2PC.
+ */
+ if (IsOnCommitActions())
+ ExecSetTempObjectIncluded();
+
if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && contact_gtm)
PreparePGXCNodes = PGXCNodeIsImplicit2PC(&PrepareLocalCoord);
@@ -2330,6 +2338,15 @@ PrepareTransaction(void)
ShowTransactionState("PrepareTransaction");
+#ifdef PGXC
+ /*
+ * Check if there are any On Commit actions and force temporary object flag.
+ * This is possible in the case of a session using ON COMMIT DELETE ROWS.
+ */
+ if (IsOnCommitActions())
+ ExecSetTempObjectIncluded();
+#endif
+
/*
* check the current transaction state
*/
diff --git a/src/backend/commands/comment.c b/src/backend/commands/comment.c
index d09bef0682..f18a132219 100644
--- a/src/backend/commands/comment.c
+++ b/src/backend/commands/comment.c
@@ -4,7 +4,8 @@
*
* PostgreSQL object comments utility code.
*
- * Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2011 Nippon Telegraph and Telephone Corporation
*
* IDENTIFICATION
* src/backend/commands/comment.c
@@ -460,3 +461,42 @@ GetComment(Oid oid, Oid classoid, int32 subid)
return comment;
}
+
+
+#ifdef PGXC
+/*
+ * GetCommentObjectId
+ *
+ * Return Object ID of object commented
+ * Note: This function uses portions of the code of CommentObject,
+ * even if this code is duplicated this is done like this to facilitate
+ * merges with PostgreSQL head.
+ */
+Oid
+GetCommentObjectId(CommentStmt *stmt)
+{
+ ObjectAddress address;
+ Relation relation;
+
+ if (stmt->objtype == OBJECT_DATABASE && list_length(stmt->objname) == 1)
+ {
+ char *database = strVal(linitial(stmt->objname));
+
+ if (!OidIsValid(get_database_oid(database, true)))
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", database)));
+ return InvalidOid;
+ }
+ }
+
+ address = get_object_address(stmt->objtype, stmt->objname, stmt->objargs,
+ &relation, ShareUpdateExclusiveLock);
+
+ if (relation != NULL)
+ relation_close(relation, NoLock);
+
+ return address.objectId;
+}
+#endif
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 952b88b533..bfaf6e2860 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -918,6 +918,12 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
rte->relkind = rel->rd_rel->relkind;
rte->requiredPerms = required_access;
+#ifdef PGXC
+ /* In case COPY is used on a temporary table, never use 2PC for implicit commits */
+ if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
+ ExecSetTempObjectIncluded();
+#endif
+
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
foreach(cur, attnums)
diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c
index 831886f46d..851b0dca3b 100644
--- a/src/backend/commands/schemacmds.c
+++ b/src/backend/commands/schemacmds.c
@@ -132,7 +132,8 @@ CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString)
* Add a RemoteQuery node for a query at top level on a remote Coordinator
*/
if (is_top_level)
- parsetree_list = AddRemoteQueryNode(parsetree_list, queryString, EXEC_ON_ALL_NODES);
+ parsetree_list = AddRemoteQueryNode(parsetree_list, queryString,
+ EXEC_ON_ALL_NODES, false);
#endif
/*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 644e572934..7fe0015868 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -89,6 +89,7 @@
#include "pgxc/pgxc.h"
#include "access/gtm.h"
#include "commands/sequence.h"
+#include "pgxc/execRemote.h"
#endif
/*
@@ -439,20 +440,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
* code. This is needed because calling code might not expect untrusted
* tables to appear in pg_temp at the front of its search path.
*/
-#ifdef PGXC
- if (stmt->relation->relpersistence == RELPERSISTENCE_TEMP &&
- IsUnderPostmaster &&
- relkind != RELKIND_SEQUENCE)
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("PG-XC does not yet support temporary tables")));
-#else
if (stmt->relation->relpersistence == RELPERSISTENCE_TEMP
&& InSecurityRestrictedOperation())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("cannot create temporary table within security-restricted operation")));
-#endif
/*
* Look up the namespace in which we are supposed to create the relation,
@@ -9462,3 +9454,110 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
}
}
}
+
+#ifdef PGXC
+/*
+ * IsTempTable
+ *
+ * Check if given table Oid is temporary.
+ */
+bool
+IsTempTable(Oid relid)
+{
+ Relation rel;
+ bool res;
+ /*
+ * PGXCTODO: Is it correct to open without locks?
+ * we just check if this table is temporary though...
+ */
+ rel = relation_open(relid, NoLock);
+ res = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
+ relation_close(rel, NoLock);
+ return res;
+}
+
+/*
+ * IsIndexUsingTemp
+ *
+ * Check if given index relation uses temporary tables.
+ */
+bool
+IsIndexUsingTempTable(Oid relid)
+{
+ bool res = false;
+ HeapTuple tuple;
+ Oid parent_id = InvalidOid;
+
+ tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relid));
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple);
+ parent_id = index->indrelid;
+
+ /* Release system cache BEFORE looking at the parent table */
+ ReleaseSysCache(tuple);
+
+ res = IsTempTable(parent_id);
+ }
+ else
+ res = false; /* Default case */
+
+ return res;
+}
+
+/*
+ * IsOnCommitDeleteRows
+ *
+ * Check if there are any on-commit actions activated
+ * This is possible in the case of ON COMMIT DELETE ROWS for example.
+ * In this case 2PC cannot be used.
+ */
+bool
+IsOnCommitActions(void)
+{
+ return list_length(on_commits) > 0;
+}
+
+/*
+ * DropTableThrowErrorExternal
+ *
+ * Error interface for DROP when looking for execution node type.
+ */
+void
+DropTableThrowErrorExternal(RangeVar *relation, ObjectType removeType, bool missing_ok)
+{
+ char relkind;
+
+ /* Determine required relkind */
+ switch (removeType)
+ {
+ case OBJECT_TABLE:
+ relkind = RELKIND_RELATION;
+ break;
+
+ case OBJECT_INDEX:
+ relkind = RELKIND_INDEX;
+ break;
+
+ case OBJECT_SEQUENCE:
+ relkind = RELKIND_SEQUENCE;
+ break;
+
+ case OBJECT_VIEW:
+ relkind = RELKIND_VIEW;
+ break;
+
+ case OBJECT_FOREIGN_TABLE:
+ relkind = RELKIND_FOREIGN_TABLE;
+ break;
+
+ default:
+ elog(ERROR, "unrecognized drop object type: %d",
+ (int) removeType);
+ relkind = 0; /* keep compiler quiet */
+ break;
+ }
+
+ DropErrorMsgNonExistent(relation->relname, relkind, missing_ok);
+}
+#endif
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index d3b5c7793b..bc32049b87 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -47,6 +47,7 @@
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "rewrite/rewriteManip.h"
+#include "commands/tablecmds.h"
#endif
#include "utils/lsyscache.h"
@@ -2601,8 +2602,15 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path,
/*
* XXX: should use GENERIC OPTIONS like 'foreign_relname' or something for
* the foreign table name instead of the local name ?
+ *
+ * A temporary table does not use namespace as it may not be
+ * consistent among nodes cluster. Relation name is sufficient.
*/
- appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q);
+ if (IsTempTable(rte->relid))
+ appendStringInfo(&sql, "%s %s", relname_q, aliasname_q);
+ else
+ appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q);
+
pfree(nspname);
pfree(relname);
if (nspname_q != nspname_q)
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 7da7f848af..e3499134db 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -46,6 +46,7 @@
#include "utils/syscache.h"
#include "utils/numeric.h"
#include "access/hash.h"
+#include "commands/tablecmds.h"
#include "utils/timestamp.h"
#include "utils/date.h"
@@ -171,7 +172,8 @@ static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStm
static void InitXCWalkerContext(XCWalkerContext *context);
static RemoteQuery *makeRemoteQuery(void);
static void validate_part_col_updatable(const Query *query);
-
+static bool contains_temp_tables(List *rtable);
+static bool contains_only_pg_catalog(List *rtable);
/*
* Find position of specified substring in the string
@@ -1403,7 +1405,7 @@ examine_conditions_fromlist(Node *treenode, XCWalkerContext *context)
* only contain pg_catalog entries.
*/
static bool
-contains_only_pg_catalog (List *rtable)
+contains_only_pg_catalog(List *rtable)
{
ListCell *item;
@@ -1416,13 +1418,39 @@ contains_only_pg_catalog (List *rtable)
{
if (get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE)
return false;
- } else if (rte->rtekind == RTE_SUBQUERY &&
- !contains_only_pg_catalog (rte->subquery->rtable))
+ }
+ else if (rte->rtekind == RTE_SUBQUERY &&
+ !contains_only_pg_catalog(rte->subquery->rtable))
return false;
}
return true;
}
+/*
+ * Returns true if at least one temporary table is in use
+ * in query (and its subqueries)
+ */
+static bool
+contains_temp_tables(List *rtable)
+{
+ ListCell *item;
+
+ foreach(item, rtable)
+ {
+ RangeTblEntry *rte = (RangeTblEntry *) lfirst(item);
+
+ if (rte->rtekind == RTE_RELATION)
+ {
+ if (IsTempTable(rte->relid))
+ return true;
+ }
+ else if (rte->rtekind == RTE_SUBQUERY &&
+ contains_temp_tables(rte->subquery->rtable))
+ return true;
+ }
+
+ return false;
+}
/*
* get_plan_nodes - determine the nodes to execute the command on.
@@ -1934,6 +1962,7 @@ makeRemoteQuery(void)
result->paramval_data = NULL;
result->paramval_len = 0;
result->exec_direct_type = EXEC_DIRECT_NONE;
+ result->is_temp = false;
result->relname = NULL;
result->remotejoin = false;
@@ -2737,12 +2766,16 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query->commandType != CMD_SELECT)
result->resultRelations = list_make1_int(query->resultRelation);
- if (contains_only_pg_catalog (query->rtable))
+ if (contains_only_pg_catalog(query->rtable))
{
result = standard_planner(query, cursorOptions, boundParams);
return result;
}
+ /* Check if temporary tables are in use in target list */
+ if (contains_temp_tables(query->rtable))
+ query_step->is_temp = true;
+
if (query_step->exec_nodes == NULL)
get_plan_nodes_command(query_step, root);
@@ -3184,7 +3217,7 @@ GetHashExecNodes(RelationLocInfo *rel_loc_info, ExecNodes **exec_nodes, const Ex
* duplicated queries on Datanodes.
*/
List *
-AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType remoteExecType)
+AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType remoteExecType, bool is_temp)
{
List *result = stmts;
@@ -3199,6 +3232,7 @@ AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType rem
step->combine_type = COMBINE_TYPE_SAME;
step->sql_statement = queryString;
step->exec_type = remoteExecType;
+ step->is_temp = is_temp;
result = lappend(result, step);
}
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 1efb364333..121245d696 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -52,6 +52,7 @@
static bool autocommit = true;
static bool is_ddl = false;
static bool implicit_force_autocommit = false;
+static bool temp_object_included = false;
static PGXCNodeHandle **write_node_list = NULL;
static int write_node_count = 0;
static char *begin_string = NULL;
@@ -1536,11 +1537,27 @@ finish:
if (res != 0)
{
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not prepare transaction on data nodes")));
+
+ /* In case transaction has operated on temporary objects */
+ if (temp_object_included)
+ {
+ /* Reset temporary object flag */
+ temp_object_included = false;
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not prepare transaction on data nodes")));
+ }
}
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
return local_operation;
}
@@ -1824,6 +1841,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Clean up connections */
pfree_pgxc_all_handles(pgxc_connections);
@@ -1973,6 +1993,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Free node list taken from GTM */
if (datanodes && datanodecnt != 0)
free(datanodes);
@@ -2113,6 +2136,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Free node list taken from GTM */
if (datanodes)
free(datanodes);
@@ -2210,6 +2236,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Clean up connections */
pfree_pgxc_all_handles(pgxc_connections);
if (res != 0)
@@ -2284,6 +2313,9 @@ finish:
is_ddl = false;
clear_write_node_list();
+ /* Reset temporary object flag */
+ temp_object_included = false;
+
/* Clean up connections */
pfree_pgxc_all_handles(pgxc_connections);
return res;
@@ -3217,6 +3249,10 @@ do_query(RemoteQueryState *node)
bool need_tran;
PGXCNodeAllHandles *pgxc_connections;
+ /* Be sure to set temporary object flag if necessary */
+ if (step->is_temp)
+ temp_object_included = true;
+
/*
* Get connections for Datanodes only, utilities and DDLs
* are launched in ExecRemoteUtility
@@ -4033,6 +4069,9 @@ ExecRemoteUtility(RemoteQuery *node)
implicit_force_autocommit = force_autocommit;
+ /* A transaction using temporary objects cannot use 2PC */
+ temp_object_included = node->is_temp;
+
remotestate = CreateResponseCombiner(0, node->combine_type);
pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type);
@@ -4516,12 +4555,14 @@ PGXCNodeIsImplicit2PC(bool *prepare_local_coord)
/*
* In case of an autocommit or forced autocommit transaction, 2PC is not involved
- * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...)
+ * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...).
+ * For a transaction using temporary objects, 2PC is not authorized.
*/
- if (implicit_force_autocommit)
+ if (implicit_force_autocommit || temp_object_included)
{
*prepare_local_coord = false;
implicit_force_autocommit = false;
+ temp_object_included = false;
return false;
}
@@ -4622,3 +4663,15 @@ int DataNodeCopyInBinaryForAll(char *msg_buf, int len, PGXCNodeHandle** copy_con
return 0;
}
+
+/*
+ * ExecSetTempObjectIncluded
+ *
+ * Set Temp object flag on the fly for transactions
+ * This flag will be reinitialized at commit.
+ */
+void
+ExecSetTempObjectIncluded(void)
+{
+ temp_object_included = true;
+}
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index 463bd5a170..c0d8d56aab 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -93,7 +93,13 @@ static void agent_init(PoolAgent *agent, const char *database, const char *user_
static void agent_destroy(PoolAgent *agent);
static void agent_create(void);
static void agent_handle_input(PoolAgent *agent, StringInfo s);
-static int agent_set_command(PoolAgent *agent, const char *set_command, bool is_local);
+static int agent_session_command(PoolAgent *agent,
+ const char *set_command,
+ PoolCommandType command_type);
+static int agent_set_command(PoolAgent *agent,
+ const char *set_command,
+ PoolCommandType command_type);
+static int agent_temp_command(PoolAgent *agent);
static DatabasePool *create_database_pool(const char *database, const char *user_name);
static void insert_database_pool(DatabasePool *pool);
static int destroy_database_pool(const char *database, const char *user_name);
@@ -107,7 +113,7 @@ static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List
static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist);
static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node, char client_conn_type);
static void agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard);
-static void agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list);
+static void agent_reset_session(PoolAgent *agent, List *dn_list, List *co_list);
static void release_connection(DatabasePool *dbPool, PGXCNodePoolSlot *slot, int index, bool clean,
char client_conn_type);
static void destroy_slot(PGXCNodePoolSlot *slot);
@@ -490,6 +496,7 @@ agent_create(void)
agent->coord_connections = NULL;
agent->session_params = NULL;
agent->local_params = NULL;
+ agent->is_temp = false;
agent->pid = 0;
/* Append new agent to the list */
@@ -543,30 +550,44 @@ PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_na
}
int
-PoolManagerSetCommand(bool is_local, const char *set_command)
+PoolManagerSetCommand(PoolCommandType command_type, const char *set_command)
{
int n32;
char msgtype = 's';
- Assert(set_command);
Assert(Handle);
/* Message type */
pool_putbytes(&Handle->port, &msgtype, 1);
/* Message length */
- n32 = htonl(strlen(set_command) + 10);
+ if (set_command)
+ n32 = htonl(strlen(set_command) + 13);
+ else
+ n32 = htonl(12);
+
pool_putbytes(&Handle->port, (char *) &n32, 4);
/* LOCAL or SESSION parameter ? */
- pool_putbytes(&Handle->port, (char *) &is_local, 1);
-
- /* Length of SET command string */
- n32 = htonl(strlen(set_command) + 1);
+ n32 = htonl(command_type);
pool_putbytes(&Handle->port, (char *) &n32, 4);
- /* Send command string followed by \0 terminator */
- pool_putbytes(&Handle->port, set_command, strlen(set_command) + 1);
+ if (set_command)
+ {
+ /* Length of SET command string */
+ n32 = htonl(strlen(set_command) + 1);
+ pool_putbytes(&Handle->port, (char *) &n32, 4);
+
+ /* Send command string followed by \0 terminator */
+ pool_putbytes(&Handle->port, set_command, strlen(set_command) + 1);
+ }
+ else
+ {
+ /* Send empty command */
+ n32 = htonl(0);
+ pool_putbytes(&Handle->port, (char *) &n32, 4);
+ }
+
pool_flush(&Handle->port);
/* Get result */
@@ -628,10 +649,10 @@ agent_destroy(PoolAgent *agent)
co_conn = lappend_int(co_conn, i+1);
/*
- * agent is being destroyed, so reset session parameters
- * before putting back connections to pool
+ * Agent is being destroyed, so reset session parameters
+ * and temporary objects before putting back connections to pool.
*/
- agent_reset_params(agent, dn_conn, co_conn);
+ agent_reset_session(agent, dn_conn, co_conn);
/* release them all */
agent_release_connections(agent, dn_conn, co_conn);
@@ -881,8 +902,8 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
{
const char *database = NULL;
const char *user_name = NULL;
- const char *set_command;
- bool is_local;
+ const char *set_command = NULL;
+ PoolCommandType command_type;
int datanodecount;
int coordcount;
List *datanodelist = NIL;
@@ -1043,16 +1064,19 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
list_free(datanodelist);
list_free(coordlist);
break;
- case 's': /* SET COMMAND */
+ case 's': /* Session-related COMMAND */
pool_getmessage(&agent->port, s, 0);
/* Determine if command is local or session */
- is_local = (bool) pq_getmsgbyte(s);
- /* Get the SET command */
+ command_type = (PoolCommandType) pq_getmsgint(s, 4);
+ /* Get the SET command if necessary */
len = pq_getmsgint(s, 4);
- set_command = pq_getmsgbytes(s, len);
+ if (len != 0)
+ set_command = pq_getmsgbytes(s, len);
+
pq_getmsgend(s);
- res = agent_set_command(agent, set_command, is_local);
+ /* Manage command depending on its type */
+ res = agent_session_command(agent, set_command, command_type);
/* Send success result */
pool_sendres(&agent->port, res);
@@ -1068,11 +1092,46 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
}
/*
+ * Manage a session command for pooler
+ */
+static int
+agent_session_command(PoolAgent *agent, const char *set_command, PoolCommandType command_type)
+{
+ int res;
+
+ switch (command_type)
+ {
+ case POOL_CMD_LOCAL_SET:
+ case POOL_CMD_GLOBAL_SET:
+ res = agent_set_command(agent, set_command, command_type);
+ break;
+ case POOL_CMD_TEMP:
+ res = agent_temp_command(agent);
+ break;
+ default:
+ res = -1;
+ break;
+ }
+
+ return res;
+}
+
+/*
+ * Set agent flag that a temporary object is in use.
+ */
+static int
+agent_temp_command(PoolAgent *agent)
+{
+ agent->is_temp = true;
+ return 0;
+}
+
+/*
* Save a SET command and distribute it to the agent connections
* already in use.
*/
static int
-agent_set_command(PoolAgent *agent, const char *set_command, bool is_local)
+agent_set_command(PoolAgent *agent, const char *set_command, PoolCommandType command_type)
{
char *params_string;
int i;
@@ -1080,11 +1139,16 @@ agent_set_command(PoolAgent *agent, const char *set_command, bool is_local)
Assert(agent);
Assert(set_command);
+ Assert(command_type == POOL_CMD_LOCAL_SET || command_type == POOL_CMD_GLOBAL_SET);
- if (is_local)
+ if (command_type == POOL_CMD_LOCAL_SET)
params_string = agent->local_params;
- else
+ else if (command_type == POOL_CMD_GLOBAL_SET)
params_string = agent->session_params;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Set command process failed")));
/* First command recorded */
if (!params_string)
@@ -1131,9 +1195,9 @@ agent_set_command(PoolAgent *agent, const char *set_command, bool is_local)
}
/* Save the latest string */
- if (is_local)
+ if (command_type == POOL_CMD_LOCAL_SET)
agent->local_params = params_string;
- else
+ else if (command_type == POOL_CMD_GLOBAL_SET)
agent->session_params = params_string;
return res;
@@ -1278,7 +1342,6 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
static int
cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
{
- int i;
ListCell *nodelist_item;
char errbuf[256];
int nCount;
@@ -1435,8 +1498,9 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard)
return;
/*
- * If there are some session parameters, do not put back connections to pool
- * disconnection will be made when session is cut for this user.
+ * If there are some session parameters or temporary objects,
+ * do not put back connections to pool.
+ * Disconnection will be made when session is cut for this user.
* Local parameters are reset when transaction block is finished,
* so don't do anything for them, but just reset their list.
*/
@@ -1445,7 +1509,8 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard)
pfree(agent->local_params);
agent->local_params = NULL;
}
- if (agent->session_params)
+ if (agent->session_params ||
+ agent->is_temp)
return;
/* Discard first for Datanodes */
@@ -1515,50 +1580,61 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard)
* modified by session parameters.
*/
static void
-agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list)
+agent_reset_session(PoolAgent *agent, List *dn_list, List *co_list)
{
- PGXCNodePoolSlot *slot;
if (!agent->dn_connections && !agent->coord_connections)
return;
- /* Parameters are reset, so free commands */
- if (agent->session_params)
- {
- pfree(agent->session_params);
- agent->session_params = NULL;
- }
- if (agent->local_params)
- {
- pfree(agent->local_params);
- agent->local_params = NULL;
- }
+ if (!agent->session_params && !agent->local_params && !agent->is_temp)
+ return;
- /* Reset Datanode connection params */
- if (dn_list)
+ /*
+ * Reset Datanode connection params.
+ * Discard is only done for Datanodes as Temporary objects are never created
+ * to other Coordinators in a session.
+ */
+ if (dn_list &&
+ (agent->session_params || agent->local_params || agent->is_temp))
{
ListCell *lc;
foreach(lc, dn_list)
{
+ PGXCNodePoolSlot *slot;
int node = lfirst_int(lc);
+
Assert(node > 0 && node <= NumDataNodes);
slot = agent->dn_connections[node - 1];
/* Reset connection params */
if (slot)
- PGXCNodeSendSetQuery(slot->conn, "RESET ALL;");
+ {
+ if (agent->session_params || agent->local_params)
+ PGXCNodeSendSetQuery(slot->conn, "RESET ALL;");
+
+ /*
+ * Discard queries cannot be sent as multiple-queries,
+ * so do it separately. It is OK to use this slow process
+ * as session is ending.
+ */
+ if (agent->is_temp)
+ PGXCNodeSendSetQuery(slot->conn, "DISCARD ALL;");
+ }
}
}
/* Reset Coordinator connection params */
- if (co_list)
+ if (co_list &&
+ (agent->session_params || agent->local_params))
{
ListCell *lc;
foreach(lc, co_list)
{
+ PGXCNodePoolSlot *slot;
int node = lfirst_int(lc);
+
Assert(node > 0 && node <= NumCoords);
slot = agent->coord_connections[node - 1];
@@ -1567,6 +1643,19 @@ agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list)
PGXCNodeSendSetQuery(slot->conn, "RESET ALL;");
}
}
+
+ /* Parameters are reset, so free commands */
+ if (agent->session_params)
+ {
+ pfree(agent->session_params);
+ agent->session_params = NULL;
+ }
+ if (agent->local_params)
+ {
+ pfree(agent->local_params);
+ agent->local_params = NULL;
+ }
+ agent->is_temp = false;
}
/*
@@ -2070,6 +2159,9 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type)
errmsg("out of memory")));
}
+ /* If connection fails, be sure that slot is destroyed cleanly */
+ slot->xc_cancelConn = NULL;
+
/* Establish connection */
slot->conn = PGXCNodeConnect(nodePool->connstr);
if (!PGXCNodeConnected(slot->conn))
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index e4e6ed55b4..9e88b268b2 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -7,6 +7,7 @@
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 2010-2011 Nippon Telegraph and Telephone Corporation
*
*
* IDENTIFICATION
@@ -68,12 +69,15 @@
#include "pgxc/planner.h"
#include "pgxc/poolutils.h"
#include "pgxc/poolmgr.h"
+#include "utils/lsyscache.h"
static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
- bool force_autocommit, RemoteQueryExecType exec_type);
-static RemoteQueryExecType ExecUtilityFindNodes(const char *queryString,
- ObjectType objectType,
- Oid relid);
+ bool force_autocommit, RemoteQueryExecType exec_type,
+ bool is_temp);
+static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType,
+ Oid relid,
+ bool *is_temp);
+static RemoteQueryExecType ExecUtilityFindNodesRelkind(Oid relid, bool *is_temp);
#endif
@@ -653,17 +657,77 @@ standard_ProcessUtility(Node *parsetree,
List *stmts;
ListCell *l;
Oid relOid;
+#ifdef PGXC
+ bool is_temp = false;
+#endif
/* Run parse analysis ... */
stmts = transformCreateStmt((CreateStmt *) parsetree,
queryString);
-
#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ /*
+ * Scan the list of objects.
+ * Temporary tables are created on Datanodes only.
+ * Non-temporary objects are created on all nodes.
+ * In case temporary and non-temporary objects are mized return an error.
+ */
+ bool is_first = true;
+
+ foreach(l, stmts)
+ {
+ Node *stmt = (Node *) lfirst(l);
+
+ if (IsA(stmt, CreateStmt))
+ {
+ CreateStmt *stmt_loc = (CreateStmt *) stmt;
+ bool is_object_temp = stmt_loc->relation->relpersistence == RELPERSISTENCE_TEMP;
+
+ if (is_first)
+ {
+ is_first = false;
+ if (is_object_temp)
+ is_temp = true;
+ }
+ else
+ {
+ if (is_object_temp != is_temp)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("CREATE not supported for TEMP and non-TEMP objects"),
+ errdetail("You should separate TEMP and non-TEMP objects")));
+ }
+ }
+ else if (IsA(stmt, CreateForeignTableStmt))
+ {
+ /* There are no temporary foreign tables */
+ if (is_first)
+ {
+ is_first = false;
+ }
+ else
+ {
+ if (!is_temp)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("CREATE not supported for TEMP and non-TEMP objects"),
+ errdetail("You should separate TEMP and non-TEMP objects")));
+ }
+ }
+ }
+ }
+
/*
* Add a RemoteQuery node for a query at top level on a remote Coordinator
*/
if (isTopLevel)
- stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_ALL_NODES);
+ {
+ if (is_temp)
+ stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_DATANODES, is_temp);
+ else
+ stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_ALL_NODES, is_temp);
+ }
#endif
/* ... and do it */
@@ -676,6 +740,12 @@ standard_ProcessUtility(Node *parsetree,
Datum toast_options;
static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
+#ifdef PGXC
+ /* Set temporary object object flag in pooler */
+ if (is_temp)
+ PoolManagerSetCommand(POOL_CMD_TEMP, NULL);
+#endif
+
/* Create the table itself */
relOid = DefineRelation((CreateStmt *) stmt,
RELKIND_RELATION,
@@ -798,6 +868,7 @@ standard_ProcessUtility(Node *parsetree,
DropStmt *stmt = (DropStmt *) parsetree;
#ifdef PGXC
bool is_temp = false;
+ RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES;
/*
* We need to check details of the objects being dropped and
@@ -806,41 +877,75 @@ standard_ProcessUtility(Node *parsetree,
*/
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- /*
- * Check the list of sequences/tables going to be dropped.
- * XC does not allow yet to mix drop of temporary and
- * non-temporary objects because this involves to rewrite
- * query to process for tables and stop remote query process
- * for sequences.
- * PGXCTODO: For the time being this is just checked for
- * sequences but should also be done for tables
- */
-
- if (stmt->removeType == OBJECT_SEQUENCE)
+ switch (stmt->removeType)
{
- ListCell *cell;
- bool is_first = true;
-
- foreach(cell, stmt->objects)
- {
- RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
- Oid relid;
-
- relid = RangeVarGetRelid(rel, false);
- if (is_first)
+ case OBJECT_TABLE:
+ case OBJECT_SEQUENCE:
+ case OBJECT_VIEW:
+ case OBJECT_INDEX:
{
- is_temp = IsTempSequence(relid);
- is_first = false;
- }
- else
- {
- if (IsTempSequence(relid) != is_temp)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DROP SEQUENCE not supported for TEMP and non-TEMP sequences"),
- errdetail("DROP can be done if TEMP and non-TEMP objects are separated")));
+ /*
+ * Check the list of objects going to be dropped.
+ * XC does not allow yet to mix drop of temporary and
+ * non-temporary objects because this involves to rewrite
+ * query to process for tables.
+ */
+ ListCell *cell;
+ bool is_first = true;
+
+ foreach(cell, stmt->objects)
+ {
+ RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
+ Oid relid;
+
+ /*
+ * Do not print result at all, error is thrown
+ * after if necessary
+ */
+ relid = RangeVarGetRelid(rel, true);
+
+ /*
+ * In case this relation ID is incorrect throw
+ * a correct DROP error.
+ */
+ if (!OidIsValid(relid) && !stmt->missing_ok)
+ DropTableThrowErrorExternal(rel,
+ stmt->removeType,
+ stmt->missing_ok);
+
+ /* In case of DROP ... IF EXISTS bypass */
+ if (!OidIsValid(relid) && stmt->missing_ok)
+ continue;
+
+ if (is_first)
+ {
+ exec_type = ExecUtilityFindNodes(stmt->removeType,
+ relid,
+ &is_temp);
+ is_first = false;
+ }
+ else
+ {
+ RemoteQueryExecType exec_type_loc;
+ bool is_temp_loc;
+ exec_type_loc = ExecUtilityFindNodes(stmt->removeType,
+ relid,
+ &is_temp_loc);
+ if (exec_type_loc != exec_type ||
+ is_temp_loc != is_temp)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DROP not supported for TEMP and non-TEMP objects"),
+ errdetail("You should separate TEMP and non-TEMP objects")));
+ }
+ }
}
- }
+ break;
+
+ default:
+ is_temp = false;
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
}
}
#endif
@@ -898,20 +1003,9 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
- /*
- * Drop can be done on non-temporary objects and temporary objects
- * if they are not mixed up.
- * PGXCTODO: This should be extended to tables
- */
- if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !is_temp)
- {
- /* Sequence and views exists only on Coordinators */
- if (stmt->removeType == OBJECT_SEQUENCE ||
- stmt->removeType == OBJECT_VIEW)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
- else
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
- }
+ /* DROP is done depending on the object type and its temporary type */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ ExecUtilityStmtOnNodes(queryString, NULL, false, exec_type, is_temp);
#endif
}
break;
@@ -920,12 +1014,31 @@ standard_ProcessUtility(Node *parsetree,
ExecuteTruncate((TruncateStmt *) parsetree);
#ifdef PGXC
/*
- * PGXCTODO
- * We may need to check details of the object being truncated and
- * run command on correct nodes
+ * Check details of the object being truncated.
+ * If at least one temporary table is truncated truncate cannot use 2PC
+ * at commit.
*/
- if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ bool is_temp = false;
+ ListCell *cell;
+ TruncateStmt *stmt = (TruncateStmt *) parsetree;
+
+ foreach(cell, stmt->relations)
+ {
+ Oid relid;
+ RangeVar *rel = (RangeVar *) lfirst(cell);
+
+ relid = RangeVarGetRelid(rel, true);
+ if (IsTempTable(relid))
+ {
+ is_temp = true;
+ break;
+ }
+ }
+
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES, is_temp);
+ }
#endif
break;
@@ -933,13 +1046,23 @@ standard_ProcessUtility(Node *parsetree,
CommentObject((CommentStmt *) parsetree);
#ifdef PGXC
- /*
- * PGXCTODO
- * Comments on temporary objects need special handling
- * depending on their types.
- */
+ /* Comment objects depending on their object and temporary types */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ {
+ bool is_temp = false;
+ RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES;
+ CommentStmt *stmt = (CommentStmt *) parsetree;
+ Oid relid = GetCommentObjectId(stmt);
+
+ /* Commented object may not have a valid object ID, so move to default */
+ if (OidIsValid(relid))
+ {
+ exec_type = ExecUtilityFindNodes(stmt->objtype,
+ relid,
+ &is_temp);
+ }
+ ExecUtilityStmtOnNodes(queryString, NULL, false, exec_type, is_temp);
+ }
#endif
break;
@@ -981,19 +1104,21 @@ standard_ProcessUtility(Node *parsetree,
{
RenameStmt *stmt = (RenameStmt *) parsetree;
RemoteQueryExecType exec_type;
+ bool is_temp = false;
/* Relation is not set for a schema */
if (stmt->relation)
- exec_type = ExecUtilityFindNodes(queryString,
- stmt->renameType,
- RangeVarGetRelid(stmt->relation, false));
+ exec_type = ExecUtilityFindNodes(stmt->renameType,
+ RangeVarGetRelid(stmt->relation, false),
+ &is_temp);
else
exec_type = EXEC_ON_ALL_NODES;
ExecUtilityStmtOnNodes(queryString,
NULL,
false,
- exec_type);
+ exec_type,
+ is_temp);
}
#endif
ExecRenameStmt((RenameStmt *) parsetree);
@@ -1004,19 +1129,21 @@ standard_ProcessUtility(Node *parsetree,
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) parsetree;
-
RemoteQueryExecType exec_type;
+ bool is_temp = false;
+
if (stmt->relation)
- exec_type = ExecUtilityFindNodes(queryString,
- stmt->objectType,
- RangeVarGetRelid(stmt->relation, false));
+ exec_type = ExecUtilityFindNodes(stmt->objectType,
+ RangeVarGetRelid(stmt->relation, false),
+ &is_temp);
else
exec_type = EXEC_ON_ALL_NODES;
ExecUtilityStmtOnNodes(queryString,
NULL,
false,
- exec_type);
+ exec_type,
+ is_temp);
}
#endif
ExecAlterObjectSchemaStmt((AlterObjectSchemaStmt *) parsetree);
@@ -1027,7 +1154,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1045,12 +1172,14 @@ standard_ProcessUtility(Node *parsetree,
*/
if (isTopLevel)
{
+ bool is_temp = false;
AlterTableStmt *stmt = (AlterTableStmt *) parsetree;
- RemoteQueryExecType exec_type = ExecUtilityFindNodes(queryString,
- stmt->relkind,
- RangeVarGetRelid(stmt->relation, false));
+ RemoteQueryExecType exec_type;
+ exec_type = ExecUtilityFindNodes(stmt->relkind,
+ RangeVarGetRelid(stmt->relation, false),
+ &is_temp);
- stmts = AddRemoteQueryNode(stmts, queryString, exec_type);
+ stmts = AddRemoteQueryNode(stmts, queryString, exec_type, is_temp);
}
#endif
@@ -1126,7 +1255,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1136,6 +1265,7 @@ standard_ProcessUtility(Node *parsetree,
{
RemoteQueryExecType remoteExecType = EXEC_ON_ALL_NODES;
GrantStmt *stmt = (GrantStmt *) parsetree;
+ bool is_temp = false;
/* Launch GRANT on Coordinator if object is a sequence */
if ((stmt->objtype == ACL_OBJECT_RELATION &&
@@ -1156,22 +1286,9 @@ standard_ProcessUtility(Node *parsetree,
RangeVar *relvar = (RangeVar *) lfirst(cell);
Oid relid = RangeVarGetRelid(relvar, false);
- if (get_rel_relkind(relid) == RELKIND_SEQUENCE ||
- get_rel_relkind(relid) == RELKIND_VIEW)
- {
- /* PGXCTODO: extend that for temporary views and tables */
- if (get_rel_relkind(relid) == RELKIND_SEQUENCE &&
- IsTempSequence(relid))
- remoteExecType = EXEC_ON_NONE;
- else
- remoteExecType = EXEC_ON_COORDS;
- }
- else
- {
- remoteExecType = EXEC_ON_ALL_NODES;
- }
+ remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp);
- /* Check if objects can be launched at the same place as 1st one */
+ /* Check if object node type corresponds to the first one */
if (first)
{
type_local = remoteExecType;
@@ -1183,11 +1300,11 @@ standard_ProcessUtility(Node *parsetree,
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PGXC does not support GRANT on multiple object types"),
- errdetail("Grant VIEW/SEQUENCE and relations on separate queries")));
+ errdetail("Grant VIEW/SEQUENCE/TABLE with separate queries")));
}
}
}
- ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp);
}
#endif
ExecuteGrantStmt((GrantStmt *) parsetree);
@@ -1198,7 +1315,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1207,7 +1324,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1260,7 +1377,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1272,7 +1389,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1280,7 +1397,7 @@ standard_ProcessUtility(Node *parsetree,
DefineEnum((CreateEnumStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1299,7 +1416,16 @@ standard_ProcessUtility(Node *parsetree,
DefineView((ViewStmt *) parsetree, queryString);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ {
+ /*
+ * If view is temporary, no need to send this query to other
+ * remote Coordinators
+ */
+ ViewStmt *stmt = (ViewStmt *) parsetree;
+
+ if (stmt->view->relpersistence != RELPERSISTENCE_TEMP)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false);
+ }
#endif
break;
@@ -1307,7 +1433,7 @@ standard_ProcessUtility(Node *parsetree,
CreateFunction((CreateFunctionStmt *) parsetree, queryString);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1315,7 +1441,7 @@ standard_ProcessUtility(Node *parsetree,
AlterFunction((AlterFunctionStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1324,6 +1450,10 @@ standard_ProcessUtility(Node *parsetree,
IndexStmt *stmt = (IndexStmt *) parsetree;
#ifdef PGXC
+ Oid relid;
+ bool is_temp = false;
+ RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES;
+
if (stmt->concurrent)
{
ereport(ERROR,
@@ -1331,6 +1461,11 @@ standard_ProcessUtility(Node *parsetree,
errmsg("PGXC does not support concurrent INDEX yet"),
errdetail("The feature is not currently supported")));
}
+
+ /* INDEX on a temporary table cannot use 2PC at commit */
+ relid = RangeVarGetRelid(stmt->relation, true);
+ if (OidIsValid(relid))
+ exec_type = ExecUtilityFindNodes(OBJECT_TABLE, relid, &is_temp);
#endif
if (stmt->concurrent)
@@ -1365,7 +1500,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR && !stmt->isconstraint && !IsConnFromCoord())
ExecUtilityStmtOnNodes(queryString, NULL,
- stmt->concurrent, EXEC_ON_ALL_NODES);
+ stmt->concurrent, exec_type, is_temp);
#endif
}
break;
@@ -1377,14 +1512,11 @@ standard_ProcessUtility(Node *parsetree,
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
RemoteQueryExecType remoteExecType;
+ bool is_temp;
Oid relid = RangeVarGetRelid(((RuleStmt *) parsetree)->relation, false);
- if (get_rel_relkind(relid) == RELKIND_VIEW)
- remoteExecType = EXEC_ON_COORDS;
- else
- remoteExecType = EXEC_ON_ALL_NODES;
-
- ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType);
+ remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp);
}
#endif
break;
@@ -1400,9 +1532,8 @@ standard_ProcessUtility(Node *parsetree,
*/
CreateSeqStmt *stmt = (CreateSeqStmt *) parsetree;
- if (stmt->sequence->relpersistence == RELPERSISTENCE_PERMANENT ||
- stmt->sequence->relpersistence == RELPERSISTENCE_UNLOGGED)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ if (stmt->sequence->relpersistence != RELPERSISTENCE_TEMP)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false);
}
#endif
break;
@@ -1420,7 +1551,7 @@ standard_ProcessUtility(Node *parsetree,
Oid relid = RangeVarGetRelid(stmt->sequence, false);
if (!IsTempSequence(relid))
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false);
}
#endif
break;
@@ -1448,7 +1579,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1461,7 +1592,7 @@ standard_ProcessUtility(Node *parsetree,
createdb((CreatedbStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1469,7 +1600,7 @@ standard_ProcessUtility(Node *parsetree,
AlterDatabase((AlterDatabaseStmt *) parsetree, isTopLevel);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1477,7 +1608,7 @@ standard_ProcessUtility(Node *parsetree,
AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1495,7 +1626,7 @@ standard_ProcessUtility(Node *parsetree,
/* Clean also remote Coordinators */
sprintf(query, "CLEAN CONNECTION TO ALL FOR DATABASE %s;", stmt->dbname);
- ExecUtilityStmtOnNodes(query, NULL, true, EXEC_ON_COORDS);
+ ExecUtilityStmtOnNodes(query, NULL, true, EXEC_ON_COORDS, false);
}
#endif
@@ -1504,7 +1635,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1551,7 +1682,7 @@ standard_ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES, false);
#endif
break;
@@ -1561,7 +1692,7 @@ standard_ProcessUtility(Node *parsetree,
cluster((ClusterStmt *) parsetree, isTopLevel);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false);
#endif
break;
@@ -1574,7 +1705,7 @@ standard_ProcessUtility(Node *parsetree,
* vacuum() pops active snapshot and we can not send it to nodes
*/
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false);
#endif
vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false,
isTopLevel);
@@ -1596,8 +1727,16 @@ standard_ProcessUtility(Node *parsetree,
* send this query to backend nodes
*/
if (!stmt->is_local || !IsTransactionBlock())
- if (PoolManagerSetCommand(stmt->is_local, queryString) < 0)
+ {
+ PoolCommandType command_type;
+ if (stmt->is_local)
+ command_type = POOL_CMD_LOCAL_SET;
+ else
+ command_type = POOL_CMD_GLOBAL_SET;
+
+ if (PoolManagerSetCommand(command_type, queryString) < 0)
elog(ERROR, "Postgres-XC: ERROR SET query");
+ }
}
#endif
break;
@@ -1623,7 +1762,7 @@ standard_ProcessUtility(Node *parsetree,
* send this query to backend nodes
*/
if (!IsTransactionBlock())
- if (PoolManagerSetCommand(false, queryString) < 0)
+ if (PoolManagerSetCommand(POOL_CMD_GLOBAL_SET, queryString) < 0)
elog(ERROR, "Postgres-XC: ERROR DISCARD query");
}
#endif
@@ -1640,7 +1779,7 @@ standard_ProcessUtility(Node *parsetree,
errdetail("The feature is not currently supported")));
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1661,15 +1800,12 @@ standard_ProcessUtility(Node *parsetree,
/* If rule is defined on a view, drop it only on Coordinators */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- RemoteQueryExecType remoteExecType;
+ RemoteQueryExecType remoteExecType = EXEC_ON_ALL_NODES;
+ bool is_temp = false;
Oid relid = RangeVarGetRelid(stmt->relation, false);
- if (get_rel_relkind(relid) == RELKIND_VIEW)
- remoteExecType = EXEC_ON_COORDS;
- else
- remoteExecType = EXEC_ON_ALL_NODES;
-
- ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType);
+ remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp);
}
#endif
break;
@@ -1679,7 +1815,7 @@ standard_ProcessUtility(Node *parsetree,
stmt->behavior, stmt->missing_ok);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
default:
@@ -1694,7 +1830,7 @@ standard_ProcessUtility(Node *parsetree,
CreateProceduralLanguage((CreatePLangStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1702,7 +1838,7 @@ standard_ProcessUtility(Node *parsetree,
DropProceduralLanguage((DropPLangStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1713,7 +1849,7 @@ standard_ProcessUtility(Node *parsetree,
DefineDomain((CreateDomainStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1724,7 +1860,7 @@ standard_ProcessUtility(Node *parsetree,
CreateRole((CreateRoleStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1732,7 +1868,7 @@ standard_ProcessUtility(Node *parsetree,
AlterRole((AlterRoleStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1740,7 +1876,7 @@ standard_ProcessUtility(Node *parsetree,
AlterRoleSet((AlterRoleSetStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1748,7 +1884,7 @@ standard_ProcessUtility(Node *parsetree,
DropRole((DropRoleStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1756,7 +1892,7 @@ standard_ProcessUtility(Node *parsetree,
DropOwnedObjects((DropOwnedStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1764,7 +1900,7 @@ standard_ProcessUtility(Node *parsetree,
ReassignOwnedObjects((ReassignOwnedStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1778,7 +1914,7 @@ standard_ProcessUtility(Node *parsetree,
LockTableCommand((LockStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1810,7 +1946,7 @@ standard_ProcessUtility(Node *parsetree,
(RecoveryInProgress() ? 0 : CHECKPOINT_FORCE));
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false);
#endif
break;
@@ -1855,7 +1991,7 @@ standard_ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
ExecUtilityStmtOnNodes(queryString, NULL,
- stmt->kind == OBJECT_DATABASE, EXEC_ON_ALL_NODES);
+ stmt->kind == OBJECT_DATABASE, EXEC_ON_ALL_NODES, false);
#endif
break;
}
@@ -1865,7 +2001,7 @@ standard_ProcessUtility(Node *parsetree,
CreateConversionCommand((CreateConversionStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1873,7 +2009,7 @@ standard_ProcessUtility(Node *parsetree,
CreateCast((CreateCastStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1881,7 +2017,7 @@ standard_ProcessUtility(Node *parsetree,
DropCast((DropCastStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1889,7 +2025,7 @@ standard_ProcessUtility(Node *parsetree,
DefineOpClass((CreateOpClassStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1897,7 +2033,7 @@ standard_ProcessUtility(Node *parsetree,
DefineOpFamily((CreateOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1905,7 +2041,7 @@ standard_ProcessUtility(Node *parsetree,
AlterOpFamily((AlterOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1913,7 +2049,7 @@ standard_ProcessUtility(Node *parsetree,
RemoveOpClass((RemoveOpClassStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1921,7 +2057,7 @@ standard_ProcessUtility(Node *parsetree,
RemoveOpFamily((RemoveOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1929,7 +2065,7 @@ standard_ProcessUtility(Node *parsetree,
AlterTSDictionary((AlterTSDictionaryStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
@@ -1937,7 +2073,7 @@ standard_ProcessUtility(Node *parsetree,
AlterTSConfiguration((AlterTSConfigurationStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false);
#endif
break;
#ifdef PGXC
@@ -1956,7 +2092,7 @@ standard_ProcessUtility(Node *parsetree,
CleanConnection((CleanConnStmt *) parsetree);
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_COORDS);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_COORDS, false);
break;
#endif
default:
@@ -1975,7 +2111,7 @@ standard_ProcessUtility(Node *parsetree,
*/
static void
ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
- bool force_autocommit, RemoteQueryExecType exec_type)
+ bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp)
{
/* Return if query is launched on no nodes */
if (exec_type == EXEC_ON_NONE)
@@ -1989,6 +2125,7 @@ ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
step->sql_statement = pstrdup(queryString);
step->force_autocommit = force_autocommit;
step->exec_type = exec_type;
+ step->is_temp = is_temp;
ExecRemoteUtility(step);
pfree(step->sql_statement);
pfree(step);
@@ -2000,37 +2137,97 @@ ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
*
* Determine the list of nodes to launch query on.
* This depends on temporary nature of object and object type.
- * PGXCTODO: Extend temporary object check for tables.
+ * Return also a flag indicating if relation is temporary.
*/
static RemoteQueryExecType
-ExecUtilityFindNodes(const char *queryString,
- ObjectType object_type,
- Oid relid)
+ExecUtilityFindNodes(ObjectType object_type,
+ Oid relid,
+ bool *is_temp)
{
- bool is_temp = false;
- RemoteQueryExecType remoteExecType = EXEC_ON_NONE;
+ RemoteQueryExecType exec_type;
- /* Check if object is a temporary sequence */
- if (object_type == OBJECT_SEQUENCE ||
- (object_type == OBJECT_TABLE &&
- get_rel_relkind(relid) == RELKIND_SEQUENCE))
- is_temp = IsTempSequence(relid);
+ switch (object_type)
+ {
+ case OBJECT_SEQUENCE:
+ /* Check if object is a temporary sequence */
+ if ((*is_temp = IsTempSequence(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
+
+ case OBJECT_TABLE:
+ /* Do the check on relation kind */
+ exec_type = ExecUtilityFindNodesRelkind(relid, is_temp);
+ break;
+
+ case OBJECT_VIEW:
+ /* Check if object is a temporary view */
+ if ((*is_temp = IsTempTable(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
+
+ case OBJECT_INDEX:
+ /* Check if given index uses temporary tables */
+ if ((*is_temp = IsIndexUsingTempTable(relid)))
+ exec_type = EXEC_ON_DATANODES;
+ else
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
+
+ default:
+ *is_temp = false;
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
+ }
+
+ return exec_type;
+}
- if (!is_temp)
+/*
+ * ExecUtilityFindNodesRelkind
+ *
+ * Get node execution and temporary type
+ * for given relation depending on its relkind
+ */
+static RemoteQueryExecType
+ExecUtilityFindNodesRelkind(Oid relid, bool *is_temp)
+{
+ char relkind_str = get_rel_relkind(relid);
+ RemoteQueryExecType exec_type;
+
+ switch (relkind_str)
{
- remoteExecType = EXEC_ON_ALL_NODES;
+ case RELKIND_SEQUENCE:
+ if ((*is_temp = IsTempSequence(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
- if (object_type == OBJECT_SEQUENCE ||
- object_type == OBJECT_VIEW)
- remoteExecType = EXEC_ON_COORDS;
- else if (object_type == OBJECT_TABLE)
- {
- if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
- remoteExecType = EXEC_ON_COORDS;
- }
+ case RELKIND_RELATION:
+ if ((*is_temp = IsTempTable(relid)))
+ exec_type = EXEC_ON_DATANODES;
+ else
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
+
+ case RELKIND_VIEW:
+ if ((*is_temp = IsTempTable(relid)))
+ exec_type = EXEC_ON_NONE;
+ else
+ exec_type = EXEC_ON_COORDS;
+ break;
+
+ default:
+ *is_temp = false;
+ exec_type = EXEC_ON_ALL_NODES;
+ break;
}
- return remoteExecType;
+ return exec_type;
}
#endif