Put the logic to wait for WAL in standby mode to a separate function.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 9 Oct 2012 16:20:17 +0000 (19:20 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 9 Oct 2012 16:20:17 +0000 (19:20 +0300)
This is just refactoring with no user-visible effect, to make the code more
readable.

src/backend/access/transam/xlog.c

index 3e4da46ac24a8817b8f4cf03053e74a81aea5c90..17ceda3b1ad311776e108307a3f4cc2f1247a2b9 100644 (file)
@@ -157,6 +157,9 @@ HotStandbyState standbyState = STANDBY_DISABLED;
 
 static XLogRecPtr LastRec;
 
+/* Local copy of WalRcv->receivedUpto */
+static XLogRecPtr receivedUpto = 0;
+
 /*
  * During recovery, lastFullPageWrites keeps track of full_page_writes that
  * the replayed WAL records indicate. It's initialized with full_page_writes
@@ -538,6 +541,7 @@ static int  readFile = -1;
 static XLogSegNo readSegNo = 0;
 static uint32 readOff = 0;
 static uint32 readLen = 0;
+static bool    readFileHeaderValidated = false;
 static int     readSource = 0;         /* XLOG_FROM_* code */
 
 /*
@@ -628,6 +632,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
 static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
                         bool randAccess);
+static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
+                                                       bool fetching_ckpt);
 static int     emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
 static void XLogFileClose(void);
 static void PreallocXlogFiles(XLogRecPtr endptr);
@@ -2685,6 +2691,9 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
                if (source != XLOG_FROM_STREAM)
                        XLogReceiptTime = GetCurrentTimestamp();
 
+               /* The file header needs to be validated on first access */
+               readFileHeaderValidated = false;
+
                return fd;
        }
        if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -9233,12 +9242,9 @@ static bool
 XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
                         bool randAccess)
 {
-       static XLogRecPtr receivedUpto = 0;
-       bool            switched_segment = false;
        uint32          targetPageOff;
        uint32          targetRecOff;
        XLogSegNo       targetSegNo;
-       static pg_time_t last_fail_time = 0;
 
        XLByteToSeg(*RecPtr, targetSegNo);
        targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
@@ -9283,208 +9289,9 @@ retry:
        {
                if (StandbyMode)
                {
-                       /*
-                        * In standby mode, wait for the requested record to become
-                        * available, either via restore_command succeeding to restore the
-                        * segment, or via walreceiver having streamed the record.
-                        */
-                       for (;;)
-                       {
-                               if (WalRcvInProgress())
-                               {
-                                       bool            havedata;
-
-                                       /*
-                                        * If we find an invalid record in the WAL streamed from
-                                        * master, something is seriously wrong. There's little
-                                        * chance that the problem will just go away, but PANIC is
-                                        * not good for availability either, especially in hot
-                                        * standby mode. Disconnect, and retry from
-                                        * archive/pg_xlog again. The WAL in the archive should be
-                                        * identical to what was streamed, so it's unlikely that
-                                        * it helps, but one can hope...
-                                        */
-                                       if (failedSources & XLOG_FROM_STREAM)
-                                       {
-                                               ShutdownWalRcv();
-                                               continue;
-                                       }
-
-                                       /*
-                                        * Walreceiver is active, so see if new data has arrived.
-                                        *
-                                        * We only advance XLogReceiptTime when we obtain fresh
-                                        * WAL from walreceiver and observe that we had already
-                                        * processed everything before the most recent "chunk"
-                                        * that it flushed to disk.  In steady state where we are
-                                        * keeping up with the incoming data, XLogReceiptTime will
-                                        * be updated on each cycle.  When we are behind,
-                                        * XLogReceiptTime will not advance, so the grace time
-                                        * alloted to conflicting queries will decrease.
-                                        */
-                                       if (XLByteLT(*RecPtr, receivedUpto))
-                                               havedata = true;
-                                       else
-                                       {
-                                               XLogRecPtr      latestChunkStart;
-
-                                               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-                                               if (XLByteLT(*RecPtr, receivedUpto))
-                                               {
-                                                       havedata = true;
-                                                       if (!XLByteLT(*RecPtr, latestChunkStart))
-                                                       {
-                                                               XLogReceiptTime = GetCurrentTimestamp();
-                                                               SetCurrentChunkStartTime(XLogReceiptTime);
-                                                       }
-                                               }
-                                               else
-                                                       havedata = false;
-                                       }
-                                       if (havedata)
-                                       {
-                                               /*
-                                                * Great, streamed far enough. Open the file if it's
-                                                * not open already.  Use XLOG_FROM_STREAM so that
-                                                * source info is set correctly and XLogReceiptTime
-                                                * isn't changed.
-                                                */
-                                               if (readFile < 0)
-                                               {
-                                                       readFile =
-                                                               XLogFileRead(readSegNo, PANIC,
-                                                                                        recoveryTargetTLI,
-                                                                                        XLOG_FROM_STREAM, false);
-                                                       Assert(readFile >= 0);
-                                                       switched_segment = true;
-                                               }
-                                               else
-                                               {
-                                                       /* just make sure source info is correct... */
-                                                       readSource = XLOG_FROM_STREAM;
-                                                       XLogReceiptSource = XLOG_FROM_STREAM;
-                                               }
-                                               break;
-                                       }
-
-                                       /*
-                                        * Data not here yet, so check for trigger then sleep for
-                                        * five seconds like in the WAL file polling case below.
-                                        */
-                                       if (CheckForStandbyTrigger())
-                                               goto retry;
-
-                                       /*
-                                        * Wait for more WAL to arrive, or timeout to be reached
-                                        */
-                                       WaitLatch(&XLogCtl->recoveryWakeupLatch,
-                                                         WL_LATCH_SET | WL_TIMEOUT,
-                                                         5000L);
-                                       ResetLatch(&XLogCtl->recoveryWakeupLatch);
-                               }
-                               else
-                               {
-                                       int                     sources;
-                                       pg_time_t       now;
-
-                                       /*
-                                        * Until walreceiver manages to reconnect, poll the
-                                        * archive.
-                                        */
-                                       if (readFile >= 0)
-                                       {
-                                               close(readFile);
-                                               readFile = -1;
-                                       }
-                                       /* Reset curFileTLI if random fetch. */
-                                       if (randAccess)
-                                               curFileTLI = 0;
-
-                                       /*
-                                        * Try to restore the file from archive, or read an
-                                        * existing file from pg_xlog.
-                                        */
-                                       sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
-                                       if (!(sources & ~failedSources))
-                                       {
-                                               /*
-                                                * We've exhausted all options for retrieving the
-                                                * file. Retry.
-                                                */
-                                               failedSources = 0;
-
-                                               /*
-                                                * Before we sleep, re-scan for possible new timelines
-                                                * if we were requested to recover to the latest
-                                                * timeline.
-                                                */
-                                               if (recoveryTargetIsLatest)
-                                               {
-                                                       if (rescanLatestTimeLine())
-                                                               continue;
-                                               }
-
-                                               /*
-                                                * If it hasn't been long since last attempt, sleep to
-                                                * avoid busy-waiting.
-                                                */
-                                               now = (pg_time_t) time(NULL);
-                                               if ((now - last_fail_time) < 5)
-                                               {
-                                                       pg_usleep(1000000L * (5 - (now - last_fail_time)));
-                                                       now = (pg_time_t) time(NULL);
-                                               }
-                                               last_fail_time = now;
-
-                                               /*
-                                                * If primary_conninfo is set, launch walreceiver to
-                                                * try to stream the missing WAL, before retrying to
-                                                * restore from archive/pg_xlog.
-                                                *
-                                                * If fetching_ckpt is TRUE, RecPtr points to the
-                                                * initial checkpoint location. In that case, we use
-                                                * RedoStartLSN as the streaming start position
-                                                * instead of RecPtr, so that when we later jump
-                                                * backwards to start redo at RedoStartLSN, we will
-                                                * have the logs streamed already.
-                                                */
-                                               if (PrimaryConnInfo)
-                                               {
-                                                       RequestXLogStreaming(
-                                                                         fetching_ckpt ? RedoStartLSN : *RecPtr,
-                                                                                                PrimaryConnInfo);
-                                                       continue;
-                                               }
-                                       }
-                                       /* Don't try to read from a source that just failed */
-                                       sources &= ~failedSources;
-                                       readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
-                                                                                                 sources);
-                                       switched_segment = true;
-                                       if (readFile >= 0)
-                                               break;
-
-                                       /*
-                                        * Nope, not found in archive and/or pg_xlog.
-                                        */
-                                       failedSources |= sources;
-
-                                       /*
-                                        * Check to see if the trigger file exists. Note that we
-                                        * do this only after failure, so when you create the
-                                        * trigger file, we still finish replaying as much as we
-                                        * can from archive and pg_xlog before failover.
-                                        */
-                                       if (CheckForStandbyTrigger())
-                                               goto triggered;
-                               }
-
-                               /*
-                                * This possibly-long loop needs to handle interrupts of
-                                * startup process.
-                                */
-                               HandleStartupProcInterrupts();
-                       }
+                       if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
+                                                                                        fetching_ckpt))
+                               goto triggered;
                }
                else
                {
@@ -9502,7 +9309,6 @@ retry:
                                        sources |= XLOG_FROM_ARCHIVE;
 
                                readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
-                               switched_segment = true;
                                if (readFile < 0)
                                        return false;
                        }
@@ -9533,7 +9339,7 @@ retry:
        else
                readLen = XLOG_BLCKSZ;
 
-       if (switched_segment && targetPageOff != 0)
+       if (!readFileHeaderValidated && targetPageOff != 0)
        {
                /*
                 * Whenever switching to a new WAL segment, we read the first page of
@@ -9582,6 +9388,8 @@ retry:
        if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
                goto next_record_is_invalid;
 
+       readFileHeaderValidated = true;
+
        Assert(targetSegNo == readSegNo);
        Assert(targetPageOff == readOff);
        Assert(targetRecOff < readLen);
@@ -9613,6 +9421,213 @@ triggered:
        return false;
 }
 
+/*
+ * In standby mode, wait for the requested record to become available, either
+ * via restore_command succeeding to restore the segment, or via walreceiver
+ * having streamed the record (or via someone copying the segment directly to
+ * pg_xlog, but that is not documented or recommended).
+ *
+ * When the requested record becomes available, the function opens the file
+ * containing it (if not open already), and returns true. When end of standby
+ * mode is triggered by the user, and there is no more WAL available, returns
+ * false.
+ */
+static bool
+WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
+                                                       bool fetching_ckpt)
+{
+       static pg_time_t last_fail_time = 0;
+
+       for (;;)
+       {
+               if (WalRcvInProgress())
+               {
+                       bool            havedata;
+
+                       /*
+                        * If we find an invalid record in the WAL streamed from master,
+                        * something is seriously wrong. There's little chance that the
+                        * problem will just go away, but PANIC is not good for
+                        * availability either, especially in hot standby mode.
+                        * Disconnect, and retry from archive/pg_xlog again. The WAL in
+                        * the archive should be identical to what was streamed, so it's
+                        * unlikely that it helps, but one can hope...
+                        */
+                       if (failedSources & XLOG_FROM_STREAM)
+                       {
+                               ShutdownWalRcv();
+                               continue;
+                       }
+
+                       /*
+                        * Walreceiver is active, so see if new data has arrived.
+                        *
+                        * We only advance XLogReceiptTime when we obtain fresh WAL from
+                        * walreceiver and observe that we had already processed
+                        * everything before the most recent "chunk" that it flushed to
+                        * disk.  In steady state where we are keeping up with the
+                        * incoming data, XLogReceiptTime will be updated on each cycle.
+                        * When we are behind, XLogReceiptTime will not advance, so the
+                        * grace time allotted to conflicting queries will decrease.
+                        */
+                       if (XLByteLT(RecPtr, receivedUpto))
+                               havedata = true;
+                       else
+                       {
+                               XLogRecPtr      latestChunkStart;
+
+                               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+                               if (XLByteLT(RecPtr, receivedUpto))
+                               {
+                                       havedata = true;
+                                       if (!XLByteLT(RecPtr, latestChunkStart))
+                                       {
+                                               XLogReceiptTime = GetCurrentTimestamp();
+                                               SetCurrentChunkStartTime(XLogReceiptTime);
+                                       }
+                               }
+                               else
+                                       havedata = false;
+                       }
+                       if (havedata)
+                       {
+                               /*
+                                * Great, streamed far enough.  Open the file if it's not open
+                                * already.  Use XLOG_FROM_STREAM so that source info is set
+                                * correctly and XLogReceiptTime isn't changed.
+                                */
+                               if (readFile < 0)
+                               {
+                                       readFile = XLogFileRead(readSegNo, PANIC,
+                                                                                       curFileTLI,
+                                                                                       XLOG_FROM_STREAM, false);
+                                       Assert(readFile >= 0);
+                               }
+                               else
+                               {
+                                       /* just make sure source info is correct... */
+                                       readSource = XLOG_FROM_STREAM;
+                                       XLogReceiptSource = XLOG_FROM_STREAM;
+                               }
+                               break;
+                       }
+
+                       /*
+                        * Data not here yet, so check for trigger then sleep for five
+                        * seconds like in the WAL file polling case below.
+                        */
+                       if (CheckForStandbyTrigger())
+                               return false;
+
+                       /*
+                        * Wait for more WAL to arrive, or timeout to be reached
+                        */
+                       WaitLatch(&XLogCtl->recoveryWakeupLatch,
+                                         WL_LATCH_SET | WL_TIMEOUT,
+                                         5000L);
+                       ResetLatch(&XLogCtl->recoveryWakeupLatch);
+               }
+               else
+               {
+                       /*
+                        * WAL receiver is not active. Poll the archive.
+                        */
+                       int                     sources;
+                       pg_time_t       now;
+
+                       if (readFile >= 0)
+                       {
+                               close(readFile);
+                               readFile = -1;
+                       }
+                       /* Reset curFileTLI if random fetch. */
+                       if (randAccess)
+                               curFileTLI = 0;
+
+                       /*
+                        * Try to restore the file from archive, or read an existing file
+                        * from pg_xlog.
+                        */
+                       sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
+                       if (!(sources & ~failedSources))
+                       {
+                               /*
+                                * We've exhausted all options for retrieving the file. Retry.
+                                */
+                               failedSources = 0;
+
+                               /*
+                                * Before we sleep, re-scan for possible new timelines if we
+                                * were requested to recover to the latest timeline.
+                                */
+                               if (recoveryTargetIsLatest)
+                               {
+                                       if (rescanLatestTimeLine())
+                                               continue;
+                               }
+
+                               /*
+                                * If it hasn't been long since last attempt, sleep to avoid
+                                * busy-waiting.
+                                */
+                               now = (pg_time_t) time(NULL);
+                               if ((now - last_fail_time) < 5)
+                               {
+                                       pg_usleep(1000000L * (5 - (now - last_fail_time)));
+                                       now = (pg_time_t) time(NULL);
+                               }
+                               last_fail_time = now;
+
+                               /*
+                                * If primary_conninfo is set, launch walreceiver to try to
+                                * stream the missing WAL, before retrying to restore from
+                                * archive/pg_xlog.
+                                *
+                                * If fetching_ckpt is TRUE, RecPtr points to the initial
+                                * checkpoint location. In that case, we use RedoStartLSN as
+                                * the streaming start position instead of RecPtr, so that
+                                * when we later jump backwards to start redo at RedoStartLSN,
+                                * we will have the logs streamed already.
+                                */
+                               if (PrimaryConnInfo)
+                               {
+                                       XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+
+                                       RequestXLogStreaming(ptr, PrimaryConnInfo);
+                                       continue;
+                               }
+                       }
+                       /* Don't try to read from a source that just failed */
+                       sources &= ~failedSources;
+                       readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
+                       if (readFile >= 0)
+                               break;
+
+                       /*
+                        * Nope, not found in archive and/or pg_xlog.
+                        */
+                       failedSources |= sources;
+
+                       /*
+                        * Check to see if the trigger file exists. Note that we do this
+                        * only after failure, so when you create the trigger file, we
+                        * still finish replaying as much as we can from archive and
+                        * pg_xlog before failover.
+                        */
+                       if (CheckForStandbyTrigger())
+                               return false;
+               }
+
+               /*
+                * This possibly-long loop needs to handle interrupts of startup
+                * process.
+                */
+               HandleStartupProcInterrupts();
+       }
+
+       return true;
+}
+
 /*
  * Determine what log level should be used to report a corrupt WAL record
  * in the current WAL page, previously read by XLogPageRead().