Teach xlogreader to follow timeline switches
authorSimon Riggs <simon@2ndQuadrant.com>
Wed, 22 Mar 2017 07:05:12 +0000 (07:05 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Wed, 22 Mar 2017 07:05:12 +0000 (07:05 +0000)
Uses page-based mechanism to ensure we’re using the correct timeline.

Tests are included to exercise the functionality using a cold disk-level copy
of the master that's started up as a replica with slots intact, but the
intended use of the functionality is with later features.

Craig Ringer, reviewed by Simon Riggs and Andres Freund

src/backend/access/transam/xlogutils.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/walsender.c
src/include/access/xlogreader.h
src/include/access/xlogutils.h
src/test/recovery/Makefile
src/test/recovery/t/010_logical_decoding_timelines.pl [new file with mode: 0644]

index b2b9fcbebb028a3dea348f0fe496cca6f991cc9c..28c07d37c17648cd872263b5eac859905fdc3203 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <unistd.h>
 
+#include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
@@ -662,6 +663,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
    /* state maintained across calls */
    static int  sendFile = -1;
    static XLogSegNo sendSegNo = 0;
+   static TimeLineID sendTLI = 0;
    static uint32 sendOff = 0;
 
    p = buf;
@@ -677,7 +679,8 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
        startoff = recptr % XLogSegSize;
 
        /* Do we need to switch to a different xlog segment? */
-       if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+       if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
+           sendTLI != tli)
        {
            char        path[MAXPGPATH];
 
@@ -704,6 +707,7 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
                                    path)));
            }
            sendOff = 0;
+           sendTLI = tli;
        }
 
        /* Need to seek in the file? */
@@ -753,6 +757,133 @@ XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
    }
 }
 
+/*
+ * Determine which timeline to read an xlog page from and set the
+ * XLogReaderState's currTLI to that timeline ID.
+ *
+ * We care about timelines in xlogreader when we might be reading xlog
+ * generated prior to a promotion, either if we're currently a standby in
+ * recovery or if we're a promoted master reading xlogs generated by the old
+ * master before our promotion.
+ *
+ * wantPage must be set to the start address of the page to read and
+ * wantLength to the amount of the page that will be read, up to
+ * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
+ *
+ * We switch to an xlog segment from the new timeline eagerly when on a
+ * historical timeline, as soon as we reach the start of the xlog segment
+ * containing the timeline switch.  The server copied the segment to the new
+ * timeline so all the data up to the switch point is the same, but there's no
+ * guarantee the old segment will still exist. It may have been deleted or
+ * renamed with a .partial suffix so we can't necessarily keep reading from
+ * the old TLI even though tliSwitchPoint says it's OK.
+ *
+ * We can't just check the timeline when we read a page on a different segment
+ * to the last page. We could've received a timeline switch from a cascading
+ * upstream, so the current segment ends apruptly (possibly getting renamed to
+ * .partial) and we have to switch to a new one.  Even in the middle of reading
+ * a page we could have to dump the cached page and switch to a new TLI.
+ *
+ * Because of this, callers MAY NOT assume that currTLI is the timeline that
+ * will be in a page's xlp_tli; the page may begin on an older timeline or we
+ * might be reading from historical timeline data on a segment that's been
+ * copied to a new timeline.
+ *
+ * The caller must also make sure it doesn't read past the current replay
+ * position (using GetWalRcvWriteRecPtr) if executing in recovery, so it
+ * doesn't fail to notice that the current timeline became historical. The
+ * caller must also update ThisTimeLineID with the result of
+ * GetWalRcvWriteRecPtr and must check RecoveryInProgress().
+ */
+void
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+{
+   const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
+
+   Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
+   Assert(wantLength <= XLOG_BLCKSZ);
+   Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+
+   /*
+    * If the desired page is currently read in and valid, we have nothing to do.
+    *
+    * The caller should've ensured that it didn't previously advance readOff
+    * past the valid limit of this timeline, so it doesn't matter if the current
+    * TLI has since become historical.
+    */
+   if (lastReadPage == wantPage &&
+       state->readLen != 0 &&
+       lastReadPage + state->readLen >= wantPage + Min(wantLength,XLOG_BLCKSZ-1))
+       return;
+
+   /*
+    * If we're reading from the current timeline, it hasn't become historical
+    * and the page we're reading is after the last page read, we can again
+    * just carry on. (Seeking backwards requires a check to make sure the older
+    * page isn't on a prior timeline).
+    *
+    * ThisTimeLineID might've become historical since we last looked, but the
+    * caller is required not to read past the flush limit it saw at the time
+    * it looked up the timeline. There's nothing we can do about it if
+    * StartupXLOG() renames it to .partial concurrently.
+    */
+   if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+   {
+       Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
+       return;
+   }
+
+   /*
+    * If we're just reading pages from a previously validated historical
+    * timeline and the timeline we're reading from is valid until the
+    * end of the current segment we can just keep reading.
+    */
+   if (state->currTLIValidUntil != InvalidXLogRecPtr &&
+       state->currTLI != ThisTimeLineID &&
+       state->currTLI != 0 &&
+       (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
+       return;
+
+   /*
+    * If we reach this point we're either looking up a page for random access,
+    * the current timeline just became historical, or we're reading from a new
+    * segment containing a timeline switch. In all cases we need to determine
+    * the newest timeline on the segment.
+    *
+    * If it's the current timeline we can just keep reading from here unless
+    * we detect a timeline switch that makes the current timeline historical.
+    * If it's a historical timeline we can read all the segment on the newest
+    * timeline because it contains all the old timelines' data too. So only
+    * one switch check is required.
+    */
+   {
+       /*
+        * We need to re-read the timeline history in case it's been changed
+        * by a promotion or replay from a cascaded replica.
+        */
+       List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+
+       XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
+
+       Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
+
+       /* Find the timeline of the last LSN on the segment containing wantPage. */
+       state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
+       state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory,
+           &state->nextTLI);
+
+       Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
+               wantPage + wantLength < state->currTLIValidUntil);
+
+       list_free_deep(timelineHistory);
+
+       elog(DEBUG3, "switched to timeline %u valid until %X/%X",
+               state->currTLI,
+               (uint32)(state->currTLIValidUntil >> 32),
+               (uint32)(state->currTLIValidUntil));
+   }
+}
+
 /*
  * read_page callback for reading local xlog files
  *
@@ -774,28 +905,84 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
    int         count;
 
    loc = targetPagePtr + reqLen;
+
+   /* Loop waiting for xlog to be available if necessary */
    while (1)
    {
        /*
-        * TODO: we're going to have to do something more intelligent about
-        * timelines on standbys. Use readTimeLineHistory() and
-        * tliOfPointInHistory() to get the proper LSN? For now we'll catch
-        * that case earlier, but the code and TODO is left in here for when
-        * that changes.
+        * Determine the limit of xlog we can currently read to, and what the
+        * most recent timeline is.
+        *
+        * RecoveryInProgress() will update ThisTimeLineID when it first
+        * notices recovery finishes, so we only have to maintain it for the
+        * local process until recovery ends.
         */
        if (!RecoveryInProgress())
-       {
-           *pageTLI = ThisTimeLineID;
            read_upto = GetFlushRecPtr();
-       }
        else
-           read_upto = GetXLogReplayRecPtr(pageTLI);
+           read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-       if (loc <= read_upto)
-           break;
+       *pageTLI = ThisTimeLineID;
+
+       /*
+        * Check which timeline to get the record from.
+        *
+        * We have to do it each time through the loop because if we're in
+        * recovery as a cascading standby, the current timeline might've
+        * become historical. We can't rely on RecoveryInProgress() because
+        * in a standby configuration like
+        *
+        *    A => B => C
+        *
+        * if we're a logical decoding session on C, and B gets promoted, our
+        * timeline will change while we remain in recovery.
+        *
+        * We can't just keep reading from the old timeline as the last WAL
+        * archive in the timeline will get renamed to .partial by StartupXLOG().
+        *
+        * If that happens after our caller updated ThisTimeLineID but before
+        * we actually read the xlog page, we might still try to read from the
+        * old (now renamed) segment and fail. There's not much we can do about
+        * this, but it can only happen when we're a leaf of a cascading
+        * standby whose master gets promoted while we're decoding, so a
+        * one-off ERROR isn't too bad.
+        */
+       XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+
+       if (state->currTLI == ThisTimeLineID)
+       {
 
-       CHECK_FOR_INTERRUPTS();
-       pg_usleep(1000L);
+           if (loc <= read_upto)
+               break;
+
+           CHECK_FOR_INTERRUPTS();
+           pg_usleep(1000L);
+       }
+       else
+       {
+           /*
+            * We're on a historical timeline, so limit reading to the switch
+            * point where we moved to the next timeline.
+            *
+            * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
+            * about the new timeline, so we must've received past the end of
+            * it.
+            */
+           read_upto = state->currTLIValidUntil;
+
+           /*
+            * Setting pageTLI to our wanted record's TLI is slightly wrong;
+            * the page might begin on an older timeline if it contains a
+            * timeline switch, since its xlog segment will have been copied
+            * from the prior timeline. This is pretty harmless though, as
+            * nothing cares so long as the timeline doesn't go backwards.  We
+            * should read the page header instead; FIXME someday.
+            */
+           *pageTLI = state->currTLI;
+
+           /* No need to wait on a historical timeline */
+           break;
+       }
    }
 
    if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
index 41c50005d7ff6b39ba1d98afb44af6b2c95d8ab4..c251b92f57bcbc248a9afde17b47328a03c55978 100644 (file)
@@ -235,11 +235,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
    rsinfo->setResult = p->tupstore;
    rsinfo->setDesc = p->tupdesc;
 
-   /* compute the current end-of-wal */
+   /*
+    * Compute the current end-of-wal and maintain ThisTimeLineID.
+    * RecoveryInProgress() will update ThisTimeLineID on promotion.
+    */
    if (!RecoveryInProgress())
        end_of_wal = GetFlushRecPtr();
    else
-       end_of_wal = GetXLogReplayRecPtr(NULL);
+       end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
    ReplicationSlotAcquire(NameStr(*name));
 
@@ -280,6 +283,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
        /* invalidate non-timetravel entries */
        InvalidateSystemCaches();
 
+       /* Decode until we run out of records */
        while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
               (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
        {
index 0f6b828336f718e7f49aba68a6c45835ad4c8ddd..75617709ecfbfa58cd7a49db1bace68431c25466 100644 (file)
@@ -48,6 +48,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
@@ -721,6 +722,12 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
    XLogRecPtr  flushptr;
    int         count;
 
+   XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+   sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
+   sendTimeLine = state->currTLI;
+   sendTimeLineValidUpto = state->currTLIValidUntil;
+   sendTimeLineNextTLI = state->nextTLI;
+
    /* make sure we have enough WAL available */
    flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
@@ -974,10 +981,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
    pq_endmessage(&buf);
    pq_flush();
 
-   /* setup state for XLogReadPage */
-   sendTimeLineIsHistoric = false;
-   sendTimeLine = ThisTimeLineID;
-
    /*
     * Initialize position to the last ack'ed one, then the xlog records begin
     * to be shipped from that position.
index 663d3e7890b7c2ad14492f1245d06ab20a614781..a1beeb54965c8ef4b3d7e5efa5ef98a52e3dec08 100644 (file)
@@ -161,6 +161,22 @@ struct XLogReaderState
 
    /* beginning of the WAL record being read. */
    XLogRecPtr  currRecPtr;
+   /* timeline to read it from, 0 if a lookup is required */
+   TimeLineID  currTLI;
+   /*
+    * Safe point to read to in currTLI if current TLI is historical
+    * (tliSwitchPoint) or InvalidXLogRecPtr if on current timeline.
+    *
+    * Actually set to the start of the segment containing the timeline
+    * switch that ends currTLI's validity, not the LSN of the switch
+    * its self, since we can't assume the old segment will be present.
+    */
+   XLogRecPtr  currTLIValidUntil;
+   /*
+    * If currTLI is not the most recent known timeline, the next timeline to
+    * read from when currTLIValidUntil is reached.
+    */
+   TimeLineID  nextTLI;
 
    /* Buffer for current ReadRecord result (expandable) */
    char       *readRecordBuf;
index 567a7f3d8710e7e46311e65ff92c522333fd2b00..25a99422c1ae8905f56172e0547aabe3daf2ce0f 100644 (file)
@@ -52,4 +52,7 @@ extern int read_local_xlog_page(XLogReaderState *state,
                     XLogRecPtr targetRecPtr, char *cur_page,
                     TimeLineID *pageTLI);
 
+extern void XLogReadDetermineTimeline(XLogReaderState *state,
+                   XLogRecPtr wantPage, uint32 wantLength);
+
 #endif
index 9d03d337d546beabed6bd4037c301da5395d50f6..142a1b8de2ee4db901be58d5c838f0a1c143f2e1 100644 (file)
@@ -9,6 +9,8 @@
 #
 #-------------------------------------------------------------------------
 
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
new file mode 100644 (file)
index 0000000..09830dc
--- /dev/null
@@ -0,0 +1,130 @@
+# Demonstrate that logical can follow timeline switches.
+#
+# Logical replication slots can follow timeline switches but it's
+# normally not possible to have a logical slot on a replica where
+# promotion and a timeline switch can occur. The only ways
+# we can create that circumstance are:
+#
+# * By doing a filesystem-level copy of the DB, since pg_basebackup
+#   excludes pg_replslot but we can copy it directly; or
+#
+# * by creating a slot directly at the C level on the replica and
+#   advancing it as we go using the low level APIs. It can't be done
+#   from SQL since logical decoding isn't allowed on replicas.
+#
+# This module uses the first approach to show that timeline following
+# on a logical slot works.
+#
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use RecursiveCopy;
+use File::Copy;
+use IPC::Run ();
+use Scalar::Util qw(blessed);
+
+my ($stdout, $stderr, $ret);
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, has_archiving => 1);
+$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
+$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
+$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->dump_info;
+$node_master->start;
+
+diag "Testing logical timeline following with a filesystem-level copy";
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
+$node_master->safe_psql('postgres',
+   "INSERT INTO decoding(blah) VALUES ('beforebb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my $backup_name = 'b1';
+$node_master->backup_fs_hot($backup_name);
+
+my $node_replica = get_new_node('replica');
+$node_replica->init_from_backup(
+   $node_master, $backup_name,
+   has_streaming => 1,
+   has_restoring => 1);
+$node_replica->start;
+
+$node_master->safe_psql('postgres',
+"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
+);
+$node_master->safe_psql('postgres',
+   "INSERT INTO decoding(blah) VALUES ('afterbb');");
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+# Verify that only the before base_backup slot is on the replica
+$stdout = $node_replica->safe_psql('postgres',
+   'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
+is($stdout, 'before_basebackup',
+   'Expected to find only slot before_basebackup on replica');
+
+# Boom, crash
+$node_master->stop('immediate');
+
+$node_replica->promote;
+$node_replica->poll_query_until('postgres',
+   "SELECT NOT pg_is_in_recovery();");
+
+$node_replica->safe_psql('postgres',
+   "INSERT INTO decoding(blah) VALUES ('after failover');");
+
+# Shouldn't be able to read from slot created after base backup
+($ret, $stdout, $stderr) = $node_replica->psql('postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
+);
+is($ret, 3, 'replaying from after_basebackup slot fails');
+like(
+   $stderr,
+   qr/replication slot "after_basebackup" does not exist/,
+   'after_basebackup slot missing');
+
+# Should be able to read from slot created before base backup
+($ret, $stdout, $stderr) = $node_replica->psql(
+   'postgres',
+"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
+   timeout => 30);
+is($ret, 0, 'replay from slot before_basebackup succeeds');
+
+my $final_expected_output_bb = q(BEGIN
+table public.decoding: INSERT: blah[text]:'beforebb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'afterbb'
+COMMIT
+BEGIN
+table public.decoding: INSERT: blah[text]:'after failover'
+COMMIT);
+is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup');
+is($stderr, '', 'replay from slot before_basebackup produces no stderr');
+
+# So far we've peeked the slots, so when we fetch the same info over
+# pg_recvlogical we should get complete results. First, find out the commit lsn
+# of the last transaction. There's no max(pg_lsn), so:
+
+my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+
+# now use the walsender protocol to peek the slot changes and make sure we see
+# the same results.
+
+$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
+   $endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+
+# walsender likes to add a newline
+chomp($stdout);
+is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
+
+# We don't need the standby anymore
+$node_replica->teardown_node();