Keep WAL segments by slot's last saved restart LSN
authorAlexander Korotkov <akorotkov@postgresql.org>
Sat, 14 Jun 2025 00:36:04 +0000 (03:36 +0300)
committerAlexander Korotkov <akorotkov@postgresql.org>
Sat, 14 Jun 2025 00:36:04 +0000 (03:36 +0300)
The patch fixes the issue with the unexpected removal of old WAL segments
after checkpoint, followed by an immediate restart.  The issue occurs when
a slot is advanced after the start of the checkpoint and before old WAL
segments are removed at the end of the checkpoint.

The patch introduces a new in-memory state for slots: last_saved_restart_lsn,
which is used to calculate the oldest LSN for removing WAL segments. This
state is updated every time with the current restart_lsn at the moment when
the slot is saved to disk.

This fix changes the shared memory layout.  It's applied to HEAD only because
we don't have to preserve ABI compatibility during the beta stage.  Another
fix that doesn't affect the ABI is committed to back branches.

Discussion: https://postgr.es/m/1d12d2-67235980-35-19a406a0%4063439497
Author: Vitaly Davydov <v.davydov@postgrespro.ru>
Author: Alexander Korotkov <aekorotkov@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
src/backend/replication/slot.c
src/include/replication/slot.h

index 600b87fa9cb65af8093d298c53c3f50918785052..c64f020742f8f8eb552dfb3f5e6283cc04969ed4 100644 (file)
@@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
    slot->candidate_restart_valid = InvalidXLogRecPtr;
    slot->candidate_restart_lsn = InvalidXLogRecPtr;
    slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+   slot->last_saved_restart_lsn = InvalidXLogRecPtr;
    slot->inactive_since = 0;
 
    /*
@@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
        XLogRecPtr  restart_lsn;
+       XLogRecPtr  last_saved_restart_lsn;
        bool        invalidated;
+       ReplicationSlotPersistency persistency;
 
        if (!s->in_use)
            continue;
 
        SpinLockAcquire(&s->mutex);
+       persistency = s->data.persistency;
        restart_lsn = s->data.restart_lsn;
        invalidated = s->data.invalidated != RS_INVAL_NONE;
+       last_saved_restart_lsn = s->last_saved_restart_lsn;
        SpinLockRelease(&s->mutex);
 
        /* invalidated slots need not apply */
        if (invalidated)
            continue;
 
+       /*
+        * For persistent slot use last_saved_restart_lsn to compute the
+        * oldest LSN for removal of WAL segments.  The segments between
+        * last_saved_restart_lsn and restart_lsn might be needed by a
+        * persistent slot in the case of database crash.  Non-persistent
+        * slots can't survive the database crash, so we don't care about
+        * last_saved_restart_lsn for them.
+        */
+       if (persistency == RS_PERSISTENT)
+       {
+           if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+               restart_lsn > last_saved_restart_lsn)
+           {
+               restart_lsn = last_saved_restart_lsn;
+           }
+       }
+
        if (restart_lsn != InvalidXLogRecPtr &&
            (min_required == InvalidXLogRecPtr ||
             restart_lsn < min_required))
@@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
    {
        ReplicationSlot *s;
        XLogRecPtr  restart_lsn;
+       XLogRecPtr  last_saved_restart_lsn;
        bool        invalidated;
+       ReplicationSlotPersistency persistency;
 
        s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 
        /* read once, it's ok if it increases while we're checking */
        SpinLockAcquire(&s->mutex);
+       persistency = s->data.persistency;
        restart_lsn = s->data.restart_lsn;
        invalidated = s->data.invalidated != RS_INVAL_NONE;
+       last_saved_restart_lsn = s->last_saved_restart_lsn;
        SpinLockRelease(&s->mutex);
 
        /* invalidated slots need not apply */
        if (invalidated)
            continue;
 
+       /*
+        * For persistent slot use last_saved_restart_lsn to compute the
+        * oldest LSN for removal of WAL segments.  The segments between
+        * last_saved_restart_lsn and restart_lsn might be needed by a
+        * persistent slot in the case of database crash.  Non-persistent
+        * slots can't survive the database crash, so we don't care about
+        * last_saved_restart_lsn for them.
+        */
+       if (persistency == RS_PERSISTENT)
+       {
+           if (last_saved_restart_lsn != InvalidXLogRecPtr &&
+               restart_lsn > last_saved_restart_lsn)
+           {
+               restart_lsn = last_saved_restart_lsn;
+           }
+       }
+
        if (restart_lsn == InvalidXLogRecPtr)
            continue;
 
@@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void)
 
    Assert(slot != NULL);
    Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+   Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
 
    /*
     * The replication slot mechanism is used to prevent removal of required
@@ -1766,6 +1810,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
         */
        SpinLockAcquire(&s->mutex);
 
+       Assert(s->data.restart_lsn >= s->last_saved_restart_lsn);
+
        restart_lsn = s->data.restart_lsn;
 
        /* we do nothing if the slot is already invalid */
@@ -1835,7 +1881,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
             * just rely on .invalidated.
             */
            if (invalidation_cause == RS_INVAL_WAL_REMOVED)
+           {
                s->data.restart_lsn = InvalidXLogRecPtr;
+               s->last_saved_restart_lsn = InvalidXLogRecPtr;
+           }
 
            /* Let caller know */
            *invalidated = true;
@@ -2079,6 +2128,12 @@ CheckPointReplicationSlots(bool is_shutdown)
        SaveSlotToPath(s, path, LOG);
    }
    LWLockRelease(ReplicationSlotAllocationLock);
+
+   /*
+    * Recompute the required LSN as SaveSlotToPath() updated
+    * last_saved_restart_lsn for slots.
+    */
+   ReplicationSlotsComputeRequiredLSN();
 }
 
 /*
@@ -2354,6 +2409,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    if (!slot->just_dirtied)
        slot->dirty = false;
    slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+   slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    SpinLockRelease(&slot->mutex);
 
    LWLockRelease(&slot->io_in_progress_lock);
@@ -2569,6 +2625,7 @@ RestoreSlotFromDisk(const char *name)
        slot->effective_xmin = cp.slotdata.xmin;
        slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
        slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+       slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
 
        slot->candidate_catalog_xmin = InvalidTransactionId;
        slot->candidate_xmin_lsn = InvalidXLogRecPtr;
index eb0b93b11141ddbf6a6c6fa857c7a055c80f8c74..ffacba9d2ae52301100b62ca49046c516e67c9b7 100644 (file)
@@ -215,6 +215,14 @@ typedef struct ReplicationSlot
     * recently stopped.
     */
    TimestampTz inactive_since;
+
+   /*
+    * Latest restart_lsn that has been flushed to disk. For persistent slots
+    * the flushed LSN should be taken into account when calculating the
+    * oldest LSN for WAL segments removal.
+    */
+   XLogRecPtr  last_saved_restart_lsn;
+
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)