Repair sometimes-incorrect computation of StartUpID after a crash, per
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 22 May 2003 14:39:28 +0000 (14:39 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 22 May 2003 14:39:28 +0000 (14:39 +0000)
example from Rao Kumar.  This is a very corner corner-case, requiring
a minimum of three closely-spaced database crashes and an unlucky
positioning of the second recovery's checkpoint record before you'd notice
any problem.  But the consequences are dire enough that it's a must-fix.

src/backend/access/transam/xlog.c

index f68fc7053669851eb381646c6b87ea533804dd9a..7fa274922b0a8564120b56c1f3777f7dc8d38a64 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.115 2003/05/10 18:01:31 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.116 2003/05/22 14:39:28 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -2568,7 +2568,18 @@ StartupXLOG(void)
    ShmemVariableCache->nextOid = checkPoint.nextOid;
    ShmemVariableCache->oidCount = 0;
 
-   ThisStartUpID = checkPoint.ThisStartUpID;
+   /*
+    * If it was a shutdown checkpoint, then any following WAL entries were
+    * created under the next StartUpID; if it was a regular checkpoint then
+    * any following WAL entries were created under the same StartUpID.
+    * We must replay WAL entries using the same StartUpID they were created
+    * under, so temporarily adopt that SUI (see also xlog_redo()).
+    */
+   if (wasShutdown)
+       ThisStartUpID = checkPoint.ThisStartUpID + 1;
+   else
+       ThisStartUpID = checkPoint.ThisStartUpID;
+
    RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
        XLogCtl->SavedRedoRecPtr = checkPoint.redo;
 
@@ -2672,55 +2683,21 @@ StartupXLOG(void)
    ControlFile->logSeg = openLogSeg + 1;
    Insert = &XLogCtl->Insert;
    Insert->PrevRecord = LastRec;
+   XLogCtl->xlblocks[0].xlogid = openLogId;
+   XLogCtl->xlblocks[0].xrecoff =
+       ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
 
    /*
-    * If the next record will go to the new page then initialize for that
-    * one.
+    * Tricky point here: readBuf contains the *last* block that the
+    * LastRec record spans, not the one it starts in.  The last block
+    * is indeed the one we want to use.
     */
-   if ((BLCKSZ - EndOfLog.xrecoff % BLCKSZ) < SizeOfXLogRecord)
-       EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
-   if (EndOfLog.xrecoff % BLCKSZ == 0)
-   {
-       XLogRecPtr  NewPageEndPtr;
-
-       NewPageEndPtr = EndOfLog;
-       if (NewPageEndPtr.xrecoff >= XLogFileSize)
-       {
-           /* crossing a logid boundary */
-           NewPageEndPtr.xlogid += 1;
-           NewPageEndPtr.xrecoff = BLCKSZ;
-       }
-       else
-           NewPageEndPtr.xrecoff += BLCKSZ;
-       XLogCtl->xlblocks[0] = NewPageEndPtr;
-       Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
-       if (InRecovery)
-           Insert->currpage->xlp_sui = ThisStartUpID;
-       else
-           Insert->currpage->xlp_sui = ThisStartUpID + 1;
-       Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
-       Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
-       /* rest of buffer was zeroed in XLOGShmemInit */
-       Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
-   }
-   else
-   {
-       XLogCtl->xlblocks[0].xlogid = openLogId;
-       XLogCtl->xlblocks[0].xrecoff =
-           ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
-
-       /*
-        * Tricky point here: readBuf contains the *last* block that the
-        * LastRec record spans, not the one it starts in.  The last block
-        * is indeed the one we want to use.
-        */
-       Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
-       memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
-       Insert->currpos = (char *) Insert->currpage +
-           (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
-       /* Make sure rest of page is zero */
-       memset(Insert->currpos, 0, INSERT_FREESPACE(Insert));
-   }
+   Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
+   memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
+   Insert->currpos = (char *) Insert->currpage +
+       (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+   /* Make sure rest of page is zero */
+   MemSet(Insert->currpos, 0, INSERT_FREESPACE(Insert));
 
    LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
@@ -2774,9 +2751,22 @@ StartupXLOG(void)
        MyXactMadeXLogEntry = false;
        MyXactMadeTempRelUpdate = false;
 
+       /*
+        * At this point, ThisStartUpID is the largest SUI that we could
+        * find evidence for in the WAL entries.  But check it against
+        * pg_control's latest checkpoint, to make sure that we can't
+        * accidentally re-use an already-used SUI.
+        */
+       if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
+           ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
+
        /*
         * Perform a new checkpoint to update our recovery activity to disk.
         *
+        * Note that we write a shutdown checkpoint.  This is correct since
+        * the records following it will use SUI one more than what is shown
+        * in the checkpoint's ThisStartUpID.
+        *
         * In case we had to use the secondary checkpoint, make sure that
         * it will still be shown as the secondary checkpoint after this
         * CreateCheckPoint operation; we don't want the broken primary
@@ -2790,21 +2780,39 @@ StartupXLOG(void)
         */
        XLogCloseRelationCache();
    }
+   else
+   {
+       /*
+        * If we are not doing recovery, then we saw a checkpoint with nothing
+        * after it, and we can safely use StartUpID equal to one more than
+        * the checkpoint's SUI.  But just for paranoia's sake, check against
+        * pg_control too.
+        */
+       ThisStartUpID = checkPoint.ThisStartUpID;
+       if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
+           ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
+   }
 
    /*
     * Preallocate additional log files, if wanted.
     */
    PreallocXlogFiles(EndOfLog);
 
+   /*
+    * Advance StartUpID to one more than the highest value used previously.
+    */
+   ThisStartUpID++;
+   XLogCtl->ThisStartUpID = ThisStartUpID;
+
+   /*
+    * Okay, we're officially UP.
+    */
    InRecovery = false;
 
    ControlFile->state = DB_IN_PRODUCTION;
    ControlFile->time = time(NULL);
    UpdateControlFile();
 
-   ThisStartUpID++;
-   XLogCtl->ThisStartUpID = ThisStartUpID;
-
    /* Start up the commit log, too */
    StartupCLOG();
 
@@ -3000,7 +3008,7 @@ CreateCheckPoint(bool shutdown, bool force)
        UpdateControlFile();
    }
 
-   memset(&checkPoint, 0, sizeof(checkPoint));
+   MemSet(&checkPoint, 0, sizeof(checkPoint));
    checkPoint.ThisStartUpID = ThisStartUpID;
    checkPoint.time = time(NULL);
 
@@ -3259,6 +3267,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
        ShmemVariableCache->nextXid = checkPoint.nextXid;
        ShmemVariableCache->nextOid = checkPoint.nextOid;
        ShmemVariableCache->oidCount = 0;
+       /* Any later WAL records should be run with shutdown SUI plus 1 */
+       ThisStartUpID = checkPoint.ThisStartUpID + 1;
    }
    else if (info == XLOG_CHECKPOINT_ONLINE)
    {
@@ -3274,6 +3284,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
            ShmemVariableCache->nextOid = checkPoint.nextOid;
            ShmemVariableCache->oidCount = 0;
        }
+       /* Any later WAL records should be run with the then-active SUI */
+       ThisStartUpID = checkPoint.ThisStartUpID;
    }
 }