Immediately WAL-log subtransaction and top-level XID association.
authorAmit Kapila <akapila@postgresql.org>
Mon, 20 Jul 2020 03:18:26 +0000 (08:48 +0530)
committerAmit Kapila <akapila@postgresql.org>
Mon, 20 Jul 2020 03:18:26 +0000 (08:48 +0530)
The logical decoding infrastructure needs to know which top-level
transaction the subxact belongs to, in order to decode all the
changes. Until now that might be delayed until commit, due to the
caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring
incremental decoding.

So we also write the assignment info into WAL immediately, as part
of the next WAL record (to minimize overhead) only when wal_level=logical.
We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is
required for avoiding overflow in the hot standby snapshot.

Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID.

Author: Tomas Vondra, Dilip Kumar, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com

src/backend/access/transam/xact.c
src/backend/access/transam/xloginsert.c
src/backend/access/transam/xlogreader.c
src/backend/replication/logical/decode.c
src/include/access/xact.h
src/include/access/xlog.h
src/include/access/xlog_internal.h
src/include/access/xlogreader.h
src/include/access/xlogrecord.h

index b3ee7fa7ea041d237c90bf688f3de280be5347fd..bd4c3cf32585fb46ab3792f0e550ba97e0691876 100644 (file)
@@ -191,6 +191,7 @@ typedef struct TransactionStateData
        bool            didLogXid;              /* has xid been included in WAL record? */
        int                     parallelModeLevel;      /* Enter/ExitParallelMode counter */
        bool            chain;                  /* start a new block after this one */
+       bool            assigned;               /* assigned to top-level XID */
        struct TransactionStateData *parent;    /* back link to parent */
 } TransactionStateData;
 
@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
 static TransactionStateData TopTransactionStateData = {
        .state = TRANS_DEFAULT,
        .blockState = TBLOCK_DEFAULT,
+       .assigned = false,
 };
 
 /*
@@ -5120,6 +5122,7 @@ PushTransaction(void)
        GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
        s->prevXactReadOnly = XactReadOnly;
        s->parallelModeLevel = 0;
+       s->assigned = false;
 
        CurrentTransactionState = s;
 
@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
        else
                elog(PANIC, "xact_redo: unknown op code %u", info);
 }
+
+/*
+ * IsSubTransactionAssignmentPending
+ *
+ * This is used to decide whether we need to WAL log the top-level XID for
+ * operation in a subtransaction.  We require that for logical decoding, see
+ * LogicalDecodingProcessRecord.
+ *
+ * This returns true if wal_level >= logical and we are inside a valid
+ * subtransaction, for which the assignment was not yet written to any WAL
+ * record.
+ */
+bool
+IsSubTransactionAssignmentPending(void)
+{
+       /* wal_level has to be logical */
+       if (!XLogLogicalInfoActive())
+               return false;
+
+       /* we need to be in a transaction state */
+       if (!IsTransactionState())
+               return false;
+
+       /* it has to be a subtransaction */
+       if (!IsSubTransaction())
+               return false;
+
+       /* the subtransaction has to have a XID assigned */
+       if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+               return false;
+
+       /* and it should not be already 'assigned' */
+       return !CurrentTransactionState->assigned;
+}
+
+/*
+ * MarkSubTransactionAssigned
+ *
+ * Mark the subtransaction assignment as completed.
+ */
+void
+MarkSubTransactionAssigned(void)
+{
+       Assert(IsSubTransactionAssignmentPending());
+
+       CurrentTransactionState->assigned = true;
+}
index b21679f09eb458a36e6552cfb4fd72f0d519a4f0..c526bb19281e05d9957071cd70219d80a3155ca9 100644 (file)
@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
 #define SizeOfXlogOrigin       (sizeof(RepOriginId) + sizeof(char))
+#define SizeOfXLogTransactionId        (sizeof(TransactionId) + sizeof(char))
 
 #define HEADER_SCRATCH_SIZE \
        (SizeOfXLogRecord + \
         MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
-        SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
+        SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
+        SizeOfXLogTransactionId)
 
 /*
  * An array of XLogRecData structs, to hold registered data.
@@ -195,6 +197,10 @@ XLogResetInsertion(void)
 {
        int                     i;
 
+       /* reset the subxact assignment flag (if needed) */
+       if (curinsert_flags & XLOG_INCLUDE_XID)
+               MarkSubTransactionAssigned();
+
        for (i = 0; i < max_registered_block_id; i++)
                registered_buffers[i].in_use = false;
 
@@ -398,7 +404,7 @@ void
 XLogSetRecordFlags(uint8 flags)
 {
        Assert(begininsert_called);
-       curinsert_flags = flags;
+       curinsert_flags |= flags;
 }
 
 /*
@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
                scratch += sizeof(replorigin_session_origin);
        }
 
+       /* followed by toplevel XID, if not already included in previous record */
+       if (IsSubTransactionAssignmentPending())
+       {
+               TransactionId xid = GetTopTransactionIdIfAny();
+
+               /* update the flag (later used by XLogResetInsertion) */
+               XLogSetRecordFlags(XLOG_INCLUDE_XID);
+
+               *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
+               memcpy(scratch, &xid, sizeof(TransactionId));
+               scratch += sizeof(TransactionId);
+       }
+
        /* followed by main data, if any */
        if (mainrdata_len > 0)
        {
index cb76be4f4696f9db7f86636baa81aca94d7672c0..a757baccfc55102d4d4efa25188afba6b7db4706 100644 (file)
@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 
        state->decoded_record = record;
        state->record_origin = InvalidRepOriginId;
+       state->toplevel_xid = InvalidTransactionId;
 
        ptr = (char *) record;
        ptr += SizeOfXLogRecord;
@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
                {
                        COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
                }
+               else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
+               {
+                       COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+               }
                else if (block_id <= XLR_MAX_BLOCK_ID)
                {
                        /* XLogRecordBlockHeader */
index c2e5e3abf82eab84a9ec4986addac45728d285bf..0c0c3717391994d9eeb6e68f9cb88caa803b1d3b 100644 (file)
@@ -94,11 +94,27 @@ void
 LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
 {
        XLogRecordBuffer buf;
+       TransactionId txid;
 
        buf.origptr = ctx->reader->ReadRecPtr;
        buf.endptr = ctx->reader->EndRecPtr;
        buf.record = record;
 
+       txid = XLogRecGetTopXid(record);
+
+       /*
+        * If the top-level xid is valid, we need to assign the subxact to the
+        * top-level xact. We need to do this for all records, hence we do it
+        * before the switch.
+        */
+       if (TransactionIdIsValid(txid))
+       {
+               ReorderBufferAssignChild(ctx->reorder,
+                                                                txid,
+                                                                record->decoded_record->xl_xid,
+                                                                buf.origptr);
+       }
+
        /* cast so we get a warning when new rmgrs are added */
        switch ((RmgrId) XLogRecGetRmid(record))
        {
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
        /*
         * If the snapshot isn't yet fully built, we cannot decode anything, so
         * bail out.
-        *
-        * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
-        * when the snapshot is being built: it is possible to get later records
-        * that require subxids to be properly assigned.
         */
-       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
-               info != XLOG_XACT_ASSIGNMENT)
+       if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
                return;
 
        switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                                break;
                        }
                case XLOG_XACT_ASSIGNMENT:
-                       {
-                               xl_xact_assignment *xlrec;
-                               int                     i;
-                               TransactionId *sub_xid;
 
-                               xlrec = (xl_xact_assignment *) XLogRecGetData(r);
-
-                               sub_xid = &xlrec->xsub[0];
-
-                               for (i = 0; i < xlrec->nsubxacts; i++)
-                               {
-                                       ReorderBufferAssignChild(reorder, xlrec->xtop,
-                                                                                        *(sub_xid++), buf->origptr);
-                               }
-                               break;
-                       }
+                       /*
+                        * We assign subxact to the toplevel xact while processing each
+                        * record if required.  So, we don't need to do anything here.
+                        * See LogicalDecodingProcessRecord.
+                        */
+                       break;
                case XLOG_XACT_PREPARE:
 
                        /*
index db191879b9d228a9f4f03768223567eafacb94c5..aef8555367449745d5ea73f53e20b1c73a6d8ca8 100644 (file)
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
 extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
 extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
+extern bool IsSubTransactionAssignmentPending(void);
+extern void MarkSubTransactionAssigned(void);
+
 extern int     xactGetCommittedChildren(TransactionId **ptr);
 
 extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
index 5b143348879bb29e6de78401d9039dcdffbc773f..d8391aa3783ac264b54e6e4b432b2892cbd77cb4 100644 (file)
@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
  */
 #define XLOG_INCLUDE_ORIGIN            0x01    /* include the replication origin */
 #define XLOG_MARK_UNIMPORTANT  0x02    /* record not important for durability */
+#define XLOG_INCLUDE_XID               0x04    /* include XID of top-level xact */
 
 
 /* Checkpoint statistics */
index 88f3d767007beab115fad687dbc53a65b872b828..b9490a3afeff26d62e8bdcc66bb2e3bf260c9848 100644 (file)
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD106 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
index b0f2a6ed43a991f0a97dbb8937298892c107c3f6..b97688222910c1715799388832e4d610cb1ee456 100644 (file)
@@ -191,6 +191,8 @@ struct XLogReaderState
 
        RepOriginId record_origin;
 
+       TransactionId toplevel_xid; /* XID of top-level transaction */
+
        /* information about blocks referenced by the record. */
        DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
 
@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
 #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
 #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
 #define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
 #define XLogRecGetData(decoder) ((decoder)->main_data)
 #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
 #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
index acd9af0194d4fc63b720b9513aca7462b19e7eaa..2f0c8bf58966c4479c28af30ced8971557f14221 100644 (file)
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
 #define XLR_BLOCK_ID_DATA_SHORT                255
 #define XLR_BLOCK_ID_DATA_LONG         254
 #define XLR_BLOCK_ID_ORIGIN                    253
+#define XLR_BLOCK_ID_TOPLEVEL_XID      252
 
 #endif                                                 /* XLOGRECORD_H */