}
/*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
+ * and mark it invalid, if necessary and possible.
*
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ * Returns whether ReplicationSlotControlLock was released in the interim (and
+ * in that case we're not holding the lock at return, otherwise we are).
+ *
+ * This is inherently racy, because we release the LWLock
+ * for syscalls, so caller must restart if we return true.
*/
-void
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+static bool
+InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
{
- XLogRecPtr oldestLSN;
-
- XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+ int last_signaled_pid = 0;
+ bool released_lock = false;
-restart:
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (;;)
{
- ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
- XLogRecPtr restart_lsn = InvalidXLogRecPtr;
+ XLogRecPtr restart_lsn;
NameData slotname;
- int wspid;
- int last_signaled_pid = 0;
+ int active_pid = 0;
+
+ Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
if (!s->in_use)
- continue;
+ {
+ if (released_lock)
+ LWLockRelease(ReplicationSlotControlLock);
+ break;
+ }
+ /*
+ * Check if the slot needs to be invalidated. If it needs to be
+ * invalidated, and is not currently acquired, acquire it and mark it
+ * as having been invalidated. We do this with the spinlock held to
+ * avoid race conditions -- for example the restart_lsn could move
+ * forward, or the slot could be dropped.
+ */
SpinLockAcquire(&s->mutex);
- slotname = s->data.name;
+
restart_lsn = s->data.restart_lsn;
- SpinLockRelease(&s->mutex);
+ /*
+ * If the slot is already invalid or is fresh enough, we don't need to
+ * do anything.
+ */
if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
- continue;
- LWLockRelease(ReplicationSlotControlLock);
- CHECK_FOR_INTERRUPTS();
+ {
+ SpinLockRelease(&s->mutex);
+ if (released_lock)
+ LWLockRelease(ReplicationSlotControlLock);
+ break;
+ }
+
+ slotname = s->data.name;
+ active_pid = s->active_pid;
+
+ /*
+ * If the slot can be acquired, do so and mark it invalidated
+ * immediately. Otherwise we'll signal the owning process, below, and
+ * retry.
+ */
+ if (active_pid == 0)
+ {
+ MyReplicationSlot = s;
+ s->active_pid = MyProcPid;
+ s->data.invalidated_at = restart_lsn;
+ s->data.restart_lsn = InvalidXLogRecPtr;
+ }
- /* Get ready to sleep on the slot in case it is active */
- ConditionVariablePrepareToSleep(&s->active_cv);
+ SpinLockRelease(&s->mutex);
- for (;;)
+ if (active_pid != 0)
{
/*
- * Try to mark this slot as used by this process.
- *
- * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should
- * not cancel the prepared condition variable if this slot is
- * active in other process. Because in this case we have to wait
- * on that CV for the process owning the slot to be terminated,
- * later.
+ * Prepare the sleep on the slot's condition variable before
+ * releasing the lock, to close a possible race condition if the
+ * slot is released before the sleep below.
*/
- wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
+ ConditionVariablePrepareToSleep(&s->active_cv);
- /*
- * Exit the loop if we successfully acquired the slot or the slot
- * was dropped during waiting for the owning process to be
- * terminated. For example, the latter case is likely to happen
- * when the slot is temporary because it's automatically dropped
- * by the termination of the owning process.
- */
- if (wspid <= 0)
- break;
+ LWLockRelease(ReplicationSlotControlLock);
+ released_lock = true;
/*
- * Signal to terminate the process that owns the slot.
+ * Signal to terminate the process that owns the slot, if we
+ * haven't already signalled it. (Avoidance of repeated
+ * signalling is the only reason for there to be a loop in this
+ * routine; otherwise we could rely on caller's restart loop.)
*
- * There is the race condition where other process may own the
- * slot after the process using it was terminated and before this
- * process owns it. To handle this case, we signal again if the
- * PID of the owning process is changed than the last.
- *
- * XXX This logic assumes that the same PID is not reused very
- * quickly.
+ * There is the race condition that other process may own the slot
+ * after its current owner process is terminated and before this
+ * process owns it. To handle that, we signal only if the PID of
+ * the owning process has changed from the previous time. (This
+ * logic assumes that the same PID is not reused very quickly.)
*/
- if (last_signaled_pid != wspid)
+ if (last_signaled_pid != active_pid)
{
ereport(LOG,
- (errmsg("terminating process %d because replication slot \"%s\" is too far behind",
- wspid, NameStr(slotname))));
- (void) kill(wspid, SIGTERM);
- last_signaled_pid = wspid;
+ (errmsg("terminating process %d to release replication slot \"%s\"",
+ active_pid, NameStr(slotname))));
+
+ (void) kill(active_pid, SIGTERM);
+ last_signaled_pid = active_pid;
}
- ConditionVariableTimedSleep(&s->active_cv, 10,
- WAIT_EVENT_REPLICATION_SLOT_DROP);
+ /* Wait until the slot is released. */
+ ConditionVariableSleep(&s->active_cv,
+ WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+ /*
+ * Re-acquire lock and start over; we expect to invalidate the slot
+ * next time (unless another process acquires the slot in the
+ * meantime).
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ continue;
}
- ConditionVariableCancelSleep();
+ else
+ {
+ /*
+ * We hold the slot now and have already invalidated it; flush it
+ * to ensure that state persists.
+ *
+ * Don't want to hold ReplicationSlotControlLock across file
+ * system operations, so release it now but be sure to tell caller
+ * to restart from scratch.
+ */
+ LWLockRelease(ReplicationSlotControlLock);
+ released_lock = true;
- /*
- * Do nothing here and start from scratch if the slot has already been
- * dropped.
- */
- if (wspid == -1)
- goto restart;
+ /* Make sure the invalidated state persists across server restart */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ ReplicationSlotRelease();
- ereport(LOG,
- (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
- NameStr(slotname),
- LSN_FORMAT_ARGS(restart_lsn))));
+ ereport(LOG,
+ (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+ NameStr(slotname),
+ LSN_FORMAT_ARGS(restart_lsn))));
- SpinLockAcquire(&s->mutex);
- s->data.invalidated_at = s->data.restart_lsn;
- s->data.restart_lsn = InvalidXLogRecPtr;
- SpinLockRelease(&s->mutex);
+ /* done with this slot for now */
+ break;
+ }
+ }
- /* Make sure the invalidated state persists across server restart */
- ReplicationSlotMarkDirty();
- ReplicationSlotSave();
- ReplicationSlotRelease();
+ Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
- /* if we did anything, start from scratch */
- goto restart;
+ return released_lock;
+}
+
+/*
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ *
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ */
+void
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+{
+ XLogRecPtr oldestLSN;
+
+ XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+
+restart:
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (!s->in_use)
+ continue;
+
+ if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
+ {
+ /* if the lock was released, start from scratch */
+ goto restart;
+ }
}
LWLockRelease(ReplicationSlotControlLock);
}