diff options
| -rw-r--r-- | src/backend/access/transam/xact.c | 213 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 341 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 46 | ||||
| -rw-r--r-- | src/backend/storage/ipc/procarray.c | 8 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 14 | ||||
| -rw-r--r-- | src/include/pgxc/pgxcnode.h | 11 |
6 files changed, 536 insertions, 97 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5faa4eaf5a..a069d44183 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -26,7 +26,7 @@ #include "access/gtm.h" /* PGXC_COORD */ #include "gtm/gtm_c.h" -#include "pgxc/pgxcnode.h" +#include "pgxc/execRemote.h" /* PGXC_DATANODE */ #include "postmaster/autovacuum.h" #endif @@ -139,6 +139,9 @@ typedef struct TransactionStateData TransactionId transactionId; /* my XID, or Invalid if none */ #ifdef PGXC /* PGXC_COORD */ GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */ + GlobalTransactionId globalCommitTransactionId; /* Commit GXID used by implicit 2PC */ + bool ArePGXCNodesPrepared; /* Checks if PGXC Nodes are prepared and + * rollbacks then in case of an Abort */ #endif SubTransactionId subTransactionId; /* my subxact ID */ char *name; /* savepoint name, if any */ @@ -169,6 +172,8 @@ static TransactionStateData TopTransactionStateData = { 0, /* transaction id */ #ifdef PGXC 0, /* global transaction id */ + 0, /* global commit transaction id */ + 0, /* flag if nodes are prepared or not */ #endif 0, /* subtransaction id */ NULL, /* savepoint name */ @@ -307,6 +312,7 @@ static const char *TransStateAsString(TransState state); #ifdef PGXC /* PGXC_COORD */ static GlobalTransactionId GetGlobalTransactionId(TransactionState s); +static void PrepareTransaction(bool write_2pc_file, bool is_implicit); /* ---------------------------------------------------------------- * PG-XC Functions @@ -1631,10 +1637,15 @@ StartTransaction(void) * start processing */ s->state = TRANS_START; -#ifdef PGXC /* PGXC_COORD */ +#ifdef PGXC /* GXID is assigned already by a remote Coordinator */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { s->globalTransactionId = InvalidGlobalTransactionId; /* until assigned */ + /* Until assigned by implicit 2PC */ + s->globalCommitTransactionId = InvalidGlobalTransactionId; + s->ArePGXCNodesPrepared = false; + } #endif s->transactionId = InvalidTransactionId; /* until assigned */ /* @@ -1737,7 +1748,31 @@ CommitTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; +#ifdef PGXC + bool PrepareLocalCoord = false; + bool PreparePGXCNodes = false; + char implicitgid[256]; + TransactionId xid = GetCurrentTransactionId(); + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + PreparePGXCNodes = PGXCNodeIsImplicit2PC(&PrepareLocalCoord); + + if (PrepareLocalCoord || PreparePGXCNodes) + sprintf(implicitgid, "T%d", xid); + + /* Save GID where PrepareTransaction can find it again */ + if (PrepareLocalCoord) + { + prepareGID = MemoryContextStrdup(TopTransactionContext, implicitgid); + /* + * If current transaction has a DDL, and involves more than 1 Coordinator, + * PREPARE first on local Coordinator. + */ + PrepareTransaction(true, true); + } + else + { +#endif ShowTransactionState("CommitTransaction"); /* @@ -1747,6 +1782,28 @@ CommitTransaction(void) elog(WARNING, "CommitTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); +#ifdef PGXC + } + + /* + * If Transaction has involved several nodes, prepare them before committing on Coordinator. + */ + if (PreparePGXCNodes) + { + /* + * Prepare all the nodes involved in this Implicit 2PC + * If Coordinator COMMIT fails, nodes are also rollbacked during AbortTransaction(). + * + * Track if PGXC Nodes are already prepared + */ + if (PGXCNodeImplicitPrepare(xid, implicitgid) < 0) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot COMMIT a transaction whose PREPARE has failed on Nodes"))); + else + s->ArePGXCNodesPrepared = true; + } +#endif /* * Do pre-commit processing (most of this stuff requires database access, @@ -1756,6 +1813,10 @@ CommitTransaction(void) * deferred triggers, and it's also possible that triggers create holdable * cursors. So we have to loop until there's nothing left to do. */ +#ifdef PGXC + if (!PrepareLocalCoord) + { +#endif for (;;) { /* @@ -1800,8 +1861,11 @@ CommitTransaction(void) /* * There can be error on the data nodes. So go to data nodes before * changing transaction state and local clean up + * Here simply commit on nodes, we know that 2PC is not involved implicitely. + * + * This is called only if it is not necessary to prepare the nodes. */ - if (IS_PGXC_COORDINATOR) + if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !PreparePGXCNodes) PGXCNodeCommit(); #endif @@ -1825,8 +1889,10 @@ CommitTransaction(void) /* * Now we can let GTM know about transaction commit. * Only a Remote Coordinator is allowed to do that. + * + * Also do not commit a transaction that has already been prepared on Datanodes */ - if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && !PreparePGXCNodes) { CommitTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; @@ -1908,6 +1974,46 @@ CommitTransaction(void) AtEOXact_MultiXact(); +#ifdef PGXC + }/* End of !PrepareLocalCoord */ + + /* + * At this point, if no 2pc has been used, we have a transaction that committed on GTM, + * local coord and nodes, so the remaining stuff is only ressource cleanup. + * If 2pc has been used, Coordinator has been prepared (if 2 Coordinators at least are involved + * in current transaction). + * Datanodes have also been prepared if more than 1 Datanode has been written. + * + * Here we complete Implicit 2PC in the following order + * - Commit the prepared transaction on local coordinator (if necessary) + * - Commit on the remaining nodes + */ + + if (PreparePGXCNodes) + { + /* + * Preparing for Commit, transaction has to take a new TransactionID for Commit + * It is considered as in Progress state. + */ + s->state = TRANS_INPROGRESS; + s->globalCommitTransactionId = BeginTranGTM(NULL); + + /* COMMIT local Coordinator */ + if (PrepareLocalCoord) + { + FinishPreparedTransaction(implicitgid, true); + } + + /* + * Commit all the nodes involved in this implicit 2PC. + * COMMIT on GTM is made here and is made at the same time + * for prepared GXID and commit GXID to limit interactions between GTM and Coord. + * This explains why prepared GXID is also in argument. + */ + PGXCNodeImplicitCommitPrepared(xid, s->globalCommitTransactionId, implicitgid, true); + } +#endif + ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_LOCKS, true, true); @@ -1948,7 +2054,11 @@ CommitTransaction(void) #ifdef PGXC if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { s->globalTransactionId = InvalidGlobalTransactionId; + s->globalCommitTransactionId = InvalidGlobalTransactionId; + s->ArePGXCNodesPrepared = false; + } else if (IS_PGXC_DATANODE || IsConnFromCoord()) SetNextTransactionId(InvalidTransactionId); #endif @@ -1972,9 +2082,11 @@ CommitTransaction(void) /* * Only a Postgres-XC Coordinator that received a PREPARE Command from * an application can use this special prepare. + * If PrepareTransaction is called during an implicit 2PC, do not release ressources, + * this is made by CommitTransaction when transaction has been committed on Nodes. */ static void -PrepareTransaction(bool write_2pc_file) +PrepareTransaction(bool write_2pc_file, bool is_implicit) #else static void PrepareTransaction(void) @@ -2169,6 +2281,14 @@ PrepareTransaction(void) } #endif +#ifdef PGXC + /* + * In case of an implicit 2PC, ressources are released by CommitTransaction() + */ + if (!is_implicit) + { +#endif + ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_LOCKS, true, true); @@ -2218,6 +2338,9 @@ PrepareTransaction(void) s->state = TRANS_DEFAULT; RESUME_INTERRUPTS(); +#ifdef PGXC + } /* is_implicit END */ +#endif } @@ -2285,8 +2408,13 @@ AbortTransaction(void) /* * We should rollback on the data nodes before cleaning up portals * to be sure data structures used by connections are not freed yet + * + * It is also necessary to check that node are not partially committed + * in an implicit 2PC, correct handling is made below. */ - if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + if (IS_PGXC_COORDINATOR && + !IsConnFromCoord() && + !TransactionIdIsValid(s->globalCommitTransactionId)) { /* * Make sure this is rolled back on the DataNodes @@ -2309,6 +2437,10 @@ AbortTransaction(void) * Advertise the fact that we aborted in pg_clog (assuming that we got as * far as assigning an XID to advertise). */ +#ifdef PGXC + /* Do not abort a transaction that has already been committed in an implicit 2PC */ + if (!TransactionIdIsValid(s->globalCommitTransactionId)) +#endif latestXid = RecordTransactionAbort(false); TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); @@ -2316,8 +2448,56 @@ AbortTransaction(void) /* This is done by remote Coordinator */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) { - RollbackTranGTM(s->globalTransactionId); + /* + * Rollback the transaction ID only if it is not being used by an implicit 2PC. + */ + if (!s->ArePGXCNodesPrepared) + RollbackTranGTM(s->globalTransactionId); + latestXid = s->globalTransactionId; + + /* Rollback Prepared Nodes if they are totally prepared but not committed at all */ + if (s->ArePGXCNodesPrepared && !TransactionIdIsValid(s->globalCommitTransactionId)) + { + char implicitgid[256]; + + sprintf(implicitgid, "T%d", s->globalTransactionId); + PGXCNodeImplicitCommitPrepared(s->globalTransactionId, + s->globalCommitTransactionId, + implicitgid, false); + } + else if (s->ArePGXCNodesPrepared && TransactionIdIsValid(s->globalCommitTransactionId)) + { + /* + * In this case transaction is partially committed, pick up the list of nodes + * prepared and not committed and register them on GTM as if it is an explicit 2PC. + * This permits to keep the transaction alive in snapshot and other transaction + * don't have any side effects with partially committed transactions + */ + char implicitgid[256]; + int co_conn_count, dn_conn_count; + PGXC_NodeId *datanodes = NULL; + PGXC_NodeId *coordinators = NULL; + + sprintf(implicitgid, "T%d", s->globalTransactionId); + + /* Get the list of nodes in error state */ + PGXCNodeGetNodeList(&datanodes, &dn_conn_count, &coordinators, &co_conn_count); + + /* Save the node list and gid on GTM. */ + StartPreparedTranGTM(s->globalTransactionId, implicitgid, + dn_conn_count, datanodes, co_conn_count, coordinators); + + /* Finish to prepare the transaction. */ + PrepareTranGTM(s->globalTransactionId); + + /* + * Rollback commit GXID as it has been used by an implicit 2PC. + * It is important at this point not to Commit the GXID used for PREPARE + * to keep it visible in snapshot for other transactions. + */ + RollbackTranGTM(s->globalCommitTransactionId); + } } else if (IS_PGXC_DATANODE || IsConnFromCoord()) { @@ -2598,7 +2778,7 @@ CommitTransactionCommand(void) * return to the idle state. */ case TBLOCK_PREPARE: - PrepareTransaction(true); + PrepareTransaction(true, false); s->blockState = TBLOCK_DEFAULT; break; @@ -2608,7 +2788,7 @@ CommitTransactionCommand(void) * that involved DDLs on a Coordinator. */ case TBLOCK_PREPARE_NO_2PC_FILE: - PrepareTransaction(false); + PrepareTransaction(false, false); s->blockState = TBLOCK_DEFAULT; break; #endif @@ -2643,17 +2823,20 @@ CommitTransactionCommand(void) CommitTransaction(); s->blockState = TBLOCK_DEFAULT; } -#ifdef PGXC - else if (s->blockState == TBLOCK_PREPARE || - s->blockState == TBLOCK_PREPARE_NO_2PC_FILE) -#else else if (s->blockState == TBLOCK_PREPARE) -#endif { Assert(s->parent == NULL); - PrepareTransaction(true); + PrepareTransaction(true, false); s->blockState = TBLOCK_DEFAULT; } +#ifdef PGXC + else if (s->blockState == TBLOCK_PREPARE_NO_2PC_FILE) + { + Assert(s->parent == NULL); + PrepareTransaction(false, false); + s->blockState = TBLOCK_DEFAULT; + } +#endif else { Assert(s->blockState == TBLOCK_INPROGRESS || diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index a524c13f6d..b954003db5 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -49,6 +49,7 @@ extern char *deparseSql(RemoteQueryState *scanstate); #define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 static bool autocommit = true; +static bool implicit_force_autocommit = false; static PGXCNodeHandle **write_node_list = NULL; static int write_node_count = 0; @@ -61,6 +62,14 @@ static int pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransacti PGXCNodeAllHandles * pgxc_handles, char *gid); static int pgxc_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, PGXCNodeAllHandles * pgxc_handles, char *gid); +static int pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + PGXCNodeAllHandles * pgxc_handles, + char *gid, + bool is_commit); +static int pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid, + PGXCNodeAllHandles * pgxc_handles, char *gid); + static PGXCNodeAllHandles * get_exec_connections(ExecNodes *exec_nodes, RemoteQueryExecType exec_type); static int pgxc_node_receive_and_validate(const int conn_count, @@ -74,7 +83,7 @@ static int handle_response_clear(PGXCNodeHandle * conn); static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor); -static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void); +static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested); #define MAX_STATEMENTS_PER_TRAN 10 @@ -1505,7 +1514,7 @@ PGXCNodePrepare(char *gid) PGXCNodeAllHandles *pgxc_connections; bool local_operation = false; - pgxc_connections = pgxc_get_all_transaction_nodes(); + pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); /* DDL involved in transaction, so make a local prepare too */ if (pgxc_connections->co_conn_count != 0) @@ -1669,6 +1678,176 @@ finish: return result; } +/* + * Prepare all the nodes involved in this implicit Prepare + * Abort transaction if this is not done correctly + */ +int +PGXCNodeImplicitPrepare(GlobalTransactionId prepare_xid, char *gid) +{ + int res = 0; + int tran_count; + PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); + + if (!pgxc_connections) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not prepare connection implicitely"))); + + tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; + + /* + * This should not happen because an implicit 2PC is always using other nodes, + * but it is better to check. + */ + if (tran_count == 0) + { + goto finish; + } + + res = pgxc_node_implicit_prepare(prepare_xid, pgxc_connections, gid); + +finish: + if (!autocommit) + stat_transaction(pgxc_connections->dn_conn_count); + + return res; +} + +/* + * Prepare transaction on dedicated nodes for Implicit 2PC + * This is done inside a Transaction commit if multiple nodes are involved in write operations + * Implicit prepare in done internally on Coordinator, so this does not interact with GTM. + */ +static int +pgxc_node_implicit_prepare(GlobalTransactionId prepare_xid, + PGXCNodeAllHandles *pgxc_handles, + char *gid) +{ + int result = 0; + int co_conn_count = pgxc_handles->co_conn_count; + int dn_conn_count = pgxc_handles->dn_conn_count; + char buffer[256]; + + sprintf(buffer, "PREPARE TRANSACTION '%s'", gid); + + /* Continue even after an error here, to consume the messages */ + result = pgxc_all_handles_send_query(pgxc_handles, buffer, true); + + /* Receive and Combine results from Datanodes and Coordinators */ + result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false); + result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false); + + return result; +} + +/* + * Commit all the nodes involved in this Implicit Commit. + * Prepared XID is committed at the same time as Commit XID on GTM. + */ +void +PGXCNodeImplicitCommitPrepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + char *gid, + bool is_commit) +{ + int res = 0; + int tran_count; + PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_IDLE); + + if (!pgxc_connections) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not commit prepared transaction implicitely"))); + + tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; + + /* + * This should not happen because an implicit 2PC is always using other nodes, + * but it is better to check. + */ + if (tran_count == 0) + { + elog(WARNING, "Nothing to PREPARE on Datanodes and Coordinators"); + goto finish; + } + + res = pgxc_node_implicit_commit_prepared(prepare_xid, commit_xid, + pgxc_connections, gid, is_commit); + +finish: + /* Clear nodes, signals are clear */ + if (!autocommit) + stat_transaction(pgxc_connections->dn_conn_count); + + /* + * If an error happened, do not release handles yet. This is done when transaction + * is aborted after the list of nodes in error state has been saved to be sent to GTM + */ + if (!PersistentConnections && res == 0) + release_handles(false); + autocommit = true; + clear_write_node_list(); + + if (res != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not commit prepared transaction implicitely"))); + + /* + * Commit on GTM is made once we are sure that Nodes are not only partially committed + * If an error happens on a Datanode during implicit COMMIT PREPARED, a special handling + * is made in AbortTransaction(). + * The list of datanodes is saved on GTM and the partially committed transaction can be committed + * with a COMMIT PREPARED delivered directly from application. + * This permits to keep the gxid alive in snapshot and avoids other transactions to see only + * partially committed results. + */ + CommitPreparedTranGTM(prepare_xid, commit_xid); +} + +/* + * Commit a transaction implicitely transaction on all nodes + * Prepared transaction with this gid has reset the datanodes, + * so we need a new gxid. + * + * GXID used for Prepare and Commit are committed at the same time on GTM. + * This saves Network ressource a bit. + */ +static int +pgxc_node_implicit_commit_prepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + PGXCNodeAllHandles *pgxc_handles, + char *gid, + bool is_commit) +{ + char buffer[256]; + int result = 0; + int co_conn_count = pgxc_handles->co_conn_count; + int dn_conn_count = pgxc_handles->dn_conn_count; + + if (is_commit) + sprintf(buffer, "COMMIT PREPARED '%s'", gid); + else + sprintf(buffer, "ROLLBACK PREPARED '%s'", gid); + + if (pgxc_all_handles_send_gxid(pgxc_handles, commit_xid, true)) + { + result = EOF; + goto finish; + } + + /* Send COMMIT to all handles */ + if (pgxc_all_handles_send_query(pgxc_handles, buffer, false)) + result = EOF; + + /* Receive and Combine results from Datanodes and Coordinators */ + result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false); + result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false); + +finish: + return result; +} /* * Commit prepared transaction on Datanodes and Coordinators (as necessary) @@ -1684,7 +1863,7 @@ PGXCNodeCommitPrepared(char *gid) { int res = 0; int res_gtm = 0; - PGXCNodeAllHandles *pgxc_handles; + PGXCNodeAllHandles *pgxc_handles = NULL; List *datanodelist = NIL; List *coordlist = NIL; int i, tran_count; @@ -1812,7 +1991,7 @@ PGXCNodeRollbackPrepared(char *gid) { int res = 0; int res_gtm = 0; - PGXCNodeAllHandles *pgxc_handles; + PGXCNodeAllHandles *pgxc_handles = NULL; List *datanodelist = NIL; List *coordlist = NIL; int i, tran_count; @@ -1922,6 +2101,8 @@ pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepar /* * Commit current transaction on data nodes where it has been started + * This function is called when no 2PC is involved implicitely. + * So only send a commit to the involved nodes. */ void PGXCNodeCommit(void) @@ -1930,7 +2111,7 @@ PGXCNodeCommit(void) int tran_count; PGXCNodeAllHandles *pgxc_connections; - pgxc_connections = pgxc_get_all_transaction_nodes(); + pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; @@ -1952,7 +2133,7 @@ finish: autocommit = true; clear_write_node_list(); - /* Clear up connection */ + /* Clean up connections */ pfree_pgxc_all_handles(pgxc_connections); if (res != 0) ereport(ERROR, @@ -1969,71 +2150,11 @@ static int pgxc_node_commit(PGXCNodeAllHandles *pgxc_handles) { char buffer[256]; - GlobalTransactionId gxid = InvalidGlobalTransactionId; int result = 0; int co_conn_count = pgxc_handles->co_conn_count; int dn_conn_count = pgxc_handles->dn_conn_count; - /* can set this to false to disable temporarily */ - /* bool do2PC = conn_count > 1; */ - - /* - * Only use 2PC if more than one node was written to. Otherwise, just send - * COMMIT to all - */ - bool do2PC = write_node_count > 1; - - /* Extra XID for Two Phase Commit */ - GlobalTransactionId two_phase_xid = 0; - - if (do2PC) - { - stat_2pc(); - - /* - * Formally we should be using GetCurrentGlobalTransactionIdIfAny() here, - * but since we need 2pc, we surely have sent down a command and got - * gxid for it. Hence GetCurrentGlobalTransactionId() just returns - * already allocated gxid - */ - gxid = GetCurrentGlobalTransactionId(); - - sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid); - - if (pgxc_all_handles_send_query(pgxc_handles, buffer, false)) - result = EOF; - - /* Receive and Combine results from Datanodes and Coordinators */ - result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, true); - result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, true); - } - - if (!do2PC) - strcpy(buffer, "COMMIT"); - else - { - if (result) - { - sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid); - /* Consume any messages on the Datanodes and Coordinators first if necessary */ - PGXCNodeConsumeMessages(); - } - else - sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); - - /* - * We need to use a new xid, the data nodes have reset - * Timestamp has already been set with BEGIN on remote Datanodes, - * so don't use it here. - */ - two_phase_xid = BeginTranGTM(NULL); - - if (pgxc_all_handles_send_gxid(pgxc_handles, two_phase_xid, true)) - { - result = EOF; - goto finish; - } - } + strcpy(buffer, "COMMIT"); /* Send COMMIT to all handles */ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false)) @@ -2043,10 +2164,6 @@ pgxc_node_commit(PGXCNodeAllHandles *pgxc_handles) result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false); result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false); -finish: - if (do2PC) - CommitTranGTM((GlobalTransactionId) two_phase_xid); - return result; } @@ -2062,7 +2179,7 @@ PGXCNodeRollback(void) int tran_count; PGXCNodeAllHandles *pgxc_connections; - pgxc_connections = pgxc_get_all_transaction_nodes(); + pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count; @@ -2099,7 +2216,6 @@ finish: static int pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles) { - int i; int result = 0; int co_conn_count = pgxc_handles->co_conn_count; int dn_conn_count = pgxc_handles->dn_conn_count; @@ -2881,6 +2997,8 @@ ExecRemoteQuery(RemoteQueryState *node) PGXCNodeAllHandles *pgxc_connections; TupleTableSlot *innerSlot = NULL; + implicit_force_autocommit = force_autocommit; + /* * Inner plan for RemoteQuery supplies parameters. * We execute inner plan to get a tuple and use values of the tuple as @@ -3622,6 +3740,8 @@ ExecRemoteUtility(RemoteQuery *node) bool need_tran; int i; + implicit_force_autocommit = force_autocommit; + remotestate = CreateResponseCombiner(0, node->combine_type); pgxc_connections = get_exec_connections(node->exec_nodes, @@ -3984,7 +4104,7 @@ finish: * for both data nodes and coordinators */ static PGXCNodeAllHandles * -pgxc_get_all_transaction_nodes() +pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested) { PGXCNodeAllHandles *pgxc_connections; @@ -4009,9 +4129,13 @@ pgxc_get_all_transaction_nodes() /* gather needed connections */ pgxc_connections->dn_conn_count = get_transaction_nodes( - pgxc_connections->datanode_handles, REMOTE_CONN_DATANODE); + pgxc_connections->datanode_handles, + REMOTE_CONN_DATANODE, + status_requested); pgxc_connections->co_conn_count = get_transaction_nodes( - pgxc_connections->coord_handles, REMOTE_CONN_COORD); + pgxc_connections->coord_handles, + REMOTE_CONN_COORD, + status_requested); return pgxc_connections; } @@ -4032,3 +4156,68 @@ pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles) pfree(pgxc_handles); } + +/* + * Check if an Implicit 2PC is necessary for this transaction. + * Check also if it is necessary to prepare transaction locally. + */ +bool +PGXCNodeIsImplicit2PC(bool *prepare_local_coord) +{ + PGXCNodeAllHandles *pgxc_handles = pgxc_get_all_transaction_nodes(HANDLE_DEFAULT); + int co_conn_count = pgxc_handles->co_conn_count; + + /* Prepare Local Coord only if DDL is involved on multiple nodes */ + *prepare_local_coord = co_conn_count > 0; + + /* + * In case of an autocommit or forced autocommit transaction, 2PC is not involved + * This case happens for Utilities using force autocommit (CREATE DATABASE, VACUUM...) + */ + if (implicit_force_autocommit) + { + implicit_force_autocommit = false; + return false; + } + + /* + * 2PC is necessary at other Nodes if one Datanode or one Coordinator + * other than the local one has been involved in a write operation. + */ + return (write_node_count > 1 || co_conn_count > 0); +} + +/* + * Return the list of active nodes + */ +void +PGXCNodeGetNodeList(PGXC_NodeId **datanodes, + int *dn_conn_count, + PGXC_NodeId **coordinators, + int *co_conn_count) +{ + PGXCNodeAllHandles *pgxc_connections = pgxc_get_all_transaction_nodes(HANDLE_ERROR); + + *dn_conn_count = pgxc_connections->dn_conn_count; + + /* Add in the list local coordinator also if necessary */ + if (pgxc_connections->co_conn_count == 0) + *co_conn_count = pgxc_connections->co_conn_count; + else + *co_conn_count = pgxc_connections->co_conn_count + 1; + + if (pgxc_connections->dn_conn_count != 0) + *datanodes = collect_pgxcnode_numbers(pgxc_connections->dn_conn_count, + pgxc_connections->datanode_handles, REMOTE_CONN_DATANODE); + + if (pgxc_connections->co_conn_count != 0) + *coordinators = collect_pgxcnode_numbers(pgxc_connections->co_conn_count, + pgxc_connections->coord_handles, REMOTE_CONN_COORD); + + /* + * Now release handles properly, the list of handles in error state has been saved + * and will be sent to GTM. + */ + if (!PersistentConnections) + release_handles(false); +} diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index cbaf68c610..4790f9573a 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -1579,9 +1579,19 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query) * to a PGXCNodeHandle structure. * The function returns number of pointers written to the connections array. * Remaining items in the array, if any, will be kept unchanged + * + * In an implicit 2PC, status of connections is set back to idle after preparing + * the transaction on each backend. + * At commit phase, it is necessary to get backends in idle state to be able to + * commit properly the backends. + * + * In the case of an error occuring with an implicit 2PC that has been partially + * committed on nodes, return the list of connections that has an error state + * to register the list of remaining nodes not commit prepared on GTM. */ int -get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type) +get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type, + PGXCNode_HandleRequested status_requested) { int tran_count = 0; int i; @@ -1596,16 +1606,42 @@ get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type) * DN_CONNECTION_STATE_ERROR_FATAL. * ERROR_NOT_READY can happen if the data node abruptly disconnects. */ - if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status != 'I') - connections[tran_count++] = &dn_handles[i]; + if (status_requested == HANDLE_IDLE) + { + if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status == 'I') + connections[tran_count++] = &dn_handles[i]; + } + else if (status_requested == HANDLE_ERROR) + { + if (dn_handles[i].transaction_status == 'E') + connections[tran_count++] = &dn_handles[i]; + } + else + { + if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status != 'I') + connections[tran_count++] = &dn_handles[i]; + } } } if (coord_count && client_conn_type == REMOTE_CONN_COORD) { for (i = 0; i < NumCoords; i++) { - if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status != 'I') - connections[tran_count++] = &co_handles[i]; + if (status_requested == HANDLE_IDLE) + { + if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status == 'I') + connections[tran_count++] = &co_handles[i]; + } + else if (status_requested == HANDLE_ERROR) + { + if (co_handles[i].transaction_status == 'E') + connections[tran_count++] = &co_handles[i]; + } + else + { + if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status != 'I') + connections[tran_count++] = &co_handles[i]; + } } } diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index cfabce3dbc..93376e97c5 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -280,6 +280,14 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) * taking a snapshot. See discussion in * src/backend/access/transam/README. */ +#ifdef PGXC + /* + * Remove this assertion check for PGXC on Coordinator + * We could abort even after a Coordinator has committed + * for a 2PC transaction if Datanodes have failed committed the transaction + */ + if (IS_PGXC_DATANODE) +#endif Assert(TransactionIdIsValid(proc->xid)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index 4a33842bdf..a3c1868422 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -43,6 +43,7 @@ typedef enum REQUEST_TYPE_COPY_OUT /* Copy Out response */ } RequestType; + /* * Represents a DataRow message received from a remote node. * Contains originating node number and message body in DataRow format without @@ -111,6 +112,19 @@ extern int PGXCNodeRollback(void); extern bool PGXCNodePrepare(char *gid); extern bool PGXCNodeRollbackPrepared(char *gid); extern bool PGXCNodeCommitPrepared(char *gid); +extern bool PGXCNodeIsImplicit2PC(bool *prepare_local_coord); +extern int PGXCNodeImplicitPrepare(GlobalTransactionId prepare_xid, char *gid); +extern void PGXCNodeImplicitCommitPrepared(GlobalTransactionId prepare_xid, + GlobalTransactionId commit_xid, + char *gid, + bool is_commit); +extern void PGXCNodeConsumeMessages(void); + +/* Get list of nodes */ +extern void PGXCNodeGetNodeList(PGXC_NodeId **datanodes, + int *dn_conn_count, + PGXC_NodeId **coordinators, + int *co_conn_count); /* Copy command just involves Datanodes */ extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index a57e4f1673..47b0b966eb 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -39,6 +39,13 @@ typedef enum DN_CONNECTION_STATE_COPY_OUT } DNConnectionState; +typedef enum +{ + HANDLE_IDLE, + HANDLE_ERROR, + HANDLE_DEFAULT +} PGXCNode_HandleRequested; + #define DN_CONNECTION_STATE_ERROR(dnconn) \ ((dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \ || (dnconn)->transaction_status == 'E') @@ -97,7 +104,9 @@ extern void PGXCNodeCleanAndRelease(int code, Datum arg); extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only); extern void release_handles(bool force_drop); -extern int get_transaction_nodes(PGXCNodeHandle ** connections, char client_conn_type); +extern int get_transaction_nodes(PGXCNodeHandle ** connections, + char client_conn_type, + PGXCNode_HandleRequested type_requested); extern PGXC_NodeId* collect_pgxcnode_numbers(int conn_count, PGXCNodeHandle ** connections, char client_conn_type); extern int get_active_nodes(PGXCNodeHandle ** connections); |
