summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xact.c26
-rw-r--r--src/backend/pgxc/pool/execRemote.c26
-rw-r--r--src/backend/storage/ipc/procarray.c6
3 files changed, 51 insertions, 7 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9b9a4c8409..0b0662bf7d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2463,7 +2463,10 @@ CommitTransaction(void)
* We need to mark our XIDs as committed in pg_clog. This is where we
* durably commit.
*/
- latestXid = RecordTransactionCommit();
+#ifdef XCP
+ if (!IsConnFromDatanode())
+#endif
+ latestXid = RecordTransactionCommit();
}
else
{
@@ -3236,7 +3239,12 @@ AbortTransaction(void)
* record.
*/
if (!is_parallel_worker)
- latestXid = RecordTransactionAbort(false);
+ {
+#ifdef XCP
+ if (!IsConnFromDatanode())
+#endif
+ latestXid = RecordTransactionAbort(false);
+ }
else
{
latestXid = InvalidTransactionId;
@@ -6766,8 +6774,18 @@ SetTopTransactionId(GlobalTransactionId xid)
TransactionState s = CurrentTransactionState;
Assert(!GlobalTransactionIdIsValid(s->transactionId) ||
GlobalTransactionIdEquals(s->transactionId, xid));
- XactTopTransactionId = s->transactionId = xid;
- elog(DEBUG2, "Assigning XID received from the remote node - %d", xid);
+
+ if (!IsConnFromDatanode())
+ {
+ XactTopTransactionId = s->transactionId = xid;
+ elog(DEBUG2, "Assigning XID received from the remote node - %d", xid);
+ }
+ else if (!TransactionIdIsValid(GetNextTransactionId()))
+ {
+ SetNextTransactionId(xid);
+ if (whereToSendOutput == DestRemote)
+ pq_putmessage('x', (const char *) &xid, sizeof (GlobalTransactionId));
+ }
}
#endif
#endif
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 913563b5e1..9657ab0157 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -2319,9 +2319,20 @@ prepare_err:
if (conn->ck_resp_rollback)
{
conn->ck_resp_rollback = false;
+
+ if (conn->state != DN_CONNECTION_STATE_IDLE)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Error while PREPARING transaction %s on "
+ "node %s. Administrative action may be required "
+ "to abort this transaction on the node",
+ prepareGID, conn->nodename)));
+ continue;
+ }
+
/* sanity checks */
Assert(conn->sock != NO_SOCKET);
- Assert(conn->state == DN_CONNECTION_STATE_IDLE);
/* Send down abort prepared command */
if (pgxc_node_send_gxid(conn, auxXid))
{
@@ -2359,9 +2370,20 @@ prepare_err:
if (conn->ck_resp_rollback)
{
conn->ck_resp_rollback = false;
+
+ if (conn->state != DN_CONNECTION_STATE_IDLE)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Error while PREPARING transaction %s on "
+ "node %s. Administrative action may be required "
+ "to abort this transaction on the node",
+ prepareGID, conn->nodename)));
+ continue;
+ }
+
/* sanity checks */
Assert(conn->sock != NO_SOCKET);
- Assert(conn->state == DN_CONNECTION_STATE_IDLE);
/* Send down abort prepared command */
if (pgxc_node_send_gxid(conn, auxXid))
{
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 84b6a87d0d..b7fc75d52d 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -496,6 +496,10 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
* anyone else's calculation of a snapshot. We might change their
* estimate of global xmin, but that's OK.
*/
+#ifdef XCP
+ if (IsConnFromDatanode())
+ allPgXact[proc->pgprocno].xid = InvalidTransactionId;
+#endif
Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
proc->lxid = InvalidLocalTransactionId;
@@ -4371,7 +4375,7 @@ KnownAssignedXidsReset(void)
void
ProcArrayCheckXminConsistency(TransactionId global_xmin)
{
- ProcArrayStruct *arrayP = procArray;
+ volatile ProcArrayStruct *arrayP = procArray;
int index;
for (index = 0; index < arrayP->numProcs; index++)