Return ReplicationSlotAcquire API to its original form
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 11 Jun 2021 19:48:26 +0000 (15:48 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 11 Jun 2021 19:48:26 +0000 (15:48 -0400)
Per 96540f80f833; the awkward API introduced by c6550776394e is no
longer needed.

Author: Andres Freund <andres@anarazel.de>
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20210408020913.zzprrlvqyvlt5cyy@alap3.anarazel.de

src/backend/replication/logical/logicalfuncs.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/replication/slot.h

index 01d354829b93671c9dae9f87a65701af66660459..1f38c5b33eaff87591b589eed9e25d5e054369f8 100644 (file)
@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
        else
                end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-       (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
+       ReplicationSlotAcquire(NameStr(*name), true);
 
        PG_TRY();
        {
index 5a0bad97f498b7e846b8382148c9b6df7632d65d..a9a06b9a38a97d2bc02bc407a9f053716b29cb1c 100644 (file)
@@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int                    max_replication_slots = 0;      /* the maximum number of replication
                                                                                 * slots */
 
-static int     ReplicationSlotAcquireInternal(ReplicationSlot *slot,
-                                                                                  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -374,34 +372,16 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
 /*
  * Find a previously created slot and mark it as used by this process.
  *
- * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, -1 if the slot no longer
- * exists, or the PID of the owning process otherwise.  If behavior is
- * SAB_Error, then trying to acquire an owned slot is an error.
- * If SAB_Block, we sleep until the slot is released by the owning process.
+ * An error is raised if nowait is true and the slot is currently in use. If
+ * nowait is false, we sleep until the slot is released by the owning process.
  */
-int
-ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
-{
-       return ReplicationSlotAcquireInternal(NULL, name, behavior);
-}
-
-/*
- * Mark the specified slot as used by this process.
- *
- * Only one of slot and name can be specified.
- * If slot == NULL, search for the slot with the given name.
- *
- * See comments about the return value in ReplicationSlotAcquire().
- */
-static int
-ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
-                                                          SlotAcquireBehavior behavior)
+void
+ReplicationSlotAcquire(const char *name, bool nowait)
 {
        ReplicationSlot *s;
        int                     active_pid;
 
-       AssertArg((slot == NULL) ^ (name == NULL));
+       AssertArg(name != NULL);
 
 retry:
        Assert(MyReplicationSlot == NULL);
@@ -412,17 +392,15 @@ retry:
         * Search for the slot with the specified name if the slot to acquire is
         * not given. If the slot is not found, we either return -1 or error out.
         */
-       s = slot ? slot : SearchNamedReplicationSlot(name, false);
+       s = SearchNamedReplicationSlot(name, false);
        if (s == NULL || !s->in_use)
        {
                LWLockRelease(ReplicationSlotControlLock);
 
-               if (behavior == SAB_Inquire)
-                       return -1;
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("replication slot \"%s\" does not exist",
-                                               name ? name : NameStr(slot->data.name))));
+                                               name)));
        }
 
        /*
@@ -436,7 +414,7 @@ retry:
                 * (We may end up not sleeping, but we don't want to do this while
                 * holding the spinlock.)
                 */
-               if (behavior == SAB_Block)
+               if (!nowait)
                        ConditionVariablePrepareToSleep(&s->active_cv);
 
                SpinLockAcquire(&s->mutex);
@@ -456,13 +434,11 @@ retry:
         */
        if (active_pid != MyProcPid)
        {
-               if (behavior == SAB_Error)
+               if (!nowait)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_IN_USE),
                                         errmsg("replication slot \"%s\" is active for PID %d",
                                                        NameStr(s->data.name), active_pid)));
-               else if (behavior == SAB_Inquire)
-                       return active_pid;
 
                /* Wait here until we get signaled, and then restart */
                ConditionVariableSleep(&s->active_cv,
@@ -470,7 +446,7 @@ retry:
                ConditionVariableCancelSleep();
                goto retry;
        }
-       else if (behavior == SAB_Block)
+       else if (!nowait)
                ConditionVariableCancelSleep(); /* no sleep needed after all */
 
        /* Let everybody know we've modified this slot */
@@ -478,9 +454,6 @@ retry:
 
        /* We made this slot active, so it's ours now. */
        MyReplicationSlot = s;
-
-       /* success */
-       return 0;
 }
 
 /*
@@ -588,7 +561,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
        Assert(MyReplicationSlot == NULL);
 
-       (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
+       ReplicationSlotAcquire(name, nowait);
 
        ReplicationSlotDropAcquired();
 }
@@ -1271,8 +1244,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
                                                                   WAIT_EVENT_REPLICATION_SLOT_DROP);
 
                        /*
-                        * Re-acquire lock and start over; we expect to invalidate the slot
-                        * next time (unless another process acquires the slot in the
+                        * Re-acquire lock and start over; we expect to invalidate the
+                        * slot next time (unless another process acquires the slot in the
                         * meantime).
                         */
                        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
index e4e6632f82e04d10bd260ceb28d480754ff85b3d..31e74d38322883dc7ef8c01110ed7868d36d62c4 100644 (file)
@@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
                moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
 
        /* Acquire the slot so we "own" it */
-       (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
+       ReplicationSlotAcquire(NameStr(*slotname), true);
 
        /* A slot whose restart_lsn has never been reserved cannot be advanced */
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
index 109c723f4e1e7ac166b62c745382b90e0936e698..322453635613c83553bd34c1858e449d6e00d662 100644 (file)
@@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd)
 
        if (cmd->slotname)
        {
-               (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+               ReplicationSlotAcquire(cmd->slotname, true);
                if (SlotIsLogical(MyReplicationSlot))
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        Assert(!MyReplicationSlot);
 
-       (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+       ReplicationSlotAcquire(cmd->slotname, true);
 
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                ereport(ERROR,
index 357068403a11e9997089c62d1afdfda04860127f..2eb7e3a530d438a5c6d10d15dc3029b80e58c4f6 100644 (file)
@@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency
        RS_TEMPORARY
 } ReplicationSlotPersistency;
 
-/* For ReplicationSlotAcquire, q.v. */
-typedef enum SlotAcquireBehavior
-{
-       SAB_Error,
-       SAB_Block,
-       SAB_Inquire
-} SlotAcquireBehavior;
-
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern int     ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);