Fix race condition in invalidating obsolete replication slots
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 11 Jun 2021 16:16:14 +0000 (12:16 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 11 Jun 2021 16:16:14 +0000 (12:16 -0400)
The code added to mark replication slots invalid in commit c6550776394e
had the race condition that a slot can be dropped or advanced
concurrently with checkpointer trying to invalidate it.  Rewrite the
code to close those races.

The changes to ReplicationSlotAcquire's API added with c6550776394e are
not necessary anymore.  To avoid an ABI break in released branches, this
commit leaves that unchanged; it'll be changed in a master-only commit
separately.

Backpatch to 13, where this code first appeared.

Reported-by: Andres Freund <andres@anarazel.de>
Author: Andres Freund <andres@anarazel.de>
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20210408001037.wfmk6jud36auhfqm@alap3.anarazel.de

src/backend/replication/slot.c

index c88b803e5d0b47826bab57b0a2e284431a931cf2..5a0bad97f498b7e846b8382148c9b6df7632d65d 100644 (file)
@@ -1161,116 +1161,183 @@ ReplicationSlotReserveWal(void)
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
+ * and mark it invalid, if necessary and possible.
  *
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ * Returns whether ReplicationSlotControlLock was released in the interim (and
+ * in that case we're not holding the lock at return, otherwise we are).
+ *
+ * This is inherently racy, because we release the LWLock
+ * for syscalls, so caller must restart if we return true.
  */
-void
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+static bool
+InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
 {
-   XLogRecPtr  oldestLSN;
-
-   XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+   int         last_signaled_pid = 0;
+   bool        released_lock = false;
 
-restart:
-   LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-   for (int i = 0; i < max_replication_slots; i++)
+   for (;;)
    {
-       ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-       XLogRecPtr  restart_lsn = InvalidXLogRecPtr;
+       XLogRecPtr  restart_lsn;
        NameData    slotname;
-       int         wspid;
-       int         last_signaled_pid = 0;
+       int         active_pid = 0;
+
+       Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
        if (!s->in_use)
-           continue;
+       {
+           if (released_lock)
+               LWLockRelease(ReplicationSlotControlLock);
+           break;
+       }
 
+       /*
+        * Check if the slot needs to be invalidated. If it needs to be
+        * invalidated, and is not currently acquired, acquire it and mark it
+        * as having been invalidated.  We do this with the spinlock held to
+        * avoid race conditions -- for example the restart_lsn could move
+        * forward, or the slot could be dropped.
+        */
        SpinLockAcquire(&s->mutex);
-       slotname = s->data.name;
+
        restart_lsn = s->data.restart_lsn;
-       SpinLockRelease(&s->mutex);
 
+       /*
+        * If the slot is already invalid or is fresh enough, we don't need to
+        * do anything.
+        */
        if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
-           continue;
-       LWLockRelease(ReplicationSlotControlLock);
-       CHECK_FOR_INTERRUPTS();
+       {
+           SpinLockRelease(&s->mutex);
+           if (released_lock)
+               LWLockRelease(ReplicationSlotControlLock);
+           break;
+       }
+
+       slotname = s->data.name;
+       active_pid = s->active_pid;
+
+       /*
+        * If the slot can be acquired, do so and mark it invalidated
+        * immediately.  Otherwise we'll signal the owning process, below, and
+        * retry.
+        */
+       if (active_pid == 0)
+       {
+           MyReplicationSlot = s;
+           s->active_pid = MyProcPid;
+           s->data.invalidated_at = restart_lsn;
+           s->data.restart_lsn = InvalidXLogRecPtr;
+       }
 
-       /* Get ready to sleep on the slot in case it is active */
-       ConditionVariablePrepareToSleep(&s->active_cv);
+       SpinLockRelease(&s->mutex);
 
-       for (;;)
+       if (active_pid != 0)
        {
            /*
-            * Try to mark this slot as used by this process.
-            *
-            * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should
-            * not cancel the prepared condition variable if this slot is
-            * active in other process. Because in this case we have to wait
-            * on that CV for the process owning the slot to be terminated,
-            * later.
+            * Prepare the sleep on the slot's condition variable before
+            * releasing the lock, to close a possible race condition if the
+            * slot is released before the sleep below.
             */
-           wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
+           ConditionVariablePrepareToSleep(&s->active_cv);
 
-           /*
-            * Exit the loop if we successfully acquired the slot or the slot
-            * was dropped during waiting for the owning process to be
-            * terminated. For example, the latter case is likely to happen
-            * when the slot is temporary because it's automatically dropped
-            * by the termination of the owning process.
-            */
-           if (wspid <= 0)
-               break;
+           LWLockRelease(ReplicationSlotControlLock);
+           released_lock = true;
 
            /*
-            * Signal to terminate the process that owns the slot.
+            * Signal to terminate the process that owns the slot, if we
+            * haven't already signalled it.  (Avoidance of repeated
+            * signalling is the only reason for there to be a loop in this
+            * routine; otherwise we could rely on caller's restart loop.)
             *
-            * There is the race condition where other process may own the
-            * slot after the process using it was terminated and before this
-            * process owns it. To handle this case, we signal again if the
-            * PID of the owning process is changed than the last.
-            *
-            * XXX This logic assumes that the same PID is not reused very
-            * quickly.
+            * There is the race condition that other process may own the slot
+            * after its current owner process is terminated and before this
+            * process owns it. To handle that, we signal only if the PID of
+            * the owning process has changed from the previous time. (This
+            * logic assumes that the same PID is not reused very quickly.)
             */
-           if (last_signaled_pid != wspid)
+           if (last_signaled_pid != active_pid)
            {
                ereport(LOG,
-                       (errmsg("terminating process %d because replication slot \"%s\" is too far behind",
-                               wspid, NameStr(slotname))));
-               (void) kill(wspid, SIGTERM);
-               last_signaled_pid = wspid;
+                       (errmsg("terminating process %d to release replication slot \"%s\"",
+                               active_pid, NameStr(slotname))));
+
+               (void) kill(active_pid, SIGTERM);
+               last_signaled_pid = active_pid;
            }
 
-           ConditionVariableTimedSleep(&s->active_cv, 10,
-                                       WAIT_EVENT_REPLICATION_SLOT_DROP);
+           /* Wait until the slot is released. */
+           ConditionVariableSleep(&s->active_cv,
+                                  WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+           /*
+            * Re-acquire lock and start over; we expect to invalidate the slot
+            * next time (unless another process acquires the slot in the
+            * meantime).
+            */
+           LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+           continue;
        }
-       ConditionVariableCancelSleep();
+       else
+       {
+           /*
+            * We hold the slot now and have already invalidated it; flush it
+            * to ensure that state persists.
+            *
+            * Don't want to hold ReplicationSlotControlLock across file
+            * system operations, so release it now but be sure to tell caller
+            * to restart from scratch.
+            */
+           LWLockRelease(ReplicationSlotControlLock);
+           released_lock = true;
 
-       /*
-        * Do nothing here and start from scratch if the slot has already been
-        * dropped.
-        */
-       if (wspid == -1)
-           goto restart;
+           /* Make sure the invalidated state persists across server restart */
+           ReplicationSlotMarkDirty();
+           ReplicationSlotSave();
+           ReplicationSlotRelease();
 
-       ereport(LOG,
-               (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
-                       NameStr(slotname),
-                       LSN_FORMAT_ARGS(restart_lsn))));
+           ereport(LOG,
+                   (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+                           NameStr(slotname),
+                           LSN_FORMAT_ARGS(restart_lsn))));
 
-       SpinLockAcquire(&s->mutex);
-       s->data.invalidated_at = s->data.restart_lsn;
-       s->data.restart_lsn = InvalidXLogRecPtr;
-       SpinLockRelease(&s->mutex);
+           /* done with this slot for now */
+           break;
+       }
+   }
 
-       /* Make sure the invalidated state persists across server restart */
-       ReplicationSlotMarkDirty();
-       ReplicationSlotSave();
-       ReplicationSlotRelease();
+   Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
 
-       /* if we did anything, start from scratch */
-       goto restart;
+   return released_lock;
+}
+
+/*
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ *
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ */
+void
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+{
+   XLogRecPtr  oldestLSN;
+
+   XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+
+restart:
+   LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+   for (int i = 0; i < max_replication_slots; i++)
+   {
+       ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+       if (!s->in_use)
+           continue;
+
+       if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
+       {
+           /* if the lock was released, start from scratch */
+           goto restart;
+       }
    }
    LWLockRelease(ReplicationSlotControlLock);
 }