diff options
| -rw-r--r-- | src/backend/access/transam/xact.c | 26 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 26 | ||||
| -rw-r--r-- | src/backend/storage/ipc/procarray.c | 6 |
3 files changed, 51 insertions, 7 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 9b9a4c8409..0b0662bf7d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2463,7 +2463,10 @@ CommitTransaction(void) * We need to mark our XIDs as committed in pg_clog. This is where we * durably commit. */ - latestXid = RecordTransactionCommit(); +#ifdef XCP + if (!IsConnFromDatanode()) +#endif + latestXid = RecordTransactionCommit(); } else { @@ -3236,7 +3239,12 @@ AbortTransaction(void) * record. */ if (!is_parallel_worker) - latestXid = RecordTransactionAbort(false); + { +#ifdef XCP + if (!IsConnFromDatanode()) +#endif + latestXid = RecordTransactionAbort(false); + } else { latestXid = InvalidTransactionId; @@ -6766,8 +6774,18 @@ SetTopTransactionId(GlobalTransactionId xid) TransactionState s = CurrentTransactionState; Assert(!GlobalTransactionIdIsValid(s->transactionId) || GlobalTransactionIdEquals(s->transactionId, xid)); - XactTopTransactionId = s->transactionId = xid; - elog(DEBUG2, "Assigning XID received from the remote node - %d", xid); + + if (!IsConnFromDatanode()) + { + XactTopTransactionId = s->transactionId = xid; + elog(DEBUG2, "Assigning XID received from the remote node - %d", xid); + } + else if (!TransactionIdIsValid(GetNextTransactionId())) + { + SetNextTransactionId(xid); + if (whereToSendOutput == DestRemote) + pq_putmessage('x', (const char *) &xid, sizeof (GlobalTransactionId)); + } } #endif #endif diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 913563b5e1..9657ab0157 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -2319,9 +2319,20 @@ prepare_err: if (conn->ck_resp_rollback) { conn->ck_resp_rollback = false; + + if (conn->state != DN_CONNECTION_STATE_IDLE) + { + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Error while PREPARING transaction %s on " + "node %s. Administrative action may be required " + "to abort this transaction on the node", + prepareGID, conn->nodename))); + continue; + } + /* sanity checks */ Assert(conn->sock != NO_SOCKET); - Assert(conn->state == DN_CONNECTION_STATE_IDLE); /* Send down abort prepared command */ if (pgxc_node_send_gxid(conn, auxXid)) { @@ -2359,9 +2370,20 @@ prepare_err: if (conn->ck_resp_rollback) { conn->ck_resp_rollback = false; + + if (conn->state != DN_CONNECTION_STATE_IDLE) + { + ereport(WARNING, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Error while PREPARING transaction %s on " + "node %s. Administrative action may be required " + "to abort this transaction on the node", + prepareGID, conn->nodename))); + continue; + } + /* sanity checks */ Assert(conn->sock != NO_SOCKET); - Assert(conn->state == DN_CONNECTION_STATE_IDLE); /* Send down abort prepared command */ if (pgxc_node_send_gxid(conn, auxXid)) { diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 84b6a87d0d..b7fc75d52d 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -496,6 +496,10 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) * anyone else's calculation of a snapshot. We might change their * estimate of global xmin, but that's OK. */ +#ifdef XCP + if (IsConnFromDatanode()) + allPgXact[proc->pgprocno].xid = InvalidTransactionId; +#endif Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid)); proc->lxid = InvalidLocalTransactionId; @@ -4371,7 +4375,7 @@ KnownAssignedXidsReset(void) void ProcArrayCheckXminConsistency(TransactionId global_xmin) { - ProcArrayStruct *arrayP = procArray; + volatile ProcArrayStruct *arrayP = procArray; int index; for (index = 0; index < arrayP->numProcs; index++) |
