Refactor to create generic WAL page read callback
authorSimon Riggs <simon@2ndQuadrant.com>
Thu, 21 Jan 2016 01:18:58 +0000 (17:18 -0800)
committerSimon Riggs <simon@2ndQuadrant.com>
Thu, 21 Jan 2016 01:18:58 +0000 (17:18 -0800)
Previously we didn’t have a generic WAL page read callback function,
surprisingly. Logical decoding has logical_read_local_xlog_page(), which was
actually generic, so move that to xlogfunc.c and rename to
read_local_xlog_page().
Maintain logical_read_local_xlog_page() so existing callers still work.

As requested by Michael Paquier, Alvaro Herrera and Andres Freund

src/backend/access/transam/xlogutils.c
src/backend/replication/logical/logicalfuncs.c
src/include/access/xlogutils.h

index 37e9e403fcac60cfbea14878a9d261ccda86a9b8..444e2180b0c2e1dcf861cfd10410cd1dc960ebc6 100644 (file)
  */
 #include "postgres.h"
 
+#include <unistd.h>
+
+#include "miscadmin.h"
+
 #include "access/xlog.h"
+#include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "storage/smgr.h"
@@ -631,3 +636,164 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 {
    forget_invalid_pages(rnode, forkNum, nblocks);
 }
+
+/*
+ * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
+ * we currently don't have the infrastructure (elog!) to share it.
+ */
+static void
+XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
+{
+   char       *p;
+   XLogRecPtr  recptr;
+   Size        nbytes;
+
+   static int  sendFile = -1;
+   static XLogSegNo sendSegNo = 0;
+   static uint32 sendOff = 0;
+
+   p = buf;
+   recptr = startptr;
+   nbytes = count;
+
+   while (nbytes > 0)
+   {
+       uint32      startoff;
+       int         segbytes;
+       int         readbytes;
+
+       startoff = recptr % XLogSegSize;
+
+       if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+       {
+           char        path[MAXPGPATH];
+
+           /* Switch to another logfile segment */
+           if (sendFile >= 0)
+               close(sendFile);
+
+           XLByteToSeg(recptr, sendSegNo);
+
+           XLogFilePath(path, tli, sendSegNo);
+
+           sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+
+           if (sendFile < 0)
+           {
+               if (errno == ENOENT)
+                   ereport(ERROR,
+                           (errcode_for_file_access(),
+                            errmsg("requested WAL segment %s has already been removed",
+                                   path)));
+               else
+                   ereport(ERROR,
+                           (errcode_for_file_access(),
+                            errmsg("could not open file \"%s\": %m",
+                                   path)));
+           }
+           sendOff = 0;
+       }
+
+       /* Need to seek in the file? */
+       if (sendOff != startoff)
+       {
+           if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
+           {
+               char        path[MAXPGPATH];
+
+               XLogFilePath(path, tli, sendSegNo);
+
+               ereport(ERROR,
+                       (errcode_for_file_access(),
+                 errmsg("could not seek in log segment %s to offset %u: %m",
+                        path, startoff)));
+           }
+           sendOff = startoff;
+       }
+
+       /* How many bytes are within this segment? */
+       if (nbytes > (XLogSegSize - startoff))
+           segbytes = XLogSegSize - startoff;
+       else
+           segbytes = nbytes;
+
+       readbytes = read(sendFile, p, segbytes);
+       if (readbytes <= 0)
+       {
+           char        path[MAXPGPATH];
+
+           XLogFilePath(path, tli, sendSegNo);
+
+           ereport(ERROR,
+                   (errcode_for_file_access(),
+                    errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+                           path, sendOff, (unsigned long) segbytes)));
+       }
+
+       /* Update state for read */
+       recptr += readbytes;
+
+       sendOff += readbytes;
+       nbytes -= readbytes;
+       p += readbytes;
+   }
+}
+
+/*
+ * read_page callback for reading local xlog files
+ *
+ * Public because it would likely be very helpful for someone writing another
+ * output method outside walsender, e.g. in a bgworker.
+ *
+ * TODO: The walsender has it's own version of this, but it relies on the
+ * walsender's latch being set whenever WAL is flushed. No such infrastructure
+ * exists for normal backends, so we have to do a check/sleep/repeat style of
+ * loop for now.
+ */
+int
+read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
+   int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+{
+   XLogRecPtr  flushptr,
+               loc;
+   int         count;
+
+   loc = targetPagePtr + reqLen;
+   while (1)
+   {
+       /*
+        * TODO: we're going to have to do something more intelligent about
+        * timelines on standbys. Use readTimeLineHistory() and
+        * tliOfPointInHistory() to get the proper LSN? For now we'll catch
+        * that case earlier, but the code and TODO is left in here for when
+        * that changes.
+        */
+       if (!RecoveryInProgress())
+       {
+           *pageTLI = ThisTimeLineID;
+           flushptr = GetFlushRecPtr();
+       }
+       else
+           flushptr = GetXLogReplayRecPtr(pageTLI);
+
+       if (loc <= flushptr)
+           break;
+
+       CHECK_FOR_INTERRUPTS();
+       pg_usleep(1000L);
+   }
+
+   /* more than one block available */
+   if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+       count = XLOG_BLCKSZ;
+   /* not enough data there */
+   else if (targetPagePtr + reqLen > flushptr)
+       return -1;
+   /* part of the page available */
+   else
+       count = flushptr - targetPagePtr;
+
+   XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+
+   return count;
+}
index 56e47e4b9c442b9f94ffdc11d1a02be20c1f70cd..f789fc127d0288302ae7eef75504b244f3351a1f 100644 (file)
@@ -22,6 +22,7 @@
 #include "miscadmin.h"
 
 #include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 
 #include "catalog/pg_type.h"
 
@@ -100,108 +101,6 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
    p->returned_rows++;
 }
 
-/*
- * TODO: This is duplicate code with pg_xlogdump, similar to walsender.c, but
- * we currently don't have the infrastructure (elog!) to share it.
- */
-static void
-XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
-{
-   char       *p;
-   XLogRecPtr  recptr;
-   Size        nbytes;
-
-   static int  sendFile = -1;
-   static XLogSegNo sendSegNo = 0;
-   static uint32 sendOff = 0;
-
-   p = buf;
-   recptr = startptr;
-   nbytes = count;
-
-   while (nbytes > 0)
-   {
-       uint32      startoff;
-       int         segbytes;
-       int         readbytes;
-
-       startoff = recptr % XLogSegSize;
-
-       if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
-       {
-           char        path[MAXPGPATH];
-
-           /* Switch to another logfile segment */
-           if (sendFile >= 0)
-               close(sendFile);
-
-           XLByteToSeg(recptr, sendSegNo);
-
-           XLogFilePath(path, tli, sendSegNo);
-
-           sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
-
-           if (sendFile < 0)
-           {
-               if (errno == ENOENT)
-                   ereport(ERROR,
-                           (errcode_for_file_access(),
-                            errmsg("requested WAL segment %s has already been removed",
-                                   path)));
-               else
-                   ereport(ERROR,
-                           (errcode_for_file_access(),
-                            errmsg("could not open file \"%s\": %m",
-                                   path)));
-           }
-           sendOff = 0;
-       }
-
-       /* Need to seek in the file? */
-       if (sendOff != startoff)
-       {
-           if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
-           {
-               char        path[MAXPGPATH];
-
-               XLogFilePath(path, tli, sendSegNo);
-
-               ereport(ERROR,
-                       (errcode_for_file_access(),
-                 errmsg("could not seek in log segment %s to offset %u: %m",
-                        path, startoff)));
-           }
-           sendOff = startoff;
-       }
-
-       /* How many bytes are within this segment? */
-       if (nbytes > (XLogSegSize - startoff))
-           segbytes = XLogSegSize - startoff;
-       else
-           segbytes = nbytes;
-
-       readbytes = read(sendFile, p, segbytes);
-       if (readbytes <= 0)
-       {
-           char        path[MAXPGPATH];
-
-           XLogFilePath(path, tli, sendSegNo);
-
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-                           path, sendOff, (unsigned long) segbytes)));
-       }
-
-       /* Update state for read */
-       recptr += readbytes;
-
-       sendOff += readbytes;
-       nbytes -= readbytes;
-       p += readbytes;
-   }
-}
-
 static void
 check_permissions(void)
 {
@@ -211,63 +110,12 @@ check_permissions(void)
                 (errmsg("must be superuser or replication role to use replication slots"))));
 }
 
-/*
- * read_page callback for logical decoding contexts.
- *
- * Public because it would likely be very helpful for someone writing another
- * output method outside walsender, e.g. in a bgworker.
- *
- * TODO: The walsender has it's own version of this, but it relies on the
- * walsender's latch being set whenever WAL is flushed. No such infrastructure
- * exists for normal backends, so we have to do a check/sleep/repeat style of
- * loop for now.
- */
 int
 logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
    int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
 {
-   XLogRecPtr  flushptr,
-               loc;
-   int         count;
-
-   loc = targetPagePtr + reqLen;
-   while (1)
-   {
-       /*
-        * TODO: we're going to have to do something more intelligent about
-        * timelines on standbys. Use readTimeLineHistory() and
-        * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-        * that case earlier, but the code and TODO is left in here for when
-        * that changes.
-        */
-       if (!RecoveryInProgress())
-       {
-           *pageTLI = ThisTimeLineID;
-           flushptr = GetFlushRecPtr();
-       }
-       else
-           flushptr = GetXLogReplayRecPtr(pageTLI);
-
-       if (loc <= flushptr)
-           break;
-
-       CHECK_FOR_INTERRUPTS();
-       pg_usleep(1000L);
-   }
-
-   /* more than one block available */
-   if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
-       count = XLOG_BLCKSZ;
-   /* not enough data there */
-   else if (targetPagePtr + reqLen > flushptr)
-       return -1;
-   /* part of the page available */
-   else
-       count = flushptr - targetPagePtr;
-
-   XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
-
-   return count;
+   return read_local_xlog_page(state, targetPagePtr, reqLen,
+                        targetRecPtr, cur_page, pageTLI);
 }
 
 /*
index a1c0c82c347339d54d43614e9acf92efb40b1b3f..1b9abce9ad3b61e62f921361432280afc7549817 100644 (file)
@@ -47,4 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
+extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
+   int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI);
+
 #endif