* twophase files and ReadTwoPhaseFile should be used instead.
*
* Note clearly that this function can access WAL during normal operation,
- * similarly to the way WALSender or Logical Decoding would do. While
- * accessing WAL, read_local_xlog_page() may change ThisTimeLineID,
- * particularly if this routine is called for the end-of-recovery checkpoint
- * in the checkpointer itself, so save the current timeline number value
- * and restore it once done.
+ * similarly to the way WALSender or Logical Decoding would do.
*/
static void
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
- TimeLineID save_currtli = ThisTimeLineID;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.page_read = &read_local_xlog_page,
XLogBeginRead(xlogreader, lsn);
record = XLogReadRecord(xlogreader, &errormsg);
- /*
- * Restore immediately the timeline where it was previously, as
- * read_local_xlog_page() could have changed it if the record was read
- * while recovery was finishing or if the timeline has jumped in-between.
- */
- ThisTimeLineID = save_currtli;
-
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),
* wantLength to the amount of the page that will be read, up to
* XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
*
+ * The currTLI argument should be the system-wide current timeline.
+ * Note that this may be different from state->currTLI, which is the timeline
+ * from which the caller is currently reading previous xlog records.
+ *
* We switch to an xlog segment from the new timeline eagerly when on a
* historical timeline, as soon as we reach the start of the xlog segment
* containing the timeline switch. The server copied the segment to the new
*
* The caller must also make sure it doesn't read past the current replay
* position (using GetXLogReplayRecPtr) if executing in recovery, so it
- * doesn't fail to notice that the current timeline became historical. The
- * caller must also update ThisTimeLineID with the result of
- * GetXLogReplayRecPtr and must check RecoveryInProgress().
+ * doesn't fail to notice that the current timeline became historical.
*/
void
-XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
+XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage,
+ uint32 wantLength, TimeLineID currTLI)
{
const XLogRecPtr lastReadPage = (state->seg.ws_segno *
state->segcxt.ws_segsize + state->segoff);
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ);
Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
+ Assert(currTLI != 0);
/*
* If the desired page is currently read in and valid, we have nothing to
* just carry on. (Seeking backwards requires a check to make sure the
* older page isn't on a prior timeline).
*
- * ThisTimeLineID might've become historical since we last looked, but the
- * caller is required not to read past the flush limit it saw at the time
- * it looked up the timeline. There's nothing we can do about it if
- * StartupXLOG() renames it to .partial concurrently.
+ * currTLI might've become historical since the caller obtained the value,
+ * but the caller is required not to read past the flush limit it saw at
+ * the time it looked up the timeline. There's nothing we can do about it
+ * if StartupXLOG() renames it to .partial concurrently.
*/
- if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
+ if (state->currTLI == currTLI && wantPage >= lastReadPage)
{
Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
return;
* the current segment we can just keep reading.
*/
if (state->currTLIValidUntil != InvalidXLogRecPtr &&
- state->currTLI != ThisTimeLineID &&
+ state->currTLI != currTLI &&
state->currTLI != 0 &&
((wantPage + wantLength) / state->segcxt.ws_segsize) <
(state->currTLIValidUntil / state->segcxt.ws_segsize))
* We need to re-read the timeline history in case it's been changed
* by a promotion or replay from a cascaded replica.
*/
- List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
+ List *timelineHistory = readTimeLineHistory(currTLI);
XLogRecPtr endOfSegment;
endOfSegment = ((wantPage / state->segcxt.ws_segsize) + 1) *
TimeLineID tli;
int count;
WALReadError errinfo;
+ TimeLineID currTLI;
loc = targetPagePtr + reqLen;
* most recent timeline is.
*/
if (!RecoveryInProgress())
- read_upto = GetFlushRecPtr(&ThisTimeLineID);
+ read_upto = GetFlushRecPtr(&currTLI);
else
- read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
- tli = ThisTimeLineID;
+ read_upto = GetXLogReplayRecPtr(&currTLI);
+ tli = currTLI;
/*
* Check which timeline to get the record from.
* archive in the timeline will get renamed to .partial by
* StartupXLOG().
*
- * If that happens after our caller updated ThisTimeLineID but before
+ * If that happens after our caller determined the TLI but before
* we actually read the xlog page, we might still try to read from the
* old (now renamed) segment and fail. There's not much we can do
* about this, but it can only happen when we're a leaf of a cascading
* standby whose primary gets promoted while we're decoding, so a
* one-off ERROR isn't too bad.
*/
- XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen, tli);
- if (state->currTLI == ThisTimeLineID)
+ if (state->currTLI == currTLI)
{
if (loc <= read_upto)
static void XLogSendPhysical(void);
static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
-static XLogRecPtr GetStandbyFlushRecPtr(void);
+static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
static void IdentifySystem(void);
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
TupleDesc tupdesc;
Datum values[4];
bool nulls[4];
+ TimeLineID currTLI;
/*
* Reply with a result set with one row, four columns. First col is system
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
- {
- /* this also updates ThisTimeLineID */
- logptr = GetStandbyFlushRecPtr();
- }
+ logptr = GetStandbyFlushRecPtr(&currTLI);
else
- logptr = GetFlushRecPtr(&ThisTimeLineID);
+ logptr = GetFlushRecPtr(&currTLI);
snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
values[0] = CStringGetTextDatum(sysid);
/* column 2: timeline */
- values[1] = Int32GetDatum(ThisTimeLineID);
+ values[1] = Int32GetDatum(currTLI);
/* column 3: wal location */
values[2] = CStringGetTextDatum(xloc);
if (RecoveryInProgress())
(void) GetXLogReplayRecPtr(¤t_timeline);
else
- current_timeline = ThisTimeLineID;
+ current_timeline = GetWALInsertionTimeLine();
timeline_history = readTimeLineHistory(current_timeline);
slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
{
StringInfoData buf;
XLogRecPtr FlushPtr;
+ TimeLineID FlushTLI;
/* create xlogreader for physical replication */
xlogreader =
/*
* Select the timeline. If it was given explicitly by the client, use
- * that. Otherwise use the timeline of the last replayed record, which is
- * kept in ThisTimeLineID.
+ * that. Otherwise use the timeline of the last replayed record.
*/
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
- {
- /* this also updates ThisTimeLineID */
- FlushPtr = GetStandbyFlushRecPtr();
- }
+ FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
else
- FlushPtr = GetFlushRecPtr(&ThisTimeLineID);
+ FlushPtr = GetFlushRecPtr(&FlushTLI);
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
sendTimeLine = cmd->timeline;
- if (sendTimeLine == ThisTimeLineID)
+ if (sendTimeLine == FlushTLI)
{
sendTimeLineIsHistoric = false;
sendTimeLineValidUpto = InvalidXLogRecPtr;
* Check that the timeline the client requested exists, and the
* requested start location is on that timeline.
*/
- timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+ timeLineHistory = readTimeLineHistory(FlushTLI);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
&sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
}
else
{
- sendTimeLine = ThisTimeLineID;
+ sendTimeLine = FlushTLI;
sendTimeLineValidUpto = InvalidXLogRecPtr;
sendTimeLineIsHistoric = false;
}
int count;
WALReadError errinfo;
XLogSegNo segno;
+ TimeLineID currTLI = GetWALInsertionTimeLine();
- XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
- sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
+ /*
+ * Since logical decoding is only permitted on a primary server, we know
+ * that the current timeline ID can't be changing any more. If we did this
+ * on a standby, we'd have to worry about the values we compute here
+ * becoming invalid due to a promotion or timeline change.
+ */
+ XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
+ sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;
}
else if (am_cascading_walsender)
{
+ TimeLineID SendRqstTLI;
+
/*
* Streaming the latest timeline on a standby.
*
*/
bool becameHistoric = false;
- SendRqstPtr = GetStandbyFlushRecPtr();
+ SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
if (!RecoveryInProgress())
{
- /*
- * We have been promoted. RecoveryInProgress() updated
- * ThisTimeLineID to the new current timeline.
- */
+ /* We have been promoted. */
+ SendRqstTLI = GetWALInsertionTimeLine();
am_cascading_walsender = false;
becameHistoric = true;
}
{
/*
* Still a cascading standby. But is the timeline we're sending
- * still the one recovery is recovering from? ThisTimeLineID was
- * updated by the GetStandbyFlushRecPtr() call above.
+ * still the one recovery is recovering from?
*/
- if (sendTimeLine != ThisTimeLineID)
+ if (sendTimeLine != SendRqstTLI)
becameHistoric = true;
}
*/
List *history;
- history = readTimeLineHistory(ThisTimeLineID);
+ history = readTimeLineHistory(SendRqstTLI);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sendTimeLine < sendTimeLineNextTLI);
* can be sent to the standby. This should only be called when in recovery,
* ie. we're streaming to a cascaded standby.
*
- * As a side-effect, ThisTimeLineID is updated to the TLI of the last
+ * As a side-effect, *tli is updated to the TLI of the last
* replayed WAL record.
*/
static XLogRecPtr
-GetStandbyFlushRecPtr(void)
+GetStandbyFlushRecPtr(TimeLineID *tli)
{
XLogRecPtr replayPtr;
TimeLineID replayTLI;
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
- ThisTimeLineID = replayTLI;
+ *tli = replayTLI;
result = replayPtr;
- if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
+ if (receiveTLI == replayTLI && receivePtr > replayPtr)
result = receivePtr;
return result;