diff options
| author | Koichi Suzuki | 2011-08-05 07:38:03 +0000 |
|---|---|---|
| committer | Koichi Suzuki | 2011-08-05 07:38:03 +0000 |
| commit | f1a32c6227136d722aead50b8d4a85fceba7523f (patch) | |
| tree | 26feb93534c888eac3396c3e6b568df1b401ddeb /src/backend | |
| parent | 694cab5487b60da9b07cf0418bac6b1e41875ea9 (diff) | |
| parent | 8a15033d7d7ae7cbe9a100ea88b8e18f033f0cb4 (diff) | |
Merge branch 'master' of ssh://postgres-xc.git.sourceforge.net/gitroot/postgres-xc/postgres-xc
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/access/transam/xact.c | 17 | ||||
| -rw-r--r-- | src/backend/commands/comment.c | 42 | ||||
| -rw-r--r-- | src/backend/commands/copy.c | 6 | ||||
| -rw-r--r-- | src/backend/commands/schemacmds.c | 3 | ||||
| -rw-r--r-- | src/backend/commands/tablecmds.c | 117 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/createplan.c | 10 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 46 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 63 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 186 | ||||
| -rw-r--r-- | src/backend/tcop/utility.c | 557 |
10 files changed, 797 insertions, 250 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1a199789b1..e3e0e0f9aa 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1962,6 +1962,14 @@ CommitTransaction(bool contact_gtm) char implicitgid[256]; TransactionId xid = InvalidTransactionId; + /* + * Check if there are any On Commit actions and force temporary object flag. + * This is possible in the case of a session using ON COMMIT DELETE ROWS. + * It is essential to do this check *before* calling PGXCNodeIsImplicit2PC. + */ + if (IsOnCommitActions()) + ExecSetTempObjectIncluded(); + if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && contact_gtm) PreparePGXCNodes = PGXCNodeIsImplicit2PC(&PrepareLocalCoord); @@ -2330,6 +2338,15 @@ PrepareTransaction(void) ShowTransactionState("PrepareTransaction"); +#ifdef PGXC + /* + * Check if there are any On Commit actions and force temporary object flag. + * This is possible in the case of a session using ON COMMIT DELETE ROWS. + */ + if (IsOnCommitActions()) + ExecSetTempObjectIncluded(); +#endif + /* * check the current transaction state */ diff --git a/src/backend/commands/comment.c b/src/backend/commands/comment.c index d09bef0682..f18a132219 100644 --- a/src/backend/commands/comment.c +++ b/src/backend/commands/comment.c @@ -4,7 +4,8 @@ * * PostgreSQL object comments utility code. * - * Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 2010-2011 Nippon Telegraph and Telephone Corporation * * IDENTIFICATION * src/backend/commands/comment.c @@ -460,3 +461,42 @@ GetComment(Oid oid, Oid classoid, int32 subid) return comment; } + + +#ifdef PGXC +/* + * GetCommentObjectId + * + * Return Object ID of object commented + * Note: This function uses portions of the code of CommentObject, + * even if this code is duplicated this is done like this to facilitate + * merges with PostgreSQL head. + */ +Oid +GetCommentObjectId(CommentStmt *stmt) +{ + ObjectAddress address; + Relation relation; + + if (stmt->objtype == OBJECT_DATABASE && list_length(stmt->objname) == 1) + { + char *database = strVal(linitial(stmt->objname)); + + if (!OidIsValid(get_database_oid(database, true))) + { + ereport(WARNING, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("database \"%s\" does not exist", database))); + return InvalidOid; + } + } + + address = get_object_address(stmt->objtype, stmt->objname, stmt->objargs, + &relation, ShareUpdateExclusiveLock); + + if (relation != NULL) + relation_close(relation, NoLock); + + return address.objectId; +} +#endif diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 952b88b533..bfaf6e2860 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -918,6 +918,12 @@ DoCopy(const CopyStmt *stmt, const char *queryString) rte->relkind = rel->rd_rel->relkind; rte->requiredPerms = required_access; +#ifdef PGXC + /* In case COPY is used on a temporary table, never use 2PC for implicit commits */ + if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) + ExecSetTempObjectIncluded(); +#endif + tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist); foreach(cur, attnums) diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c index 831886f46d..851b0dca3b 100644 --- a/src/backend/commands/schemacmds.c +++ b/src/backend/commands/schemacmds.c @@ -132,7 +132,8 @@ CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString) * Add a RemoteQuery node for a query at top level on a remote Coordinator */ if (is_top_level) - parsetree_list = AddRemoteQueryNode(parsetree_list, queryString, EXEC_ON_ALL_NODES); + parsetree_list = AddRemoteQueryNode(parsetree_list, queryString, + EXEC_ON_ALL_NODES, false); #endif /* diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 644e572934..7fe0015868 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -89,6 +89,7 @@ #include "pgxc/pgxc.h" #include "access/gtm.h" #include "commands/sequence.h" +#include "pgxc/execRemote.h" #endif /* @@ -439,20 +440,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) * code. This is needed because calling code might not expect untrusted * tables to appear in pg_temp at the front of its search path. */ -#ifdef PGXC - if (stmt->relation->relpersistence == RELPERSISTENCE_TEMP && - IsUnderPostmaster && - relkind != RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("PG-XC does not yet support temporary tables"))); -#else if (stmt->relation->relpersistence == RELPERSISTENCE_TEMP && InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot create temporary table within security-restricted operation"))); -#endif /* * Look up the namespace in which we are supposed to create the relation, @@ -9462,3 +9454,110 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid, } } } + +#ifdef PGXC +/* + * IsTempTable + * + * Check if given table Oid is temporary. + */ +bool +IsTempTable(Oid relid) +{ + Relation rel; + bool res; + /* + * PGXCTODO: Is it correct to open without locks? + * we just check if this table is temporary though... + */ + rel = relation_open(relid, NoLock); + res = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP; + relation_close(rel, NoLock); + return res; +} + +/* + * IsIndexUsingTemp + * + * Check if given index relation uses temporary tables. + */ +bool +IsIndexUsingTempTable(Oid relid) +{ + bool res = false; + HeapTuple tuple; + Oid parent_id = InvalidOid; + + tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relid)); + if (HeapTupleIsValid(tuple)) + { + Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple); + parent_id = index->indrelid; + + /* Release system cache BEFORE looking at the parent table */ + ReleaseSysCache(tuple); + + res = IsTempTable(parent_id); + } + else + res = false; /* Default case */ + + return res; +} + +/* + * IsOnCommitDeleteRows + * + * Check if there are any on-commit actions activated + * This is possible in the case of ON COMMIT DELETE ROWS for example. + * In this case 2PC cannot be used. + */ +bool +IsOnCommitActions(void) +{ + return list_length(on_commits) > 0; +} + +/* + * DropTableThrowErrorExternal + * + * Error interface for DROP when looking for execution node type. + */ +void +DropTableThrowErrorExternal(RangeVar *relation, ObjectType removeType, bool missing_ok) +{ + char relkind; + + /* Determine required relkind */ + switch (removeType) + { + case OBJECT_TABLE: + relkind = RELKIND_RELATION; + break; + + case OBJECT_INDEX: + relkind = RELKIND_INDEX; + break; + + case OBJECT_SEQUENCE: + relkind = RELKIND_SEQUENCE; + break; + + case OBJECT_VIEW: + relkind = RELKIND_VIEW; + break; + + case OBJECT_FOREIGN_TABLE: + relkind = RELKIND_FOREIGN_TABLE; + break; + + default: + elog(ERROR, "unrecognized drop object type: %d", + (int) removeType); + relkind = 0; /* keep compiler quiet */ + break; + } + + DropErrorMsgNonExistent(relation->relname, relkind, missing_ok); +} +#endif diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index d3b5c7793b..bc32049b87 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -47,6 +47,7 @@ #include "catalog/pg_type.h" #include "executor/executor.h" #include "rewrite/rewriteManip.h" +#include "commands/tablecmds.h" #endif #include "utils/lsyscache.h" @@ -2601,8 +2602,15 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, /* * XXX: should use GENERIC OPTIONS like 'foreign_relname' or something for * the foreign table name instead of the local name ? + * + * A temporary table does not use namespace as it may not be + * consistent among nodes cluster. Relation name is sufficient. */ - appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q); + if (IsTempTable(rte->relid)) + appendStringInfo(&sql, "%s %s", relname_q, aliasname_q); + else + appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q); + pfree(nspname); pfree(relname); if (nspname_q != nspname_q) diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 7da7f848af..e3499134db 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -46,6 +46,7 @@ #include "utils/syscache.h" #include "utils/numeric.h" #include "access/hash.h" +#include "commands/tablecmds.h" #include "utils/timestamp.h" #include "utils/date.h" @@ -171,7 +172,8 @@ static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStm static void InitXCWalkerContext(XCWalkerContext *context); static RemoteQuery *makeRemoteQuery(void); static void validate_part_col_updatable(const Query *query); - +static bool contains_temp_tables(List *rtable); +static bool contains_only_pg_catalog(List *rtable); /* * Find position of specified substring in the string @@ -1403,7 +1405,7 @@ examine_conditions_fromlist(Node *treenode, XCWalkerContext *context) * only contain pg_catalog entries. */ static bool -contains_only_pg_catalog (List *rtable) +contains_only_pg_catalog(List *rtable) { ListCell *item; @@ -1416,13 +1418,39 @@ contains_only_pg_catalog (List *rtable) { if (get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE) return false; - } else if (rte->rtekind == RTE_SUBQUERY && - !contains_only_pg_catalog (rte->subquery->rtable)) + } + else if (rte->rtekind == RTE_SUBQUERY && + !contains_only_pg_catalog(rte->subquery->rtable)) return false; } return true; } +/* + * Returns true if at least one temporary table is in use + * in query (and its subqueries) + */ +static bool +contains_temp_tables(List *rtable) +{ + ListCell *item; + + foreach(item, rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(item); + + if (rte->rtekind == RTE_RELATION) + { + if (IsTempTable(rte->relid)) + return true; + } + else if (rte->rtekind == RTE_SUBQUERY && + contains_temp_tables(rte->subquery->rtable)) + return true; + } + + return false; +} /* * get_plan_nodes - determine the nodes to execute the command on. @@ -1934,6 +1962,7 @@ makeRemoteQuery(void) result->paramval_data = NULL; result->paramval_len = 0; result->exec_direct_type = EXEC_DIRECT_NONE; + result->is_temp = false; result->relname = NULL; result->remotejoin = false; @@ -2737,12 +2766,16 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) if (query->commandType != CMD_SELECT) result->resultRelations = list_make1_int(query->resultRelation); - if (contains_only_pg_catalog (query->rtable)) + if (contains_only_pg_catalog(query->rtable)) { result = standard_planner(query, cursorOptions, boundParams); return result; } + /* Check if temporary tables are in use in target list */ + if (contains_temp_tables(query->rtable)) + query_step->is_temp = true; + if (query_step->exec_nodes == NULL) get_plan_nodes_command(query_step, root); @@ -3184,7 +3217,7 @@ GetHashExecNodes(RelationLocInfo *rel_loc_info, ExecNodes **exec_nodes, const Ex * duplicated queries on Datanodes. */ List * -AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType remoteExecType) +AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType remoteExecType, bool is_temp) { List *result = stmts; @@ -3199,6 +3232,7 @@ AddRemoteQueryNode(List *stmts, const char *queryString, RemoteQueryExecType rem step->combine_type = COMBINE_TYPE_SAME; step->sql_statement = queryString; step->exec_type = remoteExecType; + step->is_temp = is_temp; result = lappend(result, step); } diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 1efb364333..121245d696 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -52,6 +52,7 @@ static bool autocommit = true; static bool is_ddl = false; static bool implicit_force_autocommit = false; +static bool temp_object_included = false; static PGXCNodeHandle **write_node_list = NULL; static int write_node_count = 0; static char *begin_string = NULL; @@ -1536,11 +1537,27 @@ finish: if (res != 0) { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Could not prepare transaction on data nodes"))); + + /* In case transaction has operated on temporary objects */ + if (temp_object_included) + { + /* Reset temporary object flag */ + temp_object_included = false; + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("cannot PREPARE a transaction that has operated on temporary tables"))); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not prepare transaction on data nodes"))); + } } + /* Reset temporary object flag */ + temp_object_included = false; + return local_operation; } @@ -1824,6 +1841,9 @@ finish: is_ddl = false; clear_write_node_list(); + /* Reset temporary object flag */ + temp_object_included = false; + /* Clean up connections */ pfree_pgxc_all_handles(pgxc_connections); @@ -1973,6 +1993,9 @@ finish: is_ddl = false; clear_write_node_list(); + /* Reset temporary object flag */ + temp_object_included = false; + /* Free node list taken from GTM */ if (datanodes && datanodecnt != 0) free(datanodes); @@ -2113,6 +2136,9 @@ finish: is_ddl = false; clear_write_node_list(); + /* Reset temporary object flag */ + temp_object_included = false; + /* Free node list taken from GTM */ if (datanodes) free(datanodes); @@ -2210,6 +2236,9 @@ finish: is_ddl = false; clear_write_node_list(); + /* Reset temporary object flag */ + temp_object_included = false; + /* Clean up connections */ pfree_pgxc_all_handles(pgxc_connections); if (res != 0) @@ -2284,6 +2313,9 @@ finish: is_ddl = false; clear_write_node_list(); + /* Reset temporary object flag */ + temp_object_included = false; + /* Clean up connections */ pfree_pgxc_all_handles(pgxc_connections); return res; @@ -3217,6 +3249,10 @@ do_query(RemoteQueryState *node) bool need_tran; PGXCNodeAllHandles *pgxc_connections; + /* Be sure to set temporary object flag if necessary */ + if (step->is_temp) + temp_object_included = true; + /* * Get connections for Datanodes only, utilities and DDLs * are launched in ExecRemoteUtility @@ -4033,6 +4069,9 @@ ExecRemoteUtility(RemoteQuery *node) implicit_force_autocommit = force_autocommit; + /* A transaction using temporary objects cannot use 2PC */ + temp_object_included = node->is_temp; + remotestate = CreateResponseCombiner(0, node->combine_type); pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type); @@ -4516,12 +4555,14 @@ PGXCNodeIsImplicit2PC(bool *prepare_local_coord) /* * In case of an autocommit or forced autocommit transaction, 2PC is not involved - * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...) + * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...). + * For a transaction using temporary objects, 2PC is not authorized. */ - if (implicit_force_autocommit) + if (implicit_force_autocommit || temp_object_included) { *prepare_local_coord = false; implicit_force_autocommit = false; + temp_object_included = false; return false; } @@ -4622,3 +4663,15 @@ int DataNodeCopyInBinaryForAll(char *msg_buf, int len, PGXCNodeHandle** copy_con return 0; } + +/* + * ExecSetTempObjectIncluded + * + * Set Temp object flag on the fly for transactions + * This flag will be reinitialized at commit. + */ +void +ExecSetTempObjectIncluded(void) +{ + temp_object_included = true; +} diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 463bd5a170..c0d8d56aab 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -93,7 +93,13 @@ static void agent_init(PoolAgent *agent, const char *database, const char *user_ static void agent_destroy(PoolAgent *agent); static void agent_create(void); static void agent_handle_input(PoolAgent *agent, StringInfo s); -static int agent_set_command(PoolAgent *agent, const char *set_command, bool is_local); +static int agent_session_command(PoolAgent *agent, + const char *set_command, + PoolCommandType command_type); +static int agent_set_command(PoolAgent *agent, + const char *set_command, + PoolCommandType command_type); +static int agent_temp_command(PoolAgent *agent); static DatabasePool *create_database_pool(const char *database, const char *user_name); static void insert_database_pool(DatabasePool *pool); static int destroy_database_pool(const char *database, const char *user_name); @@ -107,7 +113,7 @@ static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist); static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node, char client_conn_type); static void agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard); -static void agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list); +static void agent_reset_session(PoolAgent *agent, List *dn_list, List *co_list); static void release_connection(DatabasePool *dbPool, PGXCNodePoolSlot *slot, int index, bool clean, char client_conn_type); static void destroy_slot(PGXCNodePoolSlot *slot); @@ -490,6 +496,7 @@ agent_create(void) agent->coord_connections = NULL; agent->session_params = NULL; agent->local_params = NULL; + agent->is_temp = false; agent->pid = 0; /* Append new agent to the list */ @@ -543,30 +550,44 @@ PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_na } int -PoolManagerSetCommand(bool is_local, const char *set_command) +PoolManagerSetCommand(PoolCommandType command_type, const char *set_command) { int n32; char msgtype = 's'; - Assert(set_command); Assert(Handle); /* Message type */ pool_putbytes(&Handle->port, &msgtype, 1); /* Message length */ - n32 = htonl(strlen(set_command) + 10); + if (set_command) + n32 = htonl(strlen(set_command) + 13); + else + n32 = htonl(12); + pool_putbytes(&Handle->port, (char *) &n32, 4); /* LOCAL or SESSION parameter ? */ - pool_putbytes(&Handle->port, (char *) &is_local, 1); - - /* Length of SET command string */ - n32 = htonl(strlen(set_command) + 1); + n32 = htonl(command_type); pool_putbytes(&Handle->port, (char *) &n32, 4); - /* Send command string followed by \0 terminator */ - pool_putbytes(&Handle->port, set_command, strlen(set_command) + 1); + if (set_command) + { + /* Length of SET command string */ + n32 = htonl(strlen(set_command) + 1); + pool_putbytes(&Handle->port, (char *) &n32, 4); + + /* Send command string followed by \0 terminator */ + pool_putbytes(&Handle->port, set_command, strlen(set_command) + 1); + } + else + { + /* Send empty command */ + n32 = htonl(0); + pool_putbytes(&Handle->port, (char *) &n32, 4); + } + pool_flush(&Handle->port); /* Get result */ @@ -628,10 +649,10 @@ agent_destroy(PoolAgent *agent) co_conn = lappend_int(co_conn, i+1); /* - * agent is being destroyed, so reset session parameters - * before putting back connections to pool + * Agent is being destroyed, so reset session parameters + * and temporary objects before putting back connections to pool. */ - agent_reset_params(agent, dn_conn, co_conn); + agent_reset_session(agent, dn_conn, co_conn); /* release them all */ agent_release_connections(agent, dn_conn, co_conn); @@ -881,8 +902,8 @@ agent_handle_input(PoolAgent * agent, StringInfo s) { const char *database = NULL; const char *user_name = NULL; - const char *set_command; - bool is_local; + const char *set_command = NULL; + PoolCommandType command_type; int datanodecount; int coordcount; List *datanodelist = NIL; @@ -1043,16 +1064,19 @@ agent_handle_input(PoolAgent * agent, StringInfo s) list_free(datanodelist); list_free(coordlist); break; - case 's': /* SET COMMAND */ + case 's': /* Session-related COMMAND */ pool_getmessage(&agent->port, s, 0); /* Determine if command is local or session */ - is_local = (bool) pq_getmsgbyte(s); - /* Get the SET command */ + command_type = (PoolCommandType) pq_getmsgint(s, 4); + /* Get the SET command if necessary */ len = pq_getmsgint(s, 4); - set_command = pq_getmsgbytes(s, len); + if (len != 0) + set_command = pq_getmsgbytes(s, len); + pq_getmsgend(s); - res = agent_set_command(agent, set_command, is_local); + /* Manage command depending on its type */ + res = agent_session_command(agent, set_command, command_type); /* Send success result */ pool_sendres(&agent->port, res); @@ -1068,11 +1092,46 @@ agent_handle_input(PoolAgent * agent, StringInfo s) } /* + * Manage a session command for pooler + */ +static int +agent_session_command(PoolAgent *agent, const char *set_command, PoolCommandType command_type) +{ + int res; + + switch (command_type) + { + case POOL_CMD_LOCAL_SET: + case POOL_CMD_GLOBAL_SET: + res = agent_set_command(agent, set_command, command_type); + break; + case POOL_CMD_TEMP: + res = agent_temp_command(agent); + break; + default: + res = -1; + break; + } + + return res; +} + +/* + * Set agent flag that a temporary object is in use. + */ +static int +agent_temp_command(PoolAgent *agent) +{ + agent->is_temp = true; + return 0; +} + +/* * Save a SET command and distribute it to the agent connections * already in use. */ static int -agent_set_command(PoolAgent *agent, const char *set_command, bool is_local) +agent_set_command(PoolAgent *agent, const char *set_command, PoolCommandType command_type) { char *params_string; int i; @@ -1080,11 +1139,16 @@ agent_set_command(PoolAgent *agent, const char *set_command, bool is_local) Assert(agent); Assert(set_command); + Assert(command_type == POOL_CMD_LOCAL_SET || command_type == POOL_CMD_GLOBAL_SET); - if (is_local) + if (command_type == POOL_CMD_LOCAL_SET) params_string = agent->local_params; - else + else if (command_type == POOL_CMD_GLOBAL_SET) params_string = agent->session_params; + else + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Set command process failed"))); /* First command recorded */ if (!params_string) @@ -1131,9 +1195,9 @@ agent_set_command(PoolAgent *agent, const char *set_command, bool is_local) } /* Save the latest string */ - if (is_local) + if (command_type == POOL_CMD_LOCAL_SET) agent->local_params = params_string; - else + else if (command_type == POOL_CMD_GLOBAL_SET) agent->session_params = params_string; return res; @@ -1278,7 +1342,6 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist) { - int i; ListCell *nodelist_item; char errbuf[256]; int nCount; @@ -1435,8 +1498,9 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard) return; /* - * If there are some session parameters, do not put back connections to pool - * disconnection will be made when session is cut for this user. + * If there are some session parameters or temporary objects, + * do not put back connections to pool. + * Disconnection will be made when session is cut for this user. * Local parameters are reset when transaction block is finished, * so don't do anything for them, but just reset their list. */ @@ -1445,7 +1509,8 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard) pfree(agent->local_params); agent->local_params = NULL; } - if (agent->session_params) + if (agent->session_params || + agent->is_temp) return; /* Discard first for Datanodes */ @@ -1515,50 +1580,61 @@ agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard) * modified by session parameters. */ static void -agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list) +agent_reset_session(PoolAgent *agent, List *dn_list, List *co_list) { - PGXCNodePoolSlot *slot; if (!agent->dn_connections && !agent->coord_connections) return; - /* Parameters are reset, so free commands */ - if (agent->session_params) - { - pfree(agent->session_params); - agent->session_params = NULL; - } - if (agent->local_params) - { - pfree(agent->local_params); - agent->local_params = NULL; - } + if (!agent->session_params && !agent->local_params && !agent->is_temp) + return; - /* Reset Datanode connection params */ - if (dn_list) + /* + * Reset Datanode connection params. + * Discard is only done for Datanodes as Temporary objects are never created + * to other Coordinators in a session. + */ + if (dn_list && + (agent->session_params || agent->local_params || agent->is_temp)) { ListCell *lc; foreach(lc, dn_list) { + PGXCNodePoolSlot *slot; int node = lfirst_int(lc); + Assert(node > 0 && node <= NumDataNodes); slot = agent->dn_connections[node - 1]; /* Reset connection params */ if (slot) - PGXCNodeSendSetQuery(slot->conn, "RESET ALL;"); + { + if (agent->session_params || agent->local_params) + PGXCNodeSendSetQuery(slot->conn, "RESET ALL;"); + + /* + * Discard queries cannot be sent as multiple-queries, + * so do it separately. It is OK to use this slow process + * as session is ending. + */ + if (agent->is_temp) + PGXCNodeSendSetQuery(slot->conn, "DISCARD ALL;"); + } } } /* Reset Coordinator connection params */ - if (co_list) + if (co_list && + (agent->session_params || agent->local_params)) { ListCell *lc; foreach(lc, co_list) { + PGXCNodePoolSlot *slot; int node = lfirst_int(lc); + Assert(node > 0 && node <= NumCoords); slot = agent->coord_connections[node - 1]; @@ -1567,6 +1643,19 @@ agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list) PGXCNodeSendSetQuery(slot->conn, "RESET ALL;"); } } + + /* Parameters are reset, so free commands */ + if (agent->session_params) + { + pfree(agent->session_params); + agent->session_params = NULL; + } + if (agent->local_params) + { + pfree(agent->local_params); + agent->local_params = NULL; + } + agent->is_temp = false; } /* @@ -2070,6 +2159,9 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type) errmsg("out of memory"))); } + /* If connection fails, be sure that slot is destroyed cleanly */ + slot->xc_cancelConn = NULL; + /* Establish connection */ slot->conn = PGXCNodeConnect(nodePool->connstr); if (!PGXCNodeConnected(slot->conn)) diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index e4e6ed55b4..9e88b268b2 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -7,6 +7,7 @@ * * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2010-2011 Nippon Telegraph and Telephone Corporation * * * IDENTIFICATION @@ -68,12 +69,15 @@ #include "pgxc/planner.h" #include "pgxc/poolutils.h" #include "pgxc/poolmgr.h" +#include "utils/lsyscache.h" static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, - bool force_autocommit, RemoteQueryExecType exec_type); -static RemoteQueryExecType ExecUtilityFindNodes(const char *queryString, - ObjectType objectType, - Oid relid); + bool force_autocommit, RemoteQueryExecType exec_type, + bool is_temp); +static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType, + Oid relid, + bool *is_temp); +static RemoteQueryExecType ExecUtilityFindNodesRelkind(Oid relid, bool *is_temp); #endif @@ -653,17 +657,77 @@ standard_ProcessUtility(Node *parsetree, List *stmts; ListCell *l; Oid relOid; +#ifdef PGXC + bool is_temp = false; +#endif /* Run parse analysis ... */ stmts = transformCreateStmt((CreateStmt *) parsetree, queryString); - #ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + /* + * Scan the list of objects. + * Temporary tables are created on Datanodes only. + * Non-temporary objects are created on all nodes. + * In case temporary and non-temporary objects are mized return an error. + */ + bool is_first = true; + + foreach(l, stmts) + { + Node *stmt = (Node *) lfirst(l); + + if (IsA(stmt, CreateStmt)) + { + CreateStmt *stmt_loc = (CreateStmt *) stmt; + bool is_object_temp = stmt_loc->relation->relpersistence == RELPERSISTENCE_TEMP; + + if (is_first) + { + is_first = false; + if (is_object_temp) + is_temp = true; + } + else + { + if (is_object_temp != is_temp) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CREATE not supported for TEMP and non-TEMP objects"), + errdetail("You should separate TEMP and non-TEMP objects"))); + } + } + else if (IsA(stmt, CreateForeignTableStmt)) + { + /* There are no temporary foreign tables */ + if (is_first) + { + is_first = false; + } + else + { + if (!is_temp) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CREATE not supported for TEMP and non-TEMP objects"), + errdetail("You should separate TEMP and non-TEMP objects"))); + } + } + } + } + /* * Add a RemoteQuery node for a query at top level on a remote Coordinator */ if (isTopLevel) - stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_ALL_NODES); + { + if (is_temp) + stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_DATANODES, is_temp); + else + stmts = AddRemoteQueryNode(stmts, queryString, EXEC_ON_ALL_NODES, is_temp); + } #endif /* ... and do it */ @@ -676,6 +740,12 @@ standard_ProcessUtility(Node *parsetree, Datum toast_options; static char *validnsps[] = HEAP_RELOPT_NAMESPACES; +#ifdef PGXC + /* Set temporary object object flag in pooler */ + if (is_temp) + PoolManagerSetCommand(POOL_CMD_TEMP, NULL); +#endif + /* Create the table itself */ relOid = DefineRelation((CreateStmt *) stmt, RELKIND_RELATION, @@ -798,6 +868,7 @@ standard_ProcessUtility(Node *parsetree, DropStmt *stmt = (DropStmt *) parsetree; #ifdef PGXC bool is_temp = false; + RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES; /* * We need to check details of the objects being dropped and @@ -806,41 +877,75 @@ standard_ProcessUtility(Node *parsetree, */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { - /* - * Check the list of sequences/tables going to be dropped. - * XC does not allow yet to mix drop of temporary and - * non-temporary objects because this involves to rewrite - * query to process for tables and stop remote query process - * for sequences. - * PGXCTODO: For the time being this is just checked for - * sequences but should also be done for tables - */ - - if (stmt->removeType == OBJECT_SEQUENCE) + switch (stmt->removeType) { - ListCell *cell; - bool is_first = true; - - foreach(cell, stmt->objects) - { - RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell)); - Oid relid; - - relid = RangeVarGetRelid(rel, false); - if (is_first) + case OBJECT_TABLE: + case OBJECT_SEQUENCE: + case OBJECT_VIEW: + case OBJECT_INDEX: { - is_temp = IsTempSequence(relid); - is_first = false; - } - else - { - if (IsTempSequence(relid) != is_temp) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("DROP SEQUENCE not supported for TEMP and non-TEMP sequences"), - errdetail("DROP can be done if TEMP and non-TEMP objects are separated"))); + /* + * Check the list of objects going to be dropped. + * XC does not allow yet to mix drop of temporary and + * non-temporary objects because this involves to rewrite + * query to process for tables. + */ + ListCell *cell; + bool is_first = true; + + foreach(cell, stmt->objects) + { + RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell)); + Oid relid; + + /* + * Do not print result at all, error is thrown + * after if necessary + */ + relid = RangeVarGetRelid(rel, true); + + /* + * In case this relation ID is incorrect throw + * a correct DROP error. + */ + if (!OidIsValid(relid) && !stmt->missing_ok) + DropTableThrowErrorExternal(rel, + stmt->removeType, + stmt->missing_ok); + + /* In case of DROP ... IF EXISTS bypass */ + if (!OidIsValid(relid) && stmt->missing_ok) + continue; + + if (is_first) + { + exec_type = ExecUtilityFindNodes(stmt->removeType, + relid, + &is_temp); + is_first = false; + } + else + { + RemoteQueryExecType exec_type_loc; + bool is_temp_loc; + exec_type_loc = ExecUtilityFindNodes(stmt->removeType, + relid, + &is_temp_loc); + if (exec_type_loc != exec_type || + is_temp_loc != is_temp) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("DROP not supported for TEMP and non-TEMP objects"), + errdetail("You should separate TEMP and non-TEMP objects"))); + } + } } - } + break; + + default: + is_temp = false; + exec_type = EXEC_ON_ALL_NODES; + break; } } #endif @@ -898,20 +1003,9 @@ standard_ProcessUtility(Node *parsetree, } #ifdef PGXC - /* - * Drop can be done on non-temporary objects and temporary objects - * if they are not mixed up. - * PGXCTODO: This should be extended to tables - */ - if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !is_temp) - { - /* Sequence and views exists only on Coordinators */ - if (stmt->removeType == OBJECT_SEQUENCE || - stmt->removeType == OBJECT_VIEW) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS); - else - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); - } + /* DROP is done depending on the object type and its temporary type */ + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + ExecUtilityStmtOnNodes(queryString, NULL, false, exec_type, is_temp); #endif } break; @@ -920,12 +1014,31 @@ standard_ProcessUtility(Node *parsetree, ExecuteTruncate((TruncateStmt *) parsetree); #ifdef PGXC /* - * PGXCTODO - * We may need to check details of the object being truncated and - * run command on correct nodes + * Check details of the object being truncated. + * If at least one temporary table is truncated truncate cannot use 2PC + * at commit. */ - if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + bool is_temp = false; + ListCell *cell; + TruncateStmt *stmt = (TruncateStmt *) parsetree; + + foreach(cell, stmt->relations) + { + Oid relid; + RangeVar *rel = (RangeVar *) lfirst(cell); + + relid = RangeVarGetRelid(rel, true); + if (IsTempTable(relid)) + { + is_temp = true; + break; + } + } + + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES, is_temp); + } #endif break; @@ -933,13 +1046,23 @@ standard_ProcessUtility(Node *parsetree, CommentObject((CommentStmt *) parsetree); #ifdef PGXC - /* - * PGXCTODO - * Comments on temporary objects need special handling - * depending on their types. - */ + /* Comment objects depending on their object and temporary types */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS); + { + bool is_temp = false; + RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES; + CommentStmt *stmt = (CommentStmt *) parsetree; + Oid relid = GetCommentObjectId(stmt); + + /* Commented object may not have a valid object ID, so move to default */ + if (OidIsValid(relid)) + { + exec_type = ExecUtilityFindNodes(stmt->objtype, + relid, + &is_temp); + } + ExecUtilityStmtOnNodes(queryString, NULL, false, exec_type, is_temp); + } #endif break; @@ -981,19 +1104,21 @@ standard_ProcessUtility(Node *parsetree, { RenameStmt *stmt = (RenameStmt *) parsetree; RemoteQueryExecType exec_type; + bool is_temp = false; /* Relation is not set for a schema */ if (stmt->relation) - exec_type = ExecUtilityFindNodes(queryString, - stmt->renameType, - RangeVarGetRelid(stmt->relation, false)); + exec_type = ExecUtilityFindNodes(stmt->renameType, + RangeVarGetRelid(stmt->relation, false), + &is_temp); else exec_type = EXEC_ON_ALL_NODES; ExecUtilityStmtOnNodes(queryString, NULL, false, - exec_type); + exec_type, + is_temp); } #endif ExecRenameStmt((RenameStmt *) parsetree); @@ -1004,19 +1129,21 @@ standard_ProcessUtility(Node *parsetree, if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { AlterObjectSchemaStmt *stmt = (AlterObjectSchemaStmt *) parsetree; - RemoteQueryExecType exec_type; + bool is_temp = false; + if (stmt->relation) - exec_type = ExecUtilityFindNodes(queryString, - stmt->objectType, - RangeVarGetRelid(stmt->relation, false)); + exec_type = ExecUtilityFindNodes(stmt->objectType, + RangeVarGetRelid(stmt->relation, false), + &is_temp); else exec_type = EXEC_ON_ALL_NODES; ExecUtilityStmtOnNodes(queryString, NULL, false, - exec_type); + exec_type, + is_temp); } #endif ExecAlterObjectSchemaStmt((AlterObjectSchemaStmt *) parsetree); @@ -1027,7 +1154,7 @@ standard_ProcessUtility(Node *parsetree, #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1045,12 +1172,14 @@ standard_ProcessUtility(Node *parsetree, */ if (isTopLevel) { + bool is_temp = false; AlterTableStmt *stmt = (AlterTableStmt *) parsetree; - RemoteQueryExecType exec_type = ExecUtilityFindNodes(queryString, - stmt->relkind, - RangeVarGetRelid(stmt->relation, false)); + RemoteQueryExecType exec_type; + exec_type = ExecUtilityFindNodes(stmt->relkind, + RangeVarGetRelid(stmt->relation, false), + &is_temp); - stmts = AddRemoteQueryNode(stmts, queryString, exec_type); + stmts = AddRemoteQueryNode(stmts, queryString, exec_type, is_temp); } #endif @@ -1126,7 +1255,7 @@ standard_ProcessUtility(Node *parsetree, } #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1136,6 +1265,7 @@ standard_ProcessUtility(Node *parsetree, { RemoteQueryExecType remoteExecType = EXEC_ON_ALL_NODES; GrantStmt *stmt = (GrantStmt *) parsetree; + bool is_temp = false; /* Launch GRANT on Coordinator if object is a sequence */ if ((stmt->objtype == ACL_OBJECT_RELATION && @@ -1156,22 +1286,9 @@ standard_ProcessUtility(Node *parsetree, RangeVar *relvar = (RangeVar *) lfirst(cell); Oid relid = RangeVarGetRelid(relvar, false); - if (get_rel_relkind(relid) == RELKIND_SEQUENCE || - get_rel_relkind(relid) == RELKIND_VIEW) - { - /* PGXCTODO: extend that for temporary views and tables */ - if (get_rel_relkind(relid) == RELKIND_SEQUENCE && - IsTempSequence(relid)) - remoteExecType = EXEC_ON_NONE; - else - remoteExecType = EXEC_ON_COORDS; - } - else - { - remoteExecType = EXEC_ON_ALL_NODES; - } + remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp); - /* Check if objects can be launched at the same place as 1st one */ + /* Check if object node type corresponds to the first one */ if (first) { type_local = remoteExecType; @@ -1183,11 +1300,11 @@ standard_ProcessUtility(Node *parsetree, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("PGXC does not support GRANT on multiple object types"), - errdetail("Grant VIEW/SEQUENCE and relations on separate queries"))); + errdetail("Grant VIEW/SEQUENCE/TABLE with separate queries"))); } } } - ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType); + ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp); } #endif ExecuteGrantStmt((GrantStmt *) parsetree); @@ -1198,7 +1315,7 @@ standard_ProcessUtility(Node *parsetree, #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1207,7 +1324,7 @@ standard_ProcessUtility(Node *parsetree, #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1260,7 +1377,7 @@ standard_ProcessUtility(Node *parsetree, } #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1272,7 +1389,7 @@ standard_ProcessUtility(Node *parsetree, } #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1280,7 +1397,7 @@ standard_ProcessUtility(Node *parsetree, DefineEnum((CreateEnumStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1299,7 +1416,16 @@ standard_ProcessUtility(Node *parsetree, DefineView((ViewStmt *) parsetree, queryString); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS); + { + /* + * If view is temporary, no need to send this query to other + * remote Coordinators + */ + ViewStmt *stmt = (ViewStmt *) parsetree; + + if (stmt->view->relpersistence != RELPERSISTENCE_TEMP) + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false); + } #endif break; @@ -1307,7 +1433,7 @@ standard_ProcessUtility(Node *parsetree, CreateFunction((CreateFunctionStmt *) parsetree, queryString); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1315,7 +1441,7 @@ standard_ProcessUtility(Node *parsetree, AlterFunction((AlterFunctionStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1324,6 +1450,10 @@ standard_ProcessUtility(Node *parsetree, IndexStmt *stmt = (IndexStmt *) parsetree; #ifdef PGXC + Oid relid; + bool is_temp = false; + RemoteQueryExecType exec_type = EXEC_ON_ALL_NODES; + if (stmt->concurrent) { ereport(ERROR, @@ -1331,6 +1461,11 @@ standard_ProcessUtility(Node *parsetree, errmsg("PGXC does not support concurrent INDEX yet"), errdetail("The feature is not currently supported"))); } + + /* INDEX on a temporary table cannot use 2PC at commit */ + relid = RangeVarGetRelid(stmt->relation, true); + if (OidIsValid(relid)) + exec_type = ExecUtilityFindNodes(OBJECT_TABLE, relid, &is_temp); #endif if (stmt->concurrent) @@ -1365,7 +1500,7 @@ standard_ProcessUtility(Node *parsetree, #ifdef PGXC if (IS_PGXC_COORDINATOR && !stmt->isconstraint && !IsConnFromCoord()) ExecUtilityStmtOnNodes(queryString, NULL, - stmt->concurrent, EXEC_ON_ALL_NODES); + stmt->concurrent, exec_type, is_temp); #endif } break; @@ -1377,14 +1512,11 @@ standard_ProcessUtility(Node *parsetree, if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { RemoteQueryExecType remoteExecType; + bool is_temp; Oid relid = RangeVarGetRelid(((RuleStmt *) parsetree)->relation, false); - if (get_rel_relkind(relid) == RELKIND_VIEW) - remoteExecType = EXEC_ON_COORDS; - else - remoteExecType = EXEC_ON_ALL_NODES; - - ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType); + remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp); + ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp); } #endif break; @@ -1400,9 +1532,8 @@ standard_ProcessUtility(Node *parsetree, */ CreateSeqStmt *stmt = (CreateSeqStmt *) parsetree; - if (stmt->sequence->relpersistence == RELPERSISTENCE_PERMANENT || - stmt->sequence->relpersistence == RELPERSISTENCE_UNLOGGED) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS); + if (stmt->sequence->relpersistence != RELPERSISTENCE_TEMP) + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false); } #endif break; @@ -1420,7 +1551,7 @@ standard_ProcessUtility(Node *parsetree, Oid relid = RangeVarGetRelid(stmt->sequence, false); if (!IsTempSequence(relid)) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS, false); } #endif break; @@ -1448,7 +1579,7 @@ standard_ProcessUtility(Node *parsetree, } #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1461,7 +1592,7 @@ standard_ProcessUtility(Node *parsetree, createdb((CreatedbStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES, false); #endif break; @@ -1469,7 +1600,7 @@ standard_ProcessUtility(Node *parsetree, AlterDatabase((AlterDatabaseStmt *) parsetree, isTopLevel); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1477,7 +1608,7 @@ standard_ProcessUtility(Node *parsetree, AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1495,7 +1626,7 @@ standard_ProcessUtility(Node *parsetree, /* Clean also remote Coordinators */ sprintf(query, "CLEAN CONNECTION TO ALL FOR DATABASE %s;", stmt->dbname); - ExecUtilityStmtOnNodes(query, NULL, true, EXEC_ON_COORDS); + ExecUtilityStmtOnNodes(query, NULL, true, EXEC_ON_COORDS, false); } #endif @@ -1504,7 +1635,7 @@ standard_ProcessUtility(Node *parsetree, } #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES, false); #endif break; @@ -1551,7 +1682,7 @@ standard_ProcessUtility(Node *parsetree, } #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES, false); #endif break; @@ -1561,7 +1692,7 @@ standard_ProcessUtility(Node *parsetree, cluster((ClusterStmt *) parsetree, isTopLevel); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES); + ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false); #endif break; @@ -1574,7 +1705,7 @@ standard_ProcessUtility(Node *parsetree, * vacuum() pops active snapshot and we can not send it to nodes */ if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES); + ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false); #endif vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false, isTopLevel); @@ -1596,8 +1727,16 @@ standard_ProcessUtility(Node *parsetree, * send this query to backend nodes */ if (!stmt->is_local || !IsTransactionBlock()) - if (PoolManagerSetCommand(stmt->is_local, queryString) < 0) + { + PoolCommandType command_type; + if (stmt->is_local) + command_type = POOL_CMD_LOCAL_SET; + else + command_type = POOL_CMD_GLOBAL_SET; + + if (PoolManagerSetCommand(command_type, queryString) < 0) elog(ERROR, "Postgres-XC: ERROR SET query"); + } } #endif break; @@ -1623,7 +1762,7 @@ standard_ProcessUtility(Node *parsetree, * send this query to backend nodes */ if (!IsTransactionBlock()) - if (PoolManagerSetCommand(false, queryString) < 0) + if (PoolManagerSetCommand(POOL_CMD_GLOBAL_SET, queryString) < 0) elog(ERROR, "Postgres-XC: ERROR DISCARD query"); } #endif @@ -1640,7 +1779,7 @@ standard_ProcessUtility(Node *parsetree, errdetail("The feature is not currently supported"))); if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1661,15 +1800,12 @@ standard_ProcessUtility(Node *parsetree, /* If rule is defined on a view, drop it only on Coordinators */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { - RemoteQueryExecType remoteExecType; + RemoteQueryExecType remoteExecType = EXEC_ON_ALL_NODES; + bool is_temp = false; Oid relid = RangeVarGetRelid(stmt->relation, false); - if (get_rel_relkind(relid) == RELKIND_VIEW) - remoteExecType = EXEC_ON_COORDS; - else - remoteExecType = EXEC_ON_ALL_NODES; - - ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType); + remoteExecType = ExecUtilityFindNodesRelkind(relid, &is_temp); + ExecUtilityStmtOnNodes(queryString, NULL, false, remoteExecType, is_temp); } #endif break; @@ -1679,7 +1815,7 @@ standard_ProcessUtility(Node *parsetree, stmt->behavior, stmt->missing_ok); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; default: @@ -1694,7 +1830,7 @@ standard_ProcessUtility(Node *parsetree, CreateProceduralLanguage((CreatePLangStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1702,7 +1838,7 @@ standard_ProcessUtility(Node *parsetree, DropProceduralLanguage((DropPLangStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1713,7 +1849,7 @@ standard_ProcessUtility(Node *parsetree, DefineDomain((CreateDomainStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1724,7 +1860,7 @@ standard_ProcessUtility(Node *parsetree, CreateRole((CreateRoleStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1732,7 +1868,7 @@ standard_ProcessUtility(Node *parsetree, AlterRole((AlterRoleStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1740,7 +1876,7 @@ standard_ProcessUtility(Node *parsetree, AlterRoleSet((AlterRoleSetStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1748,7 +1884,7 @@ standard_ProcessUtility(Node *parsetree, DropRole((DropRoleStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1756,7 +1892,7 @@ standard_ProcessUtility(Node *parsetree, DropOwnedObjects((DropOwnedStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1764,7 +1900,7 @@ standard_ProcessUtility(Node *parsetree, ReassignOwnedObjects((ReassignOwnedStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1778,7 +1914,7 @@ standard_ProcessUtility(Node *parsetree, LockTableCommand((LockStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1810,7 +1946,7 @@ standard_ProcessUtility(Node *parsetree, (RecoveryInProgress() ? 0 : CHECKPOINT_FORCE)); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES); + ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES, false); #endif break; @@ -1855,7 +1991,7 @@ standard_ProcessUtility(Node *parsetree, #ifdef PGXC if (IS_PGXC_COORDINATOR) ExecUtilityStmtOnNodes(queryString, NULL, - stmt->kind == OBJECT_DATABASE, EXEC_ON_ALL_NODES); + stmt->kind == OBJECT_DATABASE, EXEC_ON_ALL_NODES, false); #endif break; } @@ -1865,7 +2001,7 @@ standard_ProcessUtility(Node *parsetree, CreateConversionCommand((CreateConversionStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1873,7 +2009,7 @@ standard_ProcessUtility(Node *parsetree, CreateCast((CreateCastStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1881,7 +2017,7 @@ standard_ProcessUtility(Node *parsetree, DropCast((DropCastStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1889,7 +2025,7 @@ standard_ProcessUtility(Node *parsetree, DefineOpClass((CreateOpClassStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1897,7 +2033,7 @@ standard_ProcessUtility(Node *parsetree, DefineOpFamily((CreateOpFamilyStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1905,7 +2041,7 @@ standard_ProcessUtility(Node *parsetree, AlterOpFamily((AlterOpFamilyStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1913,7 +2049,7 @@ standard_ProcessUtility(Node *parsetree, RemoveOpClass((RemoveOpClassStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1921,7 +2057,7 @@ standard_ProcessUtility(Node *parsetree, RemoveOpFamily((RemoveOpFamilyStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1929,7 +2065,7 @@ standard_ProcessUtility(Node *parsetree, AlterTSDictionary((AlterTSDictionaryStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; @@ -1937,7 +2073,7 @@ standard_ProcessUtility(Node *parsetree, AlterTSConfiguration((AlterTSConfigurationStmt *) parsetree); #ifdef PGXC if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES); + ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES, false); #endif break; #ifdef PGXC @@ -1956,7 +2092,7 @@ standard_ProcessUtility(Node *parsetree, CleanConnection((CleanConnStmt *) parsetree); if (IS_PGXC_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_COORDS); + ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_COORDS, false); break; #endif default: @@ -1975,7 +2111,7 @@ standard_ProcessUtility(Node *parsetree, */ static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, - bool force_autocommit, RemoteQueryExecType exec_type) + bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp) { /* Return if query is launched on no nodes */ if (exec_type == EXEC_ON_NONE) @@ -1989,6 +2125,7 @@ ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, step->sql_statement = pstrdup(queryString); step->force_autocommit = force_autocommit; step->exec_type = exec_type; + step->is_temp = is_temp; ExecRemoteUtility(step); pfree(step->sql_statement); pfree(step); @@ -2000,37 +2137,97 @@ ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, * * Determine the list of nodes to launch query on. * This depends on temporary nature of object and object type. - * PGXCTODO: Extend temporary object check for tables. + * Return also a flag indicating if relation is temporary. */ static RemoteQueryExecType -ExecUtilityFindNodes(const char *queryString, - ObjectType object_type, - Oid relid) +ExecUtilityFindNodes(ObjectType object_type, + Oid relid, + bool *is_temp) { - bool is_temp = false; - RemoteQueryExecType remoteExecType = EXEC_ON_NONE; + RemoteQueryExecType exec_type; - /* Check if object is a temporary sequence */ - if (object_type == OBJECT_SEQUENCE || - (object_type == OBJECT_TABLE && - get_rel_relkind(relid) == RELKIND_SEQUENCE)) - is_temp = IsTempSequence(relid); + switch (object_type) + { + case OBJECT_SEQUENCE: + /* Check if object is a temporary sequence */ + if ((*is_temp = IsTempSequence(relid))) + exec_type = EXEC_ON_NONE; + else + exec_type = EXEC_ON_COORDS; + break; + + case OBJECT_TABLE: + /* Do the check on relation kind */ + exec_type = ExecUtilityFindNodesRelkind(relid, is_temp); + break; + + case OBJECT_VIEW: + /* Check if object is a temporary view */ + if ((*is_temp = IsTempTable(relid))) + exec_type = EXEC_ON_NONE; + else + exec_type = EXEC_ON_COORDS; + break; + + case OBJECT_INDEX: + /* Check if given index uses temporary tables */ + if ((*is_temp = IsIndexUsingTempTable(relid))) + exec_type = EXEC_ON_DATANODES; + else + exec_type = EXEC_ON_ALL_NODES; + break; + + default: + *is_temp = false; + exec_type = EXEC_ON_ALL_NODES; + break; + } + + return exec_type; +} - if (!is_temp) +/* + * ExecUtilityFindNodesRelkind + * + * Get node execution and temporary type + * for given relation depending on its relkind + */ +static RemoteQueryExecType +ExecUtilityFindNodesRelkind(Oid relid, bool *is_temp) +{ + char relkind_str = get_rel_relkind(relid); + RemoteQueryExecType exec_type; + + switch (relkind_str) { - remoteExecType = EXEC_ON_ALL_NODES; + case RELKIND_SEQUENCE: + if ((*is_temp = IsTempSequence(relid))) + exec_type = EXEC_ON_NONE; + else + exec_type = EXEC_ON_COORDS; + break; - if (object_type == OBJECT_SEQUENCE || - object_type == OBJECT_VIEW) - remoteExecType = EXEC_ON_COORDS; - else if (object_type == OBJECT_TABLE) - { - if (get_rel_relkind(relid) == RELKIND_SEQUENCE) - remoteExecType = EXEC_ON_COORDS; - } + case RELKIND_RELATION: + if ((*is_temp = IsTempTable(relid))) + exec_type = EXEC_ON_DATANODES; + else + exec_type = EXEC_ON_ALL_NODES; + break; + + case RELKIND_VIEW: + if ((*is_temp = IsTempTable(relid))) + exec_type = EXEC_ON_NONE; + else + exec_type = EXEC_ON_COORDS; + break; + + default: + *is_temp = false; + exec_type = EXEC_ON_ALL_NODES; + break; } - return remoteExecType; + return exec_type; } #endif |
