summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c64
-rw-r--r--src/backend/replication/logical/snapbuild.c63
-rw-r--r--src/include/replication/reorderbuffer.h4
3 files changed, 111 insertions, 20 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 56c25e3a6da..fa9413fa2a0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
SharedInvalidationMessage *msgs)
{
ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- if (txn->ninvalidations != 0)
- elog(ERROR, "only ever add one set of invalidations");
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ /*
+ * Collect all the invalidations under the top transaction, if available,
+ * so that we can execute them all together.
+ */
+ if (txn->toplevel_xid)
+ {
+ txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn,
+ true);
+ }
Assert(nmsgs > 0);
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- MemoryContextAlloc(rb->context,
- sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
+ /* Accumulate invalidations. */
+ if (txn->ninvalidations == 0)
+ {
+ txn->ninvalidations = nmsgs;
+ txn->invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(txn->invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+ }
+ else
+ {
+ txn->invalidations = (SharedInvalidationMessage *)
+ repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+ (txn->ninvalidations + nmsgs));
+
+ memcpy(txn->invalidations + txn->ninvalidations, msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ txn->ninvalidations += nmsgs;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
}
/*
@@ -3895,3 +3920,26 @@ restart:
*cmax = ent->cmax;
return true;
}
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+ SharedInvalidationMessage **msgs)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ if (txn == NULL)
+ return 0;
+
+ *msgs = txn->invalidations;
+
+ return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7546de96763..3bda41c5251 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -292,7 +292,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
static void SnapBuildSnapIncRefcount(Snapshot snap);
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
@@ -861,15 +861,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
}
/*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
*/
static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
{
dlist_iter txn_i;
ReorderBufferTXN *txn;
@@ -877,7 +877,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
/*
* Iterate through all toplevel transactions. This can include
* subtransactions which we just don't yet know to be that, but that's
- * fine, they will just get an unnecessary snapshot queued.
+ * fine, they will just get an unnecessary snapshot and invalidations
+ * queued.
*/
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
{
@@ -890,6 +891,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
* transaction which in turn implies we don't yet need a snapshot at
* all. We'll add a snapshot when the first change gets queued.
*
+ * Similarly, we don't need to add invalidations to a transaction whose
+ * base snapshot is not yet set. Once a base snapshot is built, it will
+ * include the xids of committed transactions that have modified the
+ * catalog, thus reflecting the new catalog contents. The existing
+ * catalog cache will have already been invalidated after processing
+ * the invalidations in the transaction that modified catalogs,
+ * ensuring that a fresh cache is constructed during decoding.
+ *
* NB: This works correctly even for subtransactions because
* ReorderBufferAssignChild() takes care to transfer the base snapshot
* to the top-level transaction, and while iterating the changequeue
@@ -898,7 +907,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue;
- elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+ elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
/*
@@ -908,6 +917,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
SnapBuildSnapIncRefcount(builder->snapshot);
ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
builder->snapshot);
+
+ /*
+ * Add invalidation messages to the reorder buffer of in-progress
+ * transactions except the current committed transaction, for which we
+ * will execute invalidations at the end.
+ *
+ * It is required, otherwise, we will end up using the stale catcache
+ * contents built by the current transaction even after its decoding,
+ * which should have been invalidated due to concurrent catalog
+ * changing transaction.
+ */
+ if (txn->xid != xid)
+ {
+ uint32 ninvalidations;
+ SharedInvalidationMessage *msgs = NULL;
+
+ ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
+ xid, &msgs);
+
+ if (ninvalidations > 0)
+ {
+ Assert(msgs != NULL);
+
+ ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
+ ninvalidations, msgs);
+ }
+ }
}
}
@@ -1186,8 +1222,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
/* refcount of the snapshot builder for the new snapshot */
SnapBuildSnapIncRefcount(builder->snapshot);
- /* add a new catalog snapshot to all currently running transactions */
- SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+ /*
+ * Add a new catalog snapshot and invalidations messages to all
+ * currently running transactions.
+ */
+ SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
}
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5347597e92b..545cee891ed 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -463,6 +463,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
+uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
+ TransactionId xid,
+ SharedInvalidationMessage **msgs);
+
void StartupReorderBuffer(void);
#endif