Return ReplicationSlotAcquire API to its original form
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 11 Jun 2021 19:48:26 +0000 (15:48 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 11 Jun 2021 19:48:26 +0000 (15:48 -0400)
Per 96540f80f833; the awkward API introduced by c6550776394e is no
longer needed.

Author: Andres Freund <andres@anarazel.de>
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20210408020913.zzprrlvqyvlt5cyy@alap3.anarazel.de

src/backend/replication/logical/logicalfuncs.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/replication/slot.h

index 01d354829b93671c9dae9f87a65701af66660459..1f38c5b33eaff87591b589eed9e25d5e054369f8 100644 (file)
@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
    else
        end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-   (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
+   ReplicationSlotAcquire(NameStr(*name), true);
 
    PG_TRY();
    {
index 5a0bad97f498b7e846b8382148c9b6df7632d65d..a9a06b9a38a97d2bc02bc407a9f053716b29cb1c 100644 (file)
@@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int            max_replication_slots = 0;  /* the maximum number of replication
                                         * slots */
 
-static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
-                                          const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -374,34 +372,16 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
 /*
  * Find a previously created slot and mark it as used by this process.
  *
- * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, -1 if the slot no longer
- * exists, or the PID of the owning process otherwise.  If behavior is
- * SAB_Error, then trying to acquire an owned slot is an error.
- * If SAB_Block, we sleep until the slot is released by the owning process.
+ * An error is raised if nowait is true and the slot is currently in use. If
+ * nowait is false, we sleep until the slot is released by the owning process.
  */
-int
-ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
-{
-   return ReplicationSlotAcquireInternal(NULL, name, behavior);
-}
-
-/*
- * Mark the specified slot as used by this process.
- *
- * Only one of slot and name can be specified.
- * If slot == NULL, search for the slot with the given name.
- *
- * See comments about the return value in ReplicationSlotAcquire().
- */
-static int
-ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
-                              SlotAcquireBehavior behavior)
+void
+ReplicationSlotAcquire(const char *name, bool nowait)
 {
    ReplicationSlot *s;
    int         active_pid;
 
-   AssertArg((slot == NULL) ^ (name == NULL));
+   AssertArg(name != NULL);
 
 retry:
    Assert(MyReplicationSlot == NULL);
@@ -412,17 +392,15 @@ retry:
     * Search for the slot with the specified name if the slot to acquire is
     * not given. If the slot is not found, we either return -1 or error out.
     */
-   s = slot ? slot : SearchNamedReplicationSlot(name, false);
+   s = SearchNamedReplicationSlot(name, false);
    if (s == NULL || !s->in_use)
    {
        LWLockRelease(ReplicationSlotControlLock);
 
-       if (behavior == SAB_Inquire)
-           return -1;
        ereport(ERROR,
                (errcode(ERRCODE_UNDEFINED_OBJECT),
                 errmsg("replication slot \"%s\" does not exist",
-                       name ? name : NameStr(slot->data.name))));
+                       name)));
    }
 
    /*
@@ -436,7 +414,7 @@ retry:
         * (We may end up not sleeping, but we don't want to do this while
         * holding the spinlock.)
         */
-       if (behavior == SAB_Block)
+       if (!nowait)
            ConditionVariablePrepareToSleep(&s->active_cv);
 
        SpinLockAcquire(&s->mutex);
@@ -456,13 +434,11 @@ retry:
     */
    if (active_pid != MyProcPid)
    {
-       if (behavior == SAB_Error)
+       if (!nowait)
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_IN_USE),
                     errmsg("replication slot \"%s\" is active for PID %d",
                            NameStr(s->data.name), active_pid)));
-       else if (behavior == SAB_Inquire)
-           return active_pid;
 
        /* Wait here until we get signaled, and then restart */
        ConditionVariableSleep(&s->active_cv,
@@ -470,7 +446,7 @@ retry:
        ConditionVariableCancelSleep();
        goto retry;
    }
-   else if (behavior == SAB_Block)
+   else if (!nowait)
        ConditionVariableCancelSleep(); /* no sleep needed after all */
 
    /* Let everybody know we've modified this slot */
@@ -478,9 +454,6 @@ retry:
 
    /* We made this slot active, so it's ours now. */
    MyReplicationSlot = s;
-
-   /* success */
-   return 0;
 }
 
 /*
@@ -588,7 +561,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
    Assert(MyReplicationSlot == NULL);
 
-   (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
+   ReplicationSlotAcquire(name, nowait);
 
    ReplicationSlotDropAcquired();
 }
@@ -1271,8 +1244,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
                                   WAIT_EVENT_REPLICATION_SLOT_DROP);
 
            /*
-            * Re-acquire lock and start over; we expect to invalidate the slot
-            * next time (unless another process acquires the slot in the
+            * Re-acquire lock and start over; we expect to invalidate the
+            * slot next time (unless another process acquires the slot in the
             * meantime).
             */
            LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
index e4e6632f82e04d10bd260ceb28d480754ff85b3d..31e74d38322883dc7ef8c01110ed7868d36d62c4 100644 (file)
@@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
        moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
 
    /* Acquire the slot so we "own" it */
-   (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
+   ReplicationSlotAcquire(NameStr(*slotname), true);
 
    /* A slot whose restart_lsn has never been reserved cannot be advanced */
    if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
index 109c723f4e1e7ac166b62c745382b90e0936e698..322453635613c83553bd34c1858e449d6e00d662 100644 (file)
@@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd)
 
    if (cmd->slotname)
    {
-       (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+       ReplicationSlotAcquire(cmd->slotname, true);
        if (SlotIsLogical(MyReplicationSlot))
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
    Assert(!MyReplicationSlot);
 
-   (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+   ReplicationSlotAcquire(cmd->slotname, true);
 
    if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
        ereport(ERROR,
index 357068403a11e9997089c62d1afdfda04860127f..2eb7e3a530d438a5c6d10d15dc3029b80e58c4f6 100644 (file)
@@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency
    RS_TEMPORARY
 } ReplicationSlotPersistency;
 
-/* For ReplicationSlotAcquire, q.v. */
-typedef enum SlotAcquireBehavior
-{
-   SAB_Error,
-   SAB_Block,
-   SAB_Inquire
-} SlotAcquireBehavior;
-
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);