Replace replication slot's invalidated_at LSN with an enum
authorAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 04:47:25 +0000 (21:47 -0700)
committerAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 04:47:25 +0000 (21:47 -0700)
This is mainly useful because the upcoming logical-decoding-on-standby feature
adds further reasons for invalidating slots, and we don't want to end up with
multiple invalidated_* fields, or check different attributes.

Eventually we should consider not resetting restart_lsn when invalidating a
slot due to max_slot_wal_keep_size. But that's a user visible change, so left
for later.

Increases SLOT_VERSION, due to the changed field (with a different alignment,
no less).

Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de

src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/include/replication/slot.h
src/tools/pgindent/typedefs.list

index 2293c0c6fc348b0a1541c2ed666977b73fd67639..f969f7c083f97c2fb8f8af17763332e8d639f4ea 100644 (file)
@@ -89,7 +89,7 @@ typedef struct ReplicationSlotOnDisk
        sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC             0x1051CA1       /* format identifier */
-#define SLOT_VERSION   2               /* version for new files */
+#define SLOT_VERSION   3               /* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -855,8 +855,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
                SpinLockAcquire(&s->mutex);
                effective_xmin = s->effective_xmin;
                effective_catalog_xmin = s->effective_catalog_xmin;
-               invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-                                          XLogRecPtrIsInvalid(s->data.restart_lsn));
+               invalidated = s->data.invalidated != RS_INVAL_NONE;
                SpinLockRelease(&s->mutex);
 
                /* invalidated slots need not apply */
@@ -901,14 +900,20 @@ ReplicationSlotsComputeRequiredLSN(void)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                XLogRecPtr      restart_lsn;
+               bool            invalidated;
 
                if (!s->in_use)
                        continue;
 
                SpinLockAcquire(&s->mutex);
                restart_lsn = s->data.restart_lsn;
+               invalidated = s->data.invalidated != RS_INVAL_NONE;
                SpinLockRelease(&s->mutex);
 
+               /* invalidated slots need not apply */
+               if (invalidated)
+                       continue;
+
                if (restart_lsn != InvalidXLogRecPtr &&
                        (min_required == InvalidXLogRecPtr ||
                         restart_lsn < min_required))
@@ -946,6 +951,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
        {
                ReplicationSlot *s;
                XLogRecPtr      restart_lsn;
+               bool            invalidated;
 
                s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -960,8 +966,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
                /* read once, it's ok if it increases while we're checking */
                SpinLockAcquire(&s->mutex);
                restart_lsn = s->data.restart_lsn;
+               invalidated = s->data.invalidated != RS_INVAL_NONE;
                SpinLockRelease(&s->mutex);
 
+               /* invalidated slots need not apply */
+               if (invalidated)
+                       continue;
+
                if (restart_lsn == InvalidXLogRecPtr)
                        continue;
 
@@ -1012,6 +1023,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
                if (s->data.database != dboid)
                        continue;
 
+               /* NB: intentionally counting invalidated slots */
+
                /* count slots with spinlock held */
                SpinLockAcquire(&s->mutex);
                (*nslots)++;
@@ -1069,6 +1082,8 @@ restart:
                if (s->data.database != dboid)
                        continue;
 
+               /* NB: intentionally including invalidated slots */
+
                /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
                SpinLockAcquire(&s->mutex);
                /* can't change while ReplicationSlotControlLock is held */
@@ -1294,7 +1309,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
                {
                        MyReplicationSlot = s;
                        s->active_pid = MyProcPid;
-                       s->data.invalidated_at = restart_lsn;
+                       s->data.invalidated = RS_INVAL_WAL_REMOVED;
+
+                       /*
+                        * XXX: We should consider not overwriting restart_lsn and instead
+                        * just rely on .invalidated.
+                        */
                        s->data.restart_lsn = InvalidXLogRecPtr;
 
                        /* Let caller know */
index 2f3c96482413324de00675765229351733d8c54b..ad3e72be5ee2634983ab39804843aa48a9e2194e 100644 (file)
@@ -315,12 +315,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                        nulls[i++] = true;
 
                /*
-                * If invalidated_at is valid and restart_lsn is invalid, we know for
-                * certain that the slot has been invalidated.  Otherwise, test
-                * availability from restart_lsn.
+                * If the slot has not been invalidated, test availability from
+                * restart_lsn.
                 */
-               if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
-                       !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+               if (slot_contents.data.invalidated != RS_INVAL_NONE)
                        walstate = WALAVAIL_REMOVED;
                else
                        walstate = GetWALAvailability(slot_contents.data.restart_lsn);
index 8872c80cdfe7f6db9602b3152119c2e1844767eb..34ce055dd503d4a59c52065cd52d55806f48fec3 100644 (file)
@@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
        RS_TEMPORARY
 } ReplicationSlotPersistency;
 
+/*
+ * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
+ * 'invalidated' field is set to a value other than _NONE.
+ */
+typedef enum ReplicationSlotInvalidationCause
+{
+       RS_INVAL_NONE,
+       /* required WAL has been removed */
+       RS_INVAL_WAL_REMOVED,
+} ReplicationSlotInvalidationCause;
+
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -72,8 +83,8 @@ typedef struct ReplicationSlotPersistentData
        /* oldest LSN that might be required by this replication slot */
        XLogRecPtr      restart_lsn;
 
-       /* restart_lsn is copied here when the slot is invalidated */
-       XLogRecPtr      invalidated_at;
+       /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
+       ReplicationSlotInvalidationCause invalidated;
 
        /*
         * Oldest LSN that the client has acked receipt for.  This is used as the
index df960883c5ce3f69338683e58eb07219bb799555..b4058b88c3e9ef319d1677e13bbbf6bb3165ab4c 100644 (file)
@@ -2339,6 +2339,7 @@ ReplicaIdentityStmt
 ReplicationKind
 ReplicationSlot
 ReplicationSlotCtlData
+ReplicationSlotInvalidationCause
 ReplicationSlotOnDisk
 ReplicationSlotPersistency
 ReplicationSlotPersistentData