Make logical decoding a part of the rmgr.
authorJeff Davis <jdavis@postgresql.org>
Wed, 19 Jan 2022 22:58:04 +0000 (14:58 -0800)
committerJeff Davis <jdavis@postgresql.org>
Wed, 19 Jan 2022 22:58:49 +0000 (14:58 -0800)
Add a new rmgr method, rm_decode, and use that rather than a switch
statement.

In preparation for rmgr extensibility.

Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
Discussion: https://postgr.es/m/20220118095332.6xtlcjoyxobv6cbk@jrouhaud

src/backend/access/transam/rmgr.c
src/backend/replication/logical/decode.c
src/bin/pg_rewind/parsexlog.c
src/bin/pg_waldump/rmgrdesc.c
src/include/access/rmgr.h
src/include/access/rmgrlist.h
src/include/access/xlog_internal.h
src/include/replication/decode.h

index 58091f6b520b90661e2dee6435d676294077eb9a..f8847d5aebfdd33a37514a6a5b696cbd1048f117 100644 (file)
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
-   { name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+   { name, redo, desc, identify, startup, cleanup, mask, decode },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
index 1d22208c1ad6745b218d04f33793bf64e38e28c9..9b450c9f90e2ab5c74a641489e9f96704fcda0dc 100644 (file)
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
 
-typedef struct XLogRecordBuffer
-{
-   XLogRecPtr  origptr;
-   XLogRecPtr  endptr;
-   XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
    XLogRecordBuffer buf;
    TransactionId txid;
+   RmgrId rmid;
 
    buf.origptr = ctx->reader->ReadRecPtr;
    buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
                                 buf.origptr);
    }
 
-   /* cast so we get a warning when new rmgrs are added */
-   switch ((RmgrId) XLogRecGetRmid(record))
-   {
-           /*
-            * Rmgrs we care about for logical decoding. Add new rmgrs in
-            * rmgrlist.h's order.
-            */
-       case RM_XLOG_ID:
-           DecodeXLogOp(ctx, &buf);
-           break;
-
-       case RM_XACT_ID:
-           DecodeXactOp(ctx, &buf);
-           break;
+   rmid = XLogRecGetRmid(record);
 
-       case RM_STANDBY_ID:
-           DecodeStandbyOp(ctx, &buf);
-           break;
-
-       case RM_HEAP2_ID:
-           DecodeHeap2Op(ctx, &buf);
-           break;
-
-       case RM_HEAP_ID:
-           DecodeHeapOp(ctx, &buf);
-           break;
-
-       case RM_LOGICALMSG_ID:
-           DecodeLogicalMsgOp(ctx, &buf);
-           break;
-
-           /*
-            * Rmgrs irrelevant for logical decoding; they describe stuff not
-            * represented in logical decoding. Add new rmgrs in rmgrlist.h's
-            * order.
-            */
-       case RM_SMGR_ID:
-       case RM_CLOG_ID:
-       case RM_DBASE_ID:
-       case RM_TBLSPC_ID:
-       case RM_MULTIXACT_ID:
-       case RM_RELMAP_ID:
-       case RM_BTREE_ID:
-       case RM_HASH_ID:
-       case RM_GIN_ID:
-       case RM_GIST_ID:
-       case RM_SEQ_ID:
-       case RM_SPGIST_ID:
-       case RM_BRIN_ID:
-       case RM_COMMIT_TS_ID:
-       case RM_REPLORIGIN_ID:
-       case RM_GENERIC_ID:
-           /* just deal with xid, and done */
-           ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
-                                   buf.origptr);
-           break;
-       case RM_NEXT_ID:
-           elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+   if (RmgrTable[rmid].rm_decode != NULL)
+       RmgrTable[rmid].rm_decode(ctx, &buf);
+   else
+   {
+       /* just deal with xid, and done */
+       ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+                               buf.origptr);
    }
 }
 
 /*
  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
    SnapBuild  *builder = ctx->snapshot_builder;
    uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
    SnapBuild  *builder = ctx->snapshot_builder;
    ReorderBuffer *reorder = ctx->reorder;
@@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
    SnapBuild  *builder = ctx->snapshot_builder;
    XLogReaderState *r = buf->record;
@@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
    uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
    TransactionId xid = XLogRecGetXid(buf->record);
@@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
    uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
    TransactionId xid = XLogRecGetXid(buf->record);
@@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
    SnapBuild  *builder = ctx->snapshot_builder;
    XLogReaderState *r = buf->record;
index 91437974584a1c1ab6908994188ea06dfa7f5275..f6cfee4ce8c98cc59fe1e42ba5521a85282ec520 100644 (file)
@@ -28,7 +28,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
index 852d8ca4b1c6cbdeee4601b84fda27dfed396796..6a4ebd1310b790bb43758120bf7035642fb91350 100644 (file)
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
    { name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
index c9b5c56a4c601535b9396d1ebeddeb2638225044..d9b512630ca5fd82a4d86438b448da6e4df48b1f 100644 (file)
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
    symname,
 
 typedef enum RmgrIds
index ed751aaf03957658e49be822696c6a013487e962..9a74721c97cdec6ccb2e3c7474edd1ab3053a232 100644 (file)
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
index e27fca0cc0ed70f8ec434a886fe5b8271375c2fa..849954a8e5a1c5221d378b92d5e8fb57bbd5d018 100644 (file)
@@ -287,6 +287,9 @@ typedef enum
    RECOVERY_TARGET_ACTION_SHUTDOWN
 }          RecoveryTargetAction;
 
+struct LogicalDecodingContext;
+struct XLogRecordBuffer;
+
 /*
  * Method table for resource managers.
  *
@@ -312,6 +315,8 @@ typedef struct RmgrData
    void        (*rm_startup) (void);
    void        (*rm_cleanup) (void);
    void        (*rm_mask) (char *pagedata, BlockNumber blkno);
+   void        (*rm_decode) (struct LogicalDecodingContext *ctx,
+                             struct XLogRecordBuffer *buf);
 } RmgrData;
 
 extern const RmgrData RmgrTable[];
index 1db73f355497f090b3851e6e79156738f87d3861..a33c2a718a7802787e72d06e27c4302973c8e1d6 100644 (file)
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 
-void       LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+typedef struct XLogRecordBuffer
+{
+   XLogRecPtr  origptr;
+   XLogRecPtr  endptr;
+   XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void    LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
                                         XLogReaderState *record);
 
 #endif