Refactor WAL file-reading code into WALRead()
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 25 Nov 2019 18:04:54 +0000 (15:04 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 25 Nov 2019 18:04:54 +0000 (15:04 -0300)
XLogReader, walsender and pg_waldump all had their own routines to read
data from WAL files to memory, with slightly different approaches
according to the particular conditions of each environment.  There's a
lot of commonality, so we can refactor that into a single routine
WALRead in XLogReader, and move the differences to a separate (simpler)
callback that just opens the next WAL-segment.  This results in a
clearer (ahem) code flow.

The error reporting needs are covered by filling in a new error-info
struct, WALReadError, and it's the caller's responsibility to act on it.
The backend has WALReadRaiseError() to do so.

We no longer ever need to seek in this interface; switch to using
pg_pread().

Author: Antonin Houska, with contributions from Álvaro Herrera
Reviewed-by: Michaël Paquier, Kyotaro Horiguchi
Discussion: https://postgr.es/m/14984.1554998742@spoje.net

src/backend/access/transam/xlogreader.c
src/backend/access/transam/xlogutils.c
src/backend/replication/walsender.c
src/bin/pg_waldump/pg_waldump.c
src/include/access/xlogreader.h
src/include/access/xlogutils.h

index 7f24f0cb95f2344f3e20ea605e6b71a318ce0150..67418b05f1535a9f458f75afc58089d7b4a3b2e1 100644 (file)
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
@@ -27,6 +29,7 @@
 
 #ifndef FRONTEND
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "utils/memutils.h"
 #endif
 
@@ -208,7 +211,6 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 {
        seg->ws_file = -1;
        seg->ws_segno = 0;
-       seg->ws_off = 0;
        seg->ws_tli = 0;
 
        segcxt->ws_segsize = segsize;
@@ -295,8 +297,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
         * byte to cover the whole record header, or at least the part of it that
         * fits on the same page.
         */
-       readOff = ReadPageInternal(state,
-                                                          targetPagePtr,
+       readOff = ReadPageInternal(state, targetPagePtr,
                                                           Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
        if (readOff < 0)
                goto err;
@@ -556,7 +557,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
        /* check whether we have all the requested data already */
        if (targetSegNo == state->seg.ws_segno &&
-               targetPageOff == state->seg.ws_off && reqLen <= state->readLen)
+               targetPageOff == state->segoff && reqLen <= state->readLen)
                return state->readLen;
 
        /*
@@ -627,7 +628,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 
        /* update read state information */
        state->seg.ws_segno = targetSegNo;
-       state->seg.ws_off = targetPageOff;
+       state->segoff = targetPageOff;
        state->readLen = readLen;
 
        return readLen;
@@ -644,7 +645,7 @@ static void
 XLogReaderInvalReadState(XLogReaderState *state)
 {
        state->seg.ws_segno = 0;
-       state->seg.ws_off = 0;
+       state->segoff = 0;
        state->readLen = 0;
 }
 
@@ -1015,6 +1016,99 @@ out:
 
 #endif                                                 /* FRONTEND */
 
+/*
+ * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
+ * fetched from timeline 'tli'.
+ *
+ * 'seg/segcxt' identify the last segment used.  'openSegment' is a callback
+ * to open the next segment, if necessary.
+ *
+ * Returns true if succeeded, false if an error occurs, in which case
+ * 'errinfo' receives error details.
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
+ */
+bool
+WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+               WALOpenSegment *seg, WALSegmentContext *segcxt,
+               WALSegmentOpen openSegment, WALReadError *errinfo)
+{
+       char       *p;
+       XLogRecPtr      recptr;
+       Size            nbytes;
+
+       p = buf;
+       recptr = startptr;
+       nbytes = count;
+
+       while (nbytes > 0)
+       {
+               uint32          startoff;
+               int                     segbytes;
+               int                     readbytes;
+
+               startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+
+               /*
+                * If the data we want is not in a segment we have open, close what we
+                * have (if anything) and open the next one, using the caller's
+                * provided openSegment callback.
+                */
+               if (seg->ws_file < 0 ||
+                       !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
+                       tli != seg->ws_tli)
+               {
+                       XLogSegNo       nextSegNo;
+
+                       if (seg->ws_file >= 0)
+                               close(seg->ws_file);
+
+                       XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
+                       seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+
+                       /* Update the current segment info. */
+                       seg->ws_tli = tli;
+                       seg->ws_segno = nextSegNo;
+               }
+
+               /* How many bytes are within this segment? */
+               if (nbytes > (segcxt->ws_segsize - startoff))
+                       segbytes = segcxt->ws_segsize - startoff;
+               else
+                       segbytes = nbytes;
+
+#ifndef FRONTEND
+               pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+#endif
+
+               /* Reset errno first; eases reporting non-errno-affecting errors */
+               errno = 0;
+               readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+
+#ifndef FRONTEND
+               pgstat_report_wait_end();
+#endif
+
+               if (readbytes <= 0)
+               {
+                       errinfo->wre_errno = errno;
+                       errinfo->wre_req = segbytes;
+                       errinfo->wre_read = readbytes;
+                       errinfo->wre_off = startoff;
+                       errinfo->wre_seg = *seg;
+                       return false;
+               }
+
+               /* Update state for read */
+               recptr += readbytes;
+               nbytes -= readbytes;
+               p += readbytes;
+       }
+
+       return true;
+}
+
 /* ----------------------------------------
  * Functions for decoding the data and block references in a record.
  * ----------------------------------------
index 5f1e5ba75d5804ec9b5fcc40cba6e9e4ea3599f4..446760ed6e7b771c4ae32b4abe1412f8fc464b8e 100644 (file)
@@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
        forget_invalid_pages(rnode, forkNum, nblocks);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- * in timeline 'tli'.
- *
- * Will open, and keep open, one WAL segment stored in the static file
- * descriptor 'sendFile'. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- *
- * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
- * in walsender.c but for small differences (such as lack of elog() in
- * frontend).  Probably these should be merged at some point.
- */
-static void
-XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
-                Size count)
-{
-       char       *p;
-       XLogRecPtr      recptr;
-       Size            nbytes;
-
-       /* state maintained across calls */
-       static int      sendFile = -1;
-       static XLogSegNo sendSegNo = 0;
-       static TimeLineID sendTLI = 0;
-       static uint32 sendOff = 0;
-
-       Assert(segsize == wal_segment_size);
-
-       p = buf;
-       recptr = startptr;
-       nbytes = count;
-
-       while (nbytes > 0)
-       {
-               uint32          startoff;
-               int                     segbytes;
-               int                     readbytes;
-
-               startoff = XLogSegmentOffset(recptr, segsize);
-
-               /* Do we need to switch to a different xlog segment? */
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) ||
-                       sendTLI != tli)
-               {
-                       char            path[MAXPGPATH];
-
-                       if (sendFile >= 0)
-                               close(sendFile);
-
-                       XLByteToSeg(recptr, sendSegNo, segsize);
-
-                       XLogFilePath(path, tli, sendSegNo, segsize);
-
-                       sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-
-                       if (sendFile < 0)
-                       {
-                               if (errno == ENOENT)
-                                       ereport(ERROR,
-                                                       (errcode_for_file_access(),
-                                                        errmsg("requested WAL segment %s has already been removed",
-                                                                       path)));
-                               else
-                                       ereport(ERROR,
-                                                       (errcode_for_file_access(),
-                                                        errmsg("could not open file \"%s\": %m",
-                                                                       path)));
-                       }
-                       sendOff = 0;
-                       sendTLI = tli;
-               }
-
-               /* Need to seek in the file? */
-               if (sendOff != startoff)
-               {
-                       if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-                       {
-                               char            path[MAXPGPATH];
-                               int                     save_errno = errno;
-
-                               XLogFilePath(path, tli, sendSegNo, segsize);
-                               errno = save_errno;
-                               ereport(ERROR,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not seek in log segment %s to offset %u: %m",
-                                                               path, startoff)));
-                       }
-                       sendOff = startoff;
-               }
-
-               /* How many bytes are within this segment? */
-               if (nbytes > (segsize - startoff))
-                       segbytes = segsize - startoff;
-               else
-                       segbytes = nbytes;
-
-               pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-               readbytes = read(sendFile, p, segbytes);
-               pgstat_report_wait_end();
-               if (readbytes <= 0)
-               {
-                       char            path[MAXPGPATH];
-                       int                     save_errno = errno;
-
-                       XLogFilePath(path, tli, sendSegNo, segsize);
-                       errno = save_errno;
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-                                                       path, sendOff, (unsigned long) segbytes)));
-               }
-
-               /* Update state for read */
-               recptr += readbytes;
-
-               sendOff += readbytes;
-               nbytes -= readbytes;
-               p += readbytes;
-       }
-}
-
 /*
  * Determine which timeline to read an xlog page from and set the
  * XLogReaderState's currTLI to that timeline ID.
@@ -802,8 +680,8 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
 void
 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
 {
-       const XLogRecPtr lastReadPage = state->seg.ws_segno *
-       state->segcxt.ws_segsize + state->seg.ws_off;
+       const XLogRecPtr lastReadPage = (state->seg.ws_segno *
+                                                                        state->segcxt.ws_segsize + state->segoff);
 
        Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
        Assert(wantLength <= XLOG_BLCKSZ);
@@ -896,6 +774,34 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
        }
 }
 
+/* openSegment callback for WALRead */
+static int
+wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                                TimeLineID *tli_p)
+{
+       TimeLineID      tli = *tli_p;
+       char            path[MAXPGPATH];
+       int                     fd;
+
+       XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
+       fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+       if (fd >= 0)
+               return fd;
+
+       if (errno == ENOENT)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("requested WAL segment %s has already been removed",
+                                               path)));
+       else
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not open file \"%s\": %m",
+                                               path)));
+
+       return -1;                                      /* keep compiler quiet */
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -913,7 +819,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 {
        XLogRecPtr      read_upto,
                                loc;
+       TimeLineID      tli;
        int                     count;
+       WALReadError errinfo;
 
        loc = targetPagePtr + reqLen;
 
@@ -932,7 +840,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
                        read_upto = GetFlushRecPtr();
                else
                        read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-               state->seg.ws_tli = ThisTimeLineID;
+               tli = ThisTimeLineID;
 
                /*
                 * Check which timeline to get the record from.
@@ -982,14 +890,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
                        read_upto = state->currTLIValidUntil;
 
                        /*
-                        * Setting ws_tli to our wanted record's TLI is slightly wrong;
-                        * the page might begin on an older timeline if it contains a
-                        * timeline switch, since its xlog segment will have been copied
-                        * from the prior timeline. This is pretty harmless though, as
-                        * nothing cares so long as the timeline doesn't go backwards.  We
-                        * should read the page header instead; FIXME someday.
+                        * Setting tli to our wanted record's TLI is slightly wrong; the
+                        * page might begin on an older timeline if it contains a timeline
+                        * switch, since its xlog segment will have been copied from the
+                        * prior timeline. This is pretty harmless though, as nothing
+                        * cares so long as the timeline doesn't go backwards.  We should
+                        * read the page header instead; FIXME someday.
                         */
-                       state->seg.ws_tli = state->currTLI;
+                       tli = state->currTLI;
 
                        /* No need to wait on a historical timeline */
                        break;
@@ -1020,9 +928,38 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
         * as 'count', read the whole page anyway. It's guaranteed to be
         * zero-padded up to the page boundary if it's incomplete.
         */
-       XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr,
-                        XLOG_BLCKSZ);
+       if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
+                                &state->segcxt, wal_segment_open, &errinfo))
+               WALReadRaiseError(&errinfo);
 
        /* number of valid bytes in the buffer */
        return count;
 }
+
+/*
+ * Backend-specific convenience code to handle read errors encountered by
+ * WALRead().
+ */
+void
+WALReadRaiseError(WALReadError *errinfo)
+{
+       WALOpenSegment *seg = &errinfo->wre_seg;
+       char       *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno);
+
+       if (errinfo->wre_read < 0)
+       {
+               errno = errinfo->wre_errno;
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not read from log segment %s, offset %u: %m",
+                                               fname, errinfo->wre_off)));
+       }
+       else if (errinfo->wre_read == 0)
+       {
+               ereport(ERROR,
+                               (errcode(ERRCODE_DATA_CORRUPTED),
+                                errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+                                               fname, errinfo->wre_off, errinfo->wre_read,
+                                               (Size) errinfo->wre_req)));
+       }
+}
index cbc928501af45d40d198721dcea9b9e31d604a1b..ac9209747a4a1d931d1091a836a03211f4794ad0 100644 (file)
@@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
+static int     WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                                                         TimeLineID *tli_p);
 static void UpdateSpillStats(LogicalDecodingContext *ctx);
-static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 {
        XLogRecPtr      flushptr;
        int                     count;
+       WALReadError errinfo;
+       XLogSegNo       segno;
 
        XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
        sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
@@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
                count = flushptr - targetPagePtr;       /* part of the page available */
 
        /* now actually read the data, we know it's there */
-       XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
+       if (!WALRead(cur_page,
+                                targetPagePtr,
+                                XLOG_BLCKSZ,
+                                sendSeg->ws_tli,       /* Pass the current TLI because only
+                                                                        * WalSndSegmentOpen controls whether new
+                                                                        * TLI is needed. */
+                                sendSeg,
+                                sendCxt,
+                                WalSndSegmentOpen,
+                                &errinfo))
+               WALReadRaiseError(&errinfo);
+
+       /*
+        * After reading into the buffer, check that what we read was valid. We do
+        * this after reading, because even though the segment was present when we
+        * opened it, it might get recycled or removed while we read it. The
+        * read() succeeds in that case, but the data we tried to read might
+        * already have been overwritten with new WAL records.
+        */
+       XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
+       CheckXLogRemoved(segno, sendSeg->ws_tli);
 
        return count;
 }
@@ -2360,189 +2383,68 @@ WalSndKill(int code, Datum arg)
        SpinLockRelease(&walsnd->mutex);
 }
 
-/*
- * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
- *
- * XXX probably this should be improved to suck data directly from the
- * WAL buffers when possible.
- *
- * Will open, and keep open, one WAL segment stored in the global file
- * descriptor sendFile. This means if XLogRead is used once, there will
- * always be one descriptor left open until the process ends, but never
- * more than one.
- */
-static void
-XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
+/* walsender's openSegment callback for WALRead */
+static int
+WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                                 TimeLineID *tli_p)
 {
-       char       *p;
-       XLogRecPtr      recptr;
-       Size            nbytes;
-       XLogSegNo       segno;
-
-retry:
-       p = buf;
-       recptr = startptr;
-       nbytes = count;
+       char            path[MAXPGPATH];
+       int                     fd;
 
-       while (nbytes > 0)
+       /*-------
+        * When reading from a historic timeline, and there is a timeline switch
+        * within this segment, read from the WAL segment belonging to the new
+        * timeline.
+        *
+        * For example, imagine that this server is currently on timeline 5, and
+        * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
+        * 0/13002088. In pg_wal, we have these files:
+        *
+        * ...
+        * 000000040000000000000012
+        * 000000040000000000000013
+        * 000000050000000000000013
+        * 000000050000000000000014
+        * ...
+        *
+        * In this situation, when requested to send the WAL from segment 0x13, on
+        * timeline 4, we read the WAL from file 000000050000000000000013. Archive
+        * recovery prefers files from newer timelines, so if the segment was
+        * restored from the archive on this server, the file belonging to the old
+        * timeline, 000000040000000000000013, might not exist. Their contents are
+        * equal up to the switchpoint, because at a timeline switch, the used
+        * portion of the old segment is copied to the new file.  -------
+        */
+       *tli_p = sendTimeLine;
+       if (sendTimeLineIsHistoric)
        {
-               uint32          startoff;
-               int                     segbytes;
-               int                     readbytes;
-
-               startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
-
-               if (sendSeg->ws_file < 0 ||
-                       !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
-               {
-                       char            path[MAXPGPATH];
-
-                       /* Switch to another logfile segment */
-                       if (sendSeg->ws_file >= 0)
-                               close(sendSeg->ws_file);
-
-                       XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
-
-                       /*-------
-                        * When reading from a historic timeline, and there is a timeline
-                        * switch within this segment, read from the WAL segment belonging
-                        * to the new timeline.
-                        *
-                        * For example, imagine that this server is currently on timeline
-                        * 5, and we're streaming timeline 4. The switch from timeline 4
-                        * to 5 happened at 0/13002088. In pg_wal, we have these files:
-                        *
-                        * ...
-                        * 000000040000000000000012
-                        * 000000040000000000000013
-                        * 000000050000000000000013
-                        * 000000050000000000000014
-                        * ...
-                        *
-                        * In this situation, when requested to send the WAL from
-                        * segment 0x13, on timeline 4, we read the WAL from file
-                        * 000000050000000000000013. Archive recovery prefers files from
-                        * newer timelines, so if the segment was restored from the
-                        * archive on this server, the file belonging to the old timeline,
-                        * 000000040000000000000013, might not exist. Their contents are
-                        * equal up to the switchpoint, because at a timeline switch, the
-                        * used portion of the old segment is copied to the new file.
-                        *-------
-                        */
-                       sendSeg->ws_tli = sendTimeLine;
-                       if (sendTimeLineIsHistoric)
-                       {
-                               XLogSegNo       endSegNo;
-
-                               XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
-                               if (sendSeg->ws_segno == endSegNo)
-                                       sendSeg->ws_tli = sendTimeLineNextTLI;
-                       }
-
-                       XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
-
-                       sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-                       if (sendSeg->ws_file < 0)
-                       {
-                               /*
-                                * If the file is not found, assume it's because the standby
-                                * asked for a too old WAL segment that has already been
-                                * removed or recycled.
-                                */
-                               if (errno == ENOENT)
-                                       ereport(ERROR,
-                                                       (errcode_for_file_access(),
-                                                        errmsg("requested WAL segment %s has already been removed",
-                                                                       XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
-                               else
-                                       ereport(ERROR,
-                                                       (errcode_for_file_access(),
-                                                        errmsg("could not open file \"%s\": %m",
-                                                                       path)));
-                       }
-                       sendSeg->ws_off = 0;
-               }
-
-               /* Need to seek in the file? */
-               if (sendSeg->ws_off != startoff)
-               {
-                       if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
-                               ereport(ERROR,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not seek in log segment %s to offset %u: %m",
-                                                               XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-                                                               startoff)));
-                       sendSeg->ws_off = startoff;
-               }
-
-               /* How many bytes are within this segment? */
-               if (nbytes > (segcxt->ws_segsize - startoff))
-                       segbytes = segcxt->ws_segsize - startoff;
-               else
-                       segbytes = nbytes;
-
-               pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-               readbytes = read(sendSeg->ws_file, p, segbytes);
-               pgstat_report_wait_end();
-               if (readbytes < 0)
-               {
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not read from log segment %s, offset %u, length %zu: %m",
-                                                       XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-                                                       sendSeg->ws_off, (Size) segbytes)));
-               }
-               else if (readbytes == 0)
-               {
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_DATA_CORRUPTED),
-                                        errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-                                                       XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
-                                                       sendSeg->ws_off, readbytes, (Size) segbytes)));
-               }
+               XLogSegNo       endSegNo;
 
-               /* Update state for read */
-               recptr += readbytes;
-
-               sendSeg->ws_off += readbytes;
-               nbytes -= readbytes;
-               p += readbytes;
+               XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
+               if (sendSeg->ws_segno == endSegNo)
+                       *tli_p = sendTimeLineNextTLI;
        }
 
-       /*
-        * After reading into the buffer, check that what we read was valid. We do
-        * this after reading, because even though the segment was present when we
-        * opened it, it might get recycled or removed while we read it. The
-        * read() succeeds in that case, but the data we tried to read might
-        * already have been overwritten with new WAL records.
-        */
-       XLByteToSeg(startptr, segno, segcxt->ws_segsize);
-       CheckXLogRemoved(segno, ThisTimeLineID);
+       XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
+       fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+       if (fd >= 0)
+               return fd;
 
        /*
-        * During recovery, the currently-open WAL file might be replaced with the
-        * file of the same name retrieved from archive. So we always need to
-        * check what we read was valid after reading into the buffer. If it's
-        * invalid, we try to open and read the file again.
+        * If the file is not found, assume it's because the standby asked for a
+        * too old WAL segment that has already been removed or recycled.
         */
-       if (am_cascading_walsender)
-       {
-               WalSnd     *walsnd = MyWalSnd;
-               bool            reload;
-
-               SpinLockAcquire(&walsnd->mutex);
-               reload = walsnd->needreload;
-               walsnd->needreload = false;
-               SpinLockRelease(&walsnd->mutex);
-
-               if (reload && sendSeg->ws_file >= 0)
-               {
-                       close(sendSeg->ws_file);
-                       sendSeg->ws_file = -1;
-
-                       goto retry;
-               }
-       }
+       if (errno == ENOENT)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("requested WAL segment %s has already been removed",
+                                               XLogFileNameP(*tli_p, nextSegNo))));
+       else
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not open file \"%s\": %m",
+                                               path)));
+       return -1;                                      /* keep compiler quiet */
 }
 
 /*
@@ -2562,6 +2464,8 @@ XLogSendPhysical(void)
        XLogRecPtr      startptr;
        XLogRecPtr      endptr;
        Size            nbytes;
+       XLogSegNo       segno;
+       WALReadError errinfo;
 
        /* If requested switch the WAL sender to the stopping state. */
        if (got_STOPPING)
@@ -2777,7 +2681,49 @@ XLogSendPhysical(void)
         * calls.
         */
        enlargeStringInfo(&output_message, nbytes);
-       XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
+
+retry:
+       if (!WALRead(&output_message.data[output_message.len],
+                                startptr,
+                                nbytes,
+                                sendSeg->ws_tli,       /* Pass the current TLI because only
+                                                                        * WalSndSegmentOpen controls whether new
+                                                                        * TLI is needed. */
+                                sendSeg,
+                                sendCxt,
+                                WalSndSegmentOpen,
+                                &errinfo))
+               WALReadRaiseError(&errinfo);
+
+       /* See logical_read_xlog_page(). */
+       XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
+       CheckXLogRemoved(segno, sendSeg->ws_tli);
+
+       /*
+        * During recovery, the currently-open WAL file might be replaced with the
+        * file of the same name retrieved from archive. So we always need to
+        * check what we read was valid after reading into the buffer. If it's
+        * invalid, we try to open and read the file again.
+        */
+       if (am_cascading_walsender)
+       {
+               WalSnd     *walsnd = MyWalSnd;
+               bool            reload;
+
+               SpinLockAcquire(&walsnd->mutex);
+               reload = walsnd->needreload;
+               walsnd->needreload = false;
+               SpinLockRelease(&walsnd->mutex);
+
+               if (reload && sendSeg->ws_file >= 0)
+               {
+                       close(sendSeg->ws_file);
+                       sendSeg->ws_file = -1;
+
+                       goto retry;
+               }
+       }
+
        output_message.len += nbytes;
        output_message.data[output_message.len] = '\0';
 
index d6695f7196f5d633a4c8b12ea8e578c58ae819d8..30a5851d87cbaf3eaaf02fdbed7e4485b6c3173d 100644 (file)
@@ -280,137 +280,57 @@ identify_target_directory(char *directory, char *fname)
        return NULL;                            /* not reached */
 }
 
-/*
- * Read count bytes from a segment file in the specified directory, for the
- * given timeline, containing the specified record pointer; store the data in
- * the passed buffer.
- */
-static void
-XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
-                                XLogRecPtr startptr, char *buf, Size count)
+/* pg_waldump's openSegment callback for WALRead */
+static int
+WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                                  TimeLineID *tli_p)
 {
-       char       *p;
-       XLogRecPtr      recptr;
-       Size            nbytes;
-
-       static int      sendFile = -1;
-       static XLogSegNo sendSegNo = 0;
-       static uint32 sendOff = 0;
+       TimeLineID      tli = *tli_p;
+       char            fname[MAXPGPATH];
+       int                     fd;
+       int                     tries;
 
-       p = buf;
-       recptr = startptr;
-       nbytes = count;
+       XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
 
-       while (nbytes > 0)
+       /*
+        * In follow mode there is a short period of time after the server has
+        * written the end of the previous file before the new file is available.
+        * So we loop for 5 seconds looking for the file to appear before giving
+        * up.
+        */
+       for (tries = 0; tries < 10; tries++)
        {
-               uint32          startoff;
-               int                     segbytes;
-               int                     readbytes;
-
-               startoff = XLogSegmentOffset(recptr, WalSegSz);
-
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz))
-               {
-                       char            fname[MAXFNAMELEN];
-                       int                     tries;
-
-                       /* Switch to another logfile segment */
-                       if (sendFile >= 0)
-                               close(sendFile);
-
-                       XLByteToSeg(recptr, sendSegNo, WalSegSz);
-
-                       XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-                       /*
-                        * In follow mode there is a short period of time after the server
-                        * has written the end of the previous file before the new file is
-                        * available. So we loop for 5 seconds looking for the file to
-                        * appear before giving up.
-                        */
-                       for (tries = 0; tries < 10; tries++)
-                       {
-                               sendFile = open_file_in_directory(directory, fname);
-                               if (sendFile >= 0)
-                                       break;
-                               if (errno == ENOENT)
-                               {
-                                       int                     save_errno = errno;
-
-                                       /* File not there yet, try again */
-                                       pg_usleep(500 * 1000);
-
-                                       errno = save_errno;
-                                       continue;
-                               }
-                               /* Any other error, fall through and fail */
-                               break;
-                       }
-
-                       if (sendFile < 0)
-                               fatal_error("could not find file \"%s\": %s",
-                                                       fname, strerror(errno));
-                       sendOff = 0;
-               }
-
-               /* Need to seek in the file? */
-               if (sendOff != startoff)
-               {
-                       if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-                       {
-                               int                     err = errno;
-                               char            fname[MAXPGPATH];
-
-                               XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-
-                               fatal_error("could not seek in log file %s to offset %u: %s",
-                                                       fname, startoff, strerror(err));
-                       }
-                       sendOff = startoff;
-               }
-
-               /* How many bytes are within this segment? */
-               if (nbytes > (WalSegSz - startoff))
-                       segbytes = WalSegSz - startoff;
-               else
-                       segbytes = nbytes;
-
-               readbytes = read(sendFile, p, segbytes);
-               if (readbytes <= 0)
+               fd = open_file_in_directory(segcxt->ws_dir, fname);
+               if (fd >= 0)
+                       return fd;
+               if (errno == ENOENT)
                {
-                       int                     err = errno;
-                       char            fname[MAXPGPATH];
                        int                     save_errno = errno;
 
-                       XLogFileName(fname, timeline_id, sendSegNo, WalSegSz);
-                       errno = save_errno;
+                       /* File not there yet, try again */
+                       pg_usleep(500 * 1000);
 
-                       if (readbytes < 0)
-                               fatal_error("could not read from log file %s, offset %u, length %d: %s",
-                                                       fname, sendOff, segbytes, strerror(err));
-                       else if (readbytes == 0)
-                               fatal_error("could not read from log file %s, offset %u: read %d of %zu",
-                                                       fname, sendOff, readbytes, (Size) segbytes);
+                       errno = save_errno;
+                       continue;
                }
-
-               /* Update state for read */
-               recptr += readbytes;
-
-               sendOff += readbytes;
-               nbytes -= readbytes;
-               p += readbytes;
+               /* Any other error, fall through and fail */
+               break;
        }
+
+       fatal_error("could not find file \"%s\": %s", fname, strerror(errno));
+       return -1;                                      /* keep compiler quiet */
 }
 
 /*
  * XLogReader read_page callback
  */
 static int
-XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
-                                XLogRecPtr targetPtr, char *readBuff)
+WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
+                               XLogRecPtr targetPtr, char *readBuff)
 {
        XLogDumpPrivate *private = state->private_data;
        int                     count = XLOG_BLCKSZ;
+       WALReadError errinfo;
 
        if (private->endptr != InvalidXLogRecPtr)
        {
@@ -425,8 +345,26 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                }
        }
 
-       XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr,
-                                        readBuff, count);
+       if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
+                                &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+       {
+               WALOpenSegment *seg = &errinfo.wre_seg;
+               char            fname[MAXPGPATH];
+
+               XLogFileName(fname, seg->ws_tli, seg->ws_segno,
+                                        state->segcxt.ws_segsize);
+
+               if (errinfo.wre_errno != 0)
+               {
+                       errno = errinfo.wre_errno;
+                       fatal_error("could not read from file %s, offset %u: %m",
+                                               fname, errinfo.wre_off);
+               }
+               else
+                       fatal_error("could not read from file %s, offset %u: read %d of %zu",
+                                               fname, errinfo.wre_off, errinfo.wre_read,
+                                               (Size) errinfo.wre_req);
+       }
 
        return count;
 }
@@ -1089,7 +1027,7 @@ main(int argc, char **argv)
        /* done with argument parsing, do the actual work */
 
        /* we have everything we need, start reading */
-       xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage,
+       xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
                                                                                  &private);
        if (!xlogreader_state)
                fatal_error("out of memory");
index 1bbee386e8da95ee05c3684ab064fa022b7c4631..0193611b7fd2394043cb68fcc2fde0d7ac00ae14 100644 (file)
@@ -36,7 +36,6 @@ typedef struct WALOpenSegment
 {
        int                     ws_file;                /* segment file descriptor */
        XLogSegNo       ws_segno;               /* segment number */
-       uint32          ws_off;                 /* offset in the segment */
        TimeLineID      ws_tli;                 /* timeline ID of the currently open file */
 } WALOpenSegment;
 
@@ -168,6 +167,7 @@ struct XLogReaderState
        /* last read XLOG position for data currently in readBuf */
        WALSegmentContext segcxt;
        WALOpenSegment seg;
+       uint32          segoff;
 
        /*
         * beginning of prior page read, and its TLI.  Doesn't necessarily
@@ -217,6 +217,24 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
+/*
+ * Callback to open the specified WAL segment for reading.  Returns a valid
+ * file descriptor when the file was opened successfully.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can use
+ * it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in backend
+ * code, whereas open(2) should be used in frontend.
+ */
+typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+                                                          TimeLineID *tli_p);
+
 /* Initialize supporting structures */
 extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
                                                           int segsize, const char *waldir);
@@ -232,6 +250,25 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 #ifdef FRONTEND
 extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
 #endif                                                 /* FRONTEND */
+
+/*
+ * Error information from WALRead that both backend and frontend caller can
+ * process.  Currently only errors from pg_pread can be reported.
+ */
+typedef struct WALReadError
+{
+       int                     wre_errno;              /* errno set by the last pg_pread() */
+       int                     wre_off;                /* Offset we tried to read from. */
+       int                     wre_req;                /* Bytes requested to be read. */
+       int                     wre_read;               /* Bytes read by the last read(). */
+       WALOpenSegment wre_seg;         /* Segment we tried to read from. */
+} WALReadError;
+
+extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+                                       TimeLineID tli, WALOpenSegment *seg,
+                                       WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+                                       WALReadError *errinfo);
+
 /* Functions for decoding an XLogRecord */
 
 extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
index 2df98e45b204a59c593f4638d0ca262bcaab40e0..0572b24192782b8825d5fb9b08f1a11592c2fde9 100644 (file)
@@ -54,4 +54,6 @@ extern int    read_local_xlog_page(XLogReaderState *state,
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
                                                                          XLogRecPtr wantPage, uint32 wantLength);
 
+extern void WALReadRaiseError(WALReadError *errinfo);
+
 #endif