Refactor WAL file-reading code into WALRead()
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 25 Nov 2019 18:04:54 +0000 (15:04 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 25 Nov 2019 18:04:54 +0000 (15:04 -0300)
XLogReader, walsender and pg_waldump all had their own routines to read
data from WAL files to memory, with slightly different approaches
according to the particular conditions of each environment.  There's a
lot of commonality, so we can refactor that into a single routine
WALRead in XLogReader, and move the differences to a separate (simpler)
callback that just opens the next WAL-segment.  This results in a
clearer (ahem) code flow.

The error reporting needs are covered by filling in a new error-info
struct, WALReadError, and it's the caller's responsibility to act on it.
The backend has WALReadRaiseError() to do so.

We no longer ever need to seek in this interface; switch to using
pg_pread().

Author: Antonin Houska, with contributions from Álvaro Herrera
Reviewed-by: Michaël Paquier, Kyotaro Horiguchi
Discussion: https://postgr.es/m/14984.1554998742@spoje.net

src/backend/access/transam/xlogreader.c
src/backend/access/transam/xlogutils.c
src/backend/replication/walsender.c
src/bin/pg_waldump/pg_waldump.c
src/include/access/xlogreader.h
src/include/access/xlogutils.h

index 7f24f0cb95f2344f3e20ea605e6b71a318ce0150..67418b05f1535a9f458f75afc58089d7b4a3b2e1 100644 (file)
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
@@ -27,6 +29,7 @@
 
 #ifndef FRONTEND
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
 #endif
 
@@ -208,7 +211,6 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 {
    seg->ws_file = -1;
    seg->ws_segno = 0;
-   seg->ws_off = 0;
    seg->ws_tli = 0;
 
    segcxt->ws_segsize = segsize;
@@ -295,8 +297,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
     * byte to cover the whole record header, or at least the part of it that
     * fits on the same page.
     */
-   readOff = ReadPageInternal(state,
-                              targetPagePtr,
+   readOff = ReadPageInternal(state, targetPagePtr,
                               Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
    if (readOff < 0)
        goto err;
@@ -556,7 +557,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
    /* check whether we have all the requested data already */
    if (targetSegNo == state->seg.ws_segno &&
-       targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
+       targetPageOff == state->segoff && reqLen <= state->readLen)
        return state->readLen;
 
    /*
@@ -627,7 +628,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
    /* update read state information */
    state->seg.ws_segno = targetSegNo;
-   state->seg.ws_off = targetPageOff;
+   state->segoff = targetPageOff;
    state->readLen = readLen;
 
    return readLen;
@@ -644,7 +645,7 @@ static void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
    state->seg.ws_segno = 0;
-   state->seg.ws_off = 0;
+   state->segoff = 0;
    state->readLen = 0;
 }
 
@@ -1015,6 +1016,99 @@ out:
 
 #endif                         /* FRONTEND */
 
+/*
+ * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
+ * fetched from timeline 'tli'.
+ *
+ * 'seg/segcxt' identify the last segment used.  'openSegment' is a callback
+ * to open the next segment, if necessary.
+ *
+ * Returns true if succeeded, false if an error occurs, in which case
+ * 'errinfo' receives error details.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ */
+bool
+WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+       WALOpenSegment *seg, WALSegmentContext *segcxt,
+       WALSegmentOpen openSegment, WALReadError *errinfo)
+{
+   char       *p;
+   XLogRecPtr  recptr;
+   Size        nbytes;
+
+   p = buf;
+   recptr = startptr;
+   nbytes = count;
+
+   while (nbytes > 0)
+   {
+       uint32      startoff;
+       int         segbytes;
+       int         readbytes;
+
+       startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+
+       /*
+        * If the data we want is not in a segment we have open, close what we
+        * have (if anything) and open the next one, using the caller's
+        * provided openSegment callback.
+        */
+       if (seg->ws_file < 0 ||
+           !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
+           tli != seg->ws_tli)
+       {
+           XLogSegNo   nextSegNo;
+
+           if (seg->ws_file >= 0)
+               close(seg->ws_file);
+
+           XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
+           seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+
+           /* Update the current segment info. */
+           seg->ws_tli = tli;
+           seg->ws_segno = nextSegNo;
+       }
+
+       /* How many bytes are within this segment? */
+       if (nbytes > (segcxt->ws_segsize - startoff))
+           segbytes = segcxt->ws_segsize - startoff;
+       else
+           segbytes = nbytes;
+
+#ifndef FRONTEND
+       pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+       /* Reset errno first; eases reporting non-errno-affecting errors */
+       errno = 0;
+       readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+
+#ifndef FRONTEND
+       pgstat_report_wait_end();
+#endif
+
+       if (readbytes <= 0)
+       {
+           errinfo->wre_errno = errno;
+           errinfo->wre_req = segbytes;
+           errinfo->wre_read = readbytes;
+           errinfo->wre_off = startoff;
+           errinfo->wre_seg = *seg;
+           return false;
+       }
+
+       /* Update state for read */
+       recptr += readbytes;
+       nbytes -= readbytes;
+       p += readbytes;
+   }
+
+   return true;
+}
+
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
  * ----------------------------------------
index 5f1e5ba75d5804ec9b5fcc40cba6e9e4ea3599f4..446760ed6e7b771c4ae32b4abe1412f8fc464b8e 100644 (file)
@@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
    forget_invalid_pages(rnode, forkNum, nblocks);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend).  Probably these should be merged at some point.
- */
-static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-        Size count)
-{
-   char       *p;
-   XLogRecPtr  recptr;
-   Size        nbytes;
-
-   /* state maintained across calls */
-   static int  sendFile = -1;
-   static XLogSegNo sendSegNo = 0;
-   static TimeLineID sendTLI = 0;
-   static uint32 sendOff = 0;
-
-   Assert(segsize == wal_segment_size);
-
-   p = buf;
-   recptr = startptr;
-   nbytes = count;
-
-   while (nbytes > 0)
-   {
-       uint32      startoff;
-       int         segbytes;
-       int         readbytes;
-
-       startoff = XLogSegmentOffset(recptr, segsize);
-
-       /* Do we need to switch to a different xlog segment? */
-       if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
-           sendTLI != tli)
-       {
-           char        path[MAXPGPATH];
-
-           if (sendFile >= 0)
-               close(sendFile);
-
-           XLByteToSeg(recptr, sendSegNo, segsize);
-
-           XLogFilePath(path, tli, sendSegNo, segsize);
-
-           sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
-           if (sendFile < 0)
-           {
-               if (errno == ENOENT)
-                   ereport(ERROR,
-                           (errcode_for_file_access(),
-                            errmsg("requested WAL segment %s has already been removed",
-                                   path)));
-               else
-                   ereport(ERROR,
-                           (errcode_for_file_access(),
-                            errmsg("could not open file \"%s\": %m",
-                                   path)));
-           }
-           sendOff = 0;
-           sendTLI = tli;
-       }
-
-       /* Need to seek in the file? */
-       if (sendOff != startoff)
-       {
-           if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-           {
-               char        path[MAXPGPATH];
-               int         save_errno = errno;
-
-               XLogFilePath(path, tli, sendSegNo, segsize);
-               errno = save_errno;
-               ereport(ERROR,
-                       (errcode_for_file_access(),
-                        errmsg("could not seek in log segment %s to offset %u: %m",
-                               path, startoff)));
-           }
-           sendOff = startoff;
-       }
-
-       /* How many bytes are within this segment? */
-       if (nbytes > (segsize - startoff))
-           segbytes = segsize - startoff;
-       else
-           segbytes = nbytes;
-
-       pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-       readbytes = read(sendFile, p, segbytes);
-       pgstat_report_wait_end();
-       if (readbytes <= 0)
-       {
-           char        path[MAXPGPATH];
-           int         save_errno = errno;
-
-           XLogFilePath(path, tli, sendSegNo, segsize);
-           errno = save_errno;
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-                           path, sendOff, (unsigned long) segbytes)));
-       }
-
-       /* Update state for read */
-       recptr += readbytes;
-
-       sendOff += readbytes;
-       nbytes -= readbytes;
-       p += readbytes;
-   }
-}
-
 /*
  * Determine which timeline to read an xlog page from and set the
  * XLogReaderState's currTLI to that timeline ID.
@@ -802,8 +680,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-   const XLogRecPtr lastReadPage = state->seg.ws_segno *
-   state->segcxt.ws_segsize + state->seg.ws_off;
+   const XLogRecPtr lastReadPage = (state->seg.ws_segno *
+                                    state->segcxt.ws_segsize + state->segoff);
 
    Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
    Assert(wantLength <= XLOG_BLCKSZ);
@@ -896,6 +774,34 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
    }
 }
 
+/* openSegment callback for WALRead */
+static int
+wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                TimeLineID *tli_p)
+{
+   TimeLineID  tli = *tli_p;
+   char        path[MAXPGPATH];
+   int         fd;
+
+   XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+   fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+   if (fd >= 0)
+       return fd;
+
+   if (errno == ENOENT)
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("requested WAL segment %s has already been removed",
+                       path)));
+   else
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not open file \"%s\": %m",
+                       path)));
+
+   return -1;                  /* keep compiler quiet */
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -913,7 +819,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 {
    XLogRecPtr  read_upto,
                loc;
+   TimeLineID  tli;
    int         count;
+   WALReadError errinfo;
 
    loc = targetPagePtr + reqLen;
 
@@ -932,7 +840,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
            read_upto = GetFlushRecPtr();
        else
            read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-       state->seg.ws_tli = ThisTimeLineID;
+       tli = ThisTimeLineID;
 
        /*
         * Check which timeline to get the record from.
@@ -982,14 +890,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
            read_upto = state->currTLIValidUntil;
 
            /*
-            * Setting ws_tli to our wanted record's TLI is slightly wrong;
-            * the page might begin on an older timeline if it contains a
-            * timeline switch, since its xlog segment will have been copied
-            * from the prior timeline. This is pretty harmless though, as
-            * nothing cares so long as the timeline doesn't go backwards.  We
-            * should read the page header instead; FIXME someday.
+            * Setting tli to our wanted record's TLI is slightly wrong; the
+            * page might begin on an older timeline if it contains a timeline
+            * switch, since its xlog segment will have been copied from the
+            * prior timeline. This is pretty harmless though, as nothing
+            * cares so long as the timeline doesn't go backwards.  We should
+            * read the page header instead; FIXME someday.
             */
-           state->seg.ws_tli = state->currTLI;
+           tli = state->currTLI;
 
            /* No need to wait on a historical timeline */
            break;
@@ -1020,9 +928,38 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
     * as 'count', read the whole page anyway. It's guaranteed to be
     * zero-padded up to the page boundary if it's incomplete.
     */
-   XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
-            XLOG_BLCKSZ);
+   if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
+                &state->segcxt, wal_segment_open, &errinfo))
+       WALReadRaiseError(&errinfo);
 
    /* number of valid bytes in the buffer */
    return count;
 }
+
+/*
+ * Backend-specific convenience code to handle read errors encountered by
+ * WALRead().
+ */
+void
+WALReadRaiseError(WALReadError *errinfo)
+{
+   WALOpenSegment *seg = &errinfo->wre_seg;
+   char       *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno);
+
+   if (errinfo->wre_read < 0)
+   {
+       errno = errinfo->wre_errno;
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not read from log segment %s, offset %u: %m",
+                       fname, errinfo->wre_off)));
+   }
+   else if (errinfo->wre_read == 0)
+   {
+       ereport(ERROR,
+               (errcode(ERRCODE_DATA_CORRUPTED),
+                errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+                       fname, errinfo->wre_off, errinfo->wre_read,
+                       (Size) errinfo->wre_req)));
+   }
+}
index cbc928501af45d40d198721dcea9b9e31d604a1b..ac9209747a4a1d931d1091a836a03211f4794ad0 100644 (file)
@@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
+static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                             TimeLineID *tli_p);
 static void UpdateSpillStats(LogicalDecodingContext *ctx);
-static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 {
    XLogRecPtr  flushptr;
    int         count;
+   WALReadError errinfo;
+   XLogSegNo   segno;
 
    XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
    sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
@@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
        count = flushptr - targetPagePtr;   /* part of the page available */
 
    /* now actually read the data, we know it's there */
-   XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
+   if (!WALRead(cur_page,
+                targetPagePtr,
+                XLOG_BLCKSZ,
+                sendSeg->ws_tli,   /* Pass the current TLI because only
+                                    * WalSndSegmentOpen controls whether new
+                                    * TLI is needed. */
+                sendSeg,
+                sendCxt,
+                WalSndSegmentOpen,
+                &errinfo))
+       WALReadRaiseError(&errinfo);
+
+   /*
+    * After reading into the buffer, check that what we read was valid. We do
+    * this after reading, because even though the segment was present when we
+    * opened it, it might get recycled or removed while we read it. The
+    * read() succeeds in that case, but the data we tried to read might
+    * already have been overwritten with new WAL records.
+    */
+   XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
+   CheckXLogRemoved(segno, sendSeg->ws_tli);
 
    return count;
 }
@@ -2360,189 +2383,68 @@ WalSndKill(int code, Datum arg)
    SpinLockRelease(&walsnd->mutex);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- */
-static void
-XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
+/* walsender's openSegment callback for WALRead */
+static int
+WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                 TimeLineID *tli_p)
 {
-   char       *p;
-   XLogRecPtr  recptr;
-   Size        nbytes;
-   XLogSegNo   segno;
-
-retry:
-   p = buf;
-   recptr = startptr;
-   nbytes = count;
+   char        path[MAXPGPATH];
+   int         fd;
 
-   while (nbytes > 0)
+   /*-------
+    * When reading from a historic timeline, and there is a timeline switch
+    * within this segment, read from the WAL segment belonging to the new
+    * timeline.
+    *
+    * For example, imagine that this server is currently on timeline 5, and
+    * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+    * 0/13002088. In pg_wal, we have these files:
+    *
+    * ...
+    * 000000040000000000000012
+    * 000000040000000000000013
+    * 000000050000000000000013
+    * 000000050000000000000014
+    * ...
+    *
+    * In this situation, when requested to send the WAL from segment 0x13, on
+    * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+    * recovery prefers files from newer timelines, so if the segment was
+    * restored from the archive on this server, the file belonging to the old
+    * timeline, 000000040000000000000013, might not exist. Their contents are
+    * equal up to the switchpoint, because at a timeline switch, the used
+    * portion of the old segment is copied to the new file.  -------
+    */
+   *tli_p = sendTimeLine;
+   if (sendTimeLineIsHistoric)
    {
-       uint32      startoff;
-       int         segbytes;
-       int         readbytes;
-
-       startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
-
-       if (sendSeg->ws_file < 0 ||
-           !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
-       {
-           char        path[MAXPGPATH];
-
-           /* Switch to another logfile segment */
-           if (sendSeg->ws_file >= 0)
-               close(sendSeg->ws_file);
-
-           XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
-
-           /*-------
-            * When reading from a historic timeline, and there is a timeline
-            * switch within this segment, read from the WAL segment belonging
-            * to the new timeline.
-            *
-            * For example, imagine that this server is currently on timeline
-            * 5, and we're streaming timeline 4. The switch from timeline 4
-            * to 5 happened at 0/13002088. In pg_wal, we have these files:
-            *
-            * ...
-            * 000000040000000000000012
-            * 000000040000000000000013
-            * 000000050000000000000013
-            * 000000050000000000000014
-            * ...
-            *
-            * In this situation, when requested to send the WAL from
-            * segment 0x13, on timeline 4, we read the WAL from file
-            * 000000050000000000000013. Archive recovery prefers files from
-            * newer timelines, so if the segment was restored from the
-            * archive on this server, the file belonging to the old timeline,
-            * 000000040000000000000013, might not exist. Their contents are
-            * equal up to the switchpoint, because at a timeline switch, the
-            * used portion of the old segment is copied to the new file.
-            *-------
-            */
-           sendSeg->ws_tli = sendTimeLine;
-           if (sendTimeLineIsHistoric)
-           {
-               XLogSegNo   endSegNo;
-
-               XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
-               if (sendSeg->ws_segno == endSegNo)
-                   sendSeg->ws_tli = sendTimeLineNextTLI;
-           }
-
-           XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
-
-           sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-           if (sendSeg->ws_file < 0)
-           {
-               /*
-                * If the file is not found, assume it's because the standby
-                * asked for a too old WAL segment that has already been
-                * removed or recycled.
-                */
-               if (errno == ENOENT)
-                   ereport(ERROR,
-                           (errcode_for_file_access(),
-                            errmsg("requested WAL segment %s has already been removed",
-                                   XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
-               else
-                   ereport(ERROR,
-                           (errcode_for_file_access(),
-                            errmsg("could not open file \"%s\": %m",
-                                   path)));
-           }
-           sendSeg->ws_off = 0;
-       }
-
-       /* Need to seek in the file? */
-       if (sendSeg->ws_off != startoff)
-       {
-           if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
-               ereport(ERROR,
-                       (errcode_for_file_access(),
-                        errmsg("could not seek in log segment %s to offset %u: %m",
-                               XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-                               startoff)));
-           sendSeg->ws_off = startoff;
-       }
-
-       /* How many bytes are within this segment? */
-       if (nbytes > (segcxt->ws_segsize - startoff))
-           segbytes = segcxt->ws_segsize - startoff;
-       else
-           segbytes = nbytes;
-
-       pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-       readbytes = read(sendSeg->ws_file, p, segbytes);
-       pgstat_report_wait_end();
-       if (readbytes < 0)
-       {
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-                           XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-                           sendSeg->ws_off, (Size) segbytes)));
-       }
-       else if (readbytes == 0)
-       {
-           ereport(ERROR,
-                   (errcode(ERRCODE_DATA_CORRUPTED),
-                    errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-                           XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-                           sendSeg->ws_off, readbytes, (Size) segbytes)));
-       }
+       XLogSegNo   endSegNo;
 
-       /* Update state for read */
-       recptr += readbytes;
-
-       sendSeg->ws_off += readbytes;
-       nbytes -= readbytes;
-       p += readbytes;
+       XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+       if (sendSeg->ws_segno == endSegNo)
+           *tli_p = sendTimeLineNextTLI;
    }
 
-   /*
-    * After reading into the buffer, check that what we read was valid. We do
-    * this after reading, because even though the segment was present when we
-    * opened it, it might get recycled or removed while we read it. The
-    * read() succeeds in that case, but the data we tried to read might
-    * already have been overwritten with new WAL records.
-    */
-   XLByteToSeg(startptr, segno, segcxt->ws_segsize);
-   CheckXLogRemoved(segno, ThisTimeLineID);
+   XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
+   fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+   if (fd >= 0)
+       return fd;
 
    /*
-    * During recovery, the currently-open WAL file might be replaced with the
-    * file of the same name retrieved from archive. So we always need to
-    * check what we read was valid after reading into the buffer. If it's
-    * invalid, we try to open and read the file again.
+    * If the file is not found, assume it's because the standby asked for a
+    * too old WAL segment that has already been removed or recycled.
     */
-   if (am_cascading_walsender)
-   {
-       WalSnd     *walsnd = MyWalSnd;
-       bool        reload;
-
-       SpinLockAcquire(&walsnd->mutex);
-       reload = walsnd->needreload;
-       walsnd->needreload = false;
-       SpinLockRelease(&walsnd->mutex);
-
-       if (reload && sendSeg->ws_file >= 0)
-       {
-           close(sendSeg->ws_file);
-           sendSeg->ws_file = -1;
-
-           goto retry;
-       }
-   }
+   if (errno == ENOENT)
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("requested WAL segment %s has already been removed",
+                       XLogFileNameP(*tli_p, nextSegNo))));
+   else
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not open file \"%s\": %m",
+                       path)));
+   return -1;                  /* keep compiler quiet */
 }
 
 /*
@@ -2562,6 +2464,8 @@ XLogSendPhysical(void)
    XLogRecPtr  startptr;
    XLogRecPtr  endptr;
    Size        nbytes;
+   XLogSegNo   segno;
+   WALReadError errinfo;
 
    /* If requested switch the WAL sender to the stopping state. */
    if (got_STOPPING)
@@ -2777,7 +2681,49 @@ XLogSendPhysical(void)
     * calls.
     */
    enlargeStringInfo(&output_message, nbytes);
-   XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+   if (!WALRead(&output_message.data[output_message.len],
+                startptr,
+                nbytes,
+                sendSeg->ws_tli,   /* Pass the current TLI because only
+                                    * WalSndSegmentOpen controls whether new
+                                    * TLI is needed. */
+                sendSeg,
+                sendCxt,
+                WalSndSegmentOpen,
+                &errinfo))
+       WALReadRaiseError(&errinfo);
+
+   /* See logical_read_xlog_page(). */
+   XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
+   CheckXLogRemoved(segno, sendSeg->ws_tli);
+
+   /*
+    * During recovery, the currently-open WAL file might be replaced with the
+    * file of the same name retrieved from archive. So we always need to
+    * check what we read was valid after reading into the buffer. If it's
+    * invalid, we try to open and read the file again.
+    */
+   if (am_cascading_walsender)
+   {
+       WalSnd     *walsnd = MyWalSnd;
+       bool        reload;
+
+       SpinLockAcquire(&walsnd->mutex);
+       reload = walsnd->needreload;
+       walsnd->needreload = false;
+       SpinLockRelease(&walsnd->mutex);
+
+       if (reload && sendSeg->ws_file >= 0)
+       {
+           close(sendSeg->ws_file);
+           sendSeg->ws_file = -1;
+
+           goto retry;
+       }
+   }
+
    output_message.len += nbytes;
    output_message.data[output_message.len] = '\0';
 
index d6695f7196f5d633a4c8b12ea8e578c58ae819d8..30a5851d87cbaf3eaaf02fdbed7e4485b6c3173d 100644 (file)
@@ -280,137 +280,57 @@ identify_target_directory(char *directory, char *fname)
    return NULL;                /* not reached */
 }
 
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
-static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
-                XLogRecPtr startptr, char *buf, Size count)
+/* pg_waldump's openSegment callback for WALRead */
+static int
+WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                  TimeLineID *tli_p)
 {
-   char       *p;
-   XLogRecPtr  recptr;
-   Size        nbytes;
-
-   static int  sendFile = -1;
-   static XLogSegNo sendSegNo = 0;
-   static uint32 sendOff = 0;
+   TimeLineID  tli = *tli_p;
+   char        fname[MAXPGPATH];
+   int         fd;
+   int         tries;
 
-   p = buf;
-   recptr = startptr;
-   nbytes = count;
+   XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
 
-   while (nbytes > 0)
+   /*
+    * In follow mode there is a short period of time after the server has
+    * written the end of the previous file before the new file is available.
+    * So we loop for 5 seconds looking for the file to appear before giving
+    * up.
+    */
+   for (tries = 0; tries < 10; tries++)
    {
-       uint32      startoff;
-       int         segbytes;
-       int         readbytes;
-
-       startoff = XLogSegmentOffset(recptr, WalSegSz);
-
-       if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
-       {
-           char        fname[MAXFNAMELEN];
-           int         tries;
-
-           /* Switch to another logfile segment */
-           if (sendFile >= 0)
-               close(sendFile);
-
-           XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
-           XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-           /*
-            * In follow mode there is a short period of time after the server
-            * has written the end of the previous file before the new file is
-            * available. So we loop for 5 seconds looking for the file to
-            * appear before giving up.
-            */
-           for (tries = 0; tries < 10; tries++)
-           {
-               sendFile = open_file_in_directory(directory, fname);
-               if (sendFile >= 0)
-                   break;
-               if (errno == ENOENT)
-               {
-                   int         save_errno = errno;
-
-                   /* File not there yet, try again */
-                   pg_usleep(500 * 1000);
-
-                   errno = save_errno;
-                   continue;
-               }
-               /* Any other error, fall through and fail */
-               break;
-           }
-
-           if (sendFile < 0)
-               fatal_error("could not find file \"%s\": %s",
-                           fname, strerror(errno));
-           sendOff = 0;
-       }
-
-       /* Need to seek in the file? */
-       if (sendOff != startoff)
-       {
-           if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-           {
-               int         err = errno;
-               char        fname[MAXPGPATH];
-
-               XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-               fatal_error("could not seek in log file %s to offset %u: %s",
-                           fname, startoff, strerror(err));
-           }
-           sendOff = startoff;
-       }
-
-       /* How many bytes are within this segment? */
-       if (nbytes > (WalSegSz - startoff))
-           segbytes = WalSegSz - startoff;
-       else
-           segbytes = nbytes;
-
-       readbytes = read(sendFile, p, segbytes);
-       if (readbytes <= 0)
+       fd = open_file_in_directory(segcxt->ws_dir, fname);
+       if (fd >= 0)
+           return fd;
+       if (errno == ENOENT)
        {
-           int         err = errno;
-           char        fname[MAXPGPATH];
            int         save_errno = errno;
 
-           XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-           errno = save_errno;
+           /* File not there yet, try again */
+           pg_usleep(500 * 1000);
 
-           if (readbytes < 0)
-               fatal_error("could not read from log file %s, offset %u, length %d: %s",
-                           fname, sendOff, segbytes, strerror(err));
-           else if (readbytes == 0)
-               fatal_error("could not read from log file %s, offset %u: read %d of %zu",
-                           fname, sendOff, readbytes, (Size) segbytes);
+           errno = save_errno;
+           continue;
        }
-
-       /* Update state for read */
-       recptr += readbytes;
-
-       sendOff += readbytes;
-       nbytes -= readbytes;
-       p += readbytes;
+       /* Any other error, fall through and fail */
+       break;
    }
+
+   fatal_error("could not find file \"%s\": %s", fname, strerror(errno));
+   return -1;                  /* keep compiler quiet */
 }
 
 /*
  * XLogReader read_page callback
  */
 static int
-XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-                XLogRecPtr targetPtr, char *readBuff)
+WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
+               XLogRecPtr targetPtr, char *readBuff)
 {
    XLogDumpPrivate *private = state->private_data;
    int         count = XLOG_BLCKSZ;
+   WALReadError errinfo;
 
    if (private->endptr != InvalidXLogRecPtr)
    {
@@ -425,8 +345,26 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
        }
    }
 
-   XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
-                    readBuff, count);
+   if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
+                &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+   {
+       WALOpenSegment *seg = &errinfo.wre_seg;
+       char        fname[MAXPGPATH];
+
+       XLogFileName(fname, seg->ws_tli, seg->ws_segno,
+                    state->segcxt.ws_segsize);
+
+       if (errinfo.wre_errno != 0)
+       {
+           errno = errinfo.wre_errno;
+           fatal_error("could not read from file %s, offset %u: %m",
+                       fname, errinfo.wre_off);
+       }
+       else
+           fatal_error("could not read from file %s, offset %u: read %d of %zu",
+                       fname, errinfo.wre_off, errinfo.wre_read,
+                       (Size) errinfo.wre_req);
+   }
 
    return count;
 }
@@ -1089,7 +1027,7 @@ main(int argc, char **argv)
    /* done with argument parsing, do the actual work */
 
    /* we have everything we need, start reading */
-   xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
+   xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
                                          &private);
    if (!xlogreader_state)
        fatal_error("out of memory");
index 1bbee386e8da95ee05c3684ab064fa022b7c4631..0193611b7fd2394043cb68fcc2fde0d7ac00ae14 100644 (file)
@@ -36,7 +36,6 @@ typedef struct WALOpenSegment
 {
    int         ws_file;        /* segment file descriptor */
    XLogSegNo   ws_segno;       /* segment number */
-   uint32      ws_off;         /* offset in the segment */
    TimeLineID  ws_tli;         /* timeline ID of the currently open file */
 } WALOpenSegment;
 
@@ -168,6 +167,7 @@ struct XLogReaderState
    /* last read XLOG position for data currently in readBuf */
    WALSegmentContext segcxt;
    WALOpenSegment seg;
+   uint32      segoff;
 
    /*
     * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -217,6 +217,24 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
+/*
+ * Callback to open the specified WAL segment for reading.  Returns a valid
+ * file descriptor when the file was opened successfully.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can use
+ * it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in backend
+ * code, whereas open(2) should be used in frontend.
+ */
+typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                              TimeLineID *tli_p);
+
 /* Initialize supporting structures */
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
                               int segsize, const char *waldir);
@@ -232,6 +250,25 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif                         /* FRONTEND */
+
+/*
+ * Error information from WALRead that both backend and frontend caller can
+ * process.  Currently only errors from pg_pread can be reported.
+ */
+typedef struct WALReadError
+{
+   int         wre_errno;      /* errno set by the last pg_pread() */
+   int         wre_off;        /* Offset we tried to read from. */
+   int         wre_req;        /* Bytes requested to be read. */
+   int         wre_read;       /* Bytes read by the last read(). */
+   WALOpenSegment wre_seg;     /* Segment we tried to read from. */
+} WALReadError;
+
+extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+                   TimeLineID tli, WALOpenSegment *seg,
+                   WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+                   WALReadError *errinfo);
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
index 2df98e45b204a59c593f4638d0ca262bcaab40e0..0572b24192782b8825d5fb9b08f1a11592c2fde9 100644 (file)
@@ -54,4 +54,6 @@ extern int    read_local_xlog_page(XLogReaderState *state,
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
                                      XLogRecPtr wantPage, uint32 wantLength);
 
+extern void WALReadRaiseError(WALReadError *errinfo);
+
 #endif