uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */
TransactionId ckptXid;
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
- XLogRecPtr peggedLSN; /* LSN after which segments can be removed */
+ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG
* segment */
static void CreateEndOfRecoveryRecord(void);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
-static XLogRecPtr XLogGetPeggedLSN(void);
+static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
XLogRecPtr *lsn, BkpBlock *bkpb);
}
/*
- * Record the LSN up to which we can remove WAL because it's not required for
- * replication.
+ * Record the LSN up to which we can remove WAL because it's not required by
+ * any replication slot.
*/
void
-XLogSetPeggedLSN(XLogRecPtr lsn)
+XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->peggedLSN = lsn;
+ xlogctl->replicationSlotMinLSN = lsn;
SpinLockRelease(&xlogctl->info_lck);
}
/*
- * Return the LSN up to which we can remove WAL because it's not required for
- * replication.
+ * Return the oldest LSN we must retain to satisfy the needs of some
+ * replication slot.
*/
static XLogRecPtr
-XLogGetPeggedLSN(void)
+XLogGetReplicationSlotMinimumLSN(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr retval;
SpinLockAcquire(&xlogctl->info_lck);
- retval = xlogctl->peggedLSN;
+ retval = xlogctl->replicationSlotMinLSN;
SpinLockRelease(&xlogctl->info_lck);
return retval;
XLogRecPtr keep;
XLByteToSeg(recptr, segno);
- keep = XLogGetPeggedLSN();
+ keep = XLogGetReplicationSlotMinimumLSN();
/* compute limit for wal_keep_segments first */
if (wal_keep_segments > 0)
LWLockRelease(ReplicationSlotControlLock);
/* slot is dead and doesn't nail the xmin anymore, recompute horizon */
- ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredXmin();
/*
* If removing the directory fails, the worst thing that will happen is
* If already_locked is passed in ProcArrayLock is already held exlusively.
*/
void
-ReplicationSlotsComputeRequiredXmin(bool already_locked)
+ReplicationSlotsComputeRequiredXmin(void)
{
int i;
TransactionId agg_data_xmin = InvalidTransactionId;
}
LWLockRelease(ReplicationSlotControlLock);
- ProcArraySetPeggedXmin(agg_data_xmin, already_locked);
+ ProcArraySetReplicationSlotXmin(agg_data_xmin);
}
/*
}
LWLockRelease(ReplicationSlotControlLock);
- XLogSetPeggedLSN(min_required);
+ XLogSetReplicationSlotMinimumLSN(min_required);
}
/*
return;
/* Now that we have recovered all the data, compute replication xmin */
- ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredXmin();
ReplicationSlotsComputeRequiredLSN();
}
if (changed)
{
ReplicationSlotSave();
- ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredXmin();
}
}
*/
TransactionId lastOverflowedXid;
- /* globally "pegged" xmin horizons */
- TransactionId pegged_xmin;
+ /* oldest xmin of any replication slot */
+ TransactionId replication_slot_xmin;
/*
* We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
*/
procArray->numProcs = 0;
procArray->maxProcs = PROCARRAY_MAXPROCS;
- procArray->pegged_xmin = InvalidTransactionId;
+ procArray->replication_slot_xmin = InvalidTransactionId;
procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
procArray->numKnownAssignedXids = 0;
procArray->tailKnownAssignedXids = 0;
ProcArrayStruct *arrayP = procArray;
TransactionId result;
int index;
- volatile TransactionId pegged_xmin = InvalidTransactionId;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
/* Cannot look for individual databases during recovery */
Assert(allDbs || !RecoveryInProgress());
}
/* fetch into volatile var while ProcArrayLock is held */
- pegged_xmin = procArray->pegged_xmin;
+ replication_slot_xmin = procArray->replication_slot_xmin;
if (RecoveryInProgress())
{
}
/*
- * Check whether there's physical slots requiring xmin to be pegged.
+ * Check whether there are replication slots requiring an older xmin.
*/
- if (TransactionIdIsValid(pegged_xmin) &&
- NormalTransactionIdPrecedes(pegged_xmin, result))
- result = pegged_xmin;
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, result))
+ result = replication_slot_xmin;
return result;
}
int count = 0;
int subcount = 0;
bool suboverflowed = false;
- volatile TransactionId pegged_xmin = InvalidTransactionId;
+ volatile TransactionId replication_slot_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
/* fetch into volatile var while ProcArrayLock is held */
- pegged_xmin = procArray->pegged_xmin;
+ replication_slot_xmin = procArray->replication_slot_xmin;
if (!TransactionIdIsValid(MyPgXact->xmin))
MyPgXact->xmin = TransactionXmin = xmin;
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
- /*
- * Check whether there's a physical slot requiring the global xmin to be
- * pegged for all tables.
- */
- if (TransactionIdIsValid(pegged_xmin) &&
- NormalTransactionIdPrecedes(pegged_xmin, RecentGlobalXmin))
- RecentGlobalXmin = pegged_xmin;
+ /* Check whether there's a replication slot requiring an older xmin. */
+ if (TransactionIdIsValid(replication_slot_xmin) &&
+ NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
+ RecentGlobalXmin = replication_slot_xmin;
RecentXmin = xmin;
}
/*
- * ProcArraySetPeggedXmin
+ * ProcArraySetReplicationSlotXmin
*
* Install limits to future computations of the xmin horizon to prevent vacuum
- * and HOT pruning from removing affected rows.
+ * and HOT pruning from removing affected rows still needed by clients with
+ * replicaton slots.
*/
void
-ProcArraySetPeggedXmin(TransactionId xmin, bool already_locked)
+ProcArraySetReplicationSlotXmin(TransactionId xmin)
{
- if (!already_locked)
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-
- procArray->pegged_xmin = xmin;
-
- if (!already_locked)
- LWLockRelease(ProcArrayLock);
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ procArray->replication_slot_xmin = xmin;
+ LWLockRelease(ProcArrayLock);
}
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
extern void XLogSetAsyncXactLSN(XLogRecPtr record);
-extern void XLogSetPeggedLSN(XLogRecPtr lsn);
+extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
int block_index,
/* misc stuff */
extern bool ReplicationSlotValidateName(const char *name, int elevel);
-extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
+extern void ReplicationSlotsComputeRequiredXmin(void);
extern void ReplicationSlotsComputeRequiredLSN(void);
extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
extern void CheckSlotRequirements(void);
int nxids, const TransactionId *xids,
TransactionId latestXid);
-extern void ProcArraySetPeggedXmin(TransactionId xmin, bool already_locked);
+extern void ProcArraySetReplicationSlotXmin(TransactionId xmin);
#endif /* PROCARRAY_H */