Allow synced slots to have their inactive_since.
authorAmit Kapila <akapila@postgresql.org>
Fri, 5 Apr 2024 04:18:49 +0000 (09:48 +0530)
committerAmit Kapila <akapila@postgresql.org>
Fri, 5 Apr 2024 04:18:49 +0000 (09:48 +0530)
This commit does two things:
1) Maintains inactive_since for sync slots whenever the slot is released
just like any other regular slot.

2) Ensures the value is set to the current timestamp during the promotion
of standby to help correctly interpret the time after promotion. We don't
want the slots to appear inactive for a long time after promotion if they
haven't been synchronized recently. This would also avoid the invalidation
of such slots immediately after promotion if tomorrow we have a feature
that invalidates slots based on their inactivity time. Whoever acquires
the slot i.e. makes the slot active will reset it to NULL.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik, Masahiko Sawada
Discussion: https://postgr.es/m/CAA4eK1KrPGwfZV9LYGidjxHeW+rxJ=E2ThjXvwRGLO=iLNuo=Q@mail.gmail.com
Discussion: https://postgr.es/m/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://postgr.es/m/CA+Tgmob_Ta-t2ty8QrKHBGnNLrf4ZYcwhGHGFsuUoFrAEDw4sA@mail.gmail.com

doc/src/sgml/system-views.sgml
src/backend/replication/logical/slotsync.c
src/backend/replication/slot.c
src/test/perl/PostgreSQL/Test/Cluster.pm
src/test/recovery/t/019_replslot_limit.pl
src/test/recovery/t/040_standby_failover_slots_sync.pl

index 3c8dca8ca300ccf46743ba66cbaa0621a7c457ae..7ed617170f20ab705e8591028487dc3a3036b1ea 100644 (file)
@@ -2530,6 +2530,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       <para>
         The time since the slot has become inactive.
         <literal>NULL</literal> if the slot is currently being used.
+        Note that for slots on the standby that are being synced from a
+        primary server (whose <structfield>synced</structfield> field is
+        <literal>true</literal>), the
+        <structfield>inactive_since</structfield> indicates the last
+        synchronization (see
+        <xref linkend="logicaldecoding-replication-slots-synchronization"/>)
+        time.
       </para></entry>
      </row>
 
index 9ac847b7806756e2fa643af5d89e8d1a0c5f7fb4..d18e2c7342ab7a2b366d778b61cba2814e616fcd 100644 (file)
@@ -150,6 +150,7 @@ typedef struct RemoteSlot
 } RemoteSlot;
 
 static void slotsync_failure_callback(int code, Datum arg);
+static void update_synced_slots_inactive_since(void);
 
 /*
  * If necessary, update the local synced slot's metadata based on the data
@@ -584,6 +585,11 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
         * overwriting 'invalidated' flag to remote_slot's value. See
         * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
         * if the slot is not acquired by other processes.
+        *
+        * XXX: If it ever turns out that slot acquire/release is costly for
+        * cases when none of the slot properties is changed then we can do a
+        * pre-check to ensure that at least one of the slot properties is
+        * changed before acquiring the slot.
         */
        ReplicationSlotAcquire(remote_slot->name, true);
 
@@ -1355,6 +1361,54 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
    Assert(false);
 }
 
+/*
+ * Update the inactive_since property for synced slots.
+ *
+ * Note that this function is currently called when we shutdown the slot
+ * sync machinery.
+ */
+static void
+update_synced_slots_inactive_since(void)
+{
+   TimestampTz now = 0;
+
+   /*
+    * We need to update inactive_since only when we are promoting standby to
+    * correctly interpret the inactive_since if the standby gets promoted
+    * without a restart. We don't want the slots to appear inactive for a
+    * long time after promotion if they haven't been synchronized recently.
+    * Whoever acquires the slot i.e.makes the slot active will reset it.
+    */
+   if (!StandbyMode)
+       return;
+
+   /* The slot sync worker mustn't be running by now */
+   Assert(SlotSyncCtx->pid == InvalidPid);
+
+   LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+   for (int i = 0; i < max_replication_slots; i++)
+   {
+       ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+       /* Check if it is a synchronized slot */
+       if (s->in_use && s->data.synced)
+       {
+           Assert(SlotIsLogical(s));
+
+           /* Use the same inactive_since time for all the slots. */
+           if (now == 0)
+               now = GetCurrentTimestamp();
+
+           SpinLockAcquire(&s->mutex);
+           s->inactive_since = now;
+           SpinLockRelease(&s->mutex);
+       }
+   }
+
+   LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Shut down the slot sync worker.
  */
@@ -1368,6 +1422,7 @@ ShutDownSlotSync(void)
    if (SlotSyncCtx->pid == InvalidPid)
    {
        SpinLockRelease(&SlotSyncCtx->mutex);
+       update_synced_slots_inactive_since();
        return;
    }
    SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1400,6 +1455,8 @@ ShutDownSlotSync(void)
    }
 
    SpinLockRelease(&SlotSyncCtx->mutex);
+
+   update_synced_slots_inactive_since();
 }
 
 /*
index d778c0b921a01e3b4fea3ab46b68dcc2b1eff4c8..3bddaae022adea86168a63c1c58850d6b202cb4e 100644 (file)
@@ -690,13 +690,10 @@ ReplicationSlotRelease(void)
    }
 
    /*
-    * Set the last inactive time after marking the slot inactive. We don't
-    * set it for the slots currently being synced from the primary to the
-    * standby because such slots are typically inactive as decoding is not
-    * allowed on those.
+    * Set the time since the slot has become inactive. We get the current
+    * time beforehand to avoid system call while holding the spinlock.
     */
-   if (!(RecoveryInProgress() && slot->data.synced))
-       now = GetCurrentTimestamp();
+   now = GetCurrentTimestamp();
 
    if (slot->data.persistency == RS_PERSISTENT)
    {
@@ -2369,16 +2366,11 @@ RestoreSlotFromDisk(const char *name)
        slot->active_pid = 0;
 
        /*
-        * We set the last inactive time after loading the slot from the disk
-        * into memory. Whoever acquires the slot i.e. makes the slot active
-        * will reset it. We don't set it for the slots currently being synced
-        * from the primary to the standby because such slots are typically
-        * inactive as decoding is not allowed on those.
+        * Set the time since the slot has become inactive after loading the
+        * slot from the disk into memory. Whoever acquires the slot i.e.
+        * makes the slot active will reset it.
         */
-       if (!(RecoveryInProgress() && slot->data.synced))
-           slot->inactive_since = GetCurrentTimestamp();
-       else
-           slot->inactive_since = 0;
+       slot->inactive_since = GetCurrentTimestamp();
 
        restored = true;
        break;
index b08296605c407ce64b4f604bdc5c6c086f9a81cf..54e1008ae58baaf09e4c606b8e3e0373e9836251 100644 (file)
@@ -3276,6 +3276,37 @@ sub create_logical_slot_on_standby
 
 =pod
 
+=item $node->validate_slot_inactive_since(self, slot_name, reference_time)
+
+Validate inactive_since value of a given replication slot against the reference
+time and return it.
+
+=cut
+
+sub validate_slot_inactive_since
+{
+   my ($self, $slot_name, $reference_time) = @_;
+   my $name = $self->name;
+
+   my $inactive_since = $self->safe_psql('postgres',
+       qq(SELECT inactive_since FROM pg_replication_slots
+           WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
+       );
+
+   # Check that the inactive_since is sane
+   is($self->safe_psql('postgres',
+       qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+               '$inactive_since'::timestamptz > '$reference_time'::timestamptz;]
+       ),
+       't',
+       "last inactive time for slot $slot_name is valid on node $name")
+       or die "could not validate captured inactive_since for slot $slot_name";
+
+   return $inactive_since;
+}
+
+=pod
+
 =item $node->advance_wal(num)
 
 Advance WAL of node by given number of segments.
index 3b9a306a8bb2caec16ee692680cc02b2074fe1d6..96b60cedbbdbf5f6f471219ead8b7ed3ba3ded77 100644 (file)
@@ -443,7 +443,7 @@ $primary4->safe_psql(
 # Get inactive_since value after the slot's creation. Note that the slot is
 # still inactive till it's used by the standby below.
 my $inactive_since =
-   capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time);
+   $primary4->validate_slot_inactive_since($sb4_slot, $slot_creation_time);
 
 $standby4->start;
 
@@ -502,7 +502,7 @@ $publisher4->safe_psql('postgres',
 # Get inactive_since value after the slot's creation. Note that the slot is
 # still inactive till it's used by the subscriber below.
 $inactive_since =
-   capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time);
+   $publisher4->validate_slot_inactive_since($lsub4_slot, $slot_creation_time);
 
 $subscriber4->start;
 $subscriber4->safe_psql('postgres',
@@ -540,26 +540,4 @@ is( $publisher4->safe_psql(
 $publisher4->stop;
 $subscriber4->stop;
 
-# Capture and validate inactive_since of a given slot.
-sub capture_and_validate_slot_inactive_since
-{
-   my ($node, $slot_name, $slot_creation_time) = @_;
-
-   my $inactive_since = $node->safe_psql('postgres',
-       qq(SELECT inactive_since FROM pg_replication_slots
-           WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
-       );
-
-   # Check that the captured time is sane
-   is( $node->safe_psql(
-           'postgres',
-           qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
-               '$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
-       ),
-       't',
-       "last inactive time for an active slot $slot_name is sane");
-
-   return $inactive_since;
-}
-
 done_testing();
index 869e3d2e9146188a52957cb3ddec8ee7018bdf54..3234b1ad7d81d7cce0920067f04523ef87559554 100644 (file)
@@ -35,6 +35,13 @@ my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
 $subscriber1->init;
 $subscriber1->start;
 
+# Capture the time before the logical failover slot is created on the
+# primary. We later call this publisher as primary anyway.
+my $slot_creation_time_on_primary = $publisher->safe_psql(
+   'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 # Create a slot on the publisher with failover disabled
 $publisher->safe_psql('postgres',
    "SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
@@ -174,6 +181,11 @@ $primary->poll_query_until(
    "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
    1);
 
+# Capture the inactive_since of the slot from the primary. Note that the slot
+# will be inactive since the corresponding subscription is disabled.
+my $inactive_since_on_primary =
+   $primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
+
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the subscriber.
 $primary->wait_for_replay_catchup($standby1);
@@ -190,6 +202,18 @@ is( $standby1->safe_psql(
    "t",
    'logical slots have synced as true on standby');
 
+# Capture the inactive_since of the synced slot on the standby
+my $inactive_since_on_standby =
+   $standby1->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
+
+# Synced slot on the standby must get its own inactive_since
+is( $standby1->safe_psql(
+       'postgres',
+       "SELECT '$inactive_since_on_primary'::timestamptz < '$inactive_since_on_standby'::timestamptz;"
+   ),
+   "t",
+   'synchronized slot has got its own inactive_since');
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
@@ -237,6 +261,13 @@ is( $standby1->safe_psql(
 $standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
 $standby1->reload;
 
+# Capture the time before the logical failover slot is created on the primary.
+# Note that the subscription creates the slot again on the primary.
+$slot_creation_time_on_primary = $publisher->safe_psql(
+   'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 # To ensure that restart_lsn has moved to a recent WAL position, we re-create
 # the subscription and the logical slot.
 $subscriber1->safe_psql(
@@ -257,6 +288,11 @@ $primary->poll_query_until(
    "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
    1);
 
+# Capture the inactive_since of the slot from the primary. Note that the slot
+# will be inactive since the corresponding subscription is disabled.
+$inactive_since_on_primary =
+   $primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
+
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the subscriber.
 $primary->wait_for_replay_catchup($standby1);
@@ -808,8 +844,28 @@ $primary->reload;
 $standby1->start;
 $primary->wait_for_replay_catchup($standby1);
 
+# Capture the time before the standby is promoted
+my $promotion_time_on_primary = $standby1->safe_psql(
+   'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 $standby1->promote;
 
+# Capture the inactive_since of the synced slot after the promotion.
+# The expectation here is that the slot gets its inactive_since as part of the
+# promotion. We do this check before the slot is enabled on the new primary
+# below, otherwise, the slot gets active setting inactive_since to NULL.
+my $inactive_since_on_new_primary =
+   $standby1->validate_slot_inactive_since('lsub1_slot', $promotion_time_on_primary);
+
+is( $standby1->safe_psql(
+       'postgres',
+       "SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
+   ),
+   "t",
+   'synchronized slot has got its own inactive_since on the new primary after promotion');
+
 # Update subscription with the new primary's connection info
 my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
 $subscriber1->safe_psql('postgres',