diff options
| author | Michael P | 2010-12-22 05:48:48 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2011-05-19 16:45:26 +0000 |
| commit | 5ea02d34bd8584fadd738437bbed1155162a362f (patch) | |
| tree | 118294147f13c7abbf1add0a59c4584667675a5a | |
| parent | f0450714991af7151dc38bc566824355a2914432 (diff) | |
Correction for implicit 2PC
When a COMMIT is issued for a write transaction involving multiple
Postgres-XC nodes, a 2PC is used internally.
For implicit 2PC, the following process is respected for DDL transactions:
1) PREPARE on local Coordinator (if DDL is involved)
2) PREPARE on Postgres-XC nodes
3) COMMIT PREPARED on local Coordinator (if DDL is involved)
4) COMMIT PREPARED on Postgres-XC nodes
For transaction containing no DDL:
1) PREPARE on Datanodes
2) COMMIT on Coordinator
3) COMMIT PREPARED on Datanodes
In case of a Node failure after Coordinator has committed,
transaction becomes partially committed on Nodes.
To maintain data consistency, it is absolutely necessary to COMMIT this transaction on all nodes.
In this case, the remaining list PREPARED nodes is saved on GTM as if it was an explicit 2PC.
And this transaction is kept open to avoid visibility issues.
It is necessary to issue a COMMIT PREPARED from application to finish the COMMIT
of this transaction.
| -rw-r--r-- | src/backend/access/transam/xact.c | 213 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 341 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 46 | ||||
| -rw-r--r-- | src/backend/storage/ipc/procarray.c | 8 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 14 | ||||
| -rw-r--r-- | src/include/pgxc/pgxcnode.h | 11 |
6 files changed, 536 insertions, 97 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5faa4eaf5a..a069d44183 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -26,7 +26,7 @@ #include "access/gtm.h" /* PGXC_COORD */ #include "gtm/gtm_c.h" -#include "pgxc/pgxcnode.h" +#include "pgxc/execRemote.h" /* PGXC_DATANODE */ #include "postmaster/autovacuum.h" #endif @@ -139,6 +139,9 @@ typedef struct TransactionStateData TransactionId transactionId; /* my XID, or Invalid if none */ #ifdef PGXC /* PGXC_COORD */ GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */ + GlobalTransactionId globalCommitTransactionId; /* Commit GXID used by implicit 2PC */ + bool ArePGXCNodesPrepared; /* Checks if PGXC Nodes are prepared and + * rollbacks then in case of an Abort */ #endif SubTransactionId subTransactionId; /* my subxact ID */ char *name; /* savepoint name, if any */ @@ -169,6 +172,8 @@ static TransactionStateData TopTransactionStateData = { 0, /* transaction id */ #ifdef PGXC 0, /* global transaction id */ + 0, /* global commit transaction id */ + 0, /* flag if nodes are prepared or not */ #endif 0, /* subtransaction id */ NULL, /* savepoint name */ @@ -307,6 +312,7 @@ static const char *TransStateAsString(TransState state); #ifdef PGXC /* PGXC_COORD */ static GlobalTransactionId GetGlobalTransactionId(TransactionState s); +static void PrepareTransaction(bool write_2pc_file, bool is_implicit); /* ---------------------------------------------------------------- * PG-XC Functions @@ -1631,10 +1637,15 @@ StartTransaction(void) * start processing */ s->state = TRANS_START; -#ifdef PGXC /* PGXC_COORD */ +#ifdef PGXC /* GXID is assigned already by a remote Coordinator */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { s->globalTransactionId = InvalidGlobalTransactionId; /* until assigned */ + /* Until assigned by implicit 2PC */ + s->globalCommitTransactionId = InvalidGlobalTransactionId; + s->ArePGXCNodesPrepared = false; + } #endif s->transactionId = InvalidTransactionId; /* until assigned */ /* @@ -1737,7 +1748,31 @@ CommitTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; +#ifdef PGXC + bool PrepareLocalCoord = false; + bool PreparePGXCNodes = false; + char implicitgid[256]; + TransactionId xid = GetCurrentTransactionId(); + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + PreparePGXCNodes = PGXCNodeIsImplicit2PC(&PrepareLocalCoord); + + if (PrepareLocalCoord || PreparePGXCNodes) + sprintf(implicitgid, "T%d", xid); + + /* Save GID where PrepareTransaction can find it again */ + if (PrepareLocalCoord) + { + prepareGID = MemoryContextStrdup(TopTransactionContext, implicitgid); + /* + * If current transaction has a DDL, and involves more than 1 Coordinator, + * PREPARE first on local Coordinator. + */ + PrepareTransaction(true, true); + } + else + { +#endif ShowTransactionState("CommitTransaction"); /* @@ -1747,6 +1782,28 @@ CommitTransaction(void) elog(WARNING, "CommitTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); +#ifdef PGXC + } + + /* + * If Transaction has involved several nodes, prepare them before committing on Coordinator. + */ + if (PreparePGXCNodes) + { + /* + * Prepare all the nodes involved in this Implicit 2PC + * If Coordinator COMMIT fails, nodes are also rollbacked during AbortTransaction(). + * + * Track if PGXC Nodes are already prepared + */ + if (PGXCNodeImplicitPrepare(xid, implicitgid) < 0) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot COMMIT a transaction whose PREPARE has failed on Nodes"))); + else + s->ArePGXCNodesPrepared = true; + } +#endif /* * Do pre-commit processing (most of this stuff requires database access, @@ -1756,6 +1813,10 @@ CommitTransaction(void) * deferred triggers, and it's also possible that triggers create holdable * cursors. So we have to loop until there's nothing left to do. */ +#ifdef PGXC + if (!PrepareLocalCoord) + { +#endif for (;;) { /* @@ -1800,8 +1861,11 @@ CommitTransaction(void) /* * There can be error on the data nodes. So go to data nodes before * changing transaction state and local clean up + * Here simply commit on nodes, we know that 2PC is not involved implicitely. + * + * This is called only if it is not necessary to prepare the nodes. */ - if (IS_PGXC_COORDINATOR) + if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !PreparePGXCNodes) PGXCNodeCommit(); #endif @@ -1825,8 +1889,10 @@ CommitTransaction(void) /* * Now we can let GTM know about transaction commit. * Only a Remote Coordinator is allowed to do that. + * + * Also do not commit a transaction that has already been prepared on Datanodes */ - if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !PreparePGXCNodes) { CommitTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; @@ -1908,6 +1974,46 @@ CommitTransaction(void) AtEOXact_MultiXact(); +#ifdef PGXC + }/* End of !PrepareLocalCoord */ + + /* + * At this point, if no 2pc has been used, we have a transaction that committed on GTM, + * local coord and nodes, so the remaining stuff is only ressource cleanup. + * If 2pc has been used, Coordinator has been prepared (if 2 Coordinators at least are involved + * in current transaction). + * Datanodes have also been prepared if more than 1 Datanode has been written. + * + * Here we complete Implicit 2PC in the following order + * - Commit the prepared transaction on local coordinator (if necessary) + * - Commit on the remaining nodes + */ + + if (PreparePGXCNodes) + { + /* + * Preparing for Commit, transaction has to take a new TransactionID for Commit + * It is considered as in Progress state. + */ + s->state = TRANS_INPROGRESS; + s->globalCommitTransactionId = BeginTranGTM(NULL); + + /* COMMIT local Coordinator */ + if (PrepareLocalCoord) + { + FinishPreparedTransaction(implicitgid, true); + } + + /* + * Commit all the nodes involved in this implicit 2PC. + * COMMIT on GTM is made here and is made at the same time + * for prepared GXID and commit GXID to limit interactions between GTM and Coord. + * This explains why prepared GXID is also in argument. + */ + PGXCNodeImplicitCommitPrepared(xid, s->globalCommitTransactionId, implicitgid, true); + } +#endif + ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_LOCKS, true, true); @@ -1948,7 +2054,11 @@ CommitTransaction(void) #ifdef PGXC if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { s->globalTransactionId = InvalidGlobalTransactionId; + s->globalCommitTransactionId = InvalidGlobalTransactionId; + s->ArePGXCNodesPrepared = false; + } else if (IS_PGXC_DATANODE || IsConnFromCoord()) SetNextTransactionId(InvalidTransactionId); #endif @@ -1972,9 +2082,11 @@ CommitTransaction(void) /* * Only a Postgres-XC Coordinator that received a PREPARE Command from * an application can use this special prepare. + * If PrepareTransaction is called during an implicit 2PC, do not release ressources, + * this is made by CommitTransaction when transaction has been committed on Nodes. */ static void -PrepareTransaction(bool write_2pc_file) +PrepareTransaction(bool write_2pc_file, bool is_implicit) #else static void PrepareTransaction(void) @@ -2169,6 +2281,14 @@ PrepareTransaction(void) } #endif +#ifdef PGXC + /* + * In case of an implicit 2PC, ressources are released by CommitTransaction() + */ + if (!is_implicit) + { +#endif + ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_LOCKS, true, true); @@ -2218,6 +2338,9 @@ PrepareTransaction(void) s->state = TRANS_DEFAULT; RESUME_INTERRUPTS(); +#ifdef PGXC + } /* is_implicit END */ +#endif } @@ -2285,8 +2408,13 @@ AbortTransaction(void) /* * We should rollback on the data nodes before cleaning up portals * to be sure data structures used by connections are not freed yet + * + * It is also necessary to check that node are not partially committed + * in an implicit 2PC, correct handling is made below. */ - if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + if (IS_PGXC_COORDINATOR && + !IsConnFromCoord() && + !TransactionIdIsValid(s->globalCommitTransactionId)) { /* * Make sure this is rolled back on the DataNodes @@ -2309,6 +2437,10 @@ AbortTransaction(void) * Advertise the fact that we aborted in pg_clog (assuming that we got as * far as assigning an XID to advertise). */ +#ifdef PGXC + /* Do not abort a transaction that has already been committed in an implicit 2PC */ + if (!TransactionIdIsValid(s->globalCommitTransactionId)) +#endif latestXid = RecordTransactionAbort(false); TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); @@ -2316,8 +2448,56 @@ AbortTransaction(void) /* This is done by remote Coordinator */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { - RollbackTranGTM(s->globalTransactionId); + /* + * Rollback the transaction ID only if it is not being used by an implicit 2PC. + */ + if (!s->ArePGXCNodesPrepared) + RollbackTranGTM(s->globalTransactionId); + latestXid = s->globalTransactionId; + + /* Rollback Prepared Nodes if they are totally prepared but not committed at all */ + if (s->ArePGXCNodesPrepared && !TransactionIdIsValid(s->globalCommitTransactionId)) + { + char implicitgid[256]; + + sprintf(implicitgid, "T%d", s->globalTransactionId); + PGXCNodeImplicitCommitPrepared(s->globalTransactionId, + s->globalCommitTransactionId, + implicitgid, false); + } + else if (s->ArePGXCNodesPrepared && TransactionIdIsValid(s->globalCommitTransactionId)) + { + /* + * In this case transaction is partially committed, pick up the list of nodes + * prepared and not committed and register them on GTM as if it is an explicit 2PC. + * This permits to keep the transaction alive in snapshot and other transaction + * don't have any side effects with partially committed transactions + */ + char implicitgid[256]; + int co_conn_count, dn_conn_count; + PGXC_NodeId *datanodes = NULL; + PGXC_NodeId *coordinators = NULL; + + sprintf(implicitgid, "T%d", s->globalTransactionId); + + /* Get the list of nodes in error state */ + PGXCNodeGetNodeList(&datanodes, &dn_conn_count, &coordinators, &co_conn_count); + + /* Save the node list and gid on GTM. */ + StartPreparedTranGTM(s->globalTransactionId, implicitgid, + dn_conn_count, datanodes, co_conn_count, coordinators); + + /* Finish to prepare the transaction. */ + PrepareTranGTM(s->globalTransactionId); + + /* + * Rollback commit GXID as it has been used by an implicit 2PC. + * It is important at this point not to Commit the GXID used for PREPARE + * to keep it visible in snapshot for other transactions. + */ + RollbackTranGTM(s->globalCommitTransactionId); + } } else if (IS_PGXC_DATANODE || IsConnFromCoord()) { @@ -2598,7 +2778,7 @@ CommitTransactionCommand(void) * return to the idle state. */ case TBLOCK_PREPARE: - PrepareTransaction(true); + PrepareTransaction(true, false); s->blockState = TBLOCK_DEFAULT; break; @@ -2608,7 +2788,7 @@ CommitTransactionCommand(void) * that involved DDLs on a Coordinator. */ case TBLOCK_PREPARE_NO_2PC_FILE: - PrepareTransaction(false); + PrepareTransaction(false, false); s->blockState = TBLOCK_DEFAULT; break; #endif @@ -2643,17 +2823,20 @@ CommitTransactionCommand(void) CommitTransaction(); s->blockState = TBLOCK_DEFAULT; } -#ifdef PGXC - else if (s->blockState == TBLOCK_PREPARE || - s->blockState == TBLOCK_PREPARE_NO_2PC_FILE) -#else else if (s->blockState == TBLOCK_PREPARE) -#endif { Assert(s->parent == NULL); - PrepareTransaction(true); + PrepareTransaction(true, false); s->blockState = TBLOCK_DEFAULT; } +#ifdef PGXC + else if (s->blockState == TBLOCK_PREPARE_NO_2PC_FILE) + { + Assert(s->parent == NULL); + PrepareTransaction(false, false); + s->blockState = TBLOCK_DEFAULT; + } +#endif else { Assert(s->blockState == TBLOCK_INPROGRESS || diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index a524c13f6d..b954003db5 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -49,6 +49,7 @@ extern char *deparseSql(RemoteQueryState *scanstate); #define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 static bool autocommit = true; +static bool implicit_force_autocommit = false; static PGXCNodeHandle **write_node_list = NULL; static int write_node_count = 0; @@ -61,6 +62,14 @@ static int pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransacti PGXCNodeAllHandles * pgxc_handles, char *gid); static int pgxc_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, PGXCNodeAllHandles * pgxc_handles, char *gid); +static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + PGXCNodeAllHandles * pgxc_handles, + char *gid, + bool is_commit); +static int pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid, + PGXCNodeAllHandles * pgxc_handles, char *gid); + static PGXCNodeAllHandles * get_exec_connections(ExecNodes *exec_nodes, RemoteQueryExecType exec_type); static int pgxc_node_receive_and_validate(const int conn_count, @@ -74,7 +83,7 @@ static int handle_response_clear(PGXCNodeHandle * conn); static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor); -static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void); +static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested); #define MAX_STATEMENTS_PER_TRAN 10 @@ -1505,7 +1514,7 @@ PGXCNodePrepare(char *gid) PGXCNodeAllHandles *pgxc_connections; bool local_operation = false; - pgxc_connections = pgxc_get_all_transaction_nodes(); + pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); /* DDL involved in transaction, so make a local prepare too */ if (pgxc_connections->co_conn_count != 0) @@ -1669,6 +1678,176 @@ finish: return result; } +/* + * Prepare all the nodes involved in this implicit Prepare + * Abort transaction if this is not done correctly + */ +int +PGXCNodeImplicitPrepare(GlobalTransactionId prepare_xid, char *gid) +{ + int res = 0; + int tran_count; + PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); + + if (!pgxc_connections) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not prepare connection implicitely"))); + + tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; + + /* + * This should not happen because an implicit 2PC is always using other nodes, + * but it is better to check. + */ + if (tran_count == 0) + { + goto finish; + } + + res = pgxc_node_implicit_prepare(prepare_xid, pgxc_connections, gid); + +finish: + if (!autocommit) + stat_transaction(pgxc_connections->dn_conn_count); + + return res; +} + +/* + * Prepare transaction on dedicated nodes for Implicit 2PC + * This is done inside a Transaction commit if multiple nodes are involved in write operations + * Implicit prepare in done internally on Coordinator, so this does not interact with GTM. + */ +static int +pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid, + PGXCNodeAllHandles *pgxc_handles, + char *gid) +{ + int result = 0; + int co_conn_count = pgxc_handles->co_conn_count; + int dn_conn_count = pgxc_handles->dn_conn_count; + char buffer[256]; + + sprintf(buffer, "PREPARE TRANSACTION '%s'", gid); + + /* Continue even after an error here, to consume the messages */ + result = pgxc_all_handles_send_query(pgxc_handles, buffer, true); + + /* Receive and Combine results from Datanodes and Coordinators */ + result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false); + result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false); + + return result; +} + +/* + * Commit all the nodes involved in this Implicit Commit. + * Prepared XID is committed at the same time as Commit XID on GTM. + */ +void +PGXCNodeImplicitCommitPrepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + char *gid, + bool is_commit) +{ + int res = 0; + int tran_count; + PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_IDLE); + + if (!pgxc_connections) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not commit prepared transaction implicitely"))); + + tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; + + /* + * This should not happen because an implicit 2PC is always using other nodes, + * but it is better to check. + */ + if (tran_count == 0) + { + elog(WARNING, "Nothing to PREPARE on Datanodes and Coordinators"); + goto finish; + } + + res = pgxc_node_implicit_commit_prepared(prepare_xid, commit_xid, + pgxc_connections, gid, is_commit); + +finish: + /* Clear nodes, signals are clear */ + if (!autocommit) + stat_transaction(pgxc_connections->dn_conn_count); + + /* + * If an error happened, do not release handles yet. This is done when transaction + * is aborted after the list of nodes in error state has been saved to be sent to GTM + */ + if (!PersistentConnections && res == 0) + release_handles(false); + autocommit = true; + clear_write_node_list(); + + if (res != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not commit prepared transaction implicitely"))); + + /* + * Commit on GTM is made once we are sure that Nodes are not only partially committed + * If an error happens on a Datanode during implicit COMMIT PREPARED, a special handling + * is made in AbortTransaction(). + * The list of datanodes is saved on GTM and the partially committed transaction can be committed + * with a COMMIT PREPARED delivered directly from application. + * This permits to keep the gxid alive in snapshot and avoids other transactions to see only + * partially committed results. + */ + CommitPreparedTranGTM(prepare_xid, commit_xid); +} + +/* + * Commit a transaction implicitely transaction on all nodes + * Prepared transaction with this gid has reset the datanodes, + * so we need a new gxid. + * + * GXID used for Prepare and Commit are committed at the same time on GTM. + * This saves Network ressource a bit. + */ +static int +pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + PGXCNodeAllHandles *pgxc_handles, + char *gid, + bool is_commit) +{ + char buffer[256]; + int result = 0; + int co_conn_count = pgxc_handles->co_conn_count; + int dn_conn_count = pgxc_handles->dn_conn_count; + + if (is_commit) + sprintf(buffer, "COMMIT PREPARED '%s'", gid); + else + sprintf(buffer, "ROLLBACK PREPARED '%s'", gid); + + if (pgxc_all_handles_send_gxid(pgxc_handles, commit_xid, true)) + { + result = EOF; + goto finish; + } + + /* Send COMMIT to all handles */ + if (pgxc_all_handles_send_query(pgxc_handles, buffer, false)) + result = EOF; + + /* Receive and Combine results from Datanodes and Coordinators */ + result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false); + result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false); + +finish: + return result; +} /* * Commit prepared transaction on Datanodes and Coordinators (as necessary) @@ -1684,7 +1863,7 @@ PGXCNodeCommitPrepared(char *gid) { int res = 0; int res_gtm = 0; - PGXCNodeAllHandles *pgxc_handles; + PGXCNodeAllHandles *pgxc_handles = NULL; List *datanodelist = NIL; List *coordlist = NIL; int i, tran_count; @@ -1812,7 +1991,7 @@ PGXCNodeRollbackPrepared(char *gid) { int res = 0; int res_gtm = 0; - PGXCNodeAllHandles *pgxc_handles; + PGXCNodeAllHandles *pgxc_handles = NULL; List *datanodelist = NIL; List *coordlist = NIL; int i, tran_count; @@ -1922,6 +2101,8 @@ pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepar /* * Commit current transaction on data nodes where it has been started + * This function is called when no 2PC is involved implicitely. + * So only send a commit to the involved nodes. */ void PGXCNodeCommit(void) @@ -1930,7 +2111,7 @@ PGXCNodeCommit(void) int tran_count; PGXCNodeAllHandles *pgxc_connections; - pgxc_connections = pgxc_get_all_transaction_nodes(); + pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; @@ -1952,7 +2133,7 @@ finish: autocommit = true; clear_write_node_list(); - /* Clear up connection */ + /* Clean up connections */ pfree_pgxc_all_handles(pgxc_connections); if (res != 0) ereport(ERROR, @@ -1969,71 +2150,11 @@ static int pgxc_node_commit(PGXCNodeAllHandles *pgxc_handles) { char buffer[256]; - GlobalTransactionId gxid = InvalidGlobalTransactionId; int result = 0; int co_conn_count = pgxc_handles->co_conn_count; int dn_conn_count = pgxc_handles->dn_conn_count; - /* can set this to false to disable temporarily */ - /* bool do2PC = conn_count > 1; */ - - /* - * Only use 2PC if more than one node was written to. Otherwise, just send - * COMMIT to all - */ - bool do2PC = write_node_count > 1; - - /* Extra XID for Two Phase Commit */ - GlobalTransactionId two_phase_xid = 0; - - if (do2PC) - { - stat_2pc(); - - /* - * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here, - * but since we need 2pc, we surely have sent down a command and got - * gxid for it. Hence GetCurrentGlobalTransactionId() just returns - * already allocated gxid - */ - gxid = GetCurrentGlobalTransactionId(); - - sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid); - - if (pgxc_all_handles_send_query(pgxc_handles, buffer, false)) - result = EOF; - - /* Receive and Combine results from Datanodes and Coordinators */ - result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, true); - result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, true); - } - - if (!do2PC) - strcpy(buffer, "COMMIT"); - else - { - if (result) - { - sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid); - /* Consume any messages on the Datanodes and Coordinators first if necessary */ - PGXCNodeConsumeMessages(); - } - else - sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); - - /* - * We need to use a new xid, the data nodes have reset - * Timestamp has already been set with BEGIN on remote Datanodes, - * so don't use it here. - */ - two_phase_xid = BeginTranGTM(NULL); - - if (pgxc_all_handles_send_gxid(pgxc_handles, two_phase_xid, true)) - { - result = EOF; - goto finish; - } - } + strcpy(buffer, "COMMIT"); /* Send COMMIT to all handles */ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false)) @@ -2043,10 +2164,6 @@ pgxc_node_commit(PGXCNodeAllHandles *pgxc_handles) result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false); result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false); -finish: - if (do2PC) - CommitTranGTM((GlobalTransactionId) two_phase_xid); - return result; } @@ -2062,7 +2179,7 @@ PGXCNodeRollback(void) int tran_count; PGXCNodeAllHandles *pgxc_connections; - pgxc_connections = pgxc_get_all_transaction_nodes(); + pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; @@ -2099,7 +2216,6 @@ finish: static int pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles) { - int i; int result = 0; int co_conn_count = pgxc_handles->co_conn_count; int dn_conn_count = pgxc_handles->dn_conn_count; @@ -2881,6 +2997,8 @@ ExecRemoteQuery(RemoteQueryState *node) PGXCNodeAllHandles *pgxc_connections; TupleTableSlot *innerSlot = NULL; + implicit_force_autocommit = force_autocommit; + /* * Inner plan for RemoteQuery supplies parameters. * We execute inner plan to get a tuple and use values of the tuple as @@ -3622,6 +3740,8 @@ ExecRemoteUtility(RemoteQuery *node) bool need_tran; int i; + implicit_force_autocommit = force_autocommit; + remotestate = CreateResponseCombiner(0, node->combine_type); pgxc_connections = get_exec_connections(node->exec_nodes, @@ -3984,7 +4104,7 @@ finish: * for both data nodes and coordinators */ static PGXCNodeAllHandles * -pgxc_get_all_transaction_nodes() +pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested) { PGXCNodeAllHandles *pgxc_connections; @@ -4009,9 +4129,13 @@ pgxc_get_all_transaction_nodes() /* gather needed connections */ pgxc_connections->dn_conn_count = get_transaction_nodes( - pgxc_connections->datanode_handles, REMOTE_CONN_DATANODE); + pgxc_connections->datanode_handles, + REMOTE_CONN_DATANODE, + status_requested); pgxc_connections->co_conn_count = get_transaction_nodes( - pgxc_connections->coord_handles, REMOTE_CONN_COORD); + pgxc_connections->coord_handles, + REMOTE_CONN_COORD, + status_requested); return pgxc_connections; } @@ -4032,3 +4156,68 @@ pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles) pfree(pgxc_handles); } + +/* + * Check if an Implicit 2PC is necessary for this transaction. + * Check also if it is necessary to prepare transaction locally. + */ +bool +PGXCNodeIsImplicit2PC(bool *prepare_local_coord) +{ + PGXCNodeAllHandles *pgxc_handles = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); + int co_conn_count = pgxc_handles->co_conn_count; + + /* Prepare Local Coord only if DDL is involved on multiple nodes */ + *prepare_local_coord = co_conn_count > 0; + + /* + * In case of an autocommit or forced autocommit transaction, 2PC is not involved + * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...) + */ + if (implicit_force_autocommit) + { + implicit_force_autocommit = false; + return false; + } + + /* + * 2PC is necessary at other Nodes if one Datanode or one Coordinator + * other than the local one has been involved in a write operation. + */ + return (write_node_count > 1 || co_conn_count > 0); +} + +/* + * Return the list of active nodes + */ +void +PGXCNodeGetNodeList(PGXC_NodeId **datanodes, + int *dn_conn_count, + PGXC_NodeId **coordinators, + int *co_conn_count) +{ + PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_ERROR); + + *dn_conn_count = pgxc_connections->dn_conn_count; + + /* Add in the list local coordinator also if necessary */ + if (pgxc_connections->co_conn_count == 0) + *co_conn_count = pgxc_connections->co_conn_count; + else + *co_conn_count = pgxc_connections->co_conn_count + 1; + + if (pgxc_connections->dn_conn_count != 0) + *datanodes = collect_pgxcnode_numbers(pgxc_connections->dn_conn_count, + pgxc_connections->datanode_handles, REMOTE_CONN_DATANODE); + + if (pgxc_connections->co_conn_count != 0) + *coordinators = collect_pgxcnode_numbers(pgxc_connections->co_conn_count, + pgxc_connections->coord_handles, REMOTE_CONN_COORD); + + /* + * Now release handles properly, the list of handles in error state has been saved + * and will be sent to GTM. + */ + if (!PersistentConnections) + release_handles(false); +} diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index cbaf68c610..4790f9573a 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -1579,9 +1579,19 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query) * to a PGXCNodeHandle structure. * The function returns number of pointers written to the connections array. * Remaining items in the array, if any, will be kept unchanged + * + * In an implicit 2PC, status of connections is set back to idle after preparing + * the transaction on each backend. + * At commit phase, it is necessary to get backends in idle state to be able to + * commit properly the backends. + * + * In the case of an error occuring with an implicit 2PC that has been partially + * committed on nodes, return the list of connections that has an error state + * to register the list of remaining nodes not commit prepared on GTM. */ int -get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type) +get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type, + PGXCNode_HandleRequested status_requested) { int tran_count = 0; int i; @@ -1596,16 +1606,42 @@ get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type) * DN_CONNECTION_STATE_ERROR_FATAL. * ERROR_NOT_READY can happen if the data node abruptly disconnects. */ - if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status != 'I') - connections[tran_count++] = &dn_handles[i]; + if (status_requested == HANDLE_IDLE) + { + if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status == 'I') + connections[tran_count++] = &dn_handles[i]; + } + else if (status_requested == HANDLE_ERROR) + { + if (dn_handles[i].transaction_status == 'E') + connections[tran_count++] = &dn_handles[i]; + } + else + { + if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status != 'I') + connections[tran_count++] = &dn_handles[i]; + } } } if (coord_count && client_conn_type == REMOTE_CONN_COORD) { for (i = 0; i < NumCoords; i++) { - if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status != 'I') - connections[tran_count++] = &co_handles[i]; + if (status_requested == HANDLE_IDLE) + { + if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status == 'I') + connections[tran_count++] = &co_handles[i]; + } + else if (status_requested == HANDLE_ERROR) + { + if (co_handles[i].transaction_status == 'E') + connections[tran_count++] = &co_handles[i]; + } + else + { + if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status != 'I') + connections[tran_count++] = &co_handles[i]; + } } } diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index cfabce3dbc..93376e97c5 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -280,6 +280,14 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) * taking a snapshot. See discussion in * src/backend/access/transam/README. */ +#ifdef PGXC + /* + * Remove this assertion check for PGXC on Coordinator + * We could abort even after a Coordinator has committed + * for a 2PC transaction if Datanodes have failed committed the transaction + */ + if (IS_PGXC_DATANODE) +#endif Assert(TransactionIdIsValid(proc->xid)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index 4a33842bdf..a3c1868422 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -43,6 +43,7 @@ typedef enum REQUEST_TYPE_COPY_OUT /* Copy Out response */ } RequestType; + /* * Represents a DataRow message received from a remote node. * Contains originating node number and message body in DataRow format without @@ -111,6 +112,19 @@ extern int PGXCNodeRollback(void); extern bool PGXCNodePrepare(char *gid); extern bool PGXCNodeRollbackPrepared(char *gid); extern bool PGXCNodeCommitPrepared(char *gid); +extern bool PGXCNodeIsImplicit2PC(bool *prepare_local_coord); +extern int PGXCNodeImplicitPrepare(GlobalTransactionId prepare_xid, char *gid); +extern void PGXCNodeImplicitCommitPrepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + char *gid, + bool is_commit); +extern void PGXCNodeConsumeMessages(void); + +/* Get list of nodes */ +extern void PGXCNodeGetNodeList(PGXC_NodeId **datanodes, + int *dn_conn_count, + PGXC_NodeId **coordinators, + int *co_conn_count); /* Copy command just involves Datanodes */ extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index a57e4f1673..47b0b966eb 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -39,6 +39,13 @@ typedef enum DN_CONNECTION_STATE_COPY_OUT } DNConnectionState; +typedef enum +{ + HANDLE_IDLE, + HANDLE_ERROR, + HANDLE_DEFAULT +} PGXCNode_HandleRequested; + #define DN_CONNECTION_STATE_ERROR(dnconn) \ ((dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \ || (dnconn)->transaction_status == 'E') @@ -97,7 +104,9 @@ extern void PGXCNodeCleanAndRelease(int code, Datum arg); extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only); extern void release_handles(bool force_drop); -extern int get_transaction_nodes(PGXCNodeHandle ** connections, char client_conn_type); +extern int get_transaction_nodes(PGXCNodeHandle ** connections, + char client_conn_type, + PGXCNode_HandleRequested type_requested); extern PGXC_NodeId* collect_pgxcnode_numbers(int conn_count, PGXCNodeHandle ** connections, char client_conn_type); extern int get_active_nodes(PGXCNodeHandle ** connections); |
