diff options
-rw-r--r-- | contrib/stormstats/stormstats.c | 1 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 69 | ||||
-rw-r--r-- | src/backend/commands/analyze.c | 1 | ||||
-rw-r--r-- | src/backend/commands/dbcommands.c | 2 | ||||
-rw-r--r-- | src/backend/commands/explain.c | 1 | ||||
-rw-r--r-- | src/backend/commands/vacuum.c | 1 | ||||
-rw-r--r-- | src/backend/nodes/copyfuncs.c | 1 | ||||
-rw-r--r-- | src/backend/nodes/outfuncs.c | 1 | ||||
-rw-r--r-- | src/backend/parser/analyze.c | 2 | ||||
-rw-r--r-- | src/backend/pgxc/locator/redistrib.c | 1 | ||||
-rw-r--r-- | src/backend/pgxc/plan/planner.c | 11 | ||||
-rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 18 | ||||
-rw-r--r-- | src/backend/pgxc/pool/poolutils.c | 5 | ||||
-rw-r--r-- | src/backend/tcop/postgres.c | 23 | ||||
-rw-r--r-- | src/backend/tcop/utility.c | 47 | ||||
-rw-r--r-- | src/backend/utils/adt/dbsize.c | 1 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 1 | ||||
-rw-r--r-- | src/include/access/xact.h | 5 | ||||
-rw-r--r-- | src/include/pgxc/planner.h | 1 | ||||
-rw-r--r-- | src/test/regress/sql/plpgsql.sql | 24 |
20 files changed, 179 insertions, 37 deletions
diff --git a/contrib/stormstats/stormstats.c b/contrib/stormstats/stormstats.c index 72c7150fb0..004f5c23de 100644 --- a/contrib/stormstats/stormstats.c +++ b/contrib/stormstats/stormstats.c @@ -630,7 +630,6 @@ storm_gather_remote_coord_info(Oid funcid) step->combine_type = COMBINE_TYPE_NONE; step->exec_nodes = NULL; step->sql_statement = query; - step->force_autocommit = false; step->read_only = true; step->exec_type = EXEC_ON_COORDS; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 4a83f77e3f..9bb805d127 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -90,6 +90,8 @@ #define implicit2PC_head "_$XC$" #endif +#define XACT_REMOTE_TRANSACTION_AUTOCOMMIT 0x01 +#define XACT_REMOTE_TRANSACTION_BLOCK 0x02 /* * User-tweakable parameters @@ -221,6 +223,11 @@ typedef struct TransactionStateData int parallelModeLevel; /* Enter/ExitParallelMode counter */ struct TransactionStateData *parent; /* back link to parent */ #ifdef XCP + /* + * flags to track whether to run the remote transaction in a transaction + * block or in autocommit mode. + */ + int remoteTransactionBlockFlags; int waitedForXidsCount; /* count of xids we waited to finish */ TransactionId *waitedForXids; /* xids we waited to finish */ #endif @@ -2758,6 +2765,7 @@ AtEOXact_GlobalTxn(bool commit) } s->waitedForXids = NULL; s->waitedForXidsCount = 0; + s->remoteTransactionBlockFlags = 0; SetNextTransactionId(InvalidTransactionId); } @@ -3116,6 +3124,7 @@ PrepareTransaction(void) } s->waitedForXids = NULL; s->waitedForXidsCount = 0; + s->remoteTransactionBlockFlags = 0; #endif SetNextTransactionId(InvalidTransactionId); @@ -3429,6 +3438,7 @@ CleanupTransaction(void) } s->waitedForXids = NULL; s->waitedForXidsCount = 0; + s->remoteTransactionBlockFlags = 0; #endif /* @@ -3930,8 +3940,8 @@ AbortCurrentTransaction(void) * making callers do it.) * stmtType: statement type name, for error messages. */ -void -PreventTransactionChain(bool isTopLevel, const char *stmtType) +static void +PreventTransactionChainInternal(bool isTopLevel, const char *stmtType, bool remote) { /* * xact block already started? @@ -3968,6 +3978,21 @@ PreventTransactionChain(bool isTopLevel, const char *stmtType) CurrentTransactionState->blockState != TBLOCK_STARTED) elog(FATAL, "cannot prevent transaction chain"); /* all okay */ + + if (remote) + SetRequireRemoteTransactionAutoCommit(); +} + +void +PreventTransactionChain(bool isTopLevel, const char *stmtType) +{ + PreventTransactionChainInternal(isTopLevel, stmtType, true); +} + +void +PreventTransactionChainLocal(bool isTopLevel, const char *stmtType) +{ + PreventTransactionChainInternal(isTopLevel, stmtType, false); } /* @@ -4300,6 +4325,8 @@ BeginTransactionBlock(void) */ if (IS_PGXC_LOCAL_COORDINATOR) SetSendCommandId(true); + + SetRequireRemoteTransactionBlock(); #endif } @@ -6912,5 +6939,43 @@ SetTopTransactionId(GlobalTransactionId xid) pq_putmessage('x', (const char *) &xid, sizeof (GlobalTransactionId)); } } + +void +SetRequireRemoteTransactionBlock(void) +{ + TransactionState s = CurrentTransactionState; + + if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT) + elog(ERROR, "Can't run a query marked for autocommit in a transaction block"); + s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_BLOCK; +} + +bool +IsRemoteTransactionBlockRequired(void) +{ + TransactionState s = CurrentTransactionState; + + return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK; + +} + +void +SetRequireRemoteTransactionAutoCommit(void) +{ + TransactionState s = CurrentTransactionState; + + if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK) + elog(ERROR, "Can't run a query marked for a transaction block in autocommit mode"); + s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_AUTOCOMMIT; +} + +bool +IsRemoteTransactionAutoCommit(void) +{ + TransactionState s = CurrentTransactionState; + + return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT; + +} #endif #endif diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 832d99a065..8abc3432e3 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -3032,7 +3032,6 @@ coord_collect_simple_stats(Relation onerel, bool inh, int attr_cnt, step->combine_type = COMBINE_TYPE_NONE; step->exec_nodes = NULL; step->sql_statement = query.data; - step->force_autocommit = true; step->exec_type = EXEC_ON_DATANODES; /* Add targetlist entries */ diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index ce49f91166..2fd6497518 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -1613,7 +1613,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel) /* ... but we allow it on remote nodes */ if (IS_PGXC_LOCAL_COORDINATOR) #endif - PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE"); + PreventTransactionChainLocal(isTopLevel, "ALTER DATABASE SET TABLESPACE"); movedb(stmt->dbname, defGetString(dtablespace)); return InvalidOid; diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 69dc8d0508..4a2eb5b56e 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -3760,7 +3760,6 @@ ExplainRemoteQuery(RemoteQuery *plan, PlanState *planstate, List *ancestors, Exp step->exec_nodes->nodeList = list_make1_int(linitial_int(plan->exec_nodes->nodeList)); - step->force_autocommit = true; step->exec_type = EXEC_ON_DATANODES; dummy = makeVar(1, 1, TEXTOID, -1, InvalidOid, 0); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index fffae63bcc..edadfc4844 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -1706,7 +1706,6 @@ get_remote_relstat(char *nspname, char *relname, bool replicated, step->combine_type = COMBINE_TYPE_NONE; step->exec_nodes = NULL; step->sql_statement = query.data; - step->force_autocommit = true; step->exec_type = EXEC_ON_DATANODES; /* Add targetlist entries */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index e64a074cd1..067fa8296f 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1232,7 +1232,6 @@ _copyRemoteQuery(const RemoteQuery *from) COPY_SCALAR_FIELD(combine_type); COPY_NODE_FIELD(sort); COPY_SCALAR_FIELD(read_only); - COPY_SCALAR_FIELD(force_autocommit); COPY_STRING_FIELD(statement); COPY_STRING_FIELD(cursor); COPY_SCALAR_FIELD(rq_num_params); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 99b70b717f..06e12e0b64 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -873,7 +873,6 @@ _outRemoteQuery(StringInfo str, const RemoteQuery *node) WRITE_NODE_FIELD(exec_nodes); WRITE_ENUM_FIELD(combine_type, CombineType); WRITE_BOOL_FIELD(read_only); - WRITE_BOOL_FIELD(force_autocommit); WRITE_STRING_FIELD(statement); WRITE_STRING_FIELD(cursor); WRITE_INT_FIELD(rq_num_params); diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 7ccd3f3dd8..fc865ac47a 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -2712,7 +2712,6 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt) step->combine_type = COMBINE_TYPE_NONE; step->sort = NULL; step->read_only = true; - step->force_autocommit = false; step->cursor = NULL; /* This is needed by executor */ @@ -2735,7 +2734,6 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt) step->join_condition = NULL; /* Change the list of nodes that will be executed for the query and others */ - step->force_autocommit = false; step->combine_type = COMBINE_TYPE_SAME; step->read_only = true; step->exec_direct_type = EXEC_DIRECT_NONE; diff --git a/src/backend/pgxc/locator/redistrib.c b/src/backend/pgxc/locator/redistrib.c index 6999dc7e2a..1a081b4730 100644 --- a/src/backend/pgxc/locator/redistrib.c +++ b/src/backend/pgxc/locator/redistrib.c @@ -888,7 +888,6 @@ distrib_execute_query(char *sql, bool is_temp, ExecNodes *exec_nodes) step->combine_type = COMBINE_TYPE_SAME; step->exec_nodes = exec_nodes; step->sql_statement = pstrdup(sql); - step->force_autocommit = false; /* Redistribution operations only concern Datanodes */ step->exec_type = EXEC_ON_DATANODES; diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index e86a945c25..9afb602359 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -17,6 +17,7 @@ #include "postgres.h" #include "miscadmin.h" #include "access/transam.h" +#include "access/xact.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_class.h" #include "catalog/pg_inherits_fn.h" @@ -232,6 +233,16 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams) /* we need Coordinator for evaluation, invoke standard planner */ result = standard_planner(query, cursorOptions, boundParams); + + /* + * For coordinator side execution, we must always force a transaction block + * on the remote side. This ensures that all queries resulting from the + * coordinator side execution are run within a block. For example, this + * could be a user-defined function, which internally runs several queries, + * where each query is separately checked for fast-query-shipping. We must + * run all these queries inside a block. + */ + SetRequireRemoteTransactionBlock(); return result; } diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 2fd23687fc..b069b68be5 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -1895,6 +1895,9 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections, if (connections[i]->state == DN_CONNECTION_STATE_QUERY) BufferConnection(connections[i]); + elog(DEBUG2, "Sending gxid %u to remote node %s, need_tran_block %d", + gxid, connections[i]->nodename, need_tran_block); + /* Send GXID and check for errors */ if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid)) return EOF; @@ -1908,7 +1911,7 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections, else if (IS_PGXC_REMOTE_COORDINATOR) need_tran_block = false; - elog(DEBUG5, "need_tran_block %d, connections[%d]->transaction_status %c", + elog(DEBUG2, "need_tran_block %d, connections[%d]->transaction_status %c", need_tran_block, i, connections[i]->transaction_status); /* Send BEGIN if not already in transaction */ if (need_tran_block && connections[i]->transaction_status == 'I') @@ -2805,7 +2808,7 @@ DataNodeCopyBegin(RemoteCopyData *rcstate) * If more than one nodes are involved or if we are already in a * transaction block, we must the remote statements in a transaction block */ - need_tran_block = (conn_count > 1) || (TransactionBlockStatusCode() == 'T'); + need_tran_block = (conn_count > 1) || IsRemoteTransactionBlockRequired(); elog(DEBUG1, "conn_count = %d, need_tran_block = %s", conn_count, need_tran_block ? "true" : "false"); @@ -3400,7 +3403,7 @@ ExecRemoteUtility(RemoteQuery *node) { RemoteQueryState *remotestate; ResponseCombiner *combiner; - bool force_autocommit = node->force_autocommit; + bool force_autocommit = IsRemoteTransactionAutoCommit(); RemoteQueryExecType exec_type = node->exec_type; GlobalTransactionId gxid = InvalidGlobalTransactionId; Snapshot snapshot = NULL; @@ -4624,12 +4627,17 @@ ExecRemoteQuery(PlanState *pstate) * Start transaction on data nodes if we are in explicit transaction * or going to use extended query protocol or write to multiple nodes */ - if (step->force_autocommit) + elog(DEBUG2, "cursor %s, read_only %d," + " total_conn_count %d, transaction block status %c", + step->cursor, step->read_only, + total_conn_count, TransactionBlockStatusCode()); + + if (IsRemoteTransactionAutoCommit()) need_tran_block = false; else need_tran_block = step->cursor || (!step->read_only && total_conn_count > 1) || - (TransactionBlockStatusCode() == 'T'); + IsRemoteTransactionBlockRequired(); stat_statement(); stat_transaction(total_conn_count); diff --git a/src/backend/pgxc/pool/poolutils.c b/src/backend/pgxc/pool/poolutils.c index ad361a0812..ac8d1daff4 100644 --- a/src/backend/pgxc/pool/poolutils.c +++ b/src/backend/pgxc/pool/poolutils.c @@ -27,6 +27,7 @@ #include "pgxc/poolutils.h" #include "pgxc/pgxcnode.h" #include "access/gtm.h" +#include "access/transam.h" #include "access/xact.h" #include "catalog/pgxc_node.h" #include "commands/dbcommands.h" @@ -97,10 +98,10 @@ pgxc_pool_check(PG_FUNCTION_ARGS) Datum pgxc_pool_reload(PG_FUNCTION_ARGS) { - if (IsTransactionBlock()) + if (TransactionIdIsValid(GetTopTransactionIdIfAny())) ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("pgxc_pool_reload cannot run inside a transaction block"))); + errmsg("pgxc_pool_reload cannot run with a transaction ID assigned"))); /* * Always check if we can get away with a LESS destructive refresh diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 8aa7dc25dc..3c73f9bc16 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -805,6 +805,13 @@ pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string, */ querytree_list = pg_rewrite_query(query); + /* + * If we rewrote the query into more than one queries, then we must + * enforce a transaction block while running remote queries. + */ + if (list_length(querytree_list) > 1) + SetRequireRemoteTransactionBlock(); + TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string); return querytree_list; @@ -1245,9 +1252,19 @@ exec_simple_query(const char *query_string) */ if (IS_PGXC_DATANODE && IsPostmasterEnvironment) { - if (IsA(parsetree->stmt, VacuumStmt) || IsA(parsetree->stmt, ClusterStmt)) - SetForceXidFromGTM(true); - else if (IsA(parsetree->stmt, ReindexStmt)) + if (IsA(parsetree, VacuumStmt)) + { + VacuumStmt *stmt = (VacuumStmt *) parsetree; + if (stmt->options & VACOPT_VACUUM) + SetForceXidFromGTM(true); + } + else if (IsA(parsetree, ClusterStmt)) + { + ClusterStmt *stmt = (ClusterStmt *) parsetree; + if (stmt->relation == NULL) + SetForceXidFromGTM(true); + } + else if (IsA(parsetree, ReindexStmt)) { ReindexStmt *stmt = (ReindexStmt *) parsetree->stmt; if (stmt->kind == REINDEX_OBJECT_SCHEMA || diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 6dd3845153..6254f1c5a8 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -92,14 +92,12 @@ static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes, bool sentToRemote, - bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp, bool add_context); static void ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool sentToRemote, - bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp); static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType, @@ -1535,7 +1533,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, case T_CreateTableSpaceStmt: /* no event triggers for global objects */ if (IS_PGXC_LOCAL_COORDINATOR) - PreventTransactionChain(isTopLevel, "CREATE TABLESPACE"); + PreventTransactionChainLocal(isTopLevel, "CREATE TABLESPACE"); CreateTableSpace((CreateTableSpaceStmt *) parsetree); break; @@ -1543,7 +1541,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, /* no event triggers for global objects */ /* Allow this to be run inside transaction block on remote nodes */ if (IS_PGXC_LOCAL_COORDINATOR) - PreventTransactionChain(isTopLevel, "DROP TABLESPACE"); + PreventTransactionChainLocal(isTopLevel, "DROP TABLESPACE"); DropTableSpace((DropTableSpaceStmt *) parsetree); break; @@ -1595,7 +1593,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, case T_CreatedbStmt: /* no event triggers for global objects */ if (IS_PGXC_LOCAL_COORDINATOR) - PreventTransactionChain(isTopLevel, "CREATE DATABASE"); + PreventTransactionChainLocal(isTopLevel, "CREATE DATABASE"); createdb(pstate, (CreatedbStmt *) parsetree); break; @@ -1615,7 +1613,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, /* no event triggers for global objects */ if (IS_PGXC_LOCAL_COORDINATOR) - PreventTransactionChain(isTopLevel, "DROP DATABASE"); + PreventTransactionChainLocal(isTopLevel, "DROP DATABASE"); dropdb(stmt->dbname, stmt->missing_ok); } @@ -1667,6 +1665,8 @@ standard_ProcessUtility(PlannedStmt *pstmt, case T_ClusterStmt: /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("CLUSTER"); + if (((ClusterStmt *) parsetree)->relation == NULL) + PreventTransactionChain(isTopLevel, "CLUSTER"); /* forbidden in parallel mode due to CommandIsReadOnly */ cluster((ClusterStmt *) parsetree, isTopLevel); break; @@ -1678,6 +1678,16 @@ standard_ProcessUtility(PlannedStmt *pstmt, /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"); + /* + * We have to run the command on nodes before Coordinator because + * vacuum() pops active snapshot and we can not send it to nodes + */ + if (IS_PGXC_LOCAL_COORDINATOR && + !(stmt->options & VACOPT_COORDINATOR)) + { + if (stmt->options & VACOPT_VACUUM) + SetRequireRemoteTransactionAutoCommit(); + } /* forbidden in parallel mode due to CommandIsReadOnly */ ExecVacuum(stmt, isTopLevel); } @@ -1785,6 +1795,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, case T_ReindexStmt: { ReindexStmt *stmt = (ReindexStmt *) parsetree; + bool prevent_xact_chain = false; /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("REINDEX"); @@ -1812,12 +1823,18 @@ standard_ProcessUtility(PlannedStmt *pstmt, (stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" : "REINDEX DATABASE"); ReindexMultipleTables(stmt->name, stmt->kind, stmt->options); + prevent_xact_chain = true; break; default: elog(ERROR, "unrecognized object type: %d", (int) stmt->kind); break; } + if (IS_PGXC_LOCAL_COORDINATOR) + { + if (prevent_xact_chain) + SetRequireRemoteTransactionAutoCommit(); + } } break; @@ -1868,7 +1885,6 @@ standard_ProcessUtility(PlannedStmt *pstmt, completionTag); else ExecRenameStmt(stmt); - } break; @@ -1929,7 +1945,6 @@ standard_ProcessUtility(PlannedStmt *pstmt, completionTag); else CommentObject(stmt); - break; } break; @@ -2824,12 +2839,17 @@ ExecDropStmt(DropStmt *stmt, ExecDropStmt(DropStmt *stmt, bool isTopLevel) #endif { + bool prevent_xact_chain = false; + switch (stmt->removeType) { case OBJECT_INDEX: if (stmt->concurrent) + { PreventTransactionChain(isTopLevel, "DROP INDEX CONCURRENTLY"); + prevent_xact_chain = true; + } /* fall through */ case OBJECT_TABLE: @@ -2850,8 +2870,12 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel) #ifdef PGXC /* DROP is done depending on the object type and its temporary type */ if (IS_PGXC_LOCAL_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, + { + if (prevent_xact_chain) + SetRequireRemoteTransactionAutoCommit(); + ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, exec_type, is_temp, false); + } } #endif break; @@ -2868,7 +2892,7 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel) RemoveObjects(stmt); #ifdef PGXC if (IS_PGXC_LOCAL_COORDINATOR) - ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, + ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, exec_type, is_temp, false); } #endif @@ -4596,7 +4620,7 @@ GetCommandLogLevel(Node *parsetree) #ifdef PGXC static void ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool sentToRemote, - bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp) + RemoteQueryExecType exec_type, bool is_temp) { /* Return if query is launched on no nodes */ if (exec_type == EXEC_ON_NONE) @@ -4620,7 +4644,6 @@ ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool s step->combine_type = COMBINE_TYPE_SAME; step->exec_nodes = nodes; step->sql_statement = pstrdup(queryString); - step->force_autocommit = force_autocommit; step->exec_type = exec_type; ExecRemoteUtility(step); pfree(step->sql_statement); diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c index 4e8aacf6c4..5bf80a26a9 100644 --- a/src/backend/utils/adt/dbsize.c +++ b/src/backend/utils/adt/dbsize.c @@ -1235,7 +1235,6 @@ pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query) } plan->sql_statement = query; - plan->force_autocommit = false; /* * We only need the target entry to determine result data type. * So create dummy even if real expression is a function. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a51de7fafd..cc032e4966 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -7445,7 +7445,6 @@ set_config_option(const char *name, const char *value, step->exec_nodes = NULL; step->sql_statement = poolcmd.data; /* force_autocommit is actually does not start transaction on nodes */ - step->force_autocommit = true; step->exec_type = EXEC_ON_CURRENT; ExecRemoteUtility(step); pfree(step); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 16b3c5a26f..34c54198e2 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -403,6 +403,7 @@ extern bool IsTransactionOrTransactionBlock(void); extern char TransactionBlockStatusCode(void); extern void AbortOutOfAnyTransaction(void); extern void PreventTransactionChain(bool isTopLevel, const char *stmtType); +extern void PreventTransactionChainLocal(bool isTopLevel, const char *stmtType); extern void RequireTransactionChain(bool isTopLevel, const char *stmtType); extern void WarnNoTransactionChain(bool isTopLevel, const char *stmtType); extern bool IsInTransactionChain(bool isTopLevel); @@ -429,6 +430,10 @@ extern void SetSendCommandId(bool status); extern bool IsPGXCNodeXactReadOnly(void); extern bool IsPGXCNodeXactDatanodeDirect(void); extern void TransactionRecordXidWait(TransactionId xid); +extern void SetRequireRemoteTransactionBlock(void); +extern bool IsRemoteTransactionBlockRequired(void); +extern void SetRequireRemoteTransactionAutoCommit(void); +extern bool IsRemoteTransactionAutoCommit(void); #endif extern int xactGetCommittedChildren(TransactionId **ptr); diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index 095eca4e00..d4223d2ab6 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -88,7 +88,6 @@ typedef struct CombineType combine_type; SimpleSort *sort; bool read_only; /* do not use 2PC when committing read only steps */ - bool force_autocommit; /* some commands like VACUUM require autocommit mode */ char *statement; /* if specified use it as a PreparedStatement name on Datanodes */ char *cursor; /* if specified use it as a Portal name on Datanodes */ int rq_num_params; /* number of parameters present in diff --git a/src/test/regress/sql/plpgsql.sql b/src/test/regress/sql/plpgsql.sql index fdcffe7d65..31517d384e 100644 --- a/src/test/regress/sql/plpgsql.sql +++ b/src/test/regress/sql/plpgsql.sql @@ -4921,3 +4921,27 @@ BEGIN END; $$ LANGUAGE plpgsql; SELECT * FROM list_partitioned_table() AS t; + +-- ensure that all statements in a function are correctly executed in a +-- transaction block. +create table plp_mt_tab(a int, b int); +create function plpgsql_multistmt() returns void as $$ +begin + insert into plp_mt_tab(a) values (1); + insert into plp_mt_tab(a) values (2); + insert into plp_mt_tab(a) values (3/0); +end +$$ language plpgsql; + +select plpgsql_multistmt(); +select * from plp_mt_tab; + +create or replace function plpgsql_multistmt() returns void as $$ +begin + insert into plp_mt_tab(a) values (3); + update plp_mt_tab set b = 1 where (a / 0) = 0; +end +$$ language plpgsql; + +select plpgsql_multistmt(); +select * from plp_mt_tab; |