summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
authorAlvaro Herrera2020-05-08 19:30:34 +0000
committerAlvaro Herrera2020-05-08 19:40:11 +0000
commitb060dbe0001a1d6bf26cd294710f3cb203868d46 (patch)
tree6e9e980aa63ec1ec3655b93c92b9b5caa6689d38 /src/include
parent871696ba20e0251e86041576373809d1c7ca161d (diff)
Rework XLogReader callback system
Code review for 0dc8ead46363, prompted by a bug closed by 91c40548d5f7. XLogReader's system for opening and closing segments had gotten too complicated, with callbacks being passed at both the XLogReaderAllocate level (read_page) as well as at the WALRead level (segment_open). This was confusing and hard to follow, so restructure things so that these callbacks are passed together at XLogReaderAllocate time, and add another callback to the set (segment_close) to make it a coherent whole. Also, ensure XLogReaderState is an argument to all the callbacks, so that they can grab at the ->private data if necessary. Document the whole arrangement more clearly. Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql
Diffstat (limited to 'src/include')
-rw-r--r--src/include/access/xlogreader.h119
-rw-r--r--src/include/access/xlogutils.h5
-rw-r--r--src/include/replication/logical.h4
3 files changed, 82 insertions, 46 deletions
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 4582196e185..81af200f5e6 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -17,6 +17,13 @@
* XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
* until it returns NULL.
*
+ * Callers supply a page_read callback if they want to to call
+ * XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL
+ * otherwise. The WALRead function can be used as a helper to write
+ * page_read callbacks, but it is not mandatory; callers that use it,
+ * must supply open_segment callbacks. The close_segment callback
+ * must always be supplied.
+ *
* After reading a record with XLogReadRecord(), it's decomposed into
* the per-block and main data parts, and the parts can be accessed
* with the XLogRec* macros and functions. You can also decode a
@@ -50,12 +57,69 @@ typedef struct WALSegmentContext
typedef struct XLogReaderState XLogReaderState;
-/* Function type definition for the read_page callback */
+/* Function type definitions for various xlogreader interactions */
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);
+typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
+
+typedef struct XLogReaderRoutine
+{
+ /*
+ * Data input callback
+ *
+ * This callback shall read at least reqLen valid bytes of the xlog page
+ * starting at targetPagePtr, and store them in readBuf. The callback
+ * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
+ * -1 on failure. The callback shall sleep, if necessary, to wait for the
+ * requested bytes to become available. The callback will not be invoked
+ * again for the same page unless more than the returned number of bytes
+ * are needed.
+ *
+ * targetRecPtr is the position of the WAL record we're reading. Usually
+ * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
+ * to read and verify the page or segment header, before it reads the
+ * actual WAL record it's interested in. In that case, targetRecPtr can
+ * be used to determine which timeline to read the page from.
+ *
+ * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+ * read from.
+ */
+ XLogPageReadCB page_read;
+
+ /*
+ * Callback to open the specified WAL segment for reading. The file
+ * descriptor of the opened segment shall be returned. In case of
+ * failure, an error shall be raised by the callback and it shall not
+ * return.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can
+ * use it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in
+ * backend code, whereas open(2) should be used in frontend.
+ */
+ WALSegmentOpenCB segment_open;
+
+ /*
+ * WAL segment close callback. ->seg.ws_file shall be set to a negative
+ * number.
+ */
+ WALSegmentCloseCB segment_close;
+} XLogReaderRoutine;
+
+#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
typedef struct
{
@@ -88,34 +152,17 @@ typedef struct
struct XLogReaderState
{
+ /*
+ * Operational callbacks
+ */
+ XLogReaderRoutine routine;
+
/* ----------------------------------------
* Public parameters
* ----------------------------------------
*/
/*
- * Data input callback (mandatory).
- *
- * This callback shall read at least reqLen valid bytes of the xlog page
- * starting at targetPagePtr, and store them in readBuf. The callback
- * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
- * -1 on failure. The callback shall sleep, if necessary, to wait for the
- * requested bytes to become available. The callback will not be invoked
- * again for the same page unless more than the returned number of bytes
- * are needed.
- *
- * targetRecPtr is the position of the WAL record we're reading. Usually
- * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
- * to read and verify the page or segment header, before it reads the
- * actual WAL record it's interested in. In that case, targetRecPtr can
- * be used to determine which timeline to read the page from.
- *
- * The callback shall set ->seg.ws_tli to the TLI of the file the page was
- * read from.
- */
- XLogPageReadCB read_page;
-
- /*
* System identifier of the xlog files we're about to read. Set to zero
* (the default value) if unknown or unimportant.
*/
@@ -214,30 +261,13 @@ struct XLogReaderState
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir,
- XLogPageReadCB pagereadfunc,
+ XLogReaderRoutine *routine,
void *private_data);
+extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
-/*
- * Callback to open the specified WAL segment for reading. Returns a valid
- * file descriptor when the file was opened successfully.
- *
- * "nextSegNo" is the number of the segment to be opened.
- *
- * "segcxt" is additional information about the segment.
- *
- * "tli_p" is an input/output argument. XLogRead() uses it to pass the
- * timeline in which the new segment should be found, but the callback can use
- * it to return the TLI that it actually opened.
- *
- * BasicOpenFile() is the preferred way to open the segment file in backend
- * code, whereas open(2) should be used in frontend.
- */
-typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
- TimeLineID *tli_p);
-
/* Initialize supporting structures */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
@@ -269,9 +299,10 @@ typedef struct WALReadError
WALOpenSegment wre_seg; /* Segment we tried to read from. */
} WALReadError;
-extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+extern bool WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALOpenSegment *seg,
- WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+ WALSegmentContext *segcxt,
WALReadError *errinfo);
/* Functions for decoding an XLogRecord */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d96..68ce815476c 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page);
+extern int wal_segment_open(XLogReaderState *state,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+extern void wal_segment_close(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 3b7ca7f1da4..c2f2475e5d3 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);