188 bool need_full_snapshot,
189 bool in_slot_creation,
198 "snapshot builder context",
265 elog(
ERROR,
"cannot free a copied snapshot");
268 elog(
ERROR,
"cannot free an active snapshot");
279 return builder->
state;
344 elog(
ERROR,
"cannot free a copied snapshot");
406 memcpy(snapshot->
xip,
454 elog(
ERROR,
"cannot build an initial slot snapshot when snapshots exist");
458 elog(
ERROR,
"cannot build an initial slot snapshot before reaching a consistent state");
461 elog(
ERROR,
"cannot build an initial slot snapshot, not all transactions are monitored anymore");
465 elog(
ERROR,
"cannot build an initial slot snapshot when MyProc->xmin already is valid");
483 elog(
ERROR,
"cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
484 safeXid, snap->
xmin);
514 errmsg(
"initial slot snapshot too large")));
516 newxip[newxcnt++] = xid;
524 snap->
xcnt = newxcnt;
545 elog(
ERROR,
"cannot export a snapshot from within a transaction");
548 elog(
ERROR,
"can only export one snapshot at a time");
568 (
errmsg_plural(
"exported logical decoding snapshot: \"%s\" with %u transaction ID",
569 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
571 snapname, snap->
xcnt)));
609 elog(
ERROR,
"clearing exported snapshot in wrong transaction state");
716 elog(
ERROR,
"xl_heap_new_cid record without a valid CommandId");
777 elog(
DEBUG2,
"adding a new snapshot and invalidations to %u at %X/%X",
806 if (ninvalidations > 0)
811 ninvalidations, msgs);
829 elog(
DEBUG1,
"increasing space for committed transactions to %u",
859 int surviving_xids = 0;
877 workspace[surviving_xids++] = builder->
committed.
xip[off];
884 elog(
DEBUG3,
"purged committed transactions from %u to %u, xmin: %u, xmax: %u",
910 if (surviving_xids > 0)
921 elog(
DEBUG3,
"purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
937 bool needs_snapshot =
false;
938 bool needs_timetravel =
false;
939 bool sub_needs_timetravel =
false;
969 needs_timetravel =
true;
973 for (nxact = 0; nxact < nsubxacts; nxact++)
983 sub_needs_timetravel =
true;
984 needs_snapshot =
true;
986 elog(
DEBUG1,
"found subtransaction %u:%u with catalog changes",
1001 else if (needs_timetravel)
1012 elog(
DEBUG2,
"found top level transaction %u, with catalog changes",
1014 needs_snapshot =
true;
1015 needs_timetravel =
true;
1018 else if (sub_needs_timetravel)
1021 elog(
DEBUG2,
"forced transaction %u to do timetravel due to one of its subtransactions",
1023 needs_timetravel =
true;
1026 else if (needs_timetravel)
1028 elog(
DEBUG2,
"forced transaction %u to do timetravel", xid);
1033 if (!needs_timetravel)
1039 Assert(!needs_snapshot || needs_timetravel);
1046 if (needs_timetravel &&
1050 builder->
xmax = xmax;
1176 elog(
DEBUG3,
"xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1212 else if (txn == NULL &&
1266 (
errmsg_internal(
"skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1305 (
errmsg(
"logical decoding found consistent point at %X/%X",
1307 errdetail(
"There are no running transactions.")));
1354 (
errmsg(
"logical decoding found initial starting point at %X/%X",
1356 errdetail(
"Waiting for transactions (approximately %d) older than %u to end.",
1378 (
errmsg(
"logical decoding found initial consistent point at %X/%X",
1380 errdetail(
"Waiting for transactions (approximately %d) older than %u to end.",
1402 (
errmsg(
"logical decoding found consistent point at %X/%X",
1404 errdetail(
"There are no old transactions anymore.")));
1431 for (off = 0; off < running->
xcnt; off++)
1461#define SnapBuildOnDiskConstantSize \
1462 offsetof(SnapBuildOnDisk, builder)
1463#define SnapBuildOnDiskNotChecksummedSize \
1464 offsetof(SnapBuildOnDisk, version)
1466#define SNAPBUILD_MAGIC 0x51A1E001
1467#define SNAPBUILD_VERSION 6
1495 size_t catchange_xcnt;
1501 struct stat stat_buf;
1524 sprintf(path,
"%s/%X-%X.snap",
1533 ret =
stat(path, &stat_buf);
1535 if (ret != 0 && errno != ENOENT)
1538 errmsg(
"could not stat file \"%s\": %m", path)));
1565 elog(
DEBUG1,
"serializing snapshot to %s", path);
1568 sprintf(tmppath,
"%s/%X-%X.snap.%d.tmp",
1578 if (unlink(tmppath) != 0 && errno != ENOENT)
1581 errmsg(
"could not remove file \"%s\": %m", tmppath)));
1592 ondisk_c =
palloc0(needed_length);
1596 ondisk->
length = needed_length;
1627 if (catchange_xcnt > 0)
1630 memcpy(ondisk_c, catchange_xip, sz);
1639 O_CREAT | O_EXCL | O_WRONLY |
PG_BINARY);
1643 errmsg(
"could not open file \"%s\": %m", tmppath)));
1647 if ((
write(
fd, ondisk, needed_length)) != needed_length)
1649 int save_errno = errno;
1654 errno = save_errno ? save_errno : ENOSPC;
1657 errmsg(
"could not write to file \"%s\": %m", tmppath)));
1675 int save_errno = errno;
1681 errmsg(
"could not fsync file \"%s\": %m", tmppath)));
1688 errmsg(
"could not close file \"%s\": %m", tmppath)));
1696 if (rename(tmppath, path) != 0)
1700 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
1723 pfree(catchange_xip);
1742 sprintf(path,
"%s/%X-%X.snap",
1750 if (missing_ok && errno == ENOENT)
1755 errmsg(
"could not open file \"%s\": %m", path)));
1775 errmsg(
"snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1781 errmsg(
"snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1814 errmsg(
"could not close file \"%s\": %m", path)));
1822 errmsg(
"checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1823 path, checksum, ondisk->
checksum)));
1856 goto snapshot_not_interesting;
1863 goto snapshot_not_interesting;
1908 (
errmsg(
"logical decoding found consistent point at %X/%X",
1910 errdetail(
"Logical decoding will begin using saved snapshot.")));
1913snapshot_not_interesting:
1932 if (readBytes != size)
1934 int save_errno = errno;
1943 errmsg(
"could not read file \"%s\": %m", path)));
1948 errmsg(
"could not read file \"%s\": read %d of %zu",
1949 path, readBytes, size)));
1992 if (strcmp(snap_de->
d_name,
".") == 0 ||
1993 strcmp(snap_de->
d_name,
"..") == 0)
2001 elog(
DEBUG1,
"only regular files expected: %s", path);
2014 if (sscanf(snap_de->
d_name,
"%X-%X.snap", &hi, &lo) != 2)
2017 (
errmsg(
"could not parse file name \"%s\"", path)));
2021 lsn = ((
uint64) hi) << 32 | lo;
2026 elog(
DEBUG1,
"removing snapbuild snapshot %s", path);
2033 if (unlink(path) < 0)
2037 errmsg(
"could not remove file \"%s\": %m",
2054 struct stat stat_buf;
2056 sprintf(path,
"%s/%X-%X.snap",
2060 ret =
stat(path, &stat_buf);
2062 if (ret != 0 && errno != ENOENT)
2065 errmsg(
"could not stat file \"%s\": %m", path)));
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int CloseTransientFile(int fd)
void fsync_fname(const char *fname, bool isdir)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Assert(PointerIsAligned(start, uint64))
#define dlist_foreach(iter, lhead)
static uint32 dclist_count(const dclist_head *head)
#define dlist_container(type, membername, ptr)
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * MemoryContextAlloc(MemoryContext context, Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void * repalloc(void *pointer, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
#define EQ_CRC32C(c1, c2)
#define ERRCODE_T_R_SERIALIZATION_FAILURE
#define qsort(a, b, c, d)
static int fd(const char *x, int i)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
int GetMaxSnapshotXidCount(void)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
#define rbtxn_is_prepared(txn)
#define PG_LOGICAL_SNAPSHOTS_DIR
ResourceOwner CurrentResourceOwner
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
void SnapBuildSnapDecRefcount(Snapshot snap)
bool SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn, MemoryContext context, bool missing_ok)
#define SNAPBUILD_VERSION
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
void SnapBuildResetExportedSnapshotState(void)
void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
static void SnapBuildSnapIncRefcount(Snapshot snap)
bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, bool in_slot_creation, XLogRecPtr two_phase_at)
#define SnapBuildOnDiskNotChecksummedSize
void FreeSnapshotBuilder(SnapBuild *builder)
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
void CheckPointSnapBuild(void)
static void SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
static bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, uint32 xinfo)
Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
const char * SnapBuildExportSnapshot(SnapBuild *builder)
static ResourceOwner SavedResourceOwnerDuringExport
void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts, uint32 xinfo)
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder)
void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
void SnapBuildClearExportedSnapshot(void)
static void SnapBuildFreeSnapshot(Snapshot snap)
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
static bool ExportInProgress
static void SnapBuildPurgeOlderTxn(SnapBuild *builder)
#define SnapBuildOnDiskConstantSize
static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
@ SNAPBUILD_BUILDING_SNAPSHOT
@ SNAPBUILD_FULL_SNAPSHOT
struct SnapBuildOnDisk SnapBuildOnDisk
bool HistoricSnapshotActive(void)
bool HaveRegisteredOrActiveSnapshot(void)
char * ExportSnapshot(Snapshot snapshot)
void InvalidateCatalogSnapshot(void)
struct SnapshotData SnapshotData
XLogRecPtr LogStandbySnapshot(void)
XLogRecPtr restart_decoding_lsn
dclist_head catchange_txns
dlist_head toplevel_by_lsn
XLogRecPtr current_restart_decoding_lsn
XLogRecPtr start_decoding_at
TransactionId initial_xmin_horizon
struct SnapBuild::@117 committed
bool building_full_snapshot
TransactionId next_phase_at
XLogRecPtr last_serialized_snapshot
bool includes_all_transactions
struct SnapBuild::@118 catchange
uint64 snapXactCompletionCount
SnapshotType snapshot_type
ItemPointerData target_tid
RelFileLocator target_locator
TransactionId oldestRunningXid
TransactionId xids[FLEXIBLE_ARRAY_MEMBER]
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define NormalTransactionIdPrecedes(id1, id2)
#define NormalTransactionIdFollows(id1, id2)
#define TransactionIdIsValid(xid)
#define TransactionIdIsNormal(xid)
#define TransactionIdAdvance(dest)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
bool IsTransactionOrTransactionBlock(void)
bool IsTransactionState(void)
void StartTransactionCommand(void)
bool TransactionIdIsCurrentTransactionId(TransactionId xid)
void AbortCurrentTransaction(void)
#define XACT_REPEATABLE_READ
#define XACT_XINFO_HAS_INVALS
int xidComparator(const void *arg1, const void *arg2)
bool RecoveryInProgress(void)
XLogRecPtr GetRedoRecPtr(void)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr