diff options
| -rw-r--r-- | src/backend/access/transam/gtm.c | 14 | ||||
| -rw-r--r-- | src/backend/access/transam/varsup.c | 49 | ||||
| -rw-r--r-- | src/backend/access/transam/xact.c | 36 | ||||
| -rw-r--r-- | src/backend/executor/execMain.c | 6 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 51 | ||||
| -rw-r--r-- | src/backend/utils/init/globals.c | 1 | ||||
| -rw-r--r-- | src/backend/utils/misc/guc.c | 13 | ||||
| -rw-r--r-- | src/gtm/client/fe-protocol.c | 5 | ||||
| -rw-r--r-- | src/gtm/client/gtm_client.c | 43 | ||||
| -rw-r--r-- | src/gtm/main/gtm_txn.c | 198 | ||||
| -rw-r--r-- | src/gtm/proxy/proxy_main.c | 14 | ||||
| -rw-r--r-- | src/include/access/gtm.h | 2 | ||||
| -rw-r--r-- | src/include/access/xact.h | 1 | ||||
| -rw-r--r-- | src/include/gtm/gtm_client.h | 13 | ||||
| -rw-r--r-- | src/include/gtm/gtm_proxy.h | 1 | ||||
| -rw-r--r-- | src/include/gtm/gtm_txn.h | 17 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 1 | ||||
| -rw-r--r-- | src/include/storage/backendid.h | 1 |
18 files changed, 354 insertions, 112 deletions
diff --git a/src/backend/access/transam/gtm.c b/src/backend/access/transam/gtm.c index 29e4dc3dbb..ad4097c355 100644 --- a/src/backend/access/transam/gtm.c +++ b/src/backend/access/transam/gtm.c @@ -143,7 +143,7 @@ CloseGTM(void) } GlobalTransactionId -BeginTranGTM(GTM_Timestamp *timestamp) +BeginTranGTM(GTM_Timestamp *timestamp, const char *globalSession) { GlobalTransactionId xid = InvalidGlobalTransactionId; struct rusage start_r; @@ -155,7 +155,7 @@ BeginTranGTM(GTM_Timestamp *timestamp) CheckConnection(); // TODO Isolation level if (conn) - xid = begin_transaction(conn, GTM_ISOLATION_RC, timestamp); + xid = begin_transaction(conn, GTM_ISOLATION_RC, globalSession, timestamp); /* If something went wrong (timeout), try and reset GTM connection * and retry. This is safe at the beginning of a transaction. @@ -165,12 +165,14 @@ BeginTranGTM(GTM_Timestamp *timestamp) CloseGTM(); InitGTM(); if (conn) - xid = begin_transaction(conn, GTM_ISOLATION_RC, timestamp); + xid = begin_transaction(conn, GTM_ISOLATION_RC, globalSession, timestamp); } if (xid) IsXidFromGTM = true; currentGxid = xid; + elog(DEBUG2, "BeginTranGTM - session:%s, xid: %d", globalSession, xid); + if (log_gtm_stats) ShowUsageCommon("BeginTranGTM", &start_r, &start_t); return xid; @@ -198,6 +200,8 @@ BeginTranAutovacuumGTM(void) xid = begin_transaction_autovacuum(conn, GTM_ISOLATION_RC); } currentGxid = xid; + + elog(DEBUG3, "BeginTranGTM - %d", xid); return xid; } @@ -215,6 +219,8 @@ CommitTranGTM(GlobalTransactionId gxid, int waited_xid_count, if (log_gtm_stats) ResetUsageCommon(&start_r, &start_t); + elog(DEBUG3, "CommitTranGTM: %d", gxid); + CheckConnection(); ret = -1; if (conn) @@ -263,6 +269,8 @@ CommitPreparedTranGTM(GlobalTransactionId gxid, if (log_gtm_stats) ResetUsageCommon(&start_r, &start_t); + elog(DEBUG3, "CommitPreparedTranGTM: %d:%d", gxid, prepared_gxid); + CheckConnection(); ret = -1; if (conn) diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 40432be15f..0aa88faf87 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -34,7 +34,10 @@ #ifdef PGXC #include "pgxc/pgxc.h" #include "access/gtm.h" +#include "libpq/libpq.h" #include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" #endif @@ -184,7 +187,23 @@ GetNewTransactionId(bool isSubXact) if (MyPgXact->vacuumFlags & PROC_IN_VACUUM) next_xid = xid = (TransactionId) BeginTranAutovacuumGTM(); else - next_xid = xid = (TransactionId) BeginTranGTM(timestamp); + { + char global_session[NAMEDATALEN + 13 + 13]; + + /* + * Generate unique global session identifier using coordinator + * name, backend pid and virtual XID. + * + * For global transactions i.e. those which may involve more than + * one node, this code will be executed only on the coordinator and + * hence its correct to use PGXCNodeName and fields from MyProc to + * generate the global session identifier + */ + sprintf(global_session, "%s_%d_%d", PGXCNodeName, MyProc->pid, + MyProc->lxid); + next_xid = xid = (TransactionId) BeginTranGTM(timestamp, + global_session); + } *timestamp_received = true; } #endif /* PGXC */ @@ -206,6 +225,34 @@ GetNewTransactionId(bool isSubXact) if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->nextXid)) ShmemVariableCache->nextXid = xid; } + else if ((IsConnFromCoord() || IsConnFromDatanode()) && + MyCoordId != InvalidOid && MyCoordPid != 0 && + MyCoordLxid != InvalidLocalTransactionId) + { + char global_session[NAMEDATALEN + 13 + 13]; + + /* + * If we are running on a remote coordinator or a datanode, + * start a new transaction and associate it with a global session + * identifier which is guaranteed to be unique across the cluster + */ + sprintf(global_session, "%s_%d_%d", get_pgxc_nodename(MyCoordId), MyCoordPid, + MyCoordLxid); + xid = (TransactionId) BeginTranGTM(timestamp, global_session); + if (TransactionIdIsValid(xid)) + { + /* + * Let the coordinator know about GXID assigned to this + * transaction + */ + if (whereToSendOutput == DestRemote && + !IS_PGXC_LOCAL_COORDINATOR) + { + pq_putmessage('x', (const char *) &xid, sizeof (GlobalTransactionId)); + IsXidFromGTM = false; + } + } + } else ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 636466560d..7d42c6a406 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,6 +31,7 @@ #include "access/gtm.h" /* PGXC_COORD */ #include "gtm/gtm_c.h" +#include "gtm/gtm_gxid.h" #include "pgxc/execRemote.h" #include "pgxc/pause.h" /* PGXC_DATANODE */ @@ -782,7 +783,7 @@ GetAuxilliaryTransactionId() { TransactionState s = CurrentTransactionState; if (!GlobalTransactionIdIsValid(s->auxilliaryTransactionId)) - s->auxilliaryTransactionId = BeginTranGTM(NULL); + s->auxilliaryTransactionId = BeginTranGTM(NULL, NULL); return s->auxilliaryTransactionId; } @@ -2253,6 +2254,17 @@ CommitTransaction(void) break; } + /* + * Insert notifications sent by NOTIFY commands into the queue. This + * should be late in the pre-commit sequence to minimize time spent + * holding the notify-insertion lock. + * + * XXX XL: Since PreCommit_Notify may assign transaction ID for a + * transaction which till now doesn't have one, we want to process this + * before doing any XL specific transaction handling + */ + PreCommit_Notify(); + #ifdef PGXC /* * If we are a Coordinator and currently serving the client, @@ -2370,13 +2382,6 @@ CommitTransaction(void) */ PreCommit_CheckForSerializationFailure(); - /* - * Insert notifications sent by NOTIFY commands into the queue. This - * should be late in the pre-commit sequence to minimize time spent - * holding the notify-insertion lock. - */ - PreCommit_Notify(); - #ifdef PGXC if (IS_PGXC_DATANODE || !IsConnFromCoord()) { @@ -6716,5 +6721,20 @@ AtEOXact_WaitedXids(void) } } + +/* + * Remember the XID assigned to the top transaction. Even if multiple datanodes + * report XIDs, they should always report the same XID given that they are tied + * by an unique global session identifier + */ +void +SetTopTransactionId(GlobalTransactionId xid) +{ + TransactionState s = CurrentTransactionState; + Assert(!GlobalTransactionIdIsValid(s->transactionId) || + GlobalTransactionIdEquals(s->transactionId, xid)); + s->transactionId = xid; + elog(DEBUG2, "Assigning XID received from the remote node - %d", xid); +} #endif #endif diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 6cd78c6188..119d07e785 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -181,6 +181,12 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); +#ifdef XCP + if (queryDesc->plannedstmt->commandType != CMD_SELECT || + queryDesc->plannedstmt->hasModifyingCTE) + GetTopTransactionId(); +#endif + /* * Build EState, switch into per-query memory context for startup. */ diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index a3014f8578..d3ef43122d 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -47,6 +47,7 @@ #include "pgxc/nodemgr.h" #include "pgxc/poolmgr.h" #include "storage/ipc.h" +#include "storage/proc.h" #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -854,6 +855,17 @@ HandleWaitXids(char *msg_body, size_t len) } } +static void +HandleGlobalTransactionId(char *msg_body, size_t len) +{ + GlobalTransactionId xid; + + Assert(len == sizeof (GlobalTransactionId)); + memcpy(&xid, &msg_body[0], sizeof (GlobalTransactionId)); + + SetTopTransactionId(xid); +} + /* * Examine the specified combiner state and determine if command was completed * successfully @@ -1494,6 +1506,10 @@ FetchTuple(ResponseCombiner *combiner) /* Now slot is responsible for freeng the descriptor */ combiner->tuple_desc = NULL; } + else if (res == RESPONSE_ASSIGN_GXID) + { + /* Do nothing. It must have been handled in handle_response() */ + } else { // Can not get here? @@ -1558,6 +1574,10 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections, case RESPONSE_WAITXIDS: break; + + case RESPONSE_ASSIGN_GXID: + break; + default: /* Inconsistent responses */ add_error_message(to_receive[i], "Unexpected response from the Datanodes"); @@ -1719,6 +1739,9 @@ handle_response(PGXCNodeHandle *conn, ResponseCombiner *combiner) case 'W': HandleWaitXids(msg, msg_len); return RESPONSE_WAITXIDS; + case 'x': + HandleGlobalTransactionId(msg, msg_len); + return RESPONSE_ASSIGN_GXID; default: /* sync lost? */ elog(WARNING, "Received unsupported message type: %c", msg_type); @@ -1797,6 +1820,7 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections, PGXCNodeHandle *new_connections[conn_count]; int new_count = 0; char *init_str; + char lxid[13]; /* * If no remote connections, we don't have anything to do @@ -1861,6 +1885,10 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections, if (!ValidateAndCloseCombiner(&combiner)) return EOF; + /* Send virtualXID to the remote nodes using SET command */ + sprintf(lxid, "%d", MyProc->lxid); + PGXCNodeSetParam(true, "coordinator_lxid", lxid); + /* after transactions are started send down local set commands */ init_str = PGXCNodeGetTransactionParamStr(); if (init_str) @@ -4360,16 +4388,7 @@ ExecRemoteQuery(RemoteQueryState *node) stat_statement(); stat_transaction(total_conn_count); - gxid = GetCurrentTransactionId(); - - if (!GlobalTransactionIdIsValid(gxid)) - { - pfree_pgxc_all_handles(pgxc_connections); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to get next transaction ID"))); - } - + gxid = GetCurrentTransactionIdIfAny(); /* See if we have a primary node, execute on it first before the others */ if (primaryconnection) { @@ -4402,6 +4421,8 @@ ExecRemoteQuery(RemoteQueryState *node) else if (res == RESPONSE_COMPLETE || res == RESPONSE_ERROR) /* Get ReadyForQuery */ continue; + else if (res == RESPONSE_ASSIGN_GXID) + continue; else ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), @@ -5334,15 +5355,7 @@ ExecFinishInitRemoteSubplan(RemoteSubplanState *node) combiner->current_conn = 0; } - gxid = GetCurrentTransactionId(); - if (!GlobalTransactionIdIsValid(gxid)) - { - combiner->conn_count = 0; - pfree(combiner->connections); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to get next transaction ID"))); - } + gxid = GetCurrentTransactionIdIfAny(); /* extract parameter data types */ if (node->nParamRemote > 0) diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index ee346463a6..d91257f152 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -77,6 +77,7 @@ char postgres_exec_path[MAXPGPATH]; /* full path to backend */ Oid MyCoordId = InvalidOid; int MyCoordPid = 0; +LocalTransactionId MyCoordLxid = 0; BackendId MyFirstBackendId = InvalidBackendId; #endif diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 027add0015..ae251089cc 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2951,6 +2951,18 @@ static struct config_int ConfigureNamesInt[] = 64, 1, MAX_KILOBYTES, NULL, NULL, NULL }, + + { + {"coordinator_lxid", PGC_USERSET, UNGROUPED, + gettext_noop("Sets the coordinator local transaction identifier."), + NULL, + GUC_IS_NAME | GUC_REPORT | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE | GUC_NOT_WHILE_SEC_REST + }, + &MyCoordLxid, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + #endif #endif /* PGXC */ @@ -3414,6 +3426,7 @@ static struct config_string ConfigureNamesString[] = check_global_session, assign_global_session, NULL }, + { {"pgxc_catalog_remap", PGC_SIGHUP, XC_HOUSEKEEPING_OPTIONS, gettext_noop("List of catalog tables/views that always need to be " diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c index abce635ed7..0259868447 100644 --- a/src/gtm/client/fe-protocol.c +++ b/src/gtm/client/fe-protocol.c @@ -441,8 +441,9 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) result->gr_status = GTM_RESULT_ERROR; break; } - if (gtmpqGetnchar((char *)&result->gr_resdata.grd_txn_get_multi.start_gxid, - sizeof (GlobalTransactionId), conn)) + if (gtmpqGetnchar((char *)result->gr_resdata.grd_txn_get_multi.txn_gxid, + sizeof (GlobalTransactionId) * result->gr_resdata.grd_txn_get_multi.txn_count, + conn)) { result->gr_status = GTM_RESULT_ERROR; break; diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c index 8910e68320..ea27093b3a 100644 --- a/src/gtm/client/gtm_client.c +++ b/src/gtm/client/gtm_client.c @@ -420,13 +420,22 @@ send_failed: int bkup_begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, - bool read_only, uint32 client_id, GTM_Timestamp timestamp) + bool read_only, + char *global_sessionid, + uint32 client_id, GTM_Timestamp timestamp) { + uint32 global_sessionid_len = global_sessionid ? + strlen(global_sessionid) + 1 : 1; + char *eos = "\0"; + /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || gtmpqPutInt(MSG_BKUP_TXN_BEGIN, sizeof (GTM_MessageType), conn) || gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) || gtmpqPutc(read_only, conn) || + gtmpqPutInt(global_sessionid_len, sizeof (uint32), conn) || + gtmpqPutnchar(global_sessionid ? global_sessionid : eos, + global_sessionid_len, conn) || gtmpqPutInt(client_id, sizeof (uint32), conn) || gtmpqPutnchar((char *)×tamp, sizeof(GTM_Timestamp), conn)) goto send_failed; @@ -449,14 +458,22 @@ send_failed: int bkup_begin_transaction_gxid(GTM_Conn *conn, GlobalTransactionId gxid, GTM_IsolationLevel isolevel, bool read_only, + char *global_sessionid, uint32 client_id, GTM_Timestamp timestamp) { + uint32 global_sessionid_len = global_sessionid ? + strlen(global_sessionid) + 1 : 1; + char *eos = "\0"; + /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID, sizeof (GTM_MessageType), conn) || gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) || gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) || gtmpqPutc(read_only, conn) || + gtmpqPutInt(global_sessionid_len, sizeof (uint32), conn) || + gtmpqPutnchar(global_sessionid ? global_sessionid : eos, + global_sessionid_len, conn) || gtmpqPutInt(client_id, sizeof (uint32), conn) || gtmpqPutnchar((char *)×tamp, sizeof(GTM_Timestamp), conn)) goto send_failed; @@ -476,17 +493,25 @@ send_failed: } GlobalTransactionId -begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp) +begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, + char *global_sessionid, + GTM_Timestamp *timestamp) { bool txn_read_only = false; GTM_Result *res = NULL; time_t finish_time; + uint32 global_sessionid_len = global_sessionid ? + strlen(global_sessionid) + 1 : 1; + char *eos = "\0"; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || gtmpqPutInt(MSG_TXN_BEGIN_GETGXID, sizeof (GTM_MessageType), conn) || gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) || - gtmpqPutc(txn_read_only, conn)) + gtmpqPutc(txn_read_only, conn) || + gtmpqPutInt(global_sessionid_len, sizeof (uint32), conn) || + gtmpqPutnchar(global_sessionid ? global_sessionid : eos, + global_sessionid_len, conn)) goto send_failed; /* Finish the message. */ @@ -612,7 +637,6 @@ commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid, int status; status = commit_transaction_multi(conn, 1, &gxid, &txn_count_out, &status_out); - Assert(txn_count_out == 1); return status; } else @@ -1937,7 +1961,9 @@ begin_transaction_multi(GTM_Conn *conn, int txn_count, GTM_IsolationLevel *txn_i if (res->gr_status == GTM_RESULT_OK) { memcpy(txn_count_out, &res->gr_resdata.grd_txn_get_multi.txn_count, sizeof(int)); - memcpy(gxid_out, &res->gr_resdata.grd_txn_get_multi.start_gxid, sizeof(GlobalTransactionId)); + memcpy(gxid_out, res->gr_resdata.grd_txn_get_multi.txn_gxid, + sizeof(GlobalTransactionId) * + res->gr_resdata.grd_txn_get_multi.txn_count); memcpy(ts_out, &res->gr_resdata.grd_txn_get_multi.timestamp, sizeof(GTM_Timestamp)); } @@ -1953,12 +1979,11 @@ send_failed: int bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count, - GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel, + GlobalTransactionId *gxid, GTM_IsolationLevel *isolevel, bool *read_only, uint32 *client_id, GTMProxy_ConnID *txn_connid) { int ii; - GlobalTransactionId gxid = start_gxid; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn)) /* FIXME: no proxy header */ @@ -1970,9 +1995,7 @@ bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count, for (ii = 0; ii < txn_count; ii++, gxid++) { - if (gxid == InvalidGlobalTransactionId) - gxid = FirstNormalGlobalTransactionId; - if (gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) || + if (gtmpqPutInt(gxid[ii], sizeof(GlobalTransactionId), conn) || gtmpqPutInt(isolevel[ii], sizeof(GTM_IsolationLevel), conn) || gtmpqPutc(read_only[ii], conn) || gtmpqPutInt(client_id[ii], sizeof (uint32), conn) || diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c index 0185e4d151..1fc18e4d48 100644 --- a/src/gtm/main/gtm_txn.c +++ b/src/gtm/main/gtm_txn.c @@ -44,8 +44,11 @@ static void init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo, GTM_IsolationLevel isolevel, uint32 client_id, GTMProxy_ConnID connid, + const char *global_sessionid, bool readonly); static void clean_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo); +static GTM_TransactionHandle GTM_GlobalSessionIDToHandle( + const char *global_sessionid); GlobalTransactionId ControlXid; /* last one written to control file */ GTM_Transactions GTMTransactions; @@ -184,6 +187,28 @@ GTM_GXIDToHandle(GlobalTransactionId gxid) return GTM_GXIDToHandle_Internal(gxid, true); } +static GTM_TransactionHandle +GTM_GlobalSessionIDToHandle(const char *global_sessionid) +{ + gtm_ListCell *elem = NULL; + GTM_TransactionInfo *gtm_txninfo = NULL; + + if (global_sessionid == NULL || global_sessionid[0] == '\0') + return InvalidTransactionHandle; + + gtm_foreach(elem, GTMTransactions.gt_open_transactions) + { + gtm_txninfo = (GTM_TransactionInfo *)gtm_lfirst(elem); + if (strcmp(gtm_txninfo->gti_global_session_id, global_sessionid) == 0) + break; + gtm_txninfo = NULL; + } + if (gtm_txninfo != NULL) + return gtm_txninfo->gti_handle; + + return InvalidTransactionHandle; +} + bool GTM_IsGXIDInProgress(GlobalTransactionId gxid) { @@ -500,11 +525,12 @@ GTM_SetDoVacuum(GTM_TransactionHandle handle) * The new XID is also stored into the transaction info structure of the given * transaction before returning. */ -GlobalTransactionId -GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count) +bool +GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count, + GlobalTransactionId gxid[], GTM_TransactionHandle new_handle[], + int *new_txn_count) { GlobalTransactionId xid = InvalidGlobalTransactionId; - GlobalTransactionId start_xid = InvalidGlobalTransactionId; GTM_TransactionInfo *gtm_txninfo = NULL; int ii; bool save_control = false; @@ -512,7 +538,7 @@ GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count) if (Recovery_IsStandby()) { ereport(ERROR, (EINVAL, errmsg("GTM is running in STANDBY mode -- can not issue new transaction ids"))); - return InvalidGlobalTransactionId; + return false; } GTM_RWLockAcquire(>MTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE); @@ -521,20 +547,10 @@ GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count) { GTM_RWLockRelease(>MTransactions.gt_XidGenLock); ereport(ERROR, (EINVAL, errmsg("GTM shutting down -- can not issue new transaction ids"))); - return InvalidGlobalTransactionId; + return false; } - - /* - * If we are allocating the first XID of a new page of the commit log, - * zero out that commit-log page before returning. We must do this while - * holding XidGenLock, else another xact could acquire and commit a later - * XID before we zero the page. Fortunately, a page of the commit log - * holds 32K or more transactions, so we don't have to do this very often. - * - ExtendCLOG(xid); - */ - + *new_txn_count = 0; /* * Now advance the nextXid counter. This must not happen until after we * have successfully completed ExtendCLOG() --- if that routine fails, we @@ -543,10 +559,18 @@ GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count) */ for (ii = 0; ii < txn_count; ii++) { - xid = GTMTransactions.gt_nextXid; + gtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]); + Assert(gtm_txninfo); - if (!GlobalTransactionIdIsValid(start_xid)) - start_xid = xid; + if (GlobalTransactionIdIsValid(gtm_txninfo->gti_gxid)) + { + gxid[ii] = gtm_txninfo->gti_gxid; + elog(DEBUG2, "GTM_TransactionInfo has XID already assgined - %s:%d", + gtm_txninfo->gti_global_session_id, gxid[ii]); + continue; + } + + xid = GTMTransactions.gt_nextXid; /*---------- * Check to see if it's safe to assign another XID. This protects against @@ -579,11 +603,12 @@ GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count) } GlobalTransactionIdAdvance(GTMTransactions.gt_nextXid); - gtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]); - Assert(gtm_txninfo); - elog(INFO, "Assigning new transaction ID = %d", xid); - gtm_txninfo->gti_gxid = xid; + elog(DEBUG2, "Assigning new transaction ID = %s:%d", + gtm_txninfo->gti_global_session_id, xid); + gxid[ii] = gtm_txninfo->gti_gxid = xid; + new_handle[*new_txn_count] = gtm_txninfo->gti_handle; + *new_txn_count = *new_txn_count + 1; } /* Periodically write the xid and sequence info out to the control file. @@ -604,7 +629,7 @@ GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count) if (save_control) SaveControlInfo(); - return start_xid; + return true; } /* @@ -616,7 +641,13 @@ GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count) GlobalTransactionId GTM_GetGlobalTransactionId(GTM_TransactionHandle handle) { - return GTM_GetGlobalTransactionIdMulti(&handle, 1); + GlobalTransactionId gxid; + GTM_TransactionHandle new_handle; + int new_count; + + GTM_GetGlobalTransactionIdMulti(&handle, 1, &gxid, &new_handle, + &new_count); + return gxid; } /* @@ -662,6 +693,7 @@ SetNextGlobalTransactionId(GlobalTransactionId gxid) int GTM_BeginTransactionMulti(GTM_IsolationLevel isolevel[], bool readonly[], + const char *global_sessionid[], GTMProxy_ConnID connid[], int txn_count, GTM_TransactionHandle txns[]) @@ -687,6 +719,18 @@ GTM_BeginTransactionMulti(GTM_IsolationLevel isolevel[], for (kk = 0; kk < txn_count; kk++) { int ii, jj, startslot; + GTM_TransactionHandle txn = + GTM_GlobalSessionIDToHandle(global_sessionid[kk]); + + if (txn != InvalidTransactionHandle) + { + gtm_txninfo[kk] = GTM_HandleToTransactionInfo(txn); + elog(DEBUG2, "Existing transaction found: %s:%d", + gtm_txninfo[kk]->gti_global_session_id, + gtm_txninfo[kk]->gti_gxid); + txns[kk] = txn; + continue; + } /* * We had no cached slots. Now find a free slot in the transation array @@ -715,7 +759,9 @@ GTM_BeginTransactionMulti(GTM_IsolationLevel isolevel[], } init_GTM_TransactionInfo(gtm_txninfo[kk], ii, isolevel[kk], - GetMyThreadInfo->thr_client_id, connid[kk], readonly[kk]); + GetMyThreadInfo->thr_client_id, connid[kk], + global_sessionid[kk], + readonly[kk]); GTMTransactions.gt_lastslot = ii; @@ -740,12 +786,13 @@ GTM_BeginTransactionMulti(GTM_IsolationLevel isolevel[], /* Transaction Control */ GTM_TransactionHandle GTM_BeginTransaction(GTM_IsolationLevel isolevel, - bool readonly) + bool readonly, + const char *global_sessionid) { GTM_TransactionHandle txn; GTMProxy_ConnID connid = -1; - GTM_BeginTransactionMulti(&isolevel, &readonly, &connid, 1, &txn); + GTM_BeginTransactionMulti(&isolevel, &readonly, &global_sessionid, &connid, 1, &txn); return txn; } @@ -755,6 +802,7 @@ init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo, GTM_IsolationLevel isolevel, uint32 client_id, GTMProxy_ConnID connid, + const char *global_sessionid, bool readonly) { gtm_txninfo->gti_gxid = InvalidGlobalTransactionId; @@ -765,6 +813,12 @@ init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo, gtm_txninfo->gti_readonly = readonly; gtm_txninfo->gti_in_use = true; + if (global_sessionid) + strncpy(gtm_txninfo->gti_global_session_id, global_sessionid, + GTM_MAX_SESSION_ID_LEN); + else + gtm_txninfo->gti_global_session_id[0] = '\0'; + gtm_txninfo->nodestring = NULL; gtm_txninfo->gti_gid = NULL; @@ -822,6 +876,7 @@ clean_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo) void GTM_BkupBeginTransactionMulti(GTM_IsolationLevel *isolevel, bool *readonly, + const char **global_sessionid, uint32 *client_id, GTMProxy_ConnID *connid, int txn_count) @@ -836,7 +891,8 @@ GTM_BkupBeginTransactionMulti(GTM_IsolationLevel *isolevel, oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - count = GTM_BeginTransactionMulti(isolevel, readonly, connid, + count = GTM_BeginTransactionMulti(isolevel, readonly, + global_sessionid, connid, txn_count, txn); if (count != txn_count) ereport(ERROR, @@ -849,11 +905,13 @@ GTM_BkupBeginTransactionMulti(GTM_IsolationLevel *isolevel, void GTM_BkupBeginTransaction(GTM_IsolationLevel isolevel, bool readonly, + const char *global_sessionid, uint32 client_id) { GTMProxy_ConnID connid = -1; GTM_BkupBeginTransactionMulti(&isolevel, &readonly, + &global_sessionid, &client_id, &connid, 1); } /* @@ -1139,16 +1197,21 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message) GTM_TransactionHandle txn; GTM_Timestamp timestamp; MemoryContext oldContext; + uint32 global_sessionid_len; + const char *global_sessionid; txn_isolation_level = pq_getmsgint(message, sizeof (GTM_IsolationLevel)); txn_read_only = pq_getmsgbyte(message); + global_sessionid_len = pq_getmsgint(message, sizeof (uint32)); + global_sessionid = pq_getmsgbytes(message, global_sessionid_len); oldContext = MemoryContextSwitchTo(TopMemoryContext); /* * Start a new transaction */ - txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only); + txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only, + global_sessionid); if (txn == InvalidTransactionHandle) ereport(ERROR, (EINVAL, @@ -1164,6 +1227,7 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message) { bkup_begin_transaction(GetMyThreadInfo->thr_conn->standby, txn_isolation_level, txn_read_only, + global_sessionid, GetMyThreadInfo->thr_client_id, timestamp); /* Synch. with standby */ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) @@ -1203,9 +1267,13 @@ ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message) GTM_Timestamp timestamp; MemoryContext oldContext; uint32 client_id; + uint32 global_sessionid_len; + const char *global_sessionid; txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel)); txn_read_only = pq_getmsgbyte(message); + global_sessionid_len = pq_getmsgint(message, sizeof (uint32)); + global_sessionid = pq_getmsgbytes(message, global_sessionid_len); client_id = pq_getmsgint(message, sizeof (uint32)); memcpy(×tamp, pq_getmsgbytes(message, sizeof(GTM_Timestamp)), sizeof(GTM_Timestamp)); pq_getmsgend(message); @@ -1213,6 +1281,7 @@ ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message) oldContext = MemoryContextSwitchTo(TopMemoryContext); GTM_BkupBeginTransaction(txn_isolation_level, txn_read_only, + global_sessionid, client_id); MemoryContextSwitchTo(oldContext); @@ -1231,9 +1300,13 @@ ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message) GlobalTransactionId gxid; GTM_Timestamp timestamp; MemoryContext oldContext; + uint32 global_sessionid_len; + const char *global_sessionid; txn_isolation_level = pq_getmsgint(message, sizeof (GTM_IsolationLevel)); txn_read_only = pq_getmsgbyte(message); + global_sessionid_len = pq_getmsgint(message, sizeof (uint32)); + global_sessionid = pq_getmsgbytes(message, global_sessionid_len); oldContext = MemoryContextSwitchTo(TopMemoryContext); @@ -1243,7 +1316,8 @@ ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message) /* * Start a new transaction */ - txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only); + txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only, + global_sessionid); if (txn == InvalidTransactionHandle) ereport(ERROR, (EINVAL, @@ -1271,6 +1345,7 @@ retry: bkup_begin_transaction_gxid(GetMyThreadInfo->thr_conn->standby, gxid, txn_isolation_level, txn_read_only, + global_sessionid, GetMyThreadInfo->thr_client_id, timestamp); @@ -1311,6 +1386,7 @@ static void GTM_BkupBeginTransactionGetGXIDMulti(GlobalTransactionId *gxid, GTM_IsolationLevel *isolevel, bool *readonly, + const char **global_sessionid, uint32 *client_id, GTMProxy_ConnID *connid, int txn_count) @@ -1326,8 +1402,8 @@ GTM_BkupBeginTransactionGetGXIDMulti(GlobalTransactionId *gxid, oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - count = GTM_BeginTransactionMulti(isolevel, readonly, connid, - txn_count, txn); + count = GTM_BeginTransactionMulti(isolevel, readonly, global_sessionid, + connid, txn_count, txn); if (count != txn_count) ereport(ERROR, (EINVAL, @@ -1378,12 +1454,13 @@ static void GTM_BkupBeginTransactionGetGXID(GlobalTransactionId gxid, GTM_IsolationLevel isolevel, bool readonly, + const char *global_sessionid, uint32 client_id) { GTMProxy_ConnID connid = -1; GTM_BkupBeginTransactionGetGXIDMulti(&gxid, &isolevel, - &readonly, &client_id, &connid, 1); + &readonly, &global_sessionid, &client_id, &connid, 1); } /* @@ -1397,16 +1474,21 @@ ProcessBkupBeginTransactionGetGXIDCommand(Port *myport, StringInfo message) bool txn_read_only; uint32 txn_client_id; GTM_Timestamp timestamp; + uint32 txn_global_sessionid_len; + const char *txn_global_sessionid; gxid = pq_getmsgint(message, sizeof(GlobalTransactionId)); txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel)); txn_read_only = pq_getmsgbyte(message); + txn_global_sessionid_len = pq_getmsgint(message, sizeof (uint32)); + txn_global_sessionid = pq_getmsgbytes(message, + txn_global_sessionid_len); txn_client_id = pq_getmsgint(message, sizeof (uint32)); memcpy(×tamp, pq_getmsgbytes(message, sizeof(GTM_Timestamp)), sizeof(GTM_Timestamp)); pq_getmsgend(message); GTM_BkupBeginTransactionGetGXID(gxid, txn_isolation_level, - txn_read_only, txn_client_id); + txn_read_only, txn_global_sessionid, txn_client_id); } /* @@ -1425,7 +1507,7 @@ ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo mes pq_getmsgend(message); GTM_BkupBeginTransactionGetGXID(gxid, txn_isolation_level, - false, txn_client_id); + false, NULL, txn_client_id); GTM_SetDoVacuum(GTM_GXIDToHandle(gxid)); } @@ -1450,7 +1532,7 @@ ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message /* * Start a new transaction */ - txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only); + txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only, NULL); if (txn == InvalidTransactionHandle) ereport(ERROR, (EINVAL, @@ -1523,10 +1605,13 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) { GTM_IsolationLevel txn_isolation_level[GTM_MAX_GLOBAL_TRANSACTIONS]; bool txn_read_only[GTM_MAX_GLOBAL_TRANSACTIONS]; - int txn_count; + uint32 txn_global_sessionid_len; + char *txn_global_sessionid[GTM_MAX_GLOBAL_TRANSACTIONS]; + int txn_count, new_txn_count; StringInfoData buf; GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; - GlobalTransactionId start_gxid, end_gxid; + GTM_TransactionHandle new_txn[GTM_MAX_GLOBAL_TRANSACTIONS]; + GlobalTransactionId txn_gxid[GTM_MAX_GLOBAL_TRANSACTIONS]; GTM_Timestamp timestamp; GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS]; uint32 txn_client_id[GTM_MAX_GLOBAL_TRANSACTIONS]; @@ -1543,6 +1628,9 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) { txn_isolation_level[ii] = pq_getmsgint(message, sizeof (GTM_IsolationLevel)); txn_read_only[ii] = pq_getmsgbyte(message); + txn_global_sessionid_len = pq_getmsgint(message, sizeof (uint32)); + txn_global_sessionid[ii] = pq_getmsgbytes(message, + txn_global_sessionid_len); txn_connid[ii] = pq_getmsgint(message, sizeof (GTMProxy_ConnID)); txn_client_id[ii] = GetMyThreadInfo->thr_client_id; } @@ -1554,30 +1642,23 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) * * XXX Port should contain Coordinator name - replace NULL with that */ - count = GTM_BeginTransactionMulti(txn_isolation_level, txn_read_only, txn_connid, + count = GTM_BeginTransactionMulti(txn_isolation_level, txn_read_only, + txn_global_sessionid, txn_connid, txn_count, txn); if (count != txn_count) ereport(ERROR, (EINVAL, errmsg("Failed to start %d new transactions", txn_count))); - start_gxid = GTM_GetGlobalTransactionIdMulti(txn, txn_count); - if (start_gxid == InvalidGlobalTransactionId) - ereport(ERROR, - (EINVAL, - errmsg("Failed to get a new transaction id"))); + if (!GTM_GetGlobalTransactionIdMulti(txn, txn_count, txn_gxid, new_txn, + &new_txn_count)) + elog(ERROR, "Failed to get global transaction identifiers"); MemoryContextSwitchTo(oldContext); /* GXID has been received, now it's time to get a GTM timestamp */ timestamp = GTM_TimestampGetCurrent(); - end_gxid = start_gxid + (txn_count - 1); - if (end_gxid < start_gxid) - end_gxid += FirstNormalGlobalTransactionId; - - elog(DEBUG1, "Sending transaction ids from %u to %u", start_gxid, end_gxid); - /* Backup first */ if (GetMyThreadInfo->thr_conn->standby) { @@ -1591,7 +1672,7 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) retry: _rc = bkup_begin_transaction_multi(GetMyThreadInfo->thr_conn->standby, txn_count, - start_gxid, + txn_gxid, txn_isolation_level, txn_read_only, txn_client_id, @@ -1616,7 +1697,7 @@ retry: pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); } pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); - pq_sendbytes(&buf, (char *)&start_gxid, sizeof(start_gxid)); + pq_sendbytes(&buf, (char *)txn_gxid, sizeof(GlobalTransactionId) * txn_count); pq_sendbytes(&buf, (char *)&(timestamp), sizeof (GTM_Timestamp)); pq_endmessage(myport, &buf); @@ -1641,6 +1722,8 @@ ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS]; GTM_IsolationLevel txn_isolation_level[GTM_MAX_GLOBAL_TRANSACTIONS]; bool txn_read_only[GTM_MAX_GLOBAL_TRANSACTIONS]; + uint32 txn_global_sessionid_len; + char *txn_global_sessionid[GTM_MAX_GLOBAL_TRANSACTIONS]; GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS]; uint32 txn_client_id[GTM_MAX_GLOBAL_TRANSACTIONS]; int ii; @@ -1654,12 +1737,16 @@ ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) gxid[ii] = pq_getmsgint(message, sizeof(GlobalTransactionId)); txn_isolation_level[ii] = pq_getmsgint(message, sizeof(GTM_IsolationLevel)); txn_read_only[ii] = pq_getmsgbyte(message); + txn_global_sessionid_len = pq_getmsgint(message, sizeof (uint32)); + txn_global_sessionid[ii] = pq_getmsgbytes(message, + txn_global_sessionid_len); txn_client_id[ii] = pq_getmsgint(message, sizeof(uint32)); txn_connid[ii] = pq_getmsgint(message, sizeof(GTMProxy_ConnID)); } GTM_BkupBeginTransactionGetGXIDMulti(gxid, txn_isolation_level, - txn_read_only, txn_client_id, txn_connid, txn_count); + txn_read_only, txn_global_sessionid, + txn_client_id, txn_connid, txn_count); } /* @@ -1916,7 +2003,7 @@ ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message) errmsg("Failed to get GID Data for prepared transaction"))); /* First get the GXID for the new transaction */ - txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only); + txn = GTM_BeginTransaction(txn_isolation_level, txn_read_only, NULL); if (txn == InvalidTransactionHandle) ereport(ERROR, (EINVAL, @@ -1963,6 +2050,7 @@ retry: gxid, txn_isolation_level, false, + NULL, -1, timestamp); diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c index 9765a29e3b..62f70465d1 100644 --- a/src/gtm/proxy/proxy_main.c +++ b/src/gtm/proxy/proxy_main.c @@ -1780,11 +1780,7 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo, elog(ERROR, "Too few GXIDs"); } - gxid = res->gr_resdata.grd_txn_get_multi.start_gxid + cmdinfo->ci_res_index; - - /* Handle wraparound */ - if (gxid < res->gr_resdata.grd_txn_get_multi.start_gxid) - gxid += FirstNormalGlobalTransactionId; + gxid = res->gr_resdata.grd_txn_get_multi.txn_gxid[cmdinfo->ci_res_index]; /* Send back to each client the same timestamp value asked in this message */ timestamp = res->gr_resdata.grd_txn_get_multi.timestamp; @@ -2116,12 +2112,16 @@ ProcessTransactionCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message) { GTMProxy_CommandData cmd_data; + uint32 global_sessionid_len; switch (mtype) { case MSG_TXN_BEGIN_GETGXID: cmd_data.cd_beg.iso_level = pq_getmsgint(message, sizeof (GTM_IsolationLevel)); cmd_data.cd_beg.rdonly = pq_getmsgbyte(message); + global_sessionid_len = pq_getmsgint(message, sizeof (uint32)); + memcpy(cmd_data.cd_beg.global_sessionid, pq_getmsgbytes(message, + global_sessionid_len), global_sessionid_len); GTMProxy_CommandPending(conninfo, mtype, cmd_data); break; @@ -2422,6 +2422,10 @@ GTMProxy_ProcessPendingCommands(GTMProxy_ThreadInfo *thrinfo) if (gtmpqPutInt(cmdinfo->ci_data.cd_beg.iso_level, sizeof (GTM_IsolationLevel), gtm_conn) || gtmpqPutc(cmdinfo->ci_data.cd_beg.rdonly, gtm_conn) || + gtmpqPutInt(strlen(cmdinfo->ci_data.cd_beg.global_sessionid) + 1, + sizeof (uint32), gtm_conn) || + gtmpqPutnchar(cmdinfo->ci_data.cd_beg.global_sessionid, + strlen(cmdinfo->ci_data.cd_beg.global_sessionid) + 1, gtm_conn) || gtmpqPutInt(cmdinfo->ci_conn->con_id, sizeof (GTMProxy_ConnID), gtm_conn)) elog(ERROR, "Error sending data"); diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h index 1e948b6b02..67cea55bd4 100644 --- a/src/include/access/gtm.h +++ b/src/include/access/gtm.h @@ -23,7 +23,7 @@ extern GlobalTransactionId currentGxid; extern bool IsGTMConnected(void); extern void InitGTM(void); extern void CloseGTM(void); -extern GlobalTransactionId BeginTranGTM(GTM_Timestamp *timestamp); +extern GlobalTransactionId BeginTranGTM(GTM_Timestamp *timestamp, const char *globalSession); extern GlobalTransactionId BeginTranAutovacuumGTM(void); extern int CommitTranGTM(GlobalTransactionId gxid, int waited_xid_count, GlobalTransactionId *waited_xids); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index f64c2110bc..839337821e 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -336,6 +336,7 @@ extern GlobalTransactionId GetAuxilliaryTransactionId(void); extern GlobalTransactionId GetTopGlobalTransactionId(void); extern void SetAuxilliaryTransactionId(GlobalTransactionId gxid); extern void SetTopGlobalTransactionId(GlobalTransactionId gxid); +extern void SetTopTransactionId(GlobalTransactionId xid); #endif extern TransactionId GetStableLatestTransactionId(void); extern SubTransactionId GetCurrentSubTransactionId(void); diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h index ade4b682f3..1b6517a390 100644 --- a/src/include/gtm/gtm_client.h +++ b/src/include/gtm/gtm_client.h @@ -75,7 +75,7 @@ typedef union GTM_ResultData struct { int txn_count; /* TXN_BEGIN_GETGXID_MULTI */ - GlobalTransactionId start_gxid; + GlobalTransactionId txn_gxid[GTM_MAX_GLOBAL_TRANSACTIONS]; GTM_Timestamp timestamp; } grd_txn_get_multi; @@ -184,12 +184,15 @@ size_t get_sequence_list(GTM_Conn *, GTM_SeqInfo **); /* * Transaction Management API */ -GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp); +GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, + char *global_sessionid, + GTM_Timestamp *timestamp); int bkup_begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, - bool read_only, uint32 client_id, - GTM_Timestamp timestamp); + bool read_only, char *global_sessionid, + uint32 client_id, GTM_Timestamp timestamp); int bkup_begin_transaction_gxid(GTM_Conn *conn, GlobalTransactionId gxid, GTM_IsolationLevel isolevel, bool read_only, + char *global_sessionid, uint32 client_id, GTM_Timestamp timestamp); GlobalTransactionId begin_transaction_autovacuum(GTM_Conn *conn, GTM_IsolationLevel isolevel); @@ -226,7 +229,7 @@ begin_transaction_multi(GTM_Conn *conn, int txn_count, GTM_IsolationLevel *txn_i int *txn_count_out, GlobalTransactionId *gxid_out, GTM_Timestamp *ts_out); int bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count, - GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel, + GlobalTransactionId *gxid, GTM_IsolationLevel *isolevel, bool *read_only, uint32 *client_id, GTMProxy_ConnID *txn_connid); diff --git a/src/include/gtm/gtm_proxy.h b/src/include/gtm/gtm_proxy.h index 068d59e479..d4e8543378 100644 --- a/src/include/gtm/gtm_proxy.h +++ b/src/include/gtm/gtm_proxy.h @@ -144,6 +144,7 @@ typedef union GTMProxy_CommandData { bool rdonly; GTM_IsolationLevel iso_level; + char global_sessionid[GTM_MAX_SESSION_ID_LEN]; } cd_beg; struct diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h index b48e26f7a8..d746105af3 100644 --- a/src/include/gtm/gtm_txn.h +++ b/src/include/gtm/gtm_txn.h @@ -37,7 +37,12 @@ extern void GlobalTransactionIdAbort(GlobalTransactionId transactionId); /* in transam/varsup.c */ extern GlobalTransactionId GTM_GetGlobalTransactionId(GTM_TransactionHandle handle); -extern GlobalTransactionId GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count); +extern bool GTM_GetGlobalTransactionIdMulti( + GTM_TransactionHandle handle[], + int txn_count, + GlobalTransactionId gxids[], + GTM_TransactionHandle new_handle[], + int *new_txn_count); extern GlobalTransactionId ReadNewGlobalTransactionId(void); extern GlobalTransactionId GTM_GetLatestCompletedXID(void); extern void SetGlobalTransactionIdLimit(GlobalTransactionId oldest_datfrozenxid); @@ -68,11 +73,13 @@ typedef enum GTM_TransactionStates GTM_TXN_ABORTED } GTM_TransactionStates; +#define GTM_MAX_SESSION_ID_LEN 64 + typedef struct GTM_TransactionInfo { GTM_TransactionHandle gti_handle; uint32 gti_client_id; - + char gti_global_session_id[GTM_MAX_SESSION_ID_LEN]; bool gti_in_use; GlobalTransactionId gti_gxid; GTM_TransactionStates gti_state; @@ -149,9 +156,11 @@ bool GTM_IsGXIDInProgress(GlobalTransactionId gxid); /* Transaction Control */ void GTM_InitTxnManager(void); GTM_TransactionHandle GTM_BeginTransaction(GTM_IsolationLevel isolevel, - bool readonly); + bool readonly, + const char *global_sessionid); int GTM_BeginTransactionMulti(GTM_IsolationLevel isolevel[], bool readonly[], + const char *global_sessionid[], GTMProxy_ConnID connid[], int txn_count, GTM_TransactionHandle txns[]); @@ -191,6 +200,7 @@ void ProcessBeginTransactionCommand(Port *myport, StringInfo message); void ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message); void GTM_BkupBeginTransactionMulti(GTM_IsolationLevel *isolevel, bool *readonly, + const char **global_sessionid, uint32 *client_id, GTMProxy_ConnID *connid, int txn_count); @@ -219,6 +229,7 @@ void GTM_SaveTxnInfo(FILE *ctlf); void GTM_RestoreTxnInfo(FILE *ctlf, GlobalTransactionId next_gxid); void GTM_BkupBeginTransaction(GTM_IsolationLevel isolevel, bool readonly, + const char *global_sessionid, uint32 client_id); void ProcessBkupBeginTransactionGetGXIDCommand(Port *myport, StringInfo message); void ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message); diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index 2a26394144..069218da9e 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -51,6 +51,7 @@ extern bool EnforceTwoPhaseCommit; #define RESPONSE_ERROR 6 #define RESPONSE_READY 10 #define RESPONSE_WAITXIDS 11 +#define RESPONSE_ASSIGN_GXID 12 #endif typedef enum diff --git a/src/include/storage/backendid.h b/src/include/storage/backendid.h index bb8a7c8af9..a0a77233a9 100644 --- a/src/include/storage/backendid.h +++ b/src/include/storage/backendid.h @@ -39,6 +39,7 @@ extern PGDLLIMPORT BackendId MyBackendId; /* backend id of this backend */ extern PGDLLIMPORT Oid MyCoordId; extern PGDLLIMPORT int MyCoordPid; +extern PGDLLIMPORT LocalTransactionId MyCoordLxid; /* BackendId of the first backend of the distributed session on the node */ extern PGDLLIMPORT BackendId MyFirstBackendId; |
