From 9cd558add3c2d0fe68a77263c3ecaa38d0b09c27 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 29 Oct 2021 16:55:04 -0400 Subject: [PATCH] walsender.c: Don't rely on the global variable ThisTimeLineID. IdentifySystem(), StartReplication(), and read_local_xlog_page() rely on xlog.c to set ThisTimeLineID to the current timeline ID when not in recovery; and when in recovery, they set ThisTimeLineID themsleves. Instead, have them rely on only on local variables. When not in recovery, they now obtain the current timeline via GetFlushRecPtr(), and when in recovery, they obtain it just as they did before. As part of this change, GetStandbyFlushRecPtr() now returns the TLI via an out parameter rather than storing it into ThisTimeLineID. logical_read_xlog_page() and ReadReplicationSlot() rely on ThisTimeLineID to determine the current timeline, but only when in normal running, since ReadReplicationSlot() uses an another method when in recovery, and logical_read_xlog_page() can't run during recovery. Use GetWALInsertionTimeLine() instead, and add some comments highlighting the need for logical_read_xlog_page() to be changed if we ever want to run logical decoding on standbys. Because read_local_xlog_page() and logical_read_xlog_page() both call XLogReadDetermineTimeline() which considers ThisTimeLineID as a sort of implicit argument, update that function to take the current system timeline as an explicit argument instead. Remove the logic in XlogReadTwoPhaseData() which saves and restores the value of ThisTimeLineID. Since the walsender.c functions are no longer changing it, this isn't required any more. --- src/backend/access/transam/twophase.c | 14 +----- src/backend/access/transam/xlogutils.c | 39 ++++++++------- src/backend/replication/walsender.c | 67 +++++++++++++------------- src/include/access/xlogutils.h | 4 +- 4 files changed, 60 insertions(+), 64 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index f6e7fa71d8..ef4b5f639c 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1373,11 +1373,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) * twophase files and ReadTwoPhaseFile should be used instead. * * Note clearly that this function can access WAL during normal operation, - * similarly to the way WALSender or Logical Decoding would do. While - * accessing WAL, read_local_xlog_page() may change ThisTimeLineID, - * particularly if this routine is called for the end-of-recovery checkpoint - * in the checkpointer itself, so save the current timeline number value - * and restore it once done. + * similarly to the way WALSender or Logical Decoding would do. */ static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) @@ -1385,7 +1381,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogRecord *record; XLogReaderState *xlogreader; char *errormsg; - TimeLineID save_currtli = ThisTimeLineID; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.page_read = &read_local_xlog_page, @@ -1401,13 +1396,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) XLogBeginRead(xlogreader, lsn); record = XLogReadRecord(xlogreader, &errormsg); - /* - * Restore immediately the timeline where it was previously, as - * read_local_xlog_page() could have changed it if the record was read - * while recovery was finishing or if the timeline has jumped in-between. - */ - ThisTimeLineID = save_currtli; - if (record == NULL) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index c40500a6f2..b33e0531ed 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -678,6 +678,10 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * wantLength to the amount of the page that will be read, up to * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ. * + * The currTLI argument should be the system-wide current timeline. + * Note that this may be different from state->currTLI, which is the timeline + * from which the caller is currently reading previous xlog records. + * * We switch to an xlog segment from the new timeline eagerly when on a * historical timeline, as soon as we reach the start of the xlog segment * containing the timeline switch. The server copied the segment to the new @@ -699,12 +703,11 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * * The caller must also make sure it doesn't read past the current replay * position (using GetXLogReplayRecPtr) if executing in recovery, so it - * doesn't fail to notice that the current timeline became historical. The - * caller must also update ThisTimeLineID with the result of - * GetXLogReplayRecPtr and must check RecoveryInProgress(). + * doesn't fail to notice that the current timeline became historical. */ void -XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) +XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, + uint32 wantLength, TimeLineID currTLI) { const XLogRecPtr lastReadPage = (state->seg.ws_segno * state->segcxt.ws_segsize + state->segoff); @@ -712,6 +715,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ); + Assert(currTLI != 0); /* * If the desired page is currently read in and valid, we have nothing to @@ -732,12 +736,12 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * just carry on. (Seeking backwards requires a check to make sure the * older page isn't on a prior timeline). * - * ThisTimeLineID might've become historical since we last looked, but the - * caller is required not to read past the flush limit it saw at the time - * it looked up the timeline. There's nothing we can do about it if - * StartupXLOG() renames it to .partial concurrently. + * currTLI might've become historical since the caller obtained the value, + * but the caller is required not to read past the flush limit it saw at + * the time it looked up the timeline. There's nothing we can do about it + * if StartupXLOG() renames it to .partial concurrently. */ - if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage) + if (state->currTLI == currTLI && wantPage >= lastReadPage) { Assert(state->currTLIValidUntil == InvalidXLogRecPtr); return; @@ -749,7 +753,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * the current segment we can just keep reading. */ if (state->currTLIValidUntil != InvalidXLogRecPtr && - state->currTLI != ThisTimeLineID && + state->currTLI != currTLI && state->currTLI != 0 && ((wantPage + wantLength) / state->segcxt.ws_segsize) < (state->currTLIValidUntil / state->segcxt.ws_segsize)) @@ -772,7 +776,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * We need to re-read the timeline history in case it's been changed * by a promotion or replay from a cascaded replica. */ - List *timelineHistory = readTimeLineHistory(ThisTimeLineID); + List *timelineHistory = readTimeLineHistory(currTLI); XLogRecPtr endOfSegment; endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) * @@ -853,6 +857,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + TimeLineID currTLI; loc = targetPagePtr + reqLen; @@ -864,10 +869,10 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * most recent timeline is. */ if (!RecoveryInProgress()) - read_upto = GetFlushRecPtr(&ThisTimeLineID); + read_upto = GetFlushRecPtr(&currTLI); else - read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - tli = ThisTimeLineID; + read_upto = GetXLogReplayRecPtr(&currTLI); + tli = currTLI; /* * Check which timeline to get the record from. @@ -886,16 +891,16 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * archive in the timeline will get renamed to .partial by * StartupXLOG(). * - * If that happens after our caller updated ThisTimeLineID but before + * If that happens after our caller determined the TLI but before * we actually read the xlog page, we might still try to read from the * old (now renamed) segment and fail. There's not much we can do * about this, but it can only happen when we're a leaf of a cascading * standby whose primary gets promoted while we're decoding, so a * one-off ERROR isn't too bad. */ - XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, tli); - if (state->currTLI == ThisTimeLineID) + if (state->currTLI == currTLI) { if (loc <= read_upto) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d09bffaa9d..fff7dfc640 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -230,7 +230,7 @@ static void WalSndShutdown(void) pg_attribute_noreturn(); static void XLogSendPhysical(void); static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); -static XLogRecPtr GetStandbyFlushRecPtr(void); +static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); static void IdentifySystem(void); static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); @@ -385,6 +385,7 @@ IdentifySystem(void) TupleDesc tupdesc; Datum values[4]; bool nulls[4]; + TimeLineID currTLI; /* * Reply with a result set with one row, four columns. First col is system @@ -397,12 +398,9 @@ IdentifySystem(void) am_cascading_walsender = RecoveryInProgress(); if (am_cascading_walsender) - { - /* this also updates ThisTimeLineID */ - logptr = GetStandbyFlushRecPtr(); - } + logptr = GetStandbyFlushRecPtr(&currTLI); else - logptr = GetFlushRecPtr(&ThisTimeLineID); + logptr = GetFlushRecPtr(&currTLI); snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr)); @@ -441,7 +439,7 @@ IdentifySystem(void) values[0] = CStringGetTextDatum(sysid); /* column 2: timeline */ - values[1] = Int32GetDatum(ThisTimeLineID); + values[1] = Int32GetDatum(currTLI); /* column 3: wal location */ values[2] = CStringGetTextDatum(xloc); @@ -537,7 +535,7 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd) if (RecoveryInProgress()) (void) GetXLogReplayRecPtr(¤t_timeline); else - current_timeline = ThisTimeLineID; + current_timeline = GetWALInsertionTimeLine(); timeline_history = readTimeLineHistory(current_timeline); slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, @@ -671,6 +669,7 @@ StartReplication(StartReplicationCmd *cmd) { StringInfoData buf; XLogRecPtr FlushPtr; + TimeLineID FlushTLI; /* create xlogreader for physical replication */ xlogreader = @@ -710,24 +709,20 @@ StartReplication(StartReplicationCmd *cmd) /* * Select the timeline. If it was given explicitly by the client, use - * that. Otherwise use the timeline of the last replayed record, which is - * kept in ThisTimeLineID. + * that. Otherwise use the timeline of the last replayed record. */ am_cascading_walsender = RecoveryInProgress(); if (am_cascading_walsender) - { - /* this also updates ThisTimeLineID */ - FlushPtr = GetStandbyFlushRecPtr(); - } + FlushPtr = GetStandbyFlushRecPtr(&FlushTLI); else - FlushPtr = GetFlushRecPtr(&ThisTimeLineID); + FlushPtr = GetFlushRecPtr(&FlushTLI); if (cmd->timeline != 0) { XLogRecPtr switchpoint; sendTimeLine = cmd->timeline; - if (sendTimeLine == ThisTimeLineID) + if (sendTimeLine == FlushTLI) { sendTimeLineIsHistoric = false; sendTimeLineValidUpto = InvalidXLogRecPtr; @@ -742,7 +737,7 @@ StartReplication(StartReplicationCmd *cmd) * Check that the timeline the client requested exists, and the * requested start location is on that timeline. */ - timeLineHistory = readTimeLineHistory(ThisTimeLineID); + timeLineHistory = readTimeLineHistory(FlushTLI); switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, &sendTimeLineNextTLI); list_free_deep(timeLineHistory); @@ -781,7 +776,7 @@ StartReplication(StartReplicationCmd *cmd) } else { - sendTimeLine = ThisTimeLineID; + sendTimeLine = FlushTLI; sendTimeLineValidUpto = InvalidXLogRecPtr; sendTimeLineIsHistoric = false; } @@ -909,9 +904,16 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req int count; WALReadError errinfo; XLogSegNo segno; + TimeLineID currTLI = GetWALInsertionTimeLine(); - XLogReadDetermineTimeline(state, targetPagePtr, reqLen); - sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); + /* + * Since logical decoding is only permitted on a primary server, we know + * that the current timeline ID can't be changing any more. If we did this + * on a standby, we'd have to worry about the values we compute here + * becoming invalid due to a promotion or timeline change. + */ + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI); + sendTimeLineIsHistoric = (state->currTLI != currTLI); sendTimeLine = state->currTLI; sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; @@ -2683,6 +2685,8 @@ XLogSendPhysical(void) } else if (am_cascading_walsender) { + TimeLineID SendRqstTLI; + /* * Streaming the latest timeline on a standby. * @@ -2702,14 +2706,12 @@ XLogSendPhysical(void) */ bool becameHistoric = false; - SendRqstPtr = GetStandbyFlushRecPtr(); + SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI); if (!RecoveryInProgress()) { - /* - * We have been promoted. RecoveryInProgress() updated - * ThisTimeLineID to the new current timeline. - */ + /* We have been promoted. */ + SendRqstTLI = GetWALInsertionTimeLine(); am_cascading_walsender = false; becameHistoric = true; } @@ -2717,10 +2719,9 @@ XLogSendPhysical(void) { /* * Still a cascading standby. But is the timeline we're sending - * still the one recovery is recovering from? ThisTimeLineID was - * updated by the GetStandbyFlushRecPtr() call above. + * still the one recovery is recovering from? */ - if (sendTimeLine != ThisTimeLineID) + if (sendTimeLine != SendRqstTLI) becameHistoric = true; } @@ -2733,7 +2734,7 @@ XLogSendPhysical(void) */ List *history; - history = readTimeLineHistory(ThisTimeLineID); + history = readTimeLineHistory(SendRqstTLI); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); Assert(sendTimeLine < sendTimeLineNextTLI); @@ -3069,11 +3070,11 @@ WalSndDone(WalSndSendDataCallback send_data) * can be sent to the standby. This should only be called when in recovery, * ie. we're streaming to a cascaded standby. * - * As a side-effect, ThisTimeLineID is updated to the TLI of the last + * As a side-effect, *tli is updated to the TLI of the last * replayed WAL record. */ static XLogRecPtr -GetStandbyFlushRecPtr(void) +GetStandbyFlushRecPtr(TimeLineID *tli) { XLogRecPtr replayPtr; TimeLineID replayTLI; @@ -3090,10 +3091,10 @@ GetStandbyFlushRecPtr(void) receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); - ThisTimeLineID = replayTLI; + *tli = replayTLI; result = replayPtr; - if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr) + if (receiveTLI == replayTLI && receivePtr > replayPtr) result = receivePtr; return result; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index a5cb3d322c..eebc91f3a5 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -98,7 +98,9 @@ extern void wal_segment_open(XLogReaderState *state, extern void wal_segment_close(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, - XLogRecPtr wantPage, uint32 wantLength); + XLogRecPtr wantPage, + uint32 wantLength, + TimeLineID currTLI); extern void WALReadRaiseError(WALReadError *errinfo); -- 2.39.5