Don’t push nextid too far forwards in recovery
authorSimon Riggs <simon@2ndQuadrant.com>
Tue, 18 Apr 2017 10:14:05 +0000 (11:14 +0100)
committerSimon Riggs <simon@2ndQuadrant.com>
Tue, 18 Apr 2017 10:14:05 +0000 (11:14 +0100)
Doing so allows various crash possibilities. Fix by avoiding
having PrescanPreparedTransactions() increment
ShmemVariableCache->nextXid when it has no 2PC files

Bug found by Jeff Janes, diagnosis and patch by Pavan Deolasee,
then patch re-designed for clarity and full accuracy by
Michael Paquier.

Reported-by: Jeff Janes
Author: Pavan Deolasee, Michael Paquier
Discussion: https://postgr.es/m/CAMkU=1zMLnH_i1-PVQ-biZzvNx7VcuatriquEnh7HNk6K8Ss3Q@mail.gmail.com

src/backend/access/transam/twophase.c

index 7982e16abcc6ee162c3a4f49e68b7be7a320c829..6e6678cfff69df7a50b27a2a02064a523a6086ea 100644 (file)
@@ -222,7 +222,7 @@ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
 static char *ProcessTwoPhaseBuffer(TransactionId xid,
                            XLogRecPtr  prepare_start_lsn,
                            bool fromdisk, bool overwriteOK, bool setParent,
-                           TransactionId *result, TransactionId *maxsubxid);
+                           bool setNextXid);
 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
                const char *gid, TimestampTz prepared_at, Oid owner,
                Oid databaseid);
@@ -1744,7 +1744,7 @@ restoreTwoPhaseData(void)
 
            buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
                                        true, false, false,
-                                       NULL, NULL);
+                                       false);
            if (buf == NULL)
                continue;
 
@@ -1786,7 +1786,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 {
    TransactionId origNextXid = ShmemVariableCache->nextXid;
    TransactionId result = origNextXid;
-   TransactionId maxsubxid = origNextXid;
    TransactionId *xids = NULL;
    int         nxids = 0;
    int         allocsize = 0;
@@ -1806,11 +1805,18 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
        buf = ProcessTwoPhaseBuffer(xid,
                gxact->prepare_start_lsn,
                gxact->ondisk, false, false,
-               &result, &maxsubxid);
+               true);
 
        if (buf == NULL)
            continue;
 
+       /*
+        * OK, we think this file is valid.  Incorporate xid into the
+        * running-minimum result.
+        */
+       if (TransactionIdPrecedes(xid, result))
+           result = xid;
+
        if (xids_p)
        {
            if (nxids == allocsize)
@@ -1839,15 +1845,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
        *nxids_p = nxids;
    }
 
-   /* update nextXid if needed */
-   if (TransactionIdFollowsOrEquals(maxsubxid, ShmemVariableCache->nextXid))
-   {
-       LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
-       ShmemVariableCache->nextXid = maxsubxid;
-       TransactionIdAdvance(ShmemVariableCache->nextXid);
-       LWLockRelease(XidGenLock);
-   }
-
    return result;
 }
 
@@ -1884,7 +1881,7 @@ StandbyRecoverPreparedTransactions(bool overwriteOK)
        buf = ProcessTwoPhaseBuffer(xid,
                gxact->prepare_start_lsn,
                gxact->ondisk, overwriteOK, true,
-               NULL, NULL);
+               false);
        if (buf != NULL)
            pfree(buf);
    }
@@ -1924,7 +1921,7 @@ RecoverPreparedTransactions(void)
        buf = ProcessTwoPhaseBuffer(xid,
                gxact->prepare_start_lsn,
                gxact->ondisk, false, false,
-               NULL, NULL);
+               false);
        if (buf == NULL)
            continue;
 
@@ -2012,20 +2009,16 @@ RecoverPreparedTransactions(void)
  * If setParent is true, then use the overwriteOK parameter to set up
  * subtransaction parent linkages.
  *
- * If result and maxsubxid are not NULL, fill them up with smallest
- * running transaction id (lesser than ShmemVariableCache->nextXid)
- * and largest subtransaction id for this transaction respectively.
+ * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
+ * value scanned.
  */
 static char *
 ProcessTwoPhaseBuffer(TransactionId xid,
                      XLogRecPtr prepare_start_lsn,
                      bool fromdisk, bool overwriteOK,
-                     bool setParent, TransactionId *result,
-                     TransactionId *maxsubxid)
+                     bool setParent, bool setNextXid)
 {
    TransactionId origNextXid = ShmemVariableCache->nextXid;
-   TransactionId res = InvalidTransactionId;
-   TransactionId maxsub = InvalidTransactionId;
    TransactionId *subxids;
    char       *buf;
    TwoPhaseFileHeader *hdr;
@@ -2034,11 +2027,6 @@ ProcessTwoPhaseBuffer(TransactionId xid,
    if (!fromdisk)
        Assert(prepare_start_lsn != InvalidXLogRecPtr);
 
-   if (result)
-       res = *result;
-   if (maxsubxid)
-       maxsub = *maxsubxid;
-
    /* Already processed? */
    if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
    {
@@ -2120,13 +2108,6 @@ ProcessTwoPhaseBuffer(TransactionId xid,
        return NULL;
    }
 
-   /*
-    * OK, we think this buffer is valid.  Incorporate xid into the
-    * running-minimum result.
-    */
-   if (TransactionIdPrecedes(xid, res))
-       res = xid;
-
    /*
     * Examine subtransaction XIDs ... they should all follow main
     * XID, and they may force us to advance nextXid.
@@ -2139,17 +2120,31 @@ ProcessTwoPhaseBuffer(TransactionId xid,
        TransactionId subxid = subxids[i];
 
        Assert(TransactionIdFollows(subxid, xid));
-       if (TransactionIdFollowsOrEquals(subxid, maxsub))
-           maxsub = subxid;
+
+       /* update nextXid if needed */
+       if (setNextXid &&
+           TransactionIdFollowsOrEquals(subxid,
+                                        ShmemVariableCache->nextXid))
+       {
+           /*
+            * We don't expect anyone else to modify nextXid, hence we don't
+            * need to hold a lock while examining it.  We still acquire the
+            * lock to modify it, though, so we recheck.
+            */
+           LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+           if (TransactionIdFollowsOrEquals(subxid,
+                                        ShmemVariableCache->nextXid))
+           {
+               ShmemVariableCache->nextXid = subxid;
+               TransactionIdAdvance(ShmemVariableCache->nextXid);
+           }
+           LWLockRelease(XidGenLock);
+       }
+
        if (setParent)
            SubTransSetParent(xid, subxid, overwriteOK);
    }
 
-   if (result)
-       *result = res;
-   if (maxsubxid)
-       *maxsubxid = maxsub;
-
    return buf;
 }