summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xact.c213
-rw-r--r--src/backend/pgxc/pool/execRemote.c341
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c46
-rw-r--r--src/backend/storage/ipc/procarray.c8
-rw-r--r--src/include/pgxc/execRemote.h14
-rw-r--r--src/include/pgxc/pgxcnode.h11
6 files changed, 536 insertions, 97 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 5faa4eaf5a..a069d44183 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -26,7 +26,7 @@
#include "access/gtm.h"
/* PGXC_COORD */
#include "gtm/gtm_c.h"
-#include "pgxc/pgxcnode.h"
+#include "pgxc/execRemote.h"
/* PGXC_DATANODE */
#include "postmaster/autovacuum.h"
#endif
@@ -139,6 +139,9 @@ typedef struct TransactionStateData
TransactionId transactionId; /* my XID, or Invalid if none */
#ifdef PGXC /* PGXC_COORD */
GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */
+ GlobalTransactionId globalCommitTransactionId; /* Commit GXID used by implicit 2PC */
+ bool ArePGXCNodesPrepared; /* Checks if PGXC Nodes are prepared and
+ * rollbacks then in case of an Abort */
#endif
SubTransactionId subTransactionId; /* my subxact ID */
char *name; /* savepoint name, if any */
@@ -169,6 +172,8 @@ static TransactionStateData TopTransactionStateData = {
0, /* transaction id */
#ifdef PGXC
0, /* global transaction id */
+ 0, /* global commit transaction id */
+ 0, /* flag if nodes are prepared or not */
#endif
0, /* subtransaction id */
NULL, /* savepoint name */
@@ -307,6 +312,7 @@ static const char *TransStateAsString(TransState state);
#ifdef PGXC /* PGXC_COORD */
static GlobalTransactionId GetGlobalTransactionId(TransactionState s);
+static void PrepareTransaction(bool write_2pc_file, bool is_implicit);
/* ----------------------------------------------------------------
* PG-XC Functions
@@ -1631,10 +1637,15 @@ StartTransaction(void)
* start processing
*/
s->state = TRANS_START;
-#ifdef PGXC /* PGXC_COORD */
+#ifdef PGXC
/* GXID is assigned already by a remote Coordinator */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
s->globalTransactionId = InvalidGlobalTransactionId; /* until assigned */
+ /* Until assigned by implicit 2PC */
+ s->globalCommitTransactionId = InvalidGlobalTransactionId;
+ s->ArePGXCNodesPrepared = false;
+ }
#endif
s->transactionId = InvalidTransactionId; /* until assigned */
/*
@@ -1737,7 +1748,31 @@ CommitTransaction(void)
{
TransactionState s = CurrentTransactionState;
TransactionId latestXid;
+#ifdef PGXC
+ bool PrepareLocalCoord = false;
+ bool PreparePGXCNodes = false;
+ char implicitgid[256];
+ TransactionId xid = GetCurrentTransactionId();
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ PreparePGXCNodes = PGXCNodeIsImplicit2PC(&PrepareLocalCoord);
+
+ if (PrepareLocalCoord || PreparePGXCNodes)
+ sprintf(implicitgid, "T%d", xid);
+
+ /* Save GID where PrepareTransaction can find it again */
+ if (PrepareLocalCoord)
+ {
+ prepareGID = MemoryContextStrdup(TopTransactionContext, implicitgid);
+ /*
+ * If current transaction has a DDL, and involves more than 1 Coordinator,
+ * PREPARE first on local Coordinator.
+ */
+ PrepareTransaction(true, true);
+ }
+ else
+ {
+#endif
ShowTransactionState("CommitTransaction");
/*
@@ -1747,6 +1782,28 @@ CommitTransaction(void)
elog(WARNING, "CommitTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
+#ifdef PGXC
+ }
+
+ /*
+ * If Transaction has involved several nodes, prepare them before committing on Coordinator.
+ */
+ if (PreparePGXCNodes)
+ {
+ /*
+ * Prepare all the nodes involved in this Implicit 2PC
+ * If Coordinator COMMIT fails, nodes are also rollbacked during AbortTransaction().
+ *
+ * Track if PGXC Nodes are already prepared
+ */
+ if (PGXCNodeImplicitPrepare(xid, implicitgid) < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("cannot COMMIT a transaction whose PREPARE has failed on Nodes")));
+ else
+ s->ArePGXCNodesPrepared = true;
+ }
+#endif
/*
* Do pre-commit processing (most of this stuff requires database access,
@@ -1756,6 +1813,10 @@ CommitTransaction(void)
* deferred triggers, and it's also possible that triggers create holdable
* cursors. So we have to loop until there's nothing left to do.
*/
+#ifdef PGXC
+ if (!PrepareLocalCoord)
+ {
+#endif
for (;;)
{
/*
@@ -1800,8 +1861,11 @@ CommitTransaction(void)
/*
* There can be error on the data nodes. So go to data nodes before
* changing transaction state and local clean up
+ * Here simply commit on nodes, we know that 2PC is not involved implicitely.
+ *
+ * This is called only if it is not necessary to prepare the nodes.
*/
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !PreparePGXCNodes)
PGXCNodeCommit();
#endif
@@ -1825,8 +1889,10 @@ CommitTransaction(void)
/*
* Now we can let GTM know about transaction commit.
* Only a Remote Coordinator is allowed to do that.
+ *
+ * Also do not commit a transaction that has already been prepared on Datanodes
*/
- if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !PreparePGXCNodes)
{
CommitTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
@@ -1908,6 +1974,46 @@ CommitTransaction(void)
AtEOXact_MultiXact();
+#ifdef PGXC
+ }/* End of !PrepareLocalCoord */
+
+ /*
+ * At this point, if no 2pc has been used, we have a transaction that committed on GTM,
+ * local coord and nodes, so the remaining stuff is only ressource cleanup.
+ * If 2pc has been used, Coordinator has been prepared (if 2 Coordinators at least are involved
+ * in current transaction).
+ * Datanodes have also been prepared if more than 1 Datanode has been written.
+ *
+ * Here we complete Implicit 2PC in the following order
+ * - Commit the prepared transaction on local coordinator (if necessary)
+ * - Commit on the remaining nodes
+ */
+
+ if (PreparePGXCNodes)
+ {
+ /*
+ * Preparing for Commit, transaction has to take a new TransactionID for Commit
+ * It is considered as in Progress state.
+ */
+ s->state = TRANS_INPROGRESS;
+ s->globalCommitTransactionId = BeginTranGTM(NULL);
+
+ /* COMMIT local Coordinator */
+ if (PrepareLocalCoord)
+ {
+ FinishPreparedTransaction(implicitgid, true);
+ }
+
+ /*
+ * Commit all the nodes involved in this implicit 2PC.
+ * COMMIT on GTM is made here and is made at the same time
+ * for prepared GXID and commit GXID to limit interactions between GTM and Coord.
+ * This explains why prepared GXID is also in argument.
+ */
+ PGXCNodeImplicitCommitPrepared(xid, s->globalCommitTransactionId, implicitgid, true);
+ }
+#endif
+
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
true, true);
@@ -1948,7 +2054,11 @@ CommitTransaction(void)
#ifdef PGXC
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
s->globalTransactionId = InvalidGlobalTransactionId;
+ s->globalCommitTransactionId = InvalidGlobalTransactionId;
+ s->ArePGXCNodesPrepared = false;
+ }
else if (IS_PGXC_DATANODE || IsConnFromCoord())
SetNextTransactionId(InvalidTransactionId);
#endif
@@ -1972,9 +2082,11 @@ CommitTransaction(void)
/*
* Only a Postgres-XC Coordinator that received a PREPARE Command from
* an application can use this special prepare.
+ * If PrepareTransaction is called during an implicit 2PC, do not release ressources,
+ * this is made by CommitTransaction when transaction has been committed on Nodes.
*/
static void
-PrepareTransaction(bool write_2pc_file)
+PrepareTransaction(bool write_2pc_file, bool is_implicit)
#else
static void
PrepareTransaction(void)
@@ -2169,6 +2281,14 @@ PrepareTransaction(void)
}
#endif
+#ifdef PGXC
+ /*
+ * In case of an implicit 2PC, ressources are released by CommitTransaction()
+ */
+ if (!is_implicit)
+ {
+#endif
+
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
true, true);
@@ -2218,6 +2338,9 @@ PrepareTransaction(void)
s->state = TRANS_DEFAULT;
RESUME_INTERRUPTS();
+#ifdef PGXC
+ } /* is_implicit END */
+#endif
}
@@ -2285,8 +2408,13 @@ AbortTransaction(void)
/*
* We should rollback on the data nodes before cleaning up portals
* to be sure data structures used by connections are not freed yet
+ *
+ * It is also necessary to check that node are not partially committed
+ * in an implicit 2PC, correct handling is made below.
*/
- if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ if (IS_PGXC_COORDINATOR &&
+ !IsConnFromCoord() &&
+ !TransactionIdIsValid(s->globalCommitTransactionId))
{
/*
* Make sure this is rolled back on the DataNodes
@@ -2309,6 +2437,10 @@ AbortTransaction(void)
* Advertise the fact that we aborted in pg_clog (assuming that we got as
* far as assigning an XID to advertise).
*/
+#ifdef PGXC
+ /* Do not abort a transaction that has already been committed in an implicit 2PC */
+ if (!TransactionIdIsValid(s->globalCommitTransactionId))
+#endif
latestXid = RecordTransactionAbort(false);
TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
@@ -2316,8 +2448,56 @@ AbortTransaction(void)
/* This is done by remote Coordinator */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- RollbackTranGTM(s->globalTransactionId);
+ /*
+ * Rollback the transaction ID only if it is not being used by an implicit 2PC.
+ */
+ if (!s->ArePGXCNodesPrepared)
+ RollbackTranGTM(s->globalTransactionId);
+
latestXid = s->globalTransactionId;
+
+ /* Rollback Prepared Nodes if they are totally prepared but not committed at all */
+ if (s->ArePGXCNodesPrepared && !TransactionIdIsValid(s->globalCommitTransactionId))
+ {
+ char implicitgid[256];
+
+ sprintf(implicitgid, "T%d", s->globalTransactionId);
+ PGXCNodeImplicitCommitPrepared(s->globalTransactionId,
+ s->globalCommitTransactionId,
+ implicitgid, false);
+ }
+ else if (s->ArePGXCNodesPrepared && TransactionIdIsValid(s->globalCommitTransactionId))
+ {
+ /*
+ * In this case transaction is partially committed, pick up the list of nodes
+ * prepared and not committed and register them on GTM as if it is an explicit 2PC.
+ * This permits to keep the transaction alive in snapshot and other transaction
+ * don't have any side effects with partially committed transactions
+ */
+ char implicitgid[256];
+ int co_conn_count, dn_conn_count;
+ PGXC_NodeId *datanodes = NULL;
+ PGXC_NodeId *coordinators = NULL;
+
+ sprintf(implicitgid, "T%d", s->globalTransactionId);
+
+ /* Get the list of nodes in error state */
+ PGXCNodeGetNodeList(&datanodes, &dn_conn_count, &coordinators, &co_conn_count);
+
+ /* Save the node list and gid on GTM. */
+ StartPreparedTranGTM(s->globalTransactionId, implicitgid,
+ dn_conn_count, datanodes, co_conn_count, coordinators);
+
+ /* Finish to prepare the transaction. */
+ PrepareTranGTM(s->globalTransactionId);
+
+ /*
+ * Rollback commit GXID as it has been used by an implicit 2PC.
+ * It is important at this point not to Commit the GXID used for PREPARE
+ * to keep it visible in snapshot for other transactions.
+ */
+ RollbackTranGTM(s->globalCommitTransactionId);
+ }
}
else if (IS_PGXC_DATANODE || IsConnFromCoord())
{
@@ -2598,7 +2778,7 @@ CommitTransactionCommand(void)
* return to the idle state.
*/
case TBLOCK_PREPARE:
- PrepareTransaction(true);
+ PrepareTransaction(true, false);
s->blockState = TBLOCK_DEFAULT;
break;
@@ -2608,7 +2788,7 @@ CommitTransactionCommand(void)
* that involved DDLs on a Coordinator.
*/
case TBLOCK_PREPARE_NO_2PC_FILE:
- PrepareTransaction(false);
+ PrepareTransaction(false, false);
s->blockState = TBLOCK_DEFAULT;
break;
#endif
@@ -2643,17 +2823,20 @@ CommitTransactionCommand(void)
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
}
-#ifdef PGXC
- else if (s->blockState == TBLOCK_PREPARE ||
- s->blockState == TBLOCK_PREPARE_NO_2PC_FILE)
-#else
else if (s->blockState == TBLOCK_PREPARE)
-#endif
{
Assert(s->parent == NULL);
- PrepareTransaction(true);
+ PrepareTransaction(true, false);
s->blockState = TBLOCK_DEFAULT;
}
+#ifdef PGXC
+ else if (s->blockState == TBLOCK_PREPARE_NO_2PC_FILE)
+ {
+ Assert(s->parent == NULL);
+ PrepareTransaction(false, false);
+ s->blockState = TBLOCK_DEFAULT;
+ }
+#endif
else
{
Assert(s->blockState == TBLOCK_INPROGRESS ||
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index a524c13f6d..b954003db5 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -49,6 +49,7 @@ extern char *deparseSql(RemoteQueryState *scanstate);
#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024
static bool autocommit = true;
+static bool implicit_force_autocommit = false;
static PGXCNodeHandle **write_node_list = NULL;
static int write_node_count = 0;
@@ -61,6 +62,14 @@ static int pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransacti
PGXCNodeAllHandles * pgxc_handles, char *gid);
static int pgxc_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
PGXCNodeAllHandles * pgxc_handles, char *gid);
+static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid,
+ GlobalTransactionId commit_xid,
+ PGXCNodeAllHandles * pgxc_handles,
+ char *gid,
+ bool is_commit);
+static int pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid,
+ PGXCNodeAllHandles * pgxc_handles, char *gid);
+
static PGXCNodeAllHandles * get_exec_connections(ExecNodes *exec_nodes,
RemoteQueryExecType exec_type);
static int pgxc_node_receive_and_validate(const int conn_count,
@@ -74,7 +83,7 @@ static int handle_response_clear(PGXCNodeHandle * conn);
static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor);
-static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void);
+static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested);
#define MAX_STATEMENTS_PER_TRAN 10
@@ -1505,7 +1514,7 @@ PGXCNodePrepare(char *gid)
PGXCNodeAllHandles *pgxc_connections;
bool local_operation = false;
- pgxc_connections = pgxc_get_all_transaction_nodes();
+ pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT);
/* DDL involved in transaction, so make a local prepare too */
if (pgxc_connections->co_conn_count != 0)
@@ -1669,6 +1678,176 @@ finish:
return result;
}
+/*
+ * Prepare all the nodes involved in this implicit Prepare
+ * Abort transaction if this is not done correctly
+ */
+int
+PGXCNodeImplicitPrepare(GlobalTransactionId prepare_xid, char *gid)
+{
+ int res = 0;
+ int tran_count;
+ PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT);
+
+ if (!pgxc_connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not prepare connection implicitely")));
+
+ tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count;
+
+ /*
+ * This should not happen because an implicit 2PC is always using other nodes,
+ * but it is better to check.
+ */
+ if (tran_count == 0)
+ {
+ goto finish;
+ }
+
+ res = pgxc_node_implicit_prepare(prepare_xid, pgxc_connections, gid);
+
+finish:
+ if (!autocommit)
+ stat_transaction(pgxc_connections->dn_conn_count);
+
+ return res;
+}
+
+/*
+ * Prepare transaction on dedicated nodes for Implicit 2PC
+ * This is done inside a Transaction commit if multiple nodes are involved in write operations
+ * Implicit prepare in done internally on Coordinator, so this does not interact with GTM.
+ */
+static int
+pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid,
+ PGXCNodeAllHandles *pgxc_handles,
+ char *gid)
+{
+ int result = 0;
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int dn_conn_count = pgxc_handles->dn_conn_count;
+ char buffer[256];
+
+ sprintf(buffer, "PREPARE TRANSACTION '%s'", gid);
+
+ /* Continue even after an error here, to consume the messages */
+ result = pgxc_all_handles_send_query(pgxc_handles, buffer, true);
+
+ /* Receive and Combine results from Datanodes and Coordinators */
+ result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
+
+ return result;
+}
+
+/*
+ * Commit all the nodes involved in this Implicit Commit.
+ * Prepared XID is committed at the same time as Commit XID on GTM.
+ */
+void
+PGXCNodeImplicitCommitPrepared(GlobalTransactionId prepare_xid,
+ GlobalTransactionId commit_xid,
+ char *gid,
+ bool is_commit)
+{
+ int res = 0;
+ int tran_count;
+ PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_IDLE);
+
+ if (!pgxc_connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not commit prepared transaction implicitely")));
+
+ tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count;
+
+ /*
+ * This should not happen because an implicit 2PC is always using other nodes,
+ * but it is better to check.
+ */
+ if (tran_count == 0)
+ {
+ elog(WARNING, "Nothing to PREPARE on Datanodes and Coordinators");
+ goto finish;
+ }
+
+ res = pgxc_node_implicit_commit_prepared(prepare_xid, commit_xid,
+ pgxc_connections, gid, is_commit);
+
+finish:
+ /* Clear nodes, signals are clear */
+ if (!autocommit)
+ stat_transaction(pgxc_connections->dn_conn_count);
+
+ /*
+ * If an error happened, do not release handles yet. This is done when transaction
+ * is aborted after the list of nodes in error state has been saved to be sent to GTM
+ */
+ if (!PersistentConnections && res == 0)
+ release_handles(false);
+ autocommit = true;
+ clear_write_node_list();
+
+ if (res != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not commit prepared transaction implicitely")));
+
+ /*
+ * Commit on GTM is made once we are sure that Nodes are not only partially committed
+ * If an error happens on a Datanode during implicit COMMIT PREPARED, a special handling
+ * is made in AbortTransaction().
+ * The list of datanodes is saved on GTM and the partially committed transaction can be committed
+ * with a COMMIT PREPARED delivered directly from application.
+ * This permits to keep the gxid alive in snapshot and avoids other transactions to see only
+ * partially committed results.
+ */
+ CommitPreparedTranGTM(prepare_xid, commit_xid);
+}
+
+/*
+ * Commit a transaction implicitely transaction on all nodes
+ * Prepared transaction with this gid has reset the datanodes,
+ * so we need a new gxid.
+ *
+ * GXID used for Prepare and Commit are committed at the same time on GTM.
+ * This saves Network ressource a bit.
+ */
+static int
+pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid,
+ GlobalTransactionId commit_xid,
+ PGXCNodeAllHandles *pgxc_handles,
+ char *gid,
+ bool is_commit)
+{
+ char buffer[256];
+ int result = 0;
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int dn_conn_count = pgxc_handles->dn_conn_count;
+
+ if (is_commit)
+ sprintf(buffer, "COMMIT PREPARED '%s'", gid);
+ else
+ sprintf(buffer, "ROLLBACK PREPARED '%s'", gid);
+
+ if (pgxc_all_handles_send_gxid(pgxc_handles, commit_xid, true))
+ {
+ result = EOF;
+ goto finish;
+ }
+
+ /* Send COMMIT to all handles */
+ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
+ result = EOF;
+
+ /* Receive and Combine results from Datanodes and Coordinators */
+ result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
+
+finish:
+ return result;
+}
/*
* Commit prepared transaction on Datanodes and Coordinators (as necessary)
@@ -1684,7 +1863,7 @@ PGXCNodeCommitPrepared(char *gid)
{
int res = 0;
int res_gtm = 0;
- PGXCNodeAllHandles *pgxc_handles;
+ PGXCNodeAllHandles *pgxc_handles = NULL;
List *datanodelist = NIL;
List *coordlist = NIL;
int i, tran_count;
@@ -1812,7 +1991,7 @@ PGXCNodeRollbackPrepared(char *gid)
{
int res = 0;
int res_gtm = 0;
- PGXCNodeAllHandles *pgxc_handles;
+ PGXCNodeAllHandles *pgxc_handles = NULL;
List *datanodelist = NIL;
List *coordlist = NIL;
int i, tran_count;
@@ -1922,6 +2101,8 @@ pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepar
/*
* Commit current transaction on data nodes where it has been started
+ * This function is called when no 2PC is involved implicitely.
+ * So only send a commit to the involved nodes.
*/
void
PGXCNodeCommit(void)
@@ -1930,7 +2111,7 @@ PGXCNodeCommit(void)
int tran_count;
PGXCNodeAllHandles *pgxc_connections;
- pgxc_connections = pgxc_get_all_transaction_nodes();
+ pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT);
tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count;
@@ -1952,7 +2133,7 @@ finish:
autocommit = true;
clear_write_node_list();
- /* Clear up connection */
+ /* Clean up connections */
pfree_pgxc_all_handles(pgxc_connections);
if (res != 0)
ereport(ERROR,
@@ -1969,71 +2150,11 @@ static int
pgxc_node_commit(PGXCNodeAllHandles *pgxc_handles)
{
char buffer[256];
- GlobalTransactionId gxid = InvalidGlobalTransactionId;
int result = 0;
int co_conn_count = pgxc_handles->co_conn_count;
int dn_conn_count = pgxc_handles->dn_conn_count;
- /* can set this to false to disable temporarily */
- /* bool do2PC = conn_count > 1; */
-
- /*
- * Only use 2PC if more than one node was written to. Otherwise, just send
- * COMMIT to all
- */
- bool do2PC = write_node_count > 1;
-
- /* Extra XID for Two Phase Commit */
- GlobalTransactionId two_phase_xid = 0;
-
- if (do2PC)
- {
- stat_2pc();
-
- /*
- * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here,
- * but since we need 2pc, we surely have sent down a command and got
- * gxid for it. Hence GetCurrentGlobalTransactionId() just returns
- * already allocated gxid
- */
- gxid = GetCurrentGlobalTransactionId();
-
- sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid);
-
- if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
- result = EOF;
-
- /* Receive and Combine results from Datanodes and Coordinators */
- result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, true);
- result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, true);
- }
-
- if (!do2PC)
- strcpy(buffer, "COMMIT");
- else
- {
- if (result)
- {
- sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid);
- /* Consume any messages on the Datanodes and Coordinators first if necessary */
- PGXCNodeConsumeMessages();
- }
- else
- sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid);
-
- /*
- * We need to use a new xid, the data nodes have reset
- * Timestamp has already been set with BEGIN on remote Datanodes,
- * so don't use it here.
- */
- two_phase_xid = BeginTranGTM(NULL);
-
- if (pgxc_all_handles_send_gxid(pgxc_handles, two_phase_xid, true))
- {
- result = EOF;
- goto finish;
- }
- }
+ strcpy(buffer, "COMMIT");
/* Send COMMIT to all handles */
if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
@@ -2043,10 +2164,6 @@ pgxc_node_commit(PGXCNodeAllHandles *pgxc_handles)
result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
-finish:
- if (do2PC)
- CommitTranGTM((GlobalTransactionId) two_phase_xid);
-
return result;
}
@@ -2062,7 +2179,7 @@ PGXCNodeRollback(void)
int tran_count;
PGXCNodeAllHandles *pgxc_connections;
- pgxc_connections = pgxc_get_all_transaction_nodes();
+ pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT);
tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count;
@@ -2099,7 +2216,6 @@ finish:
static int
pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles)
{
- int i;
int result = 0;
int co_conn_count = pgxc_handles->co_conn_count;
int dn_conn_count = pgxc_handles->dn_conn_count;
@@ -2881,6 +2997,8 @@ ExecRemoteQuery(RemoteQueryState *node)
PGXCNodeAllHandles *pgxc_connections;
TupleTableSlot *innerSlot = NULL;
+ implicit_force_autocommit = force_autocommit;
+
/*
* Inner plan for RemoteQuery supplies parameters.
* We execute inner plan to get a tuple and use values of the tuple as
@@ -3622,6 +3740,8 @@ ExecRemoteUtility(RemoteQuery *node)
bool need_tran;
int i;
+ implicit_force_autocommit = force_autocommit;
+
remotestate = CreateResponseCombiner(0, node->combine_type);
pgxc_connections = get_exec_connections(node->exec_nodes,
@@ -3984,7 +4104,7 @@ finish:
* for both data nodes and coordinators
*/
static PGXCNodeAllHandles *
-pgxc_get_all_transaction_nodes()
+pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested)
{
PGXCNodeAllHandles *pgxc_connections;
@@ -4009,9 +4129,13 @@ pgxc_get_all_transaction_nodes()
/* gather needed connections */
pgxc_connections->dn_conn_count = get_transaction_nodes(
- pgxc_connections->datanode_handles, REMOTE_CONN_DATANODE);
+ pgxc_connections->datanode_handles,
+ REMOTE_CONN_DATANODE,
+ status_requested);
pgxc_connections->co_conn_count = get_transaction_nodes(
- pgxc_connections->coord_handles, REMOTE_CONN_COORD);
+ pgxc_connections->coord_handles,
+ REMOTE_CONN_COORD,
+ status_requested);
return pgxc_connections;
}
@@ -4032,3 +4156,68 @@ pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles)
pfree(pgxc_handles);
}
+
+/*
+ * Check if an Implicit 2PC is necessary for this transaction.
+ * Check also if it is necessary to prepare transaction locally.
+ */
+bool
+PGXCNodeIsImplicit2PC(bool *prepare_local_coord)
+{
+ PGXCNodeAllHandles *pgxc_handles = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT);
+ int co_conn_count = pgxc_handles->co_conn_count;
+
+ /* Prepare Local Coord only if DDL is involved on multiple nodes */
+ *prepare_local_coord = co_conn_count > 0;
+
+ /*
+ * In case of an autocommit or forced autocommit transaction, 2PC is not involved
+ * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...)
+ */
+ if (implicit_force_autocommit)
+ {
+ implicit_force_autocommit = false;
+ return false;
+ }
+
+ /*
+ * 2PC is necessary at other Nodes if one Datanode or one Coordinator
+ * other than the local one has been involved in a write operation.
+ */
+ return (write_node_count > 1 || co_conn_count > 0);
+}
+
+/*
+ * Return the list of active nodes
+ */
+void
+PGXCNodeGetNodeList(PGXC_NodeId **datanodes,
+ int *dn_conn_count,
+ PGXC_NodeId **coordinators,
+ int *co_conn_count)
+{
+ PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_ERROR);
+
+ *dn_conn_count = pgxc_connections->dn_conn_count;
+
+ /* Add in the list local coordinator also if necessary */
+ if (pgxc_connections->co_conn_count == 0)
+ *co_conn_count = pgxc_connections->co_conn_count;
+ else
+ *co_conn_count = pgxc_connections->co_conn_count + 1;
+
+ if (pgxc_connections->dn_conn_count != 0)
+ *datanodes = collect_pgxcnode_numbers(pgxc_connections->dn_conn_count,
+ pgxc_connections->datanode_handles, REMOTE_CONN_DATANODE);
+
+ if (pgxc_connections->co_conn_count != 0)
+ *coordinators = collect_pgxcnode_numbers(pgxc_connections->co_conn_count,
+ pgxc_connections->coord_handles, REMOTE_CONN_COORD);
+
+ /*
+ * Now release handles properly, the list of handles in error state has been saved
+ * and will be sent to GTM.
+ */
+ if (!PersistentConnections)
+ release_handles(false);
+}
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index cbaf68c610..4790f9573a 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -1579,9 +1579,19 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query)
* to a PGXCNodeHandle structure.
* The function returns number of pointers written to the connections array.
* Remaining items in the array, if any, will be kept unchanged
+ *
+ * In an implicit 2PC, status of connections is set back to idle after preparing
+ * the transaction on each backend.
+ * At commit phase, it is necessary to get backends in idle state to be able to
+ * commit properly the backends.
+ *
+ * In the case of an error occuring with an implicit 2PC that has been partially
+ * committed on nodes, return the list of connections that has an error state
+ * to register the list of remaining nodes not commit prepared on GTM.
*/
int
-get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type)
+get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type,
+ PGXCNode_HandleRequested status_requested)
{
int tran_count = 0;
int i;
@@ -1596,16 +1606,42 @@ get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type)
* DN_CONNECTION_STATE_ERROR_FATAL.
* ERROR_NOT_READY can happen if the data node abruptly disconnects.
*/
- if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status != 'I')
- connections[tran_count++] = &dn_handles[i];
+ if (status_requested == HANDLE_IDLE)
+ {
+ if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status == 'I')
+ connections[tran_count++] = &dn_handles[i];
+ }
+ else if (status_requested == HANDLE_ERROR)
+ {
+ if (dn_handles[i].transaction_status == 'E')
+ connections[tran_count++] = &dn_handles[i];
+ }
+ else
+ {
+ if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status != 'I')
+ connections[tran_count++] = &dn_handles[i];
+ }
}
}
if (coord_count && client_conn_type == REMOTE_CONN_COORD)
{
for (i = 0; i < NumCoords; i++)
{
- if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status != 'I')
- connections[tran_count++] = &co_handles[i];
+ if (status_requested == HANDLE_IDLE)
+ {
+ if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status == 'I')
+ connections[tran_count++] = &co_handles[i];
+ }
+ else if (status_requested == HANDLE_ERROR)
+ {
+ if (co_handles[i].transaction_status == 'E')
+ connections[tran_count++] = &co_handles[i];
+ }
+ else
+ {
+ if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status != 'I')
+ connections[tran_count++] = &co_handles[i];
+ }
}
}
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cfabce3dbc..93376e97c5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -280,6 +280,14 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
* taking a snapshot. See discussion in
* src/backend/access/transam/README.
*/
+#ifdef PGXC
+ /*
+ * Remove this assertion check for PGXC on Coordinator
+ * We could abort even after a Coordinator has committed
+ * for a 2PC transaction if Datanodes have failed committed the transaction
+ */
+ if (IS_PGXC_DATANODE)
+#endif
Assert(TransactionIdIsValid(proc->xid));
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index 4a33842bdf..a3c1868422 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -43,6 +43,7 @@ typedef enum
REQUEST_TYPE_COPY_OUT /* Copy Out response */
} RequestType;
+
/*
* Represents a DataRow message received from a remote node.
* Contains originating node number and message body in DataRow format without
@@ -111,6 +112,19 @@ extern int PGXCNodeRollback(void);
extern bool PGXCNodePrepare(char *gid);
extern bool PGXCNodeRollbackPrepared(char *gid);
extern bool PGXCNodeCommitPrepared(char *gid);
+extern bool PGXCNodeIsImplicit2PC(bool *prepare_local_coord);
+extern int PGXCNodeImplicitPrepare(GlobalTransactionId prepare_xid, char *gid);
+extern void PGXCNodeImplicitCommitPrepared(GlobalTransactionId prepare_xid,
+ GlobalTransactionId commit_xid,
+ char *gid,
+ bool is_commit);
+extern void PGXCNodeConsumeMessages(void);
+
+/* Get list of nodes */
+extern void PGXCNodeGetNodeList(PGXC_NodeId **datanodes,
+ int *dn_conn_count,
+ PGXC_NodeId **coordinators,
+ int *co_conn_count);
/* Copy command just involves Datanodes */
extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index a57e4f1673..47b0b966eb 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -39,6 +39,13 @@ typedef enum
DN_CONNECTION_STATE_COPY_OUT
} DNConnectionState;
+typedef enum
+{
+ HANDLE_IDLE,
+ HANDLE_ERROR,
+ HANDLE_DEFAULT
+} PGXCNode_HandleRequested;
+
#define DN_CONNECTION_STATE_ERROR(dnconn) \
((dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
|| (dnconn)->transaction_status == 'E')
@@ -97,7 +104,9 @@ extern void PGXCNodeCleanAndRelease(int code, Datum arg);
extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only);
extern void release_handles(bool force_drop);
-extern int get_transaction_nodes(PGXCNodeHandle ** connections, char client_conn_type);
+extern int get_transaction_nodes(PGXCNodeHandle ** connections,
+ char client_conn_type,
+ PGXCNode_HandleRequested type_requested);
extern PGXC_NodeId* collect_pgxcnode_numbers(int conn_count, PGXCNodeHandle ** connections, char client_conn_type);
extern int get_active_nodes(PGXCNodeHandle ** connections);