summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/stormstats/stormstats.c1
-rw-r--r--src/backend/access/transam/xact.c69
-rw-r--r--src/backend/commands/analyze.c1
-rw-r--r--src/backend/commands/dbcommands.c2
-rw-r--r--src/backend/commands/explain.c1
-rw-r--r--src/backend/commands/vacuum.c1
-rw-r--r--src/backend/nodes/copyfuncs.c1
-rw-r--r--src/backend/nodes/outfuncs.c1
-rw-r--r--src/backend/parser/analyze.c2
-rw-r--r--src/backend/pgxc/locator/redistrib.c1
-rw-r--r--src/backend/pgxc/plan/planner.c11
-rw-r--r--src/backend/pgxc/pool/execRemote.c18
-rw-r--r--src/backend/pgxc/pool/poolutils.c5
-rw-r--r--src/backend/tcop/postgres.c23
-rw-r--r--src/backend/tcop/utility.c47
-rw-r--r--src/backend/utils/adt/dbsize.c1
-rw-r--r--src/backend/utils/misc/guc.c1
-rw-r--r--src/include/access/xact.h5
-rw-r--r--src/include/pgxc/planner.h1
-rw-r--r--src/test/regress/sql/plpgsql.sql24
20 files changed, 179 insertions, 37 deletions
diff --git a/contrib/stormstats/stormstats.c b/contrib/stormstats/stormstats.c
index 72c7150fb0..004f5c23de 100644
--- a/contrib/stormstats/stormstats.c
+++ b/contrib/stormstats/stormstats.c
@@ -630,7 +630,6 @@ storm_gather_remote_coord_info(Oid funcid)
step->combine_type = COMBINE_TYPE_NONE;
step->exec_nodes = NULL;
step->sql_statement = query;
- step->force_autocommit = false;
step->read_only = true;
step->exec_type = EXEC_ON_COORDS;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 4a83f77e3f..9bb805d127 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -90,6 +90,8 @@
#define implicit2PC_head "_$XC$"
#endif
+#define XACT_REMOTE_TRANSACTION_AUTOCOMMIT 0x01
+#define XACT_REMOTE_TRANSACTION_BLOCK 0x02
/*
* User-tweakable parameters
@@ -221,6 +223,11 @@ typedef struct TransactionStateData
int parallelModeLevel; /* Enter/ExitParallelMode counter */
struct TransactionStateData *parent; /* back link to parent */
#ifdef XCP
+ /*
+ * flags to track whether to run the remote transaction in a transaction
+ * block or in autocommit mode.
+ */
+ int remoteTransactionBlockFlags;
int waitedForXidsCount; /* count of xids we waited to finish */
TransactionId *waitedForXids; /* xids we waited to finish */
#endif
@@ -2758,6 +2765,7 @@ AtEOXact_GlobalTxn(bool commit)
}
s->waitedForXids = NULL;
s->waitedForXidsCount = 0;
+ s->remoteTransactionBlockFlags = 0;
SetNextTransactionId(InvalidTransactionId);
}
@@ -3116,6 +3124,7 @@ PrepareTransaction(void)
}
s->waitedForXids = NULL;
s->waitedForXidsCount = 0;
+ s->remoteTransactionBlockFlags = 0;
#endif
SetNextTransactionId(InvalidTransactionId);
@@ -3429,6 +3438,7 @@ CleanupTransaction(void)
}
s->waitedForXids = NULL;
s->waitedForXidsCount = 0;
+ s->remoteTransactionBlockFlags = 0;
#endif
/*
@@ -3930,8 +3940,8 @@ AbortCurrentTransaction(void)
* making callers do it.)
* stmtType: statement type name, for error messages.
*/
-void
-PreventTransactionChain(bool isTopLevel, const char *stmtType)
+static void
+PreventTransactionChainInternal(bool isTopLevel, const char *stmtType, bool remote)
{
/*
* xact block already started?
@@ -3968,6 +3978,21 @@ PreventTransactionChain(bool isTopLevel, const char *stmtType)
CurrentTransactionState->blockState != TBLOCK_STARTED)
elog(FATAL, "cannot prevent transaction chain");
/* all okay */
+
+ if (remote)
+ SetRequireRemoteTransactionAutoCommit();
+}
+
+void
+PreventTransactionChain(bool isTopLevel, const char *stmtType)
+{
+ PreventTransactionChainInternal(isTopLevel, stmtType, true);
+}
+
+void
+PreventTransactionChainLocal(bool isTopLevel, const char *stmtType)
+{
+ PreventTransactionChainInternal(isTopLevel, stmtType, false);
}
/*
@@ -4300,6 +4325,8 @@ BeginTransactionBlock(void)
*/
if (IS_PGXC_LOCAL_COORDINATOR)
SetSendCommandId(true);
+
+ SetRequireRemoteTransactionBlock();
#endif
}
@@ -6912,5 +6939,43 @@ SetTopTransactionId(GlobalTransactionId xid)
pq_putmessage('x', (const char *) &xid, sizeof (GlobalTransactionId));
}
}
+
+void
+SetRequireRemoteTransactionBlock(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT)
+ elog(ERROR, "Can't run a query marked for autocommit in a transaction block");
+ s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_BLOCK;
+}
+
+bool
+IsRemoteTransactionBlockRequired(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK;
+
+}
+
+void
+SetRequireRemoteTransactionAutoCommit(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ if (s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_BLOCK)
+ elog(ERROR, "Can't run a query marked for a transaction block in autocommit mode");
+ s->remoteTransactionBlockFlags |= XACT_REMOTE_TRANSACTION_AUTOCOMMIT;
+}
+
+bool
+IsRemoteTransactionAutoCommit(void)
+{
+ TransactionState s = CurrentTransactionState;
+
+ return s->remoteTransactionBlockFlags & XACT_REMOTE_TRANSACTION_AUTOCOMMIT;
+
+}
#endif
#endif
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 832d99a065..8abc3432e3 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -3032,7 +3032,6 @@ coord_collect_simple_stats(Relation onerel, bool inh, int attr_cnt,
step->combine_type = COMBINE_TYPE_NONE;
step->exec_nodes = NULL;
step->sql_statement = query.data;
- step->force_autocommit = true;
step->exec_type = EXEC_ON_DATANODES;
/* Add targetlist entries */
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index ce49f91166..2fd6497518 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1613,7 +1613,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
/* ... but we allow it on remote nodes */
if (IS_PGXC_LOCAL_COORDINATOR)
#endif
- PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
+ PreventTransactionChainLocal(isTopLevel, "ALTER DATABASE SET TABLESPACE");
movedb(stmt->dbname, defGetString(dtablespace));
return InvalidOid;
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 69dc8d0508..4a2eb5b56e 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3760,7 +3760,6 @@ ExplainRemoteQuery(RemoteQuery *plan, PlanState *planstate, List *ancestors, Exp
step->exec_nodes->nodeList =
list_make1_int(linitial_int(plan->exec_nodes->nodeList));
- step->force_autocommit = true;
step->exec_type = EXEC_ON_DATANODES;
dummy = makeVar(1, 1, TEXTOID, -1, InvalidOid, 0);
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index fffae63bcc..edadfc4844 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1706,7 +1706,6 @@ get_remote_relstat(char *nspname, char *relname, bool replicated,
step->combine_type = COMBINE_TYPE_NONE;
step->exec_nodes = NULL;
step->sql_statement = query.data;
- step->force_autocommit = true;
step->exec_type = EXEC_ON_DATANODES;
/* Add targetlist entries */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index e64a074cd1..067fa8296f 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -1232,7 +1232,6 @@ _copyRemoteQuery(const RemoteQuery *from)
COPY_SCALAR_FIELD(combine_type);
COPY_NODE_FIELD(sort);
COPY_SCALAR_FIELD(read_only);
- COPY_SCALAR_FIELD(force_autocommit);
COPY_STRING_FIELD(statement);
COPY_STRING_FIELD(cursor);
COPY_SCALAR_FIELD(rq_num_params);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 99b70b717f..06e12e0b64 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -873,7 +873,6 @@ _outRemoteQuery(StringInfo str, const RemoteQuery *node)
WRITE_NODE_FIELD(exec_nodes);
WRITE_ENUM_FIELD(combine_type, CombineType);
WRITE_BOOL_FIELD(read_only);
- WRITE_BOOL_FIELD(force_autocommit);
WRITE_STRING_FIELD(statement);
WRITE_STRING_FIELD(cursor);
WRITE_INT_FIELD(rq_num_params);
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 7ccd3f3dd8..fc865ac47a 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -2712,7 +2712,6 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt)
step->combine_type = COMBINE_TYPE_NONE;
step->sort = NULL;
step->read_only = true;
- step->force_autocommit = false;
step->cursor = NULL;
/* This is needed by executor */
@@ -2735,7 +2734,6 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt)
step->join_condition = NULL;
/* Change the list of nodes that will be executed for the query and others */
- step->force_autocommit = false;
step->combine_type = COMBINE_TYPE_SAME;
step->read_only = true;
step->exec_direct_type = EXEC_DIRECT_NONE;
diff --git a/src/backend/pgxc/locator/redistrib.c b/src/backend/pgxc/locator/redistrib.c
index 6999dc7e2a..1a081b4730 100644
--- a/src/backend/pgxc/locator/redistrib.c
+++ b/src/backend/pgxc/locator/redistrib.c
@@ -888,7 +888,6 @@ distrib_execute_query(char *sql, bool is_temp, ExecNodes *exec_nodes)
step->combine_type = COMBINE_TYPE_SAME;
step->exec_nodes = exec_nodes;
step->sql_statement = pstrdup(sql);
- step->force_autocommit = false;
/* Redistribution operations only concern Datanodes */
step->exec_type = EXEC_ON_DATANODES;
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index e86a945c25..9afb602359 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -17,6 +17,7 @@
#include "postgres.h"
#include "miscadmin.h"
#include "access/transam.h"
+#include "access/xact.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_class.h"
#include "catalog/pg_inherits_fn.h"
@@ -232,6 +233,16 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
/* we need Coordinator for evaluation, invoke standard planner */
result = standard_planner(query, cursorOptions, boundParams);
+
+ /*
+ * For coordinator side execution, we must always force a transaction block
+ * on the remote side. This ensures that all queries resulting from the
+ * coordinator side execution are run within a block. For example, this
+ * could be a user-defined function, which internally runs several queries,
+ * where each query is separately checked for fast-query-shipping. We must
+ * run all these queries inside a block.
+ */
+ SetRequireRemoteTransactionBlock();
return result;
}
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 2fd23687fc..b069b68be5 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -1895,6 +1895,9 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections,
if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
BufferConnection(connections[i]);
+ elog(DEBUG2, "Sending gxid %u to remote node %s, need_tran_block %d",
+ gxid, connections[i]->nodename, need_tran_block);
+
/* Send GXID and check for errors */
if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid))
return EOF;
@@ -1908,7 +1911,7 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections,
else if (IS_PGXC_REMOTE_COORDINATOR)
need_tran_block = false;
- elog(DEBUG5, "need_tran_block %d, connections[%d]->transaction_status %c",
+ elog(DEBUG2, "need_tran_block %d, connections[%d]->transaction_status %c",
need_tran_block, i, connections[i]->transaction_status);
/* Send BEGIN if not already in transaction */
if (need_tran_block && connections[i]->transaction_status == 'I')
@@ -2805,7 +2808,7 @@ DataNodeCopyBegin(RemoteCopyData *rcstate)
* If more than one nodes are involved or if we are already in a
* transaction block, we must the remote statements in a transaction block
*/
- need_tran_block = (conn_count > 1) || (TransactionBlockStatusCode() == 'T');
+ need_tran_block = (conn_count > 1) || IsRemoteTransactionBlockRequired();
elog(DEBUG1, "conn_count = %d, need_tran_block = %s", conn_count,
need_tran_block ? "true" : "false");
@@ -3400,7 +3403,7 @@ ExecRemoteUtility(RemoteQuery *node)
{
RemoteQueryState *remotestate;
ResponseCombiner *combiner;
- bool force_autocommit = node->force_autocommit;
+ bool force_autocommit = IsRemoteTransactionAutoCommit();
RemoteQueryExecType exec_type = node->exec_type;
GlobalTransactionId gxid = InvalidGlobalTransactionId;
Snapshot snapshot = NULL;
@@ -4624,12 +4627,17 @@ ExecRemoteQuery(PlanState *pstate)
* Start transaction on data nodes if we are in explicit transaction
* or going to use extended query protocol or write to multiple nodes
*/
- if (step->force_autocommit)
+ elog(DEBUG2, "cursor %s, read_only %d,"
+ " total_conn_count %d, transaction block status %c",
+ step->cursor, step->read_only,
+ total_conn_count, TransactionBlockStatusCode());
+
+ if (IsRemoteTransactionAutoCommit())
need_tran_block = false;
else
need_tran_block = step->cursor ||
(!step->read_only && total_conn_count > 1) ||
- (TransactionBlockStatusCode() == 'T');
+ IsRemoteTransactionBlockRequired();
stat_statement();
stat_transaction(total_conn_count);
diff --git a/src/backend/pgxc/pool/poolutils.c b/src/backend/pgxc/pool/poolutils.c
index ad361a0812..ac8d1daff4 100644
--- a/src/backend/pgxc/pool/poolutils.c
+++ b/src/backend/pgxc/pool/poolutils.c
@@ -27,6 +27,7 @@
#include "pgxc/poolutils.h"
#include "pgxc/pgxcnode.h"
#include "access/gtm.h"
+#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pgxc_node.h"
#include "commands/dbcommands.h"
@@ -97,10 +98,10 @@ pgxc_pool_check(PG_FUNCTION_ARGS)
Datum
pgxc_pool_reload(PG_FUNCTION_ARGS)
{
- if (IsTransactionBlock())
+ if (TransactionIdIsValid(GetTopTransactionIdIfAny()))
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
- errmsg("pgxc_pool_reload cannot run inside a transaction block")));
+ errmsg("pgxc_pool_reload cannot run with a transaction ID assigned")));
/*
* Always check if we can get away with a LESS destructive refresh
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8aa7dc25dc..3c73f9bc16 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -805,6 +805,13 @@ pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string,
*/
querytree_list = pg_rewrite_query(query);
+ /*
+ * If we rewrote the query into more than one queries, then we must
+ * enforce a transaction block while running remote queries.
+ */
+ if (list_length(querytree_list) > 1)
+ SetRequireRemoteTransactionBlock();
+
TRACE_POSTGRESQL_QUERY_REWRITE_DONE(query_string);
return querytree_list;
@@ -1245,9 +1252,19 @@ exec_simple_query(const char *query_string)
*/
if (IS_PGXC_DATANODE && IsPostmasterEnvironment)
{
- if (IsA(parsetree->stmt, VacuumStmt) || IsA(parsetree->stmt, ClusterStmt))
- SetForceXidFromGTM(true);
- else if (IsA(parsetree->stmt, ReindexStmt))
+ if (IsA(parsetree, VacuumStmt))
+ {
+ VacuumStmt *stmt = (VacuumStmt *) parsetree;
+ if (stmt->options & VACOPT_VACUUM)
+ SetForceXidFromGTM(true);
+ }
+ else if (IsA(parsetree, ClusterStmt))
+ {
+ ClusterStmt *stmt = (ClusterStmt *) parsetree;
+ if (stmt->relation == NULL)
+ SetForceXidFromGTM(true);
+ }
+ else if (IsA(parsetree, ReindexStmt))
{
ReindexStmt *stmt = (ReindexStmt *) parsetree->stmt;
if (stmt->kind == REINDEX_OBJECT_SCHEMA ||
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 6dd3845153..6254f1c5a8 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -92,14 +92,12 @@
static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
bool sentToRemote,
- bool force_autocommit,
RemoteQueryExecType exec_type,
bool is_temp,
bool add_context);
static void ExecUtilityStmtOnNodesInternal(const char *queryString,
ExecNodes *nodes,
bool sentToRemote,
- bool force_autocommit,
RemoteQueryExecType exec_type,
bool is_temp);
static RemoteQueryExecType ExecUtilityFindNodes(ObjectType objectType,
@@ -1535,7 +1533,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case T_CreateTableSpaceStmt:
/* no event triggers for global objects */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "CREATE TABLESPACE");
+ PreventTransactionChainLocal(isTopLevel, "CREATE TABLESPACE");
CreateTableSpace((CreateTableSpaceStmt *) parsetree);
break;
@@ -1543,7 +1541,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
/* no event triggers for global objects */
/* Allow this to be run inside transaction block on remote nodes */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "DROP TABLESPACE");
+ PreventTransactionChainLocal(isTopLevel, "DROP TABLESPACE");
DropTableSpace((DropTableSpaceStmt *) parsetree);
break;
@@ -1595,7 +1593,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case T_CreatedbStmt:
/* no event triggers for global objects */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "CREATE DATABASE");
+ PreventTransactionChainLocal(isTopLevel, "CREATE DATABASE");
createdb(pstate, (CreatedbStmt *) parsetree);
break;
@@ -1615,7 +1613,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
/* no event triggers for global objects */
if (IS_PGXC_LOCAL_COORDINATOR)
- PreventTransactionChain(isTopLevel, "DROP DATABASE");
+ PreventTransactionChainLocal(isTopLevel, "DROP DATABASE");
dropdb(stmt->dbname, stmt->missing_ok);
}
@@ -1667,6 +1665,8 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case T_ClusterStmt:
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery("CLUSTER");
+ if (((ClusterStmt *) parsetree)->relation == NULL)
+ PreventTransactionChain(isTopLevel, "CLUSTER");
/* forbidden in parallel mode due to CommandIsReadOnly */
cluster((ClusterStmt *) parsetree, isTopLevel);
break;
@@ -1678,6 +1678,16 @@ standard_ProcessUtility(PlannedStmt *pstmt,
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
"VACUUM" : "ANALYZE");
+ /*
+ * We have to run the command on nodes before Coordinator because
+ * vacuum() pops active snapshot and we can not send it to nodes
+ */
+ if (IS_PGXC_LOCAL_COORDINATOR &&
+ !(stmt->options & VACOPT_COORDINATOR))
+ {
+ if (stmt->options & VACOPT_VACUUM)
+ SetRequireRemoteTransactionAutoCommit();
+ }
/* forbidden in parallel mode due to CommandIsReadOnly */
ExecVacuum(stmt, isTopLevel);
}
@@ -1785,6 +1795,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
case T_ReindexStmt:
{
ReindexStmt *stmt = (ReindexStmt *) parsetree;
+ bool prevent_xact_chain = false;
/* we choose to allow this during "read only" transactions */
PreventCommandDuringRecovery("REINDEX");
@@ -1812,12 +1823,18 @@ standard_ProcessUtility(PlannedStmt *pstmt,
(stmt->kind == REINDEX_OBJECT_SYSTEM) ? "REINDEX SYSTEM" :
"REINDEX DATABASE");
ReindexMultipleTables(stmt->name, stmt->kind, stmt->options);
+ prevent_xact_chain = true;
break;
default:
elog(ERROR, "unrecognized object type: %d",
(int) stmt->kind);
break;
}
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ {
+ if (prevent_xact_chain)
+ SetRequireRemoteTransactionAutoCommit();
+ }
}
break;
@@ -1868,7 +1885,6 @@ standard_ProcessUtility(PlannedStmt *pstmt,
completionTag);
else
ExecRenameStmt(stmt);
-
}
break;
@@ -1929,7 +1945,6 @@ standard_ProcessUtility(PlannedStmt *pstmt,
completionTag);
else
CommentObject(stmt);
- break;
}
break;
@@ -2824,12 +2839,17 @@ ExecDropStmt(DropStmt *stmt,
ExecDropStmt(DropStmt *stmt, bool isTopLevel)
#endif
{
+ bool prevent_xact_chain = false;
+
switch (stmt->removeType)
{
case OBJECT_INDEX:
if (stmt->concurrent)
+ {
PreventTransactionChain(isTopLevel,
"DROP INDEX CONCURRENTLY");
+ prevent_xact_chain = true;
+ }
/* fall through */
case OBJECT_TABLE:
@@ -2850,8 +2870,12 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel)
#ifdef PGXC
/* DROP is done depending on the object type and its temporary type */
if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
+ {
+ if (prevent_xact_chain)
+ SetRequireRemoteTransactionAutoCommit();
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
exec_type, is_temp, false);
+ }
}
#endif
break;
@@ -2868,7 +2892,7 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel)
RemoveObjects(stmt);
#ifdef PGXC
if (IS_PGXC_LOCAL_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false,
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
exec_type, is_temp, false);
}
#endif
@@ -4596,7 +4620,7 @@ GetCommandLogLevel(Node *parsetree)
#ifdef PGXC
static void
ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool sentToRemote,
- bool force_autocommit, RemoteQueryExecType exec_type, bool is_temp)
+ RemoteQueryExecType exec_type, bool is_temp)
{
/* Return if query is launched on no nodes */
if (exec_type == EXEC_ON_NONE)
@@ -4620,7 +4644,6 @@ ExecUtilityStmtOnNodesInternal(const char *queryString, ExecNodes *nodes, bool s
step->combine_type = COMBINE_TYPE_SAME;
step->exec_nodes = nodes;
step->sql_statement = pstrdup(queryString);
- step->force_autocommit = force_autocommit;
step->exec_type = exec_type;
ExecRemoteUtility(step);
pfree(step->sql_statement);
diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c
index 4e8aacf6c4..5bf80a26a9 100644
--- a/src/backend/utils/adt/dbsize.c
+++ b/src/backend/utils/adt/dbsize.c
@@ -1235,7 +1235,6 @@ pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query)
}
plan->sql_statement = query;
- plan->force_autocommit = false;
/*
* We only need the target entry to determine result data type.
* So create dummy even if real expression is a function.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a51de7fafd..cc032e4966 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -7445,7 +7445,6 @@ set_config_option(const char *name, const char *value,
step->exec_nodes = NULL;
step->sql_statement = poolcmd.data;
/* force_autocommit is actually does not start transaction on nodes */
- step->force_autocommit = true;
step->exec_type = EXEC_ON_CURRENT;
ExecRemoteUtility(step);
pfree(step);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 16b3c5a26f..34c54198e2 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -403,6 +403,7 @@ extern bool IsTransactionOrTransactionBlock(void);
extern char TransactionBlockStatusCode(void);
extern void AbortOutOfAnyTransaction(void);
extern void PreventTransactionChain(bool isTopLevel, const char *stmtType);
+extern void PreventTransactionChainLocal(bool isTopLevel, const char *stmtType);
extern void RequireTransactionChain(bool isTopLevel, const char *stmtType);
extern void WarnNoTransactionChain(bool isTopLevel, const char *stmtType);
extern bool IsInTransactionChain(bool isTopLevel);
@@ -429,6 +430,10 @@ extern void SetSendCommandId(bool status);
extern bool IsPGXCNodeXactReadOnly(void);
extern bool IsPGXCNodeXactDatanodeDirect(void);
extern void TransactionRecordXidWait(TransactionId xid);
+extern void SetRequireRemoteTransactionBlock(void);
+extern bool IsRemoteTransactionBlockRequired(void);
+extern void SetRequireRemoteTransactionAutoCommit(void);
+extern bool IsRemoteTransactionAutoCommit(void);
#endif
extern int xactGetCommittedChildren(TransactionId **ptr);
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index 095eca4e00..d4223d2ab6 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -88,7 +88,6 @@ typedef struct
CombineType combine_type;
SimpleSort *sort;
bool read_only; /* do not use 2PC when committing read only steps */
- bool force_autocommit; /* some commands like VACUUM require autocommit mode */
char *statement; /* if specified use it as a PreparedStatement name on Datanodes */
char *cursor; /* if specified use it as a Portal name on Datanodes */
int rq_num_params; /* number of parameters present in
diff --git a/src/test/regress/sql/plpgsql.sql b/src/test/regress/sql/plpgsql.sql
index fdcffe7d65..31517d384e 100644
--- a/src/test/regress/sql/plpgsql.sql
+++ b/src/test/regress/sql/plpgsql.sql
@@ -4921,3 +4921,27 @@ BEGIN
END; $$ LANGUAGE plpgsql;
SELECT * FROM list_partitioned_table() AS t;
+
+-- ensure that all statements in a function are correctly executed in a
+-- transaction block.
+create table plp_mt_tab(a int, b int);
+create function plpgsql_multistmt() returns void as $$
+begin
+ insert into plp_mt_tab(a) values (1);
+ insert into plp_mt_tab(a) values (2);
+ insert into plp_mt_tab(a) values (3/0);
+end
+$$ language plpgsql;
+
+select plpgsql_multistmt();
+select * from plp_mt_tab;
+
+create or replace function plpgsql_multistmt() returns void as $$
+begin
+ insert into plp_mt_tab(a) values (3);
+ update plp_mt_tab set b = 1 where (a / 0) = 0;
+end
+$$ language plpgsql;
+
+select plpgsql_multistmt();
+select * from plp_mt_tab;