walsender.c: Don't rely on the global variable ThisTimeLineID.
authorRobert Haas <rhaas@postgresql.org>
Fri, 29 Oct 2021 20:55:04 +0000 (16:55 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 29 Oct 2021 21:05:21 +0000 (17:05 -0400)
IdentifySystem(), StartReplication(), and read_local_xlog_page() rely
on xlog.c to set ThisTimeLineID to the current timeline ID when not in
recovery; and when in recovery, they set ThisTimeLineID themsleves.
Instead, have them rely on only on local variables.  When not in
recovery, they now obtain the current timeline via GetFlushRecPtr(),
and when in recovery, they obtain it just as they did before.  As part
of this change, GetStandbyFlushRecPtr() now returns the TLI via an out
parameter rather than storing it into ThisTimeLineID.

logical_read_xlog_page() and ReadReplicationSlot() rely on
ThisTimeLineID to determine the current timeline, but only when
in normal running, since ReadReplicationSlot() uses an another
method when in recovery, and logical_read_xlog_page() can't run
during recovery. Use GetWALInsertionTimeLine() instead, and add
some comments highlighting the need for logical_read_xlog_page()
to be changed if we ever want to run logical decoding on standbys.

Because read_local_xlog_page() and logical_read_xlog_page() both
call XLogReadDetermineTimeline() which considers ThisTimeLineID
as a sort of implicit argument, update that function to take
the current system timeline as an explicit argument instead.

Remove the logic in XlogReadTwoPhaseData() which saves and restores
the value of ThisTimeLineID. Since the walsender.c functions are
no longer changing it, this isn't required any more.

src/backend/access/transam/twophase.c
src/backend/access/transam/xlogutils.c
src/backend/replication/walsender.c
src/include/access/xlogutils.h

index f6e7fa71d8887554febe5afe7f419e39a027838b..ef4b5f639ced42ee44b16b6db94f3a1593284aa4 100644 (file)
@@ -1373,11 +1373,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
  * twophase files and ReadTwoPhaseFile should be used instead.
  *
  * Note clearly that this function can access WAL during normal operation,
- * similarly to the way WALSender or Logical Decoding would do.  While
- * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
- * particularly if this routine is called for the end-of-recovery checkpoint
- * in the checkpointer itself, so save the current timeline number value
- * and restore it once done.
+ * similarly to the way WALSender or Logical Decoding would do.
  */
 static void
 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
@@ -1385,7 +1381,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
        XLogRecord *record;
        XLogReaderState *xlogreader;
        char       *errormsg;
-       TimeLineID      save_currtli = ThisTimeLineID;
 
        xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
                                                                        XL_ROUTINE(.page_read = &read_local_xlog_page,
@@ -1401,13 +1396,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
        XLogBeginRead(xlogreader, lsn);
        record = XLogReadRecord(xlogreader, &errormsg);
 
-       /*
-        * Restore immediately the timeline where it was previously, as
-        * read_local_xlog_page() could have changed it if the record was read
-        * while recovery was finishing or if the timeline has jumped in-between.
-        */
-       ThisTimeLineID = save_currtli;
-
        if (record == NULL)
                ereport(ERROR,
                                (errcode_for_file_access(),
index c40500a6f2bad0f0b96fbc98a128169dfd829a5c..b33e0531ed1375019ec3b385e332f16231c2cac1 100644 (file)
@@ -678,6 +678,10 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  * wantLength to the amount of the page that will be read, up to
  * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
  *
+ * The currTLI argument should be the system-wide current timeline.
+ * Note that this may be different from state->currTLI, which is the timeline
+ * from which the caller is currently reading previous xlog records.
+ *
  * We switch to an xlog segment from the new timeline eagerly when on a
  * historical timeline, as soon as we reach the start of the xlog segment
  * containing the timeline switch.  The server copied the segment to the new
@@ -699,12 +703,11 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
  *
  * The caller must also make sure it doesn't read past the current replay
  * position (using GetXLogReplayRecPtr) if executing in recovery, so it
- * doesn't fail to notice that the current timeline became historical. The
- * caller must also update ThisTimeLineID with the result of
- * GetXLogReplayRecPtr and must check RecoveryInProgress().
+ * doesn't fail to notice that the current timeline became historical.
  */
 void
-XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage,
+                                                 uint32 wantLength, TimeLineID currTLI)
 {
        const XLogRecPtr lastReadPage = (state->seg.ws_segno *
                                                                         state->segcxt.ws_segsize + state->segoff);
@@ -712,6 +715,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
        Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
        Assert(wantLength <= XLOG_BLCKSZ);
        Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+       Assert(currTLI != 0);
 
        /*
         * If the desired page is currently read in and valid, we have nothing to
@@ -732,12 +736,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
         * just carry on. (Seeking backwards requires a check to make sure the
         * older page isn't on a prior timeline).
         *
-        * ThisTimeLineID might've become historical since we last looked, but the
-        * caller is required not to read past the flush limit it saw at the time
-        * it looked up the timeline. There's nothing we can do about it if
-        * StartupXLOG() renames it to .partial concurrently.
+        * currTLI might've become historical since the caller obtained the value,
+        * but the caller is required not to read past the flush limit it saw at
+        * the time it looked up the timeline. There's nothing we can do about it
+        * if StartupXLOG() renames it to .partial concurrently.
         */
-       if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+       if (state->currTLI == currTLI && wantPage >= lastReadPage)
        {
                Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
                return;
@@ -749,7 +753,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
         * the current segment we can just keep reading.
         */
        if (state->currTLIValidUntil != InvalidXLogRecPtr &&
-               state->currTLI != ThisTimeLineID &&
+               state->currTLI != currTLI &&
                state->currTLI != 0 &&
                ((wantPage + wantLength) / state->segcxt.ws_segsize) <
                (state->currTLIValidUntil / state->segcxt.ws_segsize))
@@ -772,7 +776,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
                 * We need to re-read the timeline history in case it's been changed
                 * by a promotion or replay from a cascaded replica.
                 */
-               List       *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+               List       *timelineHistory = readTimeLineHistory(currTLI);
                XLogRecPtr      endOfSegment;
 
                endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
@@ -853,6 +857,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
        TimeLineID      tli;
        int                     count;
        WALReadError errinfo;
+       TimeLineID      currTLI;
 
        loc = targetPagePtr + reqLen;
 
@@ -864,10 +869,10 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
                 * most recent timeline is.
                 */
                if (!RecoveryInProgress())
-                       read_upto = GetFlushRecPtr(&ThisTimeLineID);
+                       read_upto = GetFlushRecPtr(&currTLI);
                else
-                       read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-               tli = ThisTimeLineID;
+                       read_upto = GetXLogReplayRecPtr(&currTLI);
+               tli = currTLI;
 
                /*
                 * Check which timeline to get the record from.
@@ -886,16 +891,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
                 * archive in the timeline will get renamed to .partial by
                 * StartupXLOG().
                 *
-                * If that happens after our caller updated ThisTimeLineID but before
+                * If that happens after our caller determined the TLI but before
                 * we actually read the xlog page, we might still try to read from the
                 * old (now renamed) segment and fail. There's not much we can do
                 * about this, but it can only happen when we're a leaf of a cascading
                 * standby whose primary gets promoted while we're decoding, so a
                 * one-off ERROR isn't too bad.
                 */
-               XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+               XLogReadDetermineTimeline(state, targetPagePtr, reqLen, tli);
 
-               if (state->currTLI == ThisTimeLineID)
+               if (state->currTLI == currTLI)
                {
 
                        if (loc <= read_upto)
index d09bffaa9dc03ca401296e20922589be6f78d262..fff7dfc64098fe63ca7ec6eeed6bace9757565ad 100644 (file)
@@ -230,7 +230,7 @@ static void WalSndShutdown(void) pg_attribute_noreturn();
 static void XLogSendPhysical(void);
 static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
-static XLogRecPtr GetStandbyFlushRecPtr(void);
+static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
 static void IdentifySystem(void);
 static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
@@ -385,6 +385,7 @@ IdentifySystem(void)
        TupleDesc       tupdesc;
        Datum           values[4];
        bool            nulls[4];
+       TimeLineID      currTLI;
 
        /*
         * Reply with a result set with one row, four columns. First col is system
@@ -397,12 +398,9 @@ IdentifySystem(void)
 
        am_cascading_walsender = RecoveryInProgress();
        if (am_cascading_walsender)
-       {
-               /* this also updates ThisTimeLineID */
-               logptr = GetStandbyFlushRecPtr();
-       }
+               logptr = GetStandbyFlushRecPtr(&currTLI);
        else
-               logptr = GetFlushRecPtr(&ThisTimeLineID);
+               logptr = GetFlushRecPtr(&currTLI);
 
        snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
 
@@ -441,7 +439,7 @@ IdentifySystem(void)
        values[0] = CStringGetTextDatum(sysid);
 
        /* column 2: timeline */
-       values[1] = Int32GetDatum(ThisTimeLineID);
+       values[1] = Int32GetDatum(currTLI);
 
        /* column 3: wal location */
        values[2] = CStringGetTextDatum(xloc);
@@ -537,7 +535,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
                        if (RecoveryInProgress())
                                (void) GetXLogReplayRecPtr(&current_timeline);
                        else
-                               current_timeline = ThisTimeLineID;
+                               current_timeline = GetWALInsertionTimeLine();
 
                        timeline_history = readTimeLineHistory(current_timeline);
                        slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
@@ -671,6 +669,7 @@ StartReplication(StartReplicationCmd *cmd)
 {
        StringInfoData buf;
        XLogRecPtr      FlushPtr;
+       TimeLineID      FlushTLI;
 
        /* create xlogreader for physical replication */
        xlogreader =
@@ -710,24 +709,20 @@ StartReplication(StartReplicationCmd *cmd)
 
        /*
         * Select the timeline. If it was given explicitly by the client, use
-        * that. Otherwise use the timeline of the last replayed record, which is
-        * kept in ThisTimeLineID.
+        * that. Otherwise use the timeline of the last replayed record.
         */
        am_cascading_walsender = RecoveryInProgress();
        if (am_cascading_walsender)
-       {
-               /* this also updates ThisTimeLineID */
-               FlushPtr = GetStandbyFlushRecPtr();
-       }
+               FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
        else
-               FlushPtr = GetFlushRecPtr(&ThisTimeLineID);
+               FlushPtr = GetFlushRecPtr(&FlushTLI);
 
        if (cmd->timeline != 0)
        {
                XLogRecPtr      switchpoint;
 
                sendTimeLine = cmd->timeline;
-               if (sendTimeLine == ThisTimeLineID)
+               if (sendTimeLine == FlushTLI)
                {
                        sendTimeLineIsHistoric = false;
                        sendTimeLineValidUpto = InvalidXLogRecPtr;
@@ -742,7 +737,7 @@ StartReplication(StartReplicationCmd *cmd)
                         * Check that the timeline the client requested exists, and the
                         * requested start location is on that timeline.
                         */
-                       timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+                       timeLineHistory = readTimeLineHistory(FlushTLI);
                        switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
                                                                                 &sendTimeLineNextTLI);
                        list_free_deep(timeLineHistory);
@@ -781,7 +776,7 @@ StartReplication(StartReplicationCmd *cmd)
        }
        else
        {
-               sendTimeLine = ThisTimeLineID;
+               sendTimeLine = FlushTLI;
                sendTimeLineValidUpto = InvalidXLogRecPtr;
                sendTimeLineIsHistoric = false;
        }
@@ -909,9 +904,16 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
        int                     count;
        WALReadError errinfo;
        XLogSegNo       segno;
+       TimeLineID      currTLI = GetWALInsertionTimeLine();
 
-       XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
-       sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
+       /*
+        * Since logical decoding is only permitted on a primary server, we know
+        * that the current timeline ID can't be changing any more. If we did this
+        * on a standby, we'd have to worry about the values we compute here
+        * becoming invalid due to a promotion or timeline change.
+        */
+       XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
+       sendTimeLineIsHistoric = (state->currTLI != currTLI);
        sendTimeLine = state->currTLI;
        sendTimeLineValidUpto = state->currTLIValidUntil;
        sendTimeLineNextTLI = state->nextTLI;
@@ -2683,6 +2685,8 @@ XLogSendPhysical(void)
        }
        else if (am_cascading_walsender)
        {
+               TimeLineID      SendRqstTLI;
+
                /*
                 * Streaming the latest timeline on a standby.
                 *
@@ -2702,14 +2706,12 @@ XLogSendPhysical(void)
                 */
                bool            becameHistoric = false;
 
-               SendRqstPtr = GetStandbyFlushRecPtr();
+               SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
 
                if (!RecoveryInProgress())
                {
-                       /*
-                        * We have been promoted. RecoveryInProgress() updated
-                        * ThisTimeLineID to the new current timeline.
-                        */
+                       /* We have been promoted. */
+                       SendRqstTLI = GetWALInsertionTimeLine();
                        am_cascading_walsender = false;
                        becameHistoric = true;
                }
@@ -2717,10 +2719,9 @@ XLogSendPhysical(void)
                {
                        /*
                         * Still a cascading standby. But is the timeline we're sending
-                        * still the one recovery is recovering from? ThisTimeLineID was
-                        * updated by the GetStandbyFlushRecPtr() call above.
+                        * still the one recovery is recovering from?
                         */
-                       if (sendTimeLine != ThisTimeLineID)
+                       if (sendTimeLine != SendRqstTLI)
                                becameHistoric = true;
                }
 
@@ -2733,7 +2734,7 @@ XLogSendPhysical(void)
                         */
                        List       *history;
 
-                       history = readTimeLineHistory(ThisTimeLineID);
+                       history = readTimeLineHistory(SendRqstTLI);
                        sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
 
                        Assert(sendTimeLine < sendTimeLineNextTLI);
@@ -3069,11 +3070,11 @@ WalSndDone(WalSndSendDataCallback send_data)
  * can be sent to the standby. This should only be called when in recovery,
  * ie. we're streaming to a cascaded standby.
  *
- * As a side-effect, ThisTimeLineID is updated to the TLI of the last
+ * As a side-effect, *tli is updated to the TLI of the last
  * replayed WAL record.
  */
 static XLogRecPtr
-GetStandbyFlushRecPtr(void)
+GetStandbyFlushRecPtr(TimeLineID *tli)
 {
        XLogRecPtr      replayPtr;
        TimeLineID      replayTLI;
@@ -3090,10 +3091,10 @@ GetStandbyFlushRecPtr(void)
        receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-       ThisTimeLineID = replayTLI;
+       *tli = replayTLI;
 
        result = replayPtr;
-       if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
+       if (receiveTLI == replayTLI && receivePtr > replayPtr)
                result = receivePtr;
 
        return result;
index a5cb3d322c52f9c6827a7a98c670e01dbdc48d09..eebc91f3a5061edb1fb93f9d220767b97fa8cb07 100644 (file)
@@ -98,7 +98,9 @@ extern void wal_segment_open(XLogReaderState *state,
 extern void wal_segment_close(XLogReaderState *state);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
-                                                                         XLogRecPtr wantPage, uint32 wantLength);
+                                                                         XLogRecPtr wantPage,
+                                                                         uint32 wantLength,
+                                                                         TimeLineID currTLI);
 
 extern void WALReadRaiseError(WALReadError *errinfo);