Fix catalog lookup with the wrong snapshot during logical decoding.
authorAmit Kapila <akapila@postgresql.org>
Thu, 11 Aug 2022 04:39:24 +0000 (10:09 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 11 Aug 2022 04:39:24 +0000 (10:09 +0530)
Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION
records to know if the transaction has modified the catalog, and that
information is not serialized to snapshot. Therefore, after the restart,
if the logical decoding decodes only the commit record of the transaction
that has actually modified a catalog, we will miss adding its XID to the
snapshot. Thus, we will end up looking at catalogs with the wrong
snapshot.

To fix this problem, this change adds the list of transaction IDs and
sub-transaction IDs, that have modified catalogs and are running during
snapshot serialization, to the serialized snapshot. After restart or
otherwise, when we restore from such a serialized snapshot, the
corresponding list is restored in memory. Now, when decoding a COMMIT
record, we check both the list and the ReorderBuffer to see if the
transaction has modified catalogs.

Since this adds additional information to the serialized snapshot, we
cannot backpatch it. For back branches, we took another approach.
We remember the last-running-xacts list of the decoded RUNNING_XACTS
record after restoring the previously serialized snapshot. Then, we mark
the transaction as containing catalog changes if it's in the list of
initial running transactions and its commit record has
XACT_XINFO_HAS_INVALS. This doesn't require any file format changes but
the transaction will end up being added to the snapshot even if it has
only relcache invalidations. But that won't be a problem since we use
snapshot built during decoding only to read system catalogs.

This commit bumps SNAPBUILD_VERSION because of a change in SnapBuild.

Reported-by: Mike Oh
Author: Masahiko Sawada
Reviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan Hadi
Backpatch-through: 10
Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com

contrib/test_decoding/Makefile
contrib/test_decoding/expected/catalog_change_snapshot.out [new file with mode: 0644]
contrib/test_decoding/specs/catalog_change_snapshot.spec [new file with mode: 0644]
src/backend/replication/logical/decode.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/include/replication/reorderbuffer.h
src/include/replication/snapbuild.h

index b2209064790b8a0ddb7187c3f6933c76bb6243b8..c7ce6037064d848adb0c6fed4c6b40f5e6eaee90 100644 (file)
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
    spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
    oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-   twophase_snapshot slot_creation_error
+   twophase_snapshot slot_creation_error catalog_change_snapshot
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
new file mode 100644 (file)
index 0000000..dc4f9b7
--- /dev/null
@@ -0,0 +1,44 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                   
+---------------------------------------
+BEGIN                                  
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT                                 
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644 (file)
index 0000000..2971ddc
--- /dev/null
@@ -0,0 +1,39 @@
+# Test decoding only the commit record of the transaction that have
+# modified catalogs.
+setup
+{
+    DROP TABLE IF EXISTS tbl1;
+    CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
+# during the first checkpoint execution.  This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
+# record written by bgwriter.  One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
index c5c6a2ba68974c6f641307c9565e5a1333fc9192..1667d720b11e3ecf1019509a010f9e0ea87d253c 100644 (file)
@@ -628,7 +628,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    }
 
    SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
-                      parsed->nsubxacts, parsed->subxacts);
+                      parsed->nsubxacts, parsed->subxacts,
+                      parsed->xinfo);
 
    /* ----
     * Check whether we are interested in this specific transaction, and tell
index 88a37fde722e9bb34382f71869e5b674d1e42028..1c21a1d14b605a10f4302e945fb28cac7721d491 100644 (file)
@@ -349,6 +349,8 @@ ReorderBufferAllocate(void)
    buffer->by_txn_last_xid = InvalidTransactionId;
    buffer->by_txn_last_txn = NULL;
 
+   buffer->catchange_ntxns = 0;
+
    buffer->outbuf = NULL;
    buffer->outbufsize = 0;
    buffer->size = 0;
@@ -366,6 +368,7 @@ ReorderBufferAllocate(void)
 
    dlist_init(&buffer->toplevel_by_lsn);
    dlist_init(&buffer->txns_by_base_snapshot_lsn);
+   dlist_init(&buffer->catchange_txns);
 
    /*
     * Ensure there's no stale data from prior uses of this slot, in case some
@@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    }
 
    /*
-    * Remove TXN from its containing list.
+    * Remove TXN from its containing lists.
     *
     * Note: if txn is known as subxact, we are deleting the TXN from its
     * parent's list of known subxacts; this leaves the parent's nsubxacts
     * count too high, but we don't care.  Otherwise, we are deleting the TXN
-    * from the LSN-ordered list of toplevel TXNs.
+    * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
+    * list of catalog modifying transactions as well.
     */
    dlist_delete(&txn->node);
+   if (rbtxn_has_catalog_changes(txn))
+   {
+       dlist_delete(&txn->catchange_node);
+       rb->catchange_ntxns--;
+
+       Assert(rb->catchange_ntxns >= 0);
+   }
 
    /* now remove reference from buffer */
    hash_search(rb->by_txn,
@@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
                                  XLogRecPtr lsn)
 {
    ReorderBufferTXN *txn;
+   ReorderBufferTXN *toptxn;
 
    txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
-   txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+   if (!rbtxn_has_catalog_changes(txn))
+   {
+       txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+       dlist_push_tail(&rb->catchange_txns, &txn->catchange_node);
+       rb->catchange_ntxns++;
+   }
 
    /*
     * Mark top-level transaction as having catalog changes too if one of its
@@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
     * conveniently check just top-level transaction and decide whether to
     * build the hash table or not.
     */
-   if (txn->toptxn != NULL)
-       txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+   toptxn = txn->toptxn;
+   if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+   {
+       toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+       dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+       rb->catchange_ntxns++;
+   }
+}
+
+/*
+ * Return palloc'ed array of the transactions that have changed catalogs.
+ * The returned array is sorted in xidComparator order.
+ *
+ * The caller must free the returned array when done with it.
+ */
+TransactionId *
+ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
+{
+   dlist_iter  iter;
+   TransactionId *xids = NULL;
+   size_t      xcnt = 0;
+
+   /* Quick return if the list is empty */
+   if (rb->catchange_ntxns == 0)
+   {
+       Assert(dlist_is_empty(&rb->catchange_txns));
+       return NULL;
+   }
+
+   /* Initialize XID array */
+   xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns);
+   dlist_foreach(iter, &rb->catchange_txns)
+   {
+       ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN,
+                                               catchange_node,
+                                               iter.cur);
+
+       Assert(rbtxn_has_catalog_changes(txn));
+
+       xids[xcnt++] = txn->xid;
+   }
+
+   qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
+
+   Assert(xcnt == rb->catchange_ntxns);
+   return xids;
 }
 
 /*
index 73c0f15214a2123871c7c87905e74777ef97001c..1ff2c12240d58f5c4647316dd168e3feaa5d2f4c 100644 (file)
@@ -241,6 +241,33 @@ struct SnapBuild
         */
        TransactionId *xip;
    }           committed;
+
+   /*
+    * Array of transactions and subtransactions that had modified catalogs
+    * and were running when the snapshot was serialized.
+    *
+    * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
+    * if the transaction has changed the catalog. But it could happen that
+    * the logical decoding decodes only the commit record of the transaction
+    * after restoring the previously serialized snapshot in which case we
+    * will miss adding the xid to the snapshot and end up looking at the
+    * catalogs with the wrong snapshot.
+    *
+    * Now to avoid the above problem, we serialize the transactions that had
+    * modified the catalogs and are still running at the time of snapshot
+    * serialization. We fill this array while restoring the snapshot and then
+    * refer it while decoding commit to ensure if the xact has modified the
+    * catalog. We discard this array when all the xids in the list become old
+    * enough to matter. See SnapBuildPurgeOlderTxn for details.
+    */
+   struct
+   {
+       /* number of transactions */
+       size_t      xcnt;
+
+       /* This array must be sorted in xidComparator order */
+       TransactionId *xip;
+   }           catchange;
 };
 
 /*
@@ -250,8 +277,8 @@ struct SnapBuild
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
 
-/* ->committed manipulation */
-static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
+/* ->committed and ->catchange manipulation */
+static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
 /* snapshot building/manipulation/distribution functions */
 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
@@ -262,6 +289,9 @@ static void SnapBuildSnapIncRefcount(Snapshot snap);
 
 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
 
+static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
+                                                uint32 xinfo);
+
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
 static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
@@ -269,6 +299,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof
 /* serialization functions */
 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
 
 /*
  * Allocate a new snapshot builder.
@@ -306,6 +337,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
        palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
    builder->committed.includes_all_transactions = true;
 
+   builder->catchange.xcnt = 0;
+   builder->catchange.xip = NULL;
+
    builder->initial_xmin_horizon = xmin_horizon;
    builder->start_decoding_at = start_lsn;
    builder->building_full_snapshot = need_full_snapshot;
@@ -888,12 +922,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
 }
 
 /*
- * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->committed array but via
- * the clog machinery, so we don't need to waste memory on them.
+ * Remove knowledge about transactions we treat as committed or containing catalog
+ * changes that are smaller than ->xmin. Those won't ever get checked via
+ * the ->committed or ->catchange array, respectively. The committed xids will
+ * get checked via the clog machinery.
+ *
+ * We can ideally remove the transaction from catchange array once it is
+ * finished (committed/aborted) but that could be costly as we need to maintain
+ * the xids order in the array.
  */
 static void
-SnapBuildPurgeCommittedTxn(SnapBuild *builder)
+SnapBuildPurgeOlderTxn(SnapBuild *builder)
 {
    int         off;
    TransactionId *workspace;
@@ -928,6 +967,30 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
    builder->committed.xcnt = surviving_xids;
 
    pfree(workspace);
+
+   /*
+    * Either all the xacts got purged or none. It is only possible to
+    * partially remove the xids from this array if one or more of the xids
+    * are still running but not all. That can happen if we start decoding
+    * from a point (LSN where the snapshot state became consistent) where all
+    * the xacts in this were running and then at least one of those got
+    * committed and a few are still running. We will never start from such a
+    * point because we won't move the slot's restart_lsn past the point where
+    * the oldest running transaction's restart_decoding_lsn is.
+    */
+   if (builder->catchange.xcnt == 0 ||
+       TransactionIdFollowsOrEquals(builder->catchange.xip[0],
+                                    builder->xmin))
+       return;
+
+   Assert(TransactionIdFollows(builder->xmin,
+                               builder->catchange.xip[builder->catchange.xcnt - 1]));
+   pfree(builder->catchange.xip);
+   builder->catchange.xip = NULL;
+   builder->catchange.xcnt = 0;
+
+   elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
+        builder->xmin);
 }
 
 /*
@@ -935,7 +998,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
  */
 void
 SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
-                  int nsubxacts, TransactionId *subxacts)
+                  int nsubxacts, TransactionId *subxacts, uint32 xinfo)
 {
    int         nxact;
 
@@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
         * Add subtransaction to base snapshot if catalog modifying, we don't
         * distinguish to toplevel transactions there.
         */
-       if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
+       if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
        {
            sub_needs_timetravel = true;
            needs_snapshot = true;
@@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
    }
 
    /* if top-level modified catalog, it'll need a snapshot */
-   if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+   if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    {
        elog(DEBUG2, "found top level transaction %u, with catalog changes",
             xid);
@@ -1089,6 +1152,29 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
    }
 }
 
+/*
+ * Check the reorder buffer and the snapshot to see if the given transaction has
+ * modified catalogs.
+ */
+static inline bool
+SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
+                             uint32 xinfo)
+{
+   if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+       return true;
+
+   /*
+    * The transactions that have changed catalogs must have invalidation
+    * info.
+    */
+   if (!(xinfo & XACT_XINFO_HAS_INVALS))
+       return false;
+
+   /* Check the catchange XID array */
+   return ((builder->catchange.xcnt > 0) &&
+           (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
+                    sizeof(TransactionId), xidComparator) != NULL));
+}
 
 /* -----------------------------------
  * Snapshot building functions dealing with xlog records
@@ -1135,7 +1221,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
    builder->xmin = running->oldestRunningXid;
 
    /* Remove transactions we don't need to keep track off anymore */
-   SnapBuildPurgeCommittedTxn(builder);
+   SnapBuildPurgeOlderTxn(builder);
 
    /*
     * Advance the xmin limit for the current replication slot, to allow
@@ -1438,6 +1524,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
  *
  * struct SnapBuildOnDisk;
  * TransactionId * committed.xcnt; (*not xcnt_space*)
+ * TransactionId * catchange.xcnt;
  *
  */
 typedef struct SnapBuildOnDisk
@@ -1467,7 +1554,7 @@ typedef struct SnapBuildOnDisk
    offsetof(SnapBuildOnDisk, version)
 
 #define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 4
+#define SNAPBUILD_VERSION 5
 
 /*
  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
@@ -1493,6 +1580,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 {
    Size        needed_length;
    SnapBuildOnDisk *ondisk = NULL;
+   TransactionId *catchange_xip = NULL;
+   MemoryContext old_ctx;
+   size_t      catchange_xcnt;
    char       *ondisk_c;
    int         fd;
    char        tmppath[MAXPGPATH];
@@ -1578,10 +1668,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
                (errcode_for_file_access(),
                 errmsg("could not remove file \"%s\": %m", tmppath)));
 
+   old_ctx = MemoryContextSwitchTo(builder->context);
+
+   /* Get the catalog modifying transactions that are yet not committed */
+   catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
+   catchange_xcnt = builder->reorder->catchange_ntxns;
+
    needed_length = sizeof(SnapBuildOnDisk) +
-       sizeof(TransactionId) * builder->committed.xcnt;
+       sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
 
-   ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
+   ondisk_c = palloc0(needed_length);
    ondisk = (SnapBuildOnDisk *) ondisk_c;
    ondisk->magic = SNAPBUILD_MAGIC;
    ondisk->version = SNAPBUILD_VERSION;
@@ -1598,16 +1694,31 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    ondisk->builder.snapshot = NULL;
    ondisk->builder.reorder = NULL;
    ondisk->builder.committed.xip = NULL;
+   ondisk->builder.catchange.xip = NULL;
+   /* update catchange only on disk data */
+   ondisk->builder.catchange.xcnt = catchange_xcnt;
 
    COMP_CRC32C(ondisk->checksum,
                &ondisk->builder,
                sizeof(SnapBuild));
 
    /* copy committed xacts */
-   sz = sizeof(TransactionId) * builder->committed.xcnt;
-   memcpy(ondisk_c, builder->committed.xip, sz);
-   COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
-   ondisk_c += sz;
+   if (builder->committed.xcnt > 0)
+   {
+       sz = sizeof(TransactionId) * builder->committed.xcnt;
+       memcpy(ondisk_c, builder->committed.xip, sz);
+       COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+       ondisk_c += sz;
+   }
+
+   /* copy catalog modifying xacts */
+   if (catchange_xcnt > 0)
+   {
+       sz = sizeof(TransactionId) * catchange_xcnt;
+       memcpy(ondisk_c, catchange_xip, sz);
+       COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+       ondisk_c += sz;
+   }
 
    FIN_CRC32C(ondisk->checksum);
 
@@ -1688,12 +1799,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
     */
    builder->last_serialized_snapshot = lsn;
 
+   MemoryContextSwitchTo(old_ctx);
+
 out:
    ReorderBufferSetRestartPoint(builder->reorder,
                                 builder->last_serialized_snapshot);
    /* be tidy */
    if (ondisk)
        pfree(ondisk);
+   if (catchange_xip)
+       pfree(catchange_xip);
 }
 
 /*
@@ -1707,7 +1822,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    int         fd;
    char        path[MAXPGPATH];
    Size        sz;
-   int         readBytes;
    pg_crc32c   checksum;
 
    /* no point in loading a snapshot if we're already there */
@@ -1739,29 +1853,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
 
    /* read statically sized portion of snapshot */
-   pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-   readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
-   pgstat_report_wait_end();
-   if (readBytes != SnapBuildOnDiskConstantSize)
-   {
-       int         save_errno = errno;
-
-       CloseTransientFile(fd);
-
-       if (readBytes < 0)
-       {
-           errno = save_errno;
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not read file \"%s\": %m", path)));
-       }
-       else
-           ereport(ERROR,
-                   (errcode(ERRCODE_DATA_CORRUPTED),
-                    errmsg("could not read file \"%s\": read %d of %zu",
-                           path, readBytes,
-                           (Size) SnapBuildOnDiskConstantSize)));
-   }
+   SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
 
    if (ondisk.magic != SNAPBUILD_MAGIC)
        ereport(ERROR,
@@ -1781,56 +1873,26 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
                SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
 
    /* read SnapBuild */
-   pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-   readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
-   pgstat_report_wait_end();
-   if (readBytes != sizeof(SnapBuild))
-   {
-       int         save_errno = errno;
-
-       CloseTransientFile(fd);
-
-       if (readBytes < 0)
-       {
-           errno = save_errno;
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not read file \"%s\": %m", path)));
-       }
-       else
-           ereport(ERROR,
-                   (errcode(ERRCODE_DATA_CORRUPTED),
-                    errmsg("could not read file \"%s\": read %d of %zu",
-                           path, readBytes, sizeof(SnapBuild))));
-   }
+   SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
    COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
 
    /* restore committed xacts information */
-   sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
-   ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
-   pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-   readBytes = read(fd, ondisk.builder.committed.xip, sz);
-   pgstat_report_wait_end();
-   if (readBytes != sz)
+   if (ondisk.builder.committed.xcnt > 0)
    {
-       int         save_errno = errno;
-
-       CloseTransientFile(fd);
+       sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
+       ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
+       SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
+       COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
+   }
 
-       if (readBytes < 0)
-       {
-           errno = save_errno;
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not read file \"%s\": %m", path)));
-       }
-       else
-           ereport(ERROR,
-                   (errcode(ERRCODE_DATA_CORRUPTED),
-                    errmsg("could not read file \"%s\": read %d of %zu",
-                           path, readBytes, sz)));
+   /* restore catalog modifying xacts information */
+   if (ondisk.builder.catchange.xcnt > 0)
+   {
+       sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
+       ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
+       SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
+       COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
    }
-   COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
 
    if (CloseTransientFile(fd) != 0)
        ereport(ERROR,
@@ -1885,6 +1947,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    }
    ondisk.builder.committed.xip = NULL;
 
+   /* set catalog modifying transactions */
+   if (builder->catchange.xip)
+       pfree(builder->catchange.xip);
+   builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
+   builder->catchange.xip = ondisk.builder.catchange.xip;
+   ondisk.builder.catchange.xip = NULL;
+
    /* our snapshot is not interesting anymore, build a new one */
    if (builder->snapshot != NULL)
    {
@@ -1906,9 +1975,43 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 snapshot_not_interesting:
    if (ondisk.builder.committed.xip != NULL)
        pfree(ondisk.builder.committed.xip);
+   if (ondisk.builder.catchange.xip != NULL)
+       pfree(ondisk.builder.catchange.xip);
    return false;
 }
 
+/*
+ * Read the contents of the serialized snapshot to 'dest'.
+ */
+static void
+SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
+{
+   int         readBytes;
+
+   pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
+   readBytes = read(fd, dest, size);
+   pgstat_report_wait_end();
+   if (readBytes != size)
+   {
+       int         save_errno = errno;
+
+       CloseTransientFile(fd);
+
+       if (readBytes < 0)
+       {
+           errno = save_errno;
+           ereport(ERROR,
+                   (errcode_for_file_access(),
+                    errmsg("could not read file \"%s\": %m", path)));
+       }
+       else
+           ereport(ERROR,
+                   (errcode(ERRCODE_DATA_CORRUPTED),
+                    errmsg("could not read file \"%s\": read %d of %zu",
+                           path, readBytes, sizeof(SnapBuild))));
+   }
+}
+
 /*
  * Remove all serialized snapshots that are not required anymore because no
  * slot can need them. This doesn't actually have to run during a checkpoint,
index 2c9206ace41ab2f7bc11e40de378645854ca97e3..8695901ba7132429ced70a7c4ae584b5884b41dd 100644 (file)
@@ -380,6 +380,11 @@ typedef struct ReorderBufferTXN
     */
    dlist_node  node;
 
+   /*
+    * A node in the list of catalog modifying transactions
+    */
+   dlist_node  catchange_node;
+
    /*
     * Size of this transaction (changes currently in memory, in bytes).
     */
@@ -526,6 +531,12 @@ struct ReorderBuffer
     */
    dlist_head  txns_by_base_snapshot_lsn;
 
+   /*
+    * Transactions and subtransactions that have modified system catalogs.
+    */
+   dlist_head  catchange_txns;
+   int         catchange_ntxns;
+
    /*
     * one-entry sized cache for by_txn. Very frequently the same txn gets
     * looked up over and over again.
@@ -677,6 +688,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
 extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
 extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
+extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
index d179251aad99e0b016f8cf1a45bed090b600304f..e6adea24f22f0dfcfe639f35f51db5414aef1a56 100644 (file)
@@ -82,7 +82,7 @@ extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
                               TransactionId xid, int nsubxacts,
-                              TransactionId *subxacts);
+                              TransactionId *subxacts, uint32 xinfo);
 extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
                                   XLogRecPtr lsn);
 extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,