Refactor the code implementing standby-mode logic.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 3 Dec 2012 10:32:44 +0000 (12:32 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 3 Dec 2012 10:32:44 +0000 (12:32 +0200)
It is now easier to see that it's a state machine, making the code easier
to understand overall.

src/backend/access/transam/xlog.c

index 9208bc21d462d51bd975b560a28e27f467fab39e..c8ac97fbf7fac6cddfd7019e19d252da285c501e 100644 (file)
@@ -506,12 +506,18 @@ static XLogwrtResult LogwrtResult = {0, 0};
 
 /*
  * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.  These are chosen so that they can be OR'd together
- * in a bitmask state variable.
+ * to attempt to get one.
  */
-#define XLOG_FROM_ARCHIVE      (1<<0)  /* Restored using restore_command */
-#define XLOG_FROM_PG_XLOG      (1<<1)  /* Existing file in pg_xlog */
-#define XLOG_FROM_STREAM       (1<<2)  /* Streamed from master */
+typedef enum
+{
+   XLOG_FROM_ANY = 0,      /* request to read WAL from any source */
+   XLOG_FROM_ARCHIVE,      /* restored using restore_command */
+   XLOG_FROM_PG_XLOG,      /* existing file in pg_xlog */
+   XLOG_FROM_STREAM,       /* streamed from master */
+} XLogSource;
+
+/* human-readable names for XLogSources, for debugging output */
+static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" };
 
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
@@ -536,22 +542,28 @@ static XLogSegNo readSegNo = 0;
 static uint32 readOff = 0;
 static uint32 readLen = 0;
 static bool    readFileHeaderValidated = false;
-static int readSource = 0;     /* XLOG_FROM_* code */
+static XLogSource readSource = 0;      /* XLOG_FROM_* code */
 
 /*
- * Keeps track of which sources we've tried to read the current WAL
- * record from and failed.
+ * Keeps track of which source we're currently reading from. This is
+ * different from readSource in that this is always set, even when we don't
+ * currently have a WAL file open. If lastSourceFailed is set, our last
+ * attempt to read from currentSource failed, and we should try another source
+ * next.
  */
-static int failedSources = 0;  /* OR of XLOG_FROM_* codes */
+static XLogSource currentSource = 0;   /* XLOG_FROM_* code */
+static bool    lastSourceFailed = false;
 
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
  * readSource, but readSource gets reset to zero when we don't have data
- * to process right now.)
+ * to process right now.  It is also different from currentSource, which
+ * also changes when we try to read from a source and fail, while
+ * XLogReceiptSource tracks where we last successfully read some WAL.)
  */
 static TimestampTz XLogReceiptTime = 0;
-static int XLogReceiptSource = 0;      /* XLOG_FROM_* code */
+static XLogSource XLogReceiptSource = 0;   /* XLOG_FROM_* code */
 
 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
 static char *readBuf = NULL;
@@ -605,7 +617,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
                       bool use_lock);
 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
             int source, bool notexistOk);
-static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
+static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
 static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
             bool randAccess);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
@@ -2551,7 +2563,7 @@ XLogFileOpen(XLogSegNo segno)
 /*
  * Open a logfile segment for reading (during recovery).
  *
- * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
+ * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
  * Otherwise, it's assumed to be already available in pg_xlog.
  */
 static int
@@ -2697,7 +2709,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
  * This version searches for the segment with any TLI listed in expectedTLIs.
  */
 static int
-XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
+XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
 {
    char        path[MAXPGPATH];
    ListCell   *cell;
@@ -2720,7 +2732,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
        if (tli < curFileTLI)
            break;              /* don't bother looking at too-old TLIs */
 
-       if (sources & XLOG_FROM_ARCHIVE)
+       if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
        {
            fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
            if (fd != -1)
@@ -2730,7 +2742,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
            }
        }
 
-       if (sources & XLOG_FROM_PG_XLOG)
+       if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
        {
            fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
            if (fd != -1)
@@ -3332,7 +3344,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
    }
 
    /* This is the first try to read this page. */
-   failedSources = 0;
+   lastSourceFailed = false;
 retry:
    /* Read the page containing the record */
    if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
@@ -3545,7 +3557,7 @@ retry:
    return record;
 
 next_record_is_invalid:
-   failedSources |= readSource;
+   lastSourceFailed = true;
 
    if (readFile >= 0)
    {
@@ -9162,7 +9174,7 @@ CancelBackup(void)
  * In standby mode, if after a successful return of XLogPageRead() the
  * caller finds the record it's interested in to be broken, it should
  * ereport the error with the level determined by
- * emode_for_corrupt_record(), and then set "failedSources |= readSource"
+ * emode_for_corrupt_record(), and then set lastSourceFailed
  * and call XLogPageRead() again with the same arguments. This lets
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
@@ -9180,7 +9192,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
    targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
 
    /* Fast exit if we have read the record in the current buffer already */
-   if (failedSources == 0 && targetSegNo == readSegNo &&
+   if (!lastSourceFailed && targetSegNo == readSegNo &&
        targetPageOff == readOff && targetRecOff < readLen)
        return true;
 
@@ -9227,17 +9239,18 @@ retry:
            /* In archive or crash recovery. */
            if (readFile < 0)
            {
-               int         sources;
+               int         source;
 
                /* Reset curFileTLI if random fetch. */
                if (randAccess)
                    curFileTLI = 0;
 
-               sources = XLOG_FROM_PG_XLOG;
                if (InArchiveRecovery)
-                   sources |= XLOG_FROM_ARCHIVE;
+                   source = XLOG_FROM_ANY;
+               else
+                   source = XLOG_FROM_PG_XLOG;
 
-               readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
+               readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
                if (readFile < 0)
                    return false;
            }
@@ -9326,7 +9339,7 @@ retry:
    return true;
 
 next_record_is_invalid:
-   failedSources |= readSource;
+   lastSourceFailed = true;
 
    if (readFile >= 0)
        close(readFile);
@@ -9366,185 +9379,289 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                            bool fetching_ckpt)
 {
    static pg_time_t last_fail_time = 0;
+   pg_time_t now;
+
+   /*-------
+    * Standby mode is implemented by a state machine:
+    *
+    * 1. Read from archive (XLOG_FROM_ARCHIVE)
+    * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG)
+    * 3. Check trigger file
+    * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM)
+    * 5. Rescan timelines
+    * 6. Sleep 5 seconds, and loop back to 1.
+    *
+    * Failure to read from the current source advances the state machine to
+    * the next state. In addition, successfully reading a file from pg_xlog
+    * moves the state machine from state 2 back to state 1 (we always prefer
+    * files in the archive over files in pg_xlog).
+    *
+    * 'currentSource' indicates the current state. There are no currentSource
+    * values for "check trigger", "rescan timelines", and "sleep" states,
+    * those actions are taken when reading from the previous source fails, as
+    * part of advancing to the next state.
+    *-------
+    */
+   if (currentSource == 0)
+       currentSource = XLOG_FROM_ARCHIVE;
 
    for (;;)
    {
-       if (WalRcvInProgress())
+       int     oldSource = currentSource;
+
+       /*
+        * First check if we failed to read from the current source, and
+        * advance the state machine if so. The failure to read might've
+        * happened outside this function, e.g when a CRC check fails on a
+        * record, or within this loop.
+        */
+       if (lastSourceFailed)
        {
-           bool        havedata;
 
-           /*
-            * If we find an invalid record in the WAL streamed from master,
-            * something is seriously wrong. There's little chance that the
-            * problem will just go away, but PANIC is not good for
-            * availability either, especially in hot standby mode.
-            * Disconnect, and retry from archive/pg_xlog again. The WAL in
-            * the archive should be identical to what was streamed, so it's
-            * unlikely that it helps, but one can hope...
-            */
-           if (failedSources & XLOG_FROM_STREAM)
+           switch (currentSource)
            {
-               ShutdownWalRcv();
-               continue;
-           }
+               case XLOG_FROM_ARCHIVE:
+                   currentSource = XLOG_FROM_PG_XLOG;
+                   break;
 
-           /*
-            * Walreceiver is active, so see if new data has arrived.
-            *
-            * We only advance XLogReceiptTime when we obtain fresh WAL from
-            * walreceiver and observe that we had already processed
-            * everything before the most recent "chunk" that it flushed to
-            * disk.  In steady state where we are keeping up with the
-            * incoming data, XLogReceiptTime will be updated on each cycle.
-            * When we are behind, XLogReceiptTime will not advance, so the
-            * grace time allotted to conflicting queries will decrease.
-            */
-           if (XLByteLT(RecPtr, receivedUpto))
-               havedata = true;
-           else
-           {
-               XLogRecPtr  latestChunkStart;
+               case XLOG_FROM_PG_XLOG:
+                   /*
+                    * Check to see if the trigger file exists. Note that we do
+                    * this only after failure, so when you create the trigger
+                    * file, we still finish replaying as much as we can from
+                    * archive and pg_xlog before failover.
+                    */
+                   if (CheckForStandbyTrigger())
+                       return false;
 
-               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-               if (XLByteLT(RecPtr, receivedUpto))
-               {
-                   havedata = true;
-                   if (!XLByteLT(RecPtr, latestChunkStart))
+                   /*
+                    * If primary_conninfo is set, launch walreceiver to try to
+                    * stream the missing WAL.
+                    *
+                    * If fetching_ckpt is TRUE, RecPtr points to the initial
+                    * checkpoint location. In that case, we use RedoStartLSN
+                    * as the streaming start position instead of RecPtr, so
+                    * that when we later jump backwards to start redo at
+                    * RedoStartLSN, we will have the logs streamed already.
+                    */
+                   if (PrimaryConnInfo)
                    {
-                       XLogReceiptTime = GetCurrentTimestamp();
-                       SetCurrentChunkStartTime(XLogReceiptTime);
+                       XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+
+                       RequestXLogStreaming(ptr, PrimaryConnInfo);
                    }
-               }
-               else
-                   havedata = false;
-           }
-           if (havedata)
-           {
-               /*
-                * Great, streamed far enough.  Open the file if it's not open
-                * already.  Use XLOG_FROM_STREAM so that source info is set
-                * correctly and XLogReceiptTime isn't changed.
-                */
-               if (readFile < 0)
-               {
-                   readFile = XLogFileRead(readSegNo, PANIC,
-                                           recoveryTargetTLI,
-                                           XLOG_FROM_STREAM, false);
-                   Assert(readFile >= 0);
-               }
-               else
-               {
-                   /* just make sure source info is correct... */
-                   readSource = XLOG_FROM_STREAM;
-                   XLogReceiptSource = XLOG_FROM_STREAM;
-               }
-               break;
-           }
+                   /*
+                    * Move to XLOG_FROM_STREAM state in either case. We'll get
+                    * immediate failure if we didn't launch walreceiver, and
+                    * move on to the next state.
+                    */
+                   currentSource = XLOG_FROM_STREAM;
+                   break;
 
-           /*
-            * Data not here yet, so check for trigger then sleep for five
-            * seconds like in the WAL file polling case below.
-            */
-           if (CheckForStandbyTrigger())
-               return false;
+               case XLOG_FROM_STREAM:
+                   /*
+                    * Failure while streaming. Most likely, we got here because
+                    * streaming replication was terminated, or promotion was
+                    * triggered. But we also get here if we find an invalid
+                    * record in the WAL streamed from master, in which case
+                    * something is seriously wrong. There's little chance that
+                    * the problem will just go away, but PANIC is not good for
+                    * availability either, especially in hot standby mode. So,
+                    * we treat that the same as disconnection, and retry from
+                    * archive/pg_xlog again. The WAL in the archive should be
+                    * identical to what was streamed, so it's unlikely that it
+                    * helps, but one can hope...
+                    */
+                   /*
+                    * Before we leave XLOG_FROM_STREAM state, make sure that
+                    * walreceiver is not running, so that it won't overwrite
+                    * any WAL that we restore from archive.
+                    */
+                   if (WalRcvInProgress())
+                       ShutdownWalRcv();
 
-           /*
-            * Wait for more WAL to arrive, or timeout to be reached
-            */
-           WaitLatch(&XLogCtl->recoveryWakeupLatch,
-                     WL_LATCH_SET | WL_TIMEOUT,
-                     5000L);
-           ResetLatch(&XLogCtl->recoveryWakeupLatch);
+                   /*
+                    * Before we sleep, re-scan for possible new timelines if
+                    * we were requested to recover to the latest timeline.
+                    */
+                   if (recoveryTargetIsLatest)
+                   {
+                       if (rescanLatestTimeLine())
+                       {
+                           currentSource = XLOG_FROM_ARCHIVE;
+                           break;
+                       }
+                   }
+
+                   /*
+                    * XLOG_FROM_STREAM is the last state in our state machine,
+                    * so we've exhausted all the options for obtaining the
+                    * requested WAL. We're going to loop back and retry from
+                    * the archive, but if it hasn't been long since last
+                    * attempt, sleep 5 seconds to avoid busy-waiting.
+                    */
+                   now = (pg_time_t) time(NULL);
+                   if ((now - last_fail_time) < 5)
+                   {
+                       pg_usleep(1000000L * (5 - (now - last_fail_time)));
+                       now = (pg_time_t) time(NULL);
+                   }
+                   last_fail_time = now;
+                   currentSource = XLOG_FROM_ARCHIVE;
+                   break;
+
+               default:
+                   elog(ERROR, "unexpected WAL source %d", currentSource);
+           }
        }
-       else
+       else if (currentSource == XLOG_FROM_PG_XLOG)
        {
            /*
-            * WAL receiver is not active. Poll the archive.
+            * We just successfully read a file in pg_xlog. We prefer files
+            * in the archive over ones in pg_xlog, so try the next file
+            * again from the archive first.
             */
-           int         sources;
-           pg_time_t   now;
+           currentSource = XLOG_FROM_ARCHIVE;
+       }
 
-           if (readFile >= 0)
-           {
-               close(readFile);
-               readFile = -1;
-           }
-           /* Reset curFileTLI if random fetch. */
-           if (randAccess)
-               curFileTLI = 0;
+       if (currentSource != oldSource)
+           elog(LOG, "switched WAL source from %s to %s after %s",
+                xlogSourceNames[oldSource], xlogSourceNames[currentSource],
+                lastSourceFailed ? "failure" : "success");
+
+       /*
+        * We've now handled possible failure. Try to read from the chosen
+        * source.
+        */
+       lastSourceFailed = false;
+
+       switch (currentSource)
+       {
+           case XLOG_FROM_ARCHIVE:
+           case XLOG_FROM_PG_XLOG:
+               /* Close any old file we might have open. */
+               if (readFile >= 0)
+               {
+                   close(readFile);
+                   readFile = -1;
+               }
+               /* Reset curFileTLI if random fetch. */
+               if (randAccess)
+                   curFileTLI = 0;
 
-           /*
-            * Try to restore the file from archive, or read an existing file
-            * from pg_xlog.
-            */
-           sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
-           if (!(sources & ~failedSources))
-           {
                /*
-                * We've exhausted all options for retrieving the file. Retry.
+                * Try to restore the file from archive, or read an existing
+                * file from pg_xlog.
                 */
-               failedSources = 0;
+               readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource);
+               if (readFile >= 0)
+                   return true;    /* success! */
 
                /*
-                * Before we sleep, re-scan for possible new timelines if we
-                * were requested to recover to the latest timeline.
+                * Nope, not found in archive or pg_xlog.
                 */
-               if (recoveryTargetIsLatest)
-               {
-                   if (rescanLatestTimeLine())
-                       continue;
-               }
+               lastSourceFailed = true;
+               break;
+
+           case XLOG_FROM_STREAM:
+           {
+               bool        havedata;
 
                /*
-                * If it hasn't been long since last attempt, sleep to avoid
-                * busy-waiting.
+                * Check if WAL receiver is still active.
                 */
-               now = (pg_time_t) time(NULL);
-               if ((now - last_fail_time) < 5)
+               if (!WalRcvInProgress())
                {
-                   pg_usleep(1000000L * (5 - (now - last_fail_time)));
-                   now = (pg_time_t) time(NULL);
+                   lastSourceFailed = true;
+                   break;
                }
-               last_fail_time = now;
 
                /*
-                * If primary_conninfo is set, launch walreceiver to try to
-                * stream the missing WAL, before retrying to restore from
-                * archive/pg_xlog.
+                * Walreceiver is active, so see if new data has arrived.
                 *
-                * If fetching_ckpt is TRUE, RecPtr points to the initial
-                * checkpoint location. In that case, we use RedoStartLSN as
-                * the streaming start position instead of RecPtr, so that
-                * when we later jump backwards to start redo at RedoStartLSN,
-                * we will have the logs streamed already.
+                * We only advance XLogReceiptTime when we obtain fresh WAL
+                * from walreceiver and observe that we had already processed
+                * everything before the most recent "chunk" that it flushed to
+                * disk.  In steady state where we are keeping up with the
+                * incoming data, XLogReceiptTime will be updated on each cycle.
+                * When we are behind, XLogReceiptTime will not advance, so the
+                * grace time allotted to conflicting queries will decrease.
                 */
-               if (PrimaryConnInfo)
+               if (XLByteLT(RecPtr, receivedUpto))
+                   havedata = true;
+               else
                {
-                   XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+                   XLogRecPtr  latestChunkStart;
 
-                   RequestXLogStreaming(ptr, PrimaryConnInfo);
-                   continue;
+                   receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+                   if (XLByteLT(RecPtr, receivedUpto))
+                   {
+                       havedata = true;
+                       if (!XLByteLT(RecPtr, latestChunkStart))
+                       {
+                           XLogReceiptTime = GetCurrentTimestamp();
+                           SetCurrentChunkStartTime(XLogReceiptTime);
+                       }
+                   }
+                   else
+                       havedata = false;
+               }
+               if (havedata)
+               {
+                   /*
+                    * Great, streamed far enough.  Open the file if it's not
+                    * open already.  Use XLOG_FROM_STREAM so that source info
+                    * is set correctly and XLogReceiptTime isn't changed.
+                    */
+                   if (readFile < 0)
+                   {
+                       readFile = XLogFileRead(readSegNo, PANIC,
+                                               recoveryTargetTLI,
+                                               XLOG_FROM_STREAM, false);
+                       Assert(readFile >= 0);
+                   }
+                   else
+                   {
+                       /* just make sure source info is correct... */
+                       readSource = XLOG_FROM_STREAM;
+                       XLogReceiptSource = XLOG_FROM_STREAM;
+                       return true;
+                   }
+                   break;
                }
-           }
-           /* Don't try to read from a source that just failed */
-           sources &= ~failedSources;
-           readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
-           if (readFile >= 0)
-               break;
 
-           /*
-            * Nope, not found in archive and/or pg_xlog.
-            */
-           failedSources |= sources;
+               /*
+                * Data not here yet. Check for trigger, then wait for
+                * walreceiver to wake us up when new WAL arrives.
+                */
+               if (CheckForStandbyTrigger())
+               {
+                   /*
+                    * Note that we don't "return false" immediately here.
+                    * After being triggered, we still want to replay all the
+                    * WAL that was already streamed. It's in pg_xlog now, so
+                    * we just treat this as a failure, and the state machine
+                    * will move on to replay the streamed WAL from pg_xlog,
+                    * and then recheck the trigger and exit replay.
+                    */
+                   lastSourceFailed = true;
+                   break;
+               }
 
-           /*
-            * Check to see if the trigger file exists. Note that we do this
-            * only after failure, so when you create the trigger file, we
-            * still finish replaying as much as we can from archive and
-            * pg_xlog before failover.
-            */
-           if (CheckForStandbyTrigger())
-               return false;
+               /*
+                * Wait for more WAL to arrive. Time out after 5 seconds, like
+                * when polling the archive, to react to a trigger file
+                * promptly.
+                */
+               WaitLatch(&XLogCtl->recoveryWakeupLatch,
+                         WL_LATCH_SET | WL_TIMEOUT,
+                         5000L);
+               ResetLatch(&XLogCtl->recoveryWakeupLatch);
+               break;
+           }
+
+           default:
+               elog(ERROR, "unexpected WAL source %d", currentSource);
        }
 
        /*
@@ -9554,7 +9671,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
        HandleStartupProcInterrupts();
    }
 
-   return true;
+   return false;   /* not reached */
 }
 
 /*