diff options
| author | Alvaro Herrera | 2020-05-08 19:30:34 +0000 |
|---|---|---|
| committer | Alvaro Herrera | 2020-05-08 19:40:11 +0000 |
| commit | b060dbe0001a1d6bf26cd294710f3cb203868d46 (patch) | |
| tree | 6e9e980aa63ec1ec3655b93c92b9b5caa6689d38 /src/include | |
| parent | 871696ba20e0251e86041576373809d1c7ca161d (diff) | |
Rework XLogReader callback system
Code review for 0dc8ead46363, prompted by a bug closed by 91c40548d5f7.
XLogReader's system for opening and closing segments had gotten too
complicated, with callbacks being passed at both the XLogReaderAllocate
level (read_page) as well as at the WALRead level (segment_open). This
was confusing and hard to follow, so restructure things so that these
callbacks are passed together at XLogReaderAllocate time, and add
another callback to the set (segment_close) to make it a coherent whole.
Also, ensure XLogReaderState is an argument to all the callbacks, so
that they can grab at the ->private data if necessary.
Document the whole arrangement more clearly.
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql
Diffstat (limited to 'src/include')
| -rw-r--r-- | src/include/access/xlogreader.h | 119 | ||||
| -rw-r--r-- | src/include/access/xlogutils.h | 5 | ||||
| -rw-r--r-- | src/include/replication/logical.h | 4 |
3 files changed, 82 insertions, 46 deletions
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 4582196e185..81af200f5e6 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -17,6 +17,13 @@ * XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord() * until it returns NULL. * + * Callers supply a page_read callback if they want to to call + * XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL + * otherwise. The WALRead function can be used as a helper to write + * page_read callbacks, but it is not mandatory; callers that use it, + * must supply open_segment callbacks. The close_segment callback + * must always be supplied. + * * After reading a record with XLogReadRecord(), it's decomposed into * the per-block and main data parts, and the parts can be accessed * with the XLogRec* macros and functions. You can also decode a @@ -50,12 +57,69 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; -/* Function type definition for the read_page callback */ +/* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, + XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p); +typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); + +typedef struct XLogReaderRoutine +{ + /* + * Data input callback + * + * This callback shall read at least reqLen valid bytes of the xlog page + * starting at targetPagePtr, and store them in readBuf. The callback + * shall return the number of bytes read (never more than XLOG_BLCKSZ), or + * -1 on failure. The callback shall sleep, if necessary, to wait for the + * requested bytes to become available. The callback will not be invoked + * again for the same page unless more than the returned number of bytes + * are needed. + * + * targetRecPtr is the position of the WAL record we're reading. Usually + * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs + * to read and verify the page or segment header, before it reads the + * actual WAL record it's interested in. In that case, targetRecPtr can + * be used to determine which timeline to read the page from. + * + * The callback shall set ->seg.ws_tli to the TLI of the file the page was + * read from. + */ + XLogPageReadCB page_read; + + /* + * Callback to open the specified WAL segment for reading. The file + * descriptor of the opened segment shall be returned. In case of + * failure, an error shall be raised by the callback and it shall not + * return. + * + * "nextSegNo" is the number of the segment to be opened. + * + * "segcxt" is additional information about the segment. + * + * "tli_p" is an input/output argument. XLogRead() uses it to pass the + * timeline in which the new segment should be found, but the callback can + * use it to return the TLI that it actually opened. + * + * BasicOpenFile() is the preferred way to open the segment file in + * backend code, whereas open(2) should be used in frontend. + */ + WALSegmentOpenCB segment_open; + + /* + * WAL segment close callback. ->seg.ws_file shall be set to a negative + * number. + */ + WALSegmentCloseCB segment_close; +} XLogReaderRoutine; + +#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__} typedef struct { @@ -88,34 +152,17 @@ typedef struct struct XLogReaderState { + /* + * Operational callbacks + */ + XLogReaderRoutine routine; + /* ---------------------------------------- * Public parameters * ---------------------------------------- */ /* - * Data input callback (mandatory). - * - * This callback shall read at least reqLen valid bytes of the xlog page - * starting at targetPagePtr, and store them in readBuf. The callback - * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. - * - * targetRecPtr is the position of the WAL record we're reading. Usually - * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs - * to read and verify the page or segment header, before it reads the - * actual WAL record it's interested in. In that case, targetRecPtr can - * be used to determine which timeline to read the page from. - * - * The callback shall set ->seg.ws_tli to the TLI of the file the page was - * read from. - */ - XLogPageReadCB read_page; - - /* * System identifier of the xlog files we're about to read. Set to zero * (the default value) if unknown or unimportant. */ @@ -214,30 +261,13 @@ struct XLogReaderState /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, + XLogReaderRoutine *routine, void *private_data); +extern XLogReaderRoutine *LocalXLogReaderRoutine(void); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); -/* - * Callback to open the specified WAL segment for reading. Returns a valid - * file descriptor when the file was opened successfully. - * - * "nextSegNo" is the number of the segment to be opened. - * - * "segcxt" is additional information about the segment. - * - * "tli_p" is an input/output argument. XLogRead() uses it to pass the - * timeline in which the new segment should be found, but the callback can use - * it to return the TLI that it actually opened. - * - * BasicOpenFile() is the preferred way to open the segment file in backend - * code, whereas open(2) should be used in frontend. - */ -typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt, - TimeLineID *tli_p); - /* Initialize supporting structures */ extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); @@ -269,9 +299,10 @@ typedef struct WALReadError WALOpenSegment wre_seg; /* Segment we tried to read from. */ } WALReadError; -extern bool WALRead(char *buf, XLogRecPtr startptr, Size count, +extern bool WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, - WALSegmentContext *segcxt, WALSegmentOpen openSegment, + WALSegmentContext *segcxt, WALReadError *errinfo); /* Functions for decoding an XLogRecord */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d96..68ce815476c 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); +extern int wal_segment_open(XLogReaderState *state, + XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p); +extern void wal_segment_close(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 3b7ca7f1da4..c2f2475e5d3 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); |
