summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlogreader.c35
-rw-r--r--src/backend/access/transam/xlogutils.c16
-rw-r--r--src/backend/replication/walsender.c98
-rw-r--r--src/bin/pg_waldump/pg_waldump.c16
-rw-r--r--src/include/access/xlogreader.h20
-rw-r--r--src/include/access/xlogutils.h3
6 files changed, 83 insertions, 105 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7cee8b92c90..aae3fee24cd 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1044,14 +1044,12 @@ err:
/*
* Helper function to ease writing of XLogRoutine->page_read callbacks.
- * If this function is used, caller must supply an open_segment callback in
+ * If this function is used, caller must supply a segment_open callback in
* 'state', as that is used here.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
- * 'seg/segcxt' identify the last segment used.
- *
* Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details.
*
@@ -1061,7 +1059,6 @@ err:
bool
WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
- WALOpenSegment *seg, WALSegmentContext *segcxt,
WALReadError *errinfo)
{
char *p;
@@ -1078,34 +1075,36 @@ WALRead(XLogReaderState *state,
int segbytes;
int readbytes;
- startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+ startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
/*
* If the data we want is not in a segment we have open, close what we
* have (if anything) and open the next one, using the caller's
* provided openSegment callback.
*/
- if (seg->ws_file < 0 ||
- !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
- tli != seg->ws_tli)
+ if (state->seg.ws_file < 0 ||
+ !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
+ tli != state->seg.ws_tli)
{
XLogSegNo nextSegNo;
- if (seg->ws_file >= 0)
+ if (state->seg.ws_file >= 0)
state->routine.segment_close(state);
- XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
- seg->ws_file = state->routine.segment_open(state, nextSegNo,
- segcxt, &tli);
+ XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
+ state->routine.segment_open(state, nextSegNo, &tli);
+
+ /* This shouldn't happen -- indicates a bug in segment_open */
+ Assert(state->seg.ws_file >= 0);
/* Update the current segment info. */
- seg->ws_tli = tli;
- seg->ws_segno = nextSegNo;
+ state->seg.ws_tli = tli;
+ state->seg.ws_segno = nextSegNo;
}
/* How many bytes are within this segment? */
- if (nbytes > (segcxt->ws_segsize - startoff))
- segbytes = segcxt->ws_segsize - startoff;
+ if (nbytes > (state->segcxt.ws_segsize - startoff))
+ segbytes = state->segcxt.ws_segsize - startoff;
else
segbytes = nbytes;
@@ -1115,7 +1114,7 @@ WALRead(XLogReaderState *state,
/* Reset errno first; eases reporting non-errno-affecting errors */
errno = 0;
- readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+ readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
#ifndef FRONTEND
pgstat_report_wait_end();
@@ -1127,7 +1126,7 @@ WALRead(XLogReaderState *state,
errinfo->wre_req = segbytes;
errinfo->wre_read = readbytes;
errinfo->wre_off = startoff;
- errinfo->wre_seg = *seg;
+ errinfo->wre_seg = state->seg;
return false;
}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 0bb69447c26..322b0e8ff5b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -784,18 +784,17 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
/* XLogReaderRoutine->segment_open callback for local pg_wal files */
-int
+void
wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
- WALSegmentContext *segcxt, TimeLineID *tli_p)
+ TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
- int fd;
- XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
- fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (fd >= 0)
- return fd;
+ XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
+ state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (state->seg.ws_file >= 0)
+ return;
if (errno == ENOENT)
ereport(ERROR,
@@ -807,8 +806,6 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
-
- return -1; /* keep compiler quiet */
}
/* stock XLogReaderRoutine->segment_close callback */
@@ -947,7 +944,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* zero-padded up to the page boundary if it's incomplete.
*/
if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
- &state->seg, &state->segcxt,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9f14b992310..3367aa98f8a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -129,8 +129,14 @@ bool log_replication_commands = false;
*/
bool wake_wal_senders = false;
-static WALOpenSegment *sendSeg = NULL;
-static WALSegmentContext *sendCxt = NULL;
+/*
+ * Physical walsender does not use xlogreader to read WAL, but it does use a
+ * fake one to keep state. Logical walsender uses a proper xlogreader. Both
+ * keep the 'xlogreader' pointer to the right one, for the sake of common
+ * routines.
+ */
+static XLogReaderState fake_xlogreader;
+static XLogReaderState *xlogreader;
/*
* These variables keep track of the state of the timeline we're currently
@@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
- WALSegmentContext *segcxt, TimeLineID *tli_p);
+static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+ TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
@@ -280,12 +286,19 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
- /* Make sure we can remember the current read position in XLOG. */
- sendSeg = (WALOpenSegment *)
- MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
- sendCxt = (WALSegmentContext *)
- MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
- WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
+ /*
+ * Prepare physical walsender's fake xlogreader struct. Logical walsender
+ * does this later.
+ */
+ if (!am_db_walsender)
+ {
+ xlogreader = &fake_xlogreader;
+ xlogreader->routine =
+ *XL_ROUTINE(.segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close);
+ WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
+ wal_segment_size, NULL);
+ }
}
/*
@@ -302,11 +315,8 @@ WalSndErrorCleanup(void)
ConditionVariableCancelSleep();
pgstat_report_wait_end();
- if (sendSeg->ws_file >= 0)
- {
- close(sendSeg->ws_file);
- sendSeg->ws_file = -1;
- }
+ if (xlogreader->seg.ws_file >= 0)
+ wal_segment_close(xlogreader);
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
- sendSeg->ws_tli, /* Pass the current TLI because only
+ state->seg.ws_tli, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed. */
- sendSeg,
- sendCxt,
&errinfo))
WALReadRaiseError(&errinfo);
@@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
- CheckXLogRemoved(segno, sendSeg->ws_tli);
+ XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
+ CheckXLogRemoved(segno, state->seg.ws_tli);
return count;
}
@@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
.segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
+ xlogreader = logical_decoding_ctx->reader;
WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg)
}
/* XLogReaderRoutine->segment_open callback */
-static int
-WalSndSegmentOpen(XLogReaderState *state,
- XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+static void
+WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
char path[MAXPGPATH];
- int fd;
/*-------
* When reading from a historic timeline, and there is a timeline switch
@@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state,
{
XLogSegNo endSegNo;
- XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
- if (sendSeg->ws_segno == endSegNo)
+ XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
+ if (state->seg.ws_segno == endSegNo)
*tli_p = sendTimeLineNextTLI;
}
- XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
- fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (fd >= 0)
- return fd;
+ XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
+ state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (state->seg.ws_file >= 0)
+ return;
/*
* If the file is not found, assume it's because the standby asked for a
@@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
- return -1; /* keep compiler quiet */
}
/*
@@ -2537,12 +2543,6 @@ XLogSendPhysical(void)
Size nbytes;
XLogSegNo segno;
WALReadError errinfo;
- static XLogReaderState fake_xlogreader =
- {
- /* Fake xlogreader state for WALRead */
- .routine.segment_open = WalSndSegmentOpen,
- .routine.segment_close = wal_segment_close
- };
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -2685,9 +2685,8 @@ XLogSendPhysical(void)
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
/* close the current file. */
- if (sendSeg->ws_file >= 0)
- close(sendSeg->ws_file);
- sendSeg->ws_file = -1;
+ if (xlogreader->seg.ws_file >= 0)
+ wal_segment_close(xlogreader);
/* Send CopyDone */
pq_putmessage_noblock('c', NULL, 0);
@@ -2760,21 +2759,19 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- if (!WALRead(&fake_xlogreader,
+ if (!WALRead(xlogreader,
&output_message.data[output_message.len],
startptr,
nbytes,
- sendSeg->ws_tli, /* Pass the current TLI because only
- * WalSndSegmentOpen controls whether new
- * TLI is needed. */
- sendSeg,
- sendCxt,
+ xlogreader->seg.ws_tli, /* Pass the current TLI because
+ * only WalSndSegmentOpen controls
+ * whether new TLI is needed. */
&errinfo))
WALReadRaiseError(&errinfo);
/* See logical_read_xlog_page(). */
- XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
- CheckXLogRemoved(segno, sendSeg->ws_tli);
+ XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
+ CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
/*
* During recovery, the currently-open WAL file might be replaced with the
@@ -2792,10 +2789,9 @@ retry:
walsnd->needreload = false;
SpinLockRelease(&walsnd->mutex);
- if (reload && sendSeg->ws_file >= 0)
+ if (reload && xlogreader->seg.ws_file >= 0)
{
- close(sendSeg->ws_file);
- sendSeg->ws_file = -1;
+ wal_segment_close(xlogreader);
goto retry;
}
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e29f65500fb..d1a06789353 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -280,17 +280,15 @@ identify_target_directory(char *directory, char *fname)
}
/* pg_waldump's XLogReaderRoutine->segment_open callback */
-static int
-WALDumpOpenSegment(XLogReaderState *state,
- XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+static void
+WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char fname[MAXPGPATH];
- int fd;
int tries;
- XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
+ XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
/*
* In follow mode there is a short period of time after the server has
@@ -300,9 +298,9 @@ WALDumpOpenSegment(XLogReaderState *state,
*/
for (tries = 0; tries < 10; tries++)
{
- fd = open_file_in_directory(segcxt->ws_dir, fname);
- if (fd >= 0)
- return fd;
+ state->seg.ws_file = open_file_in_directory(state->segcxt.ws_dir, fname);
+ if (state->seg.ws_file >= 0)
+ return;
if (errno == ENOENT)
{
int save_errno = errno;
@@ -318,7 +316,6 @@ WALDumpOpenSegment(XLogReaderState *state,
}
fatal_error("could not find file \"%s\": %m", fname);
- return -1; /* keep compiler quiet */
}
/*
@@ -356,7 +353,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
- &state->seg, &state->segcxt,
&errinfo))
{
WALOpenSegment *seg = &errinfo.wre_seg;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 80cf62acb7c..c21b0ba9722 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -63,10 +63,9 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);
-typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
- XLogSegNo nextSegNo,
- WALSegmentContext *segcxt,
- TimeLineID *tli_p);
+typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+ XLogSegNo nextSegNo,
+ TimeLineID *tli_p);
typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
typedef struct XLogReaderRoutine
@@ -94,21 +93,16 @@ typedef struct XLogReaderRoutine
XLogPageReadCB page_read;
/*
- * Callback to open the specified WAL segment for reading. The file
- * descriptor of the opened segment shall be returned. In case of
+ * Callback to open the specified WAL segment for reading. ->seg.ws_file
+ * shall be set to the file descriptor of the opened segment. In case of
* failure, an error shall be raised by the callback and it shall not
* return.
*
* "nextSegNo" is the number of the segment to be opened.
*
- * "segcxt" is additional information about the segment.
- *
* "tli_p" is an input/output argument. WALRead() uses it to pass the
* timeline in which the new segment should be found, but the callback can
* use it to return the TLI that it actually opened.
- *
- * BasicOpenFile() is the preferred way to open the segment file in
- * backend code, whereas open(2) should be used in frontend.
*/
WALSegmentOpenCB segment_open;
@@ -301,9 +295,7 @@ typedef struct WALReadError
extern bool WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count,
- TimeLineID tli, WALOpenSegment *seg,
- WALSegmentContext *segcxt,
- WALReadError *errinfo);
+ TimeLineID tli, WALReadError *errinfo);
/* Functions for decoding an XLogRecord */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 68ce815476c..e59b6cf3a9f 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,9 +50,8 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page);
-extern int wal_segment_open(XLogReaderState *state,
+extern void wal_segment_open(XLogReaderState *state,
XLogSegNo nextSegNo,
- WALSegmentContext *segcxt,
TimeLineID *tli_p);
extern void wal_segment_close(XLogReaderState *state);