*/
TransactionId *xip;
} committed;
+
+ /*
+ * Array of transactions and subtransactions that had modified catalogs
+ * and were running when the snapshot was serialized.
+ *
+ * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
+ * if the transaction has changed the catalog. But it could happen that
+ * the logical decoding decodes only the commit record of the transaction
+ * after restoring the previously serialized snapshot in which case we
+ * will miss adding the xid to the snapshot and end up looking at the
+ * catalogs with the wrong snapshot.
+ *
+ * Now to avoid the above problem, we serialize the transactions that had
+ * modified the catalogs and are still running at the time of snapshot
+ * serialization. We fill this array while restoring the snapshot and then
+ * refer it while decoding commit to ensure if the xact has modified the
+ * catalog. We discard this array when all the xids in the list become old
+ * enough to matter. See SnapBuildPurgeOlderTxn for details.
+ */
+ struct
+ {
+ /* number of transactions */
+ size_t xcnt;
+
+ /* This array must be sorted in xidComparator order */
+ TransactionId *xip;
+ } catchange;
};
/*
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
-/* ->committed manipulation */
-static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
+/* ->committed and ->catchange manipulation */
+static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
+ uint32 xinfo);
+
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
/* serialization functions */
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
/*
* Allocate a new snapshot builder.
palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
builder->committed.includes_all_transactions = true;
+ builder->catchange.xcnt = 0;
+ builder->catchange.xip = NULL;
+
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
}
/*
- * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->committed array but via
- * the clog machinery, so we don't need to waste memory on them.
+ * Remove knowledge about transactions we treat as committed or containing catalog
+ * changes that are smaller than ->xmin. Those won't ever get checked via
+ * the ->committed or ->catchange array, respectively. The committed xids will
+ * get checked via the clog machinery.
+ *
+ * We can ideally remove the transaction from catchange array once it is
+ * finished (committed/aborted) but that could be costly as we need to maintain
+ * the xids order in the array.
*/
static void
-SnapBuildPurgeCommittedTxn(SnapBuild *builder)
+SnapBuildPurgeOlderTxn(SnapBuild *builder)
{
int off;
TransactionId *workspace;
builder->committed.xcnt = surviving_xids;
pfree(workspace);
+
+ /*
+ * Either all the xacts got purged or none. It is only possible to
+ * partially remove the xids from this array if one or more of the xids
+ * are still running but not all. That can happen if we start decoding
+ * from a point (LSN where the snapshot state became consistent) where all
+ * the xacts in this were running and then at least one of those got
+ * committed and a few are still running. We will never start from such a
+ * point because we won't move the slot's restart_lsn past the point where
+ * the oldest running transaction's restart_decoding_lsn is.
+ */
+ if (builder->catchange.xcnt == 0 ||
+ TransactionIdFollowsOrEquals(builder->catchange.xip[0],
+ builder->xmin))
+ return;
+
+ Assert(TransactionIdFollows(builder->xmin,
+ builder->catchange.xip[builder->catchange.xcnt - 1]));
+ pfree(builder->catchange.xip);
+ builder->catchange.xip = NULL;
+ builder->catchange.xcnt = 0;
+
+ elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
+ builder->xmin);
}
/*
*/
void
SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
- int nsubxacts, TransactionId *subxacts)
+ int nsubxacts, TransactionId *subxacts, uint32 xinfo)
{
int nxact;
* Add subtransaction to base snapshot if catalog modifying, we don't
* distinguish to toplevel transactions there.
*/
- if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
+ if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
{
sub_needs_timetravel = true;
needs_snapshot = true;
}
/* if top-level modified catalog, it'll need a snapshot */
- if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+ if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
{
elog(DEBUG2, "found top level transaction %u, with catalog changes",
xid);
}
}
+/*
+ * Check the reorder buffer and the snapshot to see if the given transaction has
+ * modified catalogs.
+ */
+static inline bool
+SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
+ uint32 xinfo)
+{
+ if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+ return true;
+
+ /*
+ * The transactions that have changed catalogs must have invalidation
+ * info.
+ */
+ if (!(xinfo & XACT_XINFO_HAS_INVALS))
+ return false;
+
+ /* Check the catchange XID array */
+ return ((builder->catchange.xcnt > 0) &&
+ (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
+ sizeof(TransactionId), xidComparator) != NULL));
+}
/* -----------------------------------
* Snapshot building functions dealing with xlog records
builder->xmin = running->oldestRunningXid;
/* Remove transactions we don't need to keep track off anymore */
- SnapBuildPurgeCommittedTxn(builder);
+ SnapBuildPurgeOlderTxn(builder);
/*
* Advance the xmin limit for the current replication slot, to allow
*
* struct SnapBuildOnDisk;
* TransactionId * committed.xcnt; (*not xcnt_space*)
+ * TransactionId * catchange.xcnt;
*
*/
typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 4
+#define SNAPBUILD_VERSION 5
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
{
Size needed_length;
SnapBuildOnDisk *ondisk = NULL;
+ TransactionId *catchange_xip = NULL;
+ MemoryContext old_ctx;
+ size_t catchange_xcnt;
char *ondisk_c;
int fd;
char tmppath[MAXPGPATH];
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", tmppath)));
+ old_ctx = MemoryContextSwitchTo(builder->context);
+
+ /* Get the catalog modifying transactions that are yet not committed */
+ catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
+ catchange_xcnt = builder->reorder->catchange_ntxns;
+
needed_length = sizeof(SnapBuildOnDisk) +
- sizeof(TransactionId) * builder->committed.xcnt;
+ sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
- ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
+ ondisk_c = palloc0(needed_length);
ondisk = (SnapBuildOnDisk *) ondisk_c;
ondisk->magic = SNAPBUILD_MAGIC;
ondisk->version = SNAPBUILD_VERSION;
ondisk->builder.snapshot = NULL;
ondisk->builder.reorder = NULL;
ondisk->builder.committed.xip = NULL;
+ ondisk->builder.catchange.xip = NULL;
+ /* update catchange only on disk data */
+ ondisk->builder.catchange.xcnt = catchange_xcnt;
COMP_CRC32C(ondisk->checksum,
&ondisk->builder,
sizeof(SnapBuild));
/* copy committed xacts */
- sz = sizeof(TransactionId) * builder->committed.xcnt;
- memcpy(ondisk_c, builder->committed.xip, sz);
- COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
- ondisk_c += sz;
+ if (builder->committed.xcnt > 0)
+ {
+ sz = sizeof(TransactionId) * builder->committed.xcnt;
+ memcpy(ondisk_c, builder->committed.xip, sz);
+ COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+ ondisk_c += sz;
+ }
+
+ /* copy catalog modifying xacts */
+ if (catchange_xcnt > 0)
+ {
+ sz = sizeof(TransactionId) * catchange_xcnt;
+ memcpy(ondisk_c, catchange_xip, sz);
+ COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+ ondisk_c += sz;
+ }
FIN_CRC32C(ondisk->checksum);
*/
builder->last_serialized_snapshot = lsn;
+ MemoryContextSwitchTo(old_ctx);
+
out:
ReorderBufferSetRestartPoint(builder->reorder,
builder->last_serialized_snapshot);
/* be tidy */
if (ondisk)
pfree(ondisk);
+ if (catchange_xip)
+ pfree(catchange_xip);
}
/*
int fd;
char path[MAXPGPATH];
Size sz;
- int readBytes;
pg_crc32c checksum;
/* no point in loading a snapshot if we're already there */
/* read statically sized portion of snapshot */
- pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
- readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
- pgstat_report_wait_end();
- if (readBytes != SnapBuildOnDiskConstantSize)
- {
- int save_errno = errno;
-
- CloseTransientFile(fd);
-
- if (readBytes < 0)
- {
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m", path)));
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read file \"%s\": read %d of %zu",
- path, readBytes,
- (Size) SnapBuildOnDiskConstantSize)));
- }
+ SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
if (ondisk.magic != SNAPBUILD_MAGIC)
ereport(ERROR,
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
- pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
- readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
- pgstat_report_wait_end();
- if (readBytes != sizeof(SnapBuild))
- {
- int save_errno = errno;
-
- CloseTransientFile(fd);
-
- if (readBytes < 0)
- {
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m", path)));
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read file \"%s\": read %d of %zu",
- path, readBytes, sizeof(SnapBuild))));
- }
+ SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
/* restore committed xacts information */
- sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
- ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
- pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
- readBytes = read(fd, ondisk.builder.committed.xip, sz);
- pgstat_report_wait_end();
- if (readBytes != sz)
+ if (ondisk.builder.committed.xcnt > 0)
{
- int save_errno = errno;
-
- CloseTransientFile(fd);
+ sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
+ ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
+ SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
+ COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
+ }
- if (readBytes < 0)
- {
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m", path)));
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read file \"%s\": read %d of %zu",
- path, readBytes, sz)));
+ /* restore catalog modifying xacts information */
+ if (ondisk.builder.catchange.xcnt > 0)
+ {
+ sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
+ ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
+ SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
+ COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
}
- COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
if (CloseTransientFile(fd) != 0)
ereport(ERROR,
}
ondisk.builder.committed.xip = NULL;
+ /* set catalog modifying transactions */
+ if (builder->catchange.xip)
+ pfree(builder->catchange.xip);
+ builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
+ builder->catchange.xip = ondisk.builder.catchange.xip;
+ ondisk.builder.catchange.xip = NULL;
+
/* our snapshot is not interesting anymore, build a new one */
if (builder->snapshot != NULL)
{
snapshot_not_interesting:
if (ondisk.builder.committed.xip != NULL)
pfree(ondisk.builder.committed.xip);
+ if (ondisk.builder.catchange.xip != NULL)
+ pfree(ondisk.builder.catchange.xip);
return false;
}
+/*
+ * Read the contents of the serialized snapshot to 'dest'.
+ */
+static void
+SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
+{
+ int readBytes;
+
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
+ readBytes = read(fd, dest, size);
+ pgstat_report_wait_end();
+ if (readBytes != size)
+ {
+ int save_errno = errno;
+
+ CloseTransientFile(fd);
+
+ if (readBytes < 0)
+ {
+ errno = save_errno;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m", path)));
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("could not read file \"%s\": read %d of %zu",
+ path, readBytes, sizeof(SnapBuild))));
+ }
+}
+
/*
* Remove all serialized snapshots that are not required anymore because no
* slot can need them. This doesn't actually have to run during a checkpoint,