From b898d46652a0214e9f2ec83a05abf50b6ac635ce Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 29 Oct 2021 13:01:09 -0400 Subject: [PATCH] walreceiver.c: Don't depend on ThisTimeLineID. Instead, pass the TLI around explicitly, as a function parameter. Since this calls a few xlog.c functions that used ThisTimeLineID, it was necessary to also change those functions to take a TimeLineID as a parameter. --- src/backend/access/transam/xlog.c | 52 ++++++++++++++++---------- src/backend/replication/walreceiver.c | 54 ++++++++++++++++----------- src/include/access/xlog.h | 4 +- 3 files changed, 66 insertions(+), 44 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ab9cb2093c..23a3d49f77 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -927,7 +927,8 @@ static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic); static bool XLogCheckpointNeeded(XLogSegNo new_segno); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, - bool find_free, XLogSegNo max_segno); + bool find_free, XLogSegNo max_segno, + TimeLineID tli); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); @@ -2517,7 +2518,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) wal_segment_size); /* create/use new log file */ - openLogFile = XLogFileInit(openLogSegNo); + openLogFile = XLogFileInit(openLogSegNo, ThisTimeLineID); ReserveExternalFD(); } @@ -2632,7 +2633,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) */ if (finishing_seg) { - issue_xlog_fsync(openLogFile, openLogSegNo); + issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID); /* signal that we need to wakeup walsenders later */ WalSndWakeupRequest(); @@ -2703,7 +2704,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) ReserveExternalFD(); } - issue_xlog_fsync(openLogFile, openLogSegNo); + issue_xlog_fsync(openLogFile, openLogSegNo, ThisTimeLineID); } /* signal that we need to wakeup walsenders later */ @@ -3295,7 +3296,8 @@ XLogNeedsFlush(XLogRecPtr record) * succeed. (This is weird, but it's efficient for the callers.) */ static int -XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) +XLogFileInitInternal(XLogSegNo logsegno, TimeLineID logtli, + bool *added, char *path) { char tmppath[MAXPGPATH]; PGAlignedXLogBlock zbuffer; @@ -3304,7 +3306,9 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) int fd; int save_errno; - XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size); + Assert(logtli != 0); + + XLogFilePath(path, logtli, logsegno, wal_segment_size); /* * Try to use existent file (checkpoint maker may have created it already) @@ -3448,7 +3452,8 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) * CheckPointSegments. */ max_segno = logsegno + CheckPointSegments; - if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno)) + if (InstallXLogFileSegment(&installed_segno, tmppath, true, max_segno, + logtli)) { *added = true; elog(DEBUG2, "done creating and filling new WAL file"); @@ -3480,13 +3485,15 @@ XLogFileInitInternal(XLogSegNo logsegno, bool *added, char *path) * in a critical section. */ int -XLogFileInit(XLogSegNo logsegno) +XLogFileInit(XLogSegNo logsegno, TimeLineID logtli) { bool ignore_added; char path[MAXPGPATH]; int fd; - fd = XLogFileInitInternal(logsegno, &ignore_added, path); + Assert(logtli != 0); + + fd = XLogFileInitInternal(logsegno, logtli, &ignore_added, path); if (fd >= 0) return fd; @@ -3628,7 +3635,7 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno, /* * Now move the segment into place with its final name. */ - if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0)) + if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, ThisTimeLineID)) elog(ERROR, "InstallXLogFileSegment should not have failed"); } @@ -3652,18 +3659,22 @@ XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno, * free slot is found between *segno and max_segno. (Ignored when find_free * is false.) * + * tli: The timeline on which the new segment should be installed. + * * Returns true if the file was installed successfully. false indicates that * max_segno limit was exceeded, the startup process has disabled this * function for now, or an error occurred while renaming the file into place. */ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, - bool find_free, XLogSegNo max_segno) + bool find_free, XLogSegNo max_segno, TimeLineID tli) { char path[MAXPGPATH]; struct stat stat_buf; - XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size); + Assert(tli != 0); + + XLogFilePath(path, tli, *segno, wal_segment_size); LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); if (!XLogCtl->InstallXLogFileSegmentActive) @@ -3689,7 +3700,7 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, return false; } (*segno)++; - XLogFilePath(path, ThisTimeLineID, *segno, wal_segment_size); + XLogFilePath(path, tli, *segno, wal_segment_size); } } @@ -3986,7 +3997,7 @@ PreallocXlogFiles(XLogRecPtr endptr) if (offset >= (uint32) (0.75 * wal_segment_size)) { _logSegNo++; - lf = XLogFileInitInternal(_logSegNo, &added, path); + lf = XLogFileInitInternal(_logSegNo, ThisTimeLineID, &added, path); if (lf >= 0) close(lf); if (added) @@ -4265,7 +4276,7 @@ RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo, XLogCtl->InstallXLogFileSegmentActive && /* callee rechecks this */ lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) && InstallXLogFileSegment(endlogSegNo, path, - true, recycleSegNo)) + true, recycleSegNo, ThisTimeLineID)) { ereport(DEBUG2, (errmsg_internal("recycled write-ahead log file \"%s\"", @@ -5400,7 +5411,7 @@ BootStrapXLOG(void) record->xl_crc = crc; /* Create first XLOG segment file */ - openLogFile = XLogFileInit(1); + openLogFile = XLogFileInit(1, ThisTimeLineID); /* * We needn't bother with Reserve/ReleaseExternalFD here, since we'll @@ -5708,7 +5719,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog) */ int fd; - fd = XLogFileInit(startLogSegNo); + fd = XLogFileInit(startLogSegNo, ThisTimeLineID); if (close(fd) != 0) { @@ -10864,11 +10875,13 @@ assign_xlog_sync_method(int new_sync_method, void *extra) * 'segno' is for error reporting purposes. */ void -issue_xlog_fsync(int fd, XLogSegNo segno) +issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli) { char *msg = NULL; instr_time start; + Assert(tli != 0); + /* * Quick exit if fsync is disabled or write() has already synced the WAL * file. @@ -10917,8 +10930,7 @@ issue_xlog_fsync(int fd, XLogSegNo segno) char xlogfname[MAXFNAMELEN]; int save_errno = errno; - XLogFileName(xlogfname, ThisTimeLineID, segno, - wal_segment_size); + XLogFileName(xlogfname, tli, segno, wal_segment_size); errno = save_errno; ereport(PANIC, (errcode_for_file_access(), diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b90e5ca98e..7a7eb3784e 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -122,10 +122,12 @@ static StringInfoData incoming_message; static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); -static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); -static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); -static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvClose(XLogRecPtr recptr); +static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, + TimeLineID tli); +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, + TimeLineID tli); +static void XLogWalRcvFlush(bool dying, TimeLineID tli); +static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -255,7 +257,7 @@ WalReceiverMain(void) pg_atomic_write_u64(&WalRcv->writtenUpto, 0); /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvDie, 0); + on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI)); /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config @@ -394,7 +396,6 @@ WalReceiverMain(void) options.startpoint = startpoint; options.slotname = slotname[0] != '\0' ? slotname : NULL; options.proto.physical.startpointTLI = startpointTLI; - ThisTimeLineID = startpointTLI; if (walrcv_startstreaming(wrconn, &options)) { if (first_stream) @@ -462,7 +463,8 @@ WalReceiverMain(void) */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; - XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); + XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, + startpointTLI); } else if (len == 0) break; @@ -487,7 +489,7 @@ WalReceiverMain(void) * let the startup process and primary server know about * them. */ - XLogWalRcvFlush(false); + XLogWalRcvFlush(false, startpointTLI); } /* Check if we need to exit the streaming loop. */ @@ -608,7 +610,7 @@ WalReceiverMain(void) { char xlogfname[MAXFNAMELEN]; - XLogWalRcvFlush(false); + XLogWalRcvFlush(false, startpointTLI); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); if (close(recvFile) != 0) ereport(PANIC, @@ -776,9 +778,12 @@ static void WalRcvDie(int code, Datum arg) { WalRcvData *walrcv = WalRcv; + TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg); + + Assert(*startpointTLI_p != 0); /* Ensure that all WAL records received are flushed to disk */ - XLogWalRcvFlush(true); + XLogWalRcvFlush(true, *startpointTLI_p); /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); @@ -808,7 +813,7 @@ WalRcvDie(int code, Datum arg) * Accept the message from XLOG stream, and process it. */ static void -XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) +XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) { int hdrlen; XLogRecPtr dataStart; @@ -838,7 +843,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) buf += hdrlen; len -= hdrlen; - XLogWalRcvWrite(buf, len, dataStart); + XLogWalRcvWrite(buf, len, dataStart, tli); break; } case 'k': /* Keepalive */ @@ -875,25 +880,27 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) * Write XLOG data to disk. */ static void -XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) +XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) { int startoff; int byteswritten; + Assert(tli != 0); + while (nbytes > 0) { int segbytes; /* Close the current segment if it's completed */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr); + XLogWalRcvClose(recptr, tli); if (recvFile < 0) { /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); - recvFile = XLogFileInit(recvSegNo); - recvFileTLI = ThisTimeLineID; + recvFile = XLogFileInit(recvSegNo, tli); + recvFileTLI = tli; } /* Calculate the start offset of the received logs */ @@ -946,7 +953,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * segment is received and written. */ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - XLogWalRcvClose(recptr); + XLogWalRcvClose(recptr, tli); } /* @@ -956,13 +963,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * an error, so we skip sending a reply in that case. */ static void -XLogWalRcvFlush(bool dying) +XLogWalRcvFlush(bool dying, TimeLineID tli) { + Assert(tli != 0); + if (LogstreamResult.Flush < LogstreamResult.Write) { WalRcvData *walrcv = WalRcv; - issue_xlog_fsync(recvFile, recvSegNo); + issue_xlog_fsync(recvFile, recvSegNo, tli); LogstreamResult.Flush = LogstreamResult.Write; @@ -972,7 +981,7 @@ XLogWalRcvFlush(bool dying) { walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; - walrcv->receivedTLI = ThisTimeLineID; + walrcv->receivedTLI = tli; } SpinLockRelease(&walrcv->mutex); @@ -1009,17 +1018,18 @@ XLogWalRcvFlush(bool dying) * Create an archive notification file since the segment is known completed. */ static void -XLogWalRcvClose(XLogRecPtr recptr) +XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) { char xlogfname[MAXFNAMELEN]; Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)); + Assert(tli != 0); /* * fsync() and close current file before we switch to next one. We would * otherwise have to reopen this file to fsync it later */ - XLogWalRcvFlush(false); + XLogWalRcvFlush(false, tli); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 2941265017..c36d688401 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -262,7 +262,7 @@ extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, extern void XLogFlush(XLogRecPtr RecPtr); extern bool XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); -extern int XLogFileInit(XLogSegNo segno); +extern int XLogFileInit(XLogSegNo segno, TimeLineID tli); extern int XLogFileOpen(XLogSegNo segno); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); @@ -274,7 +274,7 @@ extern void xlog_redo(XLogReaderState *record); extern void xlog_desc(StringInfo buf, XLogReaderState *record); extern const char *xlog_identify(uint8 info); -extern void issue_xlog_fsync(int fd, XLogSegNo segno); +extern void issue_xlog_fsync(int fd, XLogSegNo segno, TimeLineID tli); extern bool RecoveryInProgress(void); extern RecoveryState GetRecoveryState(void); -- 2.30.2