<para>
The time since the slot has become inactive.
<literal>NULL</literal> if the slot is currently being used.
+ Note that for slots on the standby that are being synced from a
+ primary server (whose <structfield>synced</structfield> field is
+ <literal>true</literal>), the
+ <structfield>inactive_since</structfield> indicates the last
+ synchronization (see
+ <xref linkend="logicaldecoding-replication-slots-synchronization"/>)
+ time.
</para></entry>
</row>
} RemoteSlot;
static void slotsync_failure_callback(int code, Datum arg);
+static void update_synced_slots_inactive_since(void);
/*
* If necessary, update the local synced slot's metadata based on the data
* overwriting 'invalidated' flag to remote_slot's value. See
* InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
* if the slot is not acquired by other processes.
+ *
+ * XXX: If it ever turns out that slot acquire/release is costly for
+ * cases when none of the slot properties is changed then we can do a
+ * pre-check to ensure that at least one of the slot properties is
+ * changed before acquiring the slot.
*/
ReplicationSlotAcquire(remote_slot->name, true);
Assert(false);
}
+/*
+ * Update the inactive_since property for synced slots.
+ *
+ * Note that this function is currently called when we shutdown the slot
+ * sync machinery.
+ */
+static void
+update_synced_slots_inactive_since(void)
+{
+ TimestampTz now = 0;
+
+ /*
+ * We need to update inactive_since only when we are promoting standby to
+ * correctly interpret the inactive_since if the standby gets promoted
+ * without a restart. We don't want the slots to appear inactive for a
+ * long time after promotion if they haven't been synchronized recently.
+ * Whoever acquires the slot i.e.makes the slot active will reset it.
+ */
+ if (!StandbyMode)
+ return;
+
+ /* The slot sync worker mustn't be running by now */
+ Assert(SlotSyncCtx->pid == InvalidPid);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ /* Check if it is a synchronized slot */
+ if (s->in_use && s->data.synced)
+ {
+ Assert(SlotIsLogical(s));
+
+ /* Use the same inactive_since time for all the slots. */
+ if (now == 0)
+ now = GetCurrentTimestamp();
+
+ SpinLockAcquire(&s->mutex);
+ s->inactive_since = now;
+ SpinLockRelease(&s->mutex);
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+}
+
/*
* Shut down the slot sync worker.
*/
if (SlotSyncCtx->pid == InvalidPid)
{
SpinLockRelease(&SlotSyncCtx->mutex);
+ update_synced_slots_inactive_since();
return;
}
SpinLockRelease(&SlotSyncCtx->mutex);
}
SpinLockRelease(&SlotSyncCtx->mutex);
+
+ update_synced_slots_inactive_since();
}
/*
}
/*
- * Set the last inactive time after marking the slot inactive. We don't
- * set it for the slots currently being synced from the primary to the
- * standby because such slots are typically inactive as decoding is not
- * allowed on those.
+ * Set the time since the slot has become inactive. We get the current
+ * time beforehand to avoid system call while holding the spinlock.
*/
- if (!(RecoveryInProgress() && slot->data.synced))
- now = GetCurrentTimestamp();
+ now = GetCurrentTimestamp();
if (slot->data.persistency == RS_PERSISTENT)
{
slot->active_pid = 0;
/*
- * We set the last inactive time after loading the slot from the disk
- * into memory. Whoever acquires the slot i.e. makes the slot active
- * will reset it. We don't set it for the slots currently being synced
- * from the primary to the standby because such slots are typically
- * inactive as decoding is not allowed on those.
+ * Set the time since the slot has become inactive after loading the
+ * slot from the disk into memory. Whoever acquires the slot i.e.
+ * makes the slot active will reset it.
*/
- if (!(RecoveryInProgress() && slot->data.synced))
- slot->inactive_since = GetCurrentTimestamp();
- else
- slot->inactive_since = 0;
+ slot->inactive_since = GetCurrentTimestamp();
restored = true;
break;
=pod
+=item $node->validate_slot_inactive_since(self, slot_name, reference_time)
+
+Validate inactive_since value of a given replication slot against the reference
+time and return it.
+
+=cut
+
+sub validate_slot_inactive_since
+{
+ my ($self, $slot_name, $reference_time) = @_;
+ my $name = $self->name;
+
+ my $inactive_since = $self->safe_psql('postgres',
+ qq(SELECT inactive_since FROM pg_replication_slots
+ WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
+ );
+
+ # Check that the inactive_since is sane
+ is($self->safe_psql('postgres',
+ qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+ '$inactive_since'::timestamptz > '$reference_time'::timestamptz;]
+ ),
+ 't',
+ "last inactive time for slot $slot_name is valid on node $name")
+ or die "could not validate captured inactive_since for slot $slot_name";
+
+ return $inactive_since;
+}
+
+=pod
+
=item $node->advance_wal(num)
Advance WAL of node by given number of segments.
# Get inactive_since value after the slot's creation. Note that the slot is
# still inactive till it's used by the standby below.
my $inactive_since =
- capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time);
+ $primary4->validate_slot_inactive_since($sb4_slot, $slot_creation_time);
$standby4->start;
# Get inactive_since value after the slot's creation. Note that the slot is
# still inactive till it's used by the subscriber below.
$inactive_since =
- capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time);
+ $publisher4->validate_slot_inactive_since($lsub4_slot, $slot_creation_time);
$subscriber4->start;
$subscriber4->safe_psql('postgres',
$publisher4->stop;
$subscriber4->stop;
-# Capture and validate inactive_since of a given slot.
-sub capture_and_validate_slot_inactive_since
-{
- my ($node, $slot_name, $slot_creation_time) = @_;
-
- my $inactive_since = $node->safe_psql('postgres',
- qq(SELECT inactive_since FROM pg_replication_slots
- WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
- );
-
- # Check that the captured time is sane
- is( $node->safe_psql(
- 'postgres',
- qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
- '$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
- ),
- 't',
- "last inactive time for an active slot $slot_name is sane");
-
- return $inactive_since;
-}
-
done_testing();
$subscriber1->init;
$subscriber1->start;
+# Capture the time before the logical failover slot is created on the
+# primary. We later call this publisher as primary anyway.
+my $slot_creation_time_on_primary = $publisher->safe_psql(
+ 'postgres', qq[
+ SELECT current_timestamp;
+]);
+
# Create a slot on the publisher with failover disabled
$publisher->safe_psql('postgres',
"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
1);
+# Capture the inactive_since of the slot from the primary. Note that the slot
+# will be inactive since the corresponding subscription is disabled.
+my $inactive_since_on_primary =
+ $primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
+
# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);
"t",
'logical slots have synced as true on standby');
+# Capture the inactive_since of the synced slot on the standby
+my $inactive_since_on_standby =
+ $standby1->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
+
+# Synced slot on the standby must get its own inactive_since
+is( $standby1->safe_psql(
+ 'postgres',
+ "SELECT '$inactive_since_on_primary'::timestamptz < '$inactive_since_on_standby'::timestamptz;"
+ ),
+ "t",
+ 'synchronized slot has got its own inactive_since');
+
##################################################
# Test that the synchronized slot will be dropped if the corresponding remote
# slot on the primary server has been dropped.
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
$standby1->reload;
+# Capture the time before the logical failover slot is created on the primary.
+# Note that the subscription creates the slot again on the primary.
+$slot_creation_time_on_primary = $publisher->safe_psql(
+ 'postgres', qq[
+ SELECT current_timestamp;
+]);
+
# To ensure that restart_lsn has moved to a recent WAL position, we re-create
# the subscription and the logical slot.
$subscriber1->safe_psql(
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
1);
+# Capture the inactive_since of the slot from the primary. Note that the slot
+# will be inactive since the corresponding subscription is disabled.
+$inactive_since_on_primary =
+ $primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
+
# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);
$standby1->start;
$primary->wait_for_replay_catchup($standby1);
+# Capture the time before the standby is promoted
+my $promotion_time_on_primary = $standby1->safe_psql(
+ 'postgres', qq[
+ SELECT current_timestamp;
+]);
+
$standby1->promote;
+# Capture the inactive_since of the synced slot after the promotion.
+# The expectation here is that the slot gets its inactive_since as part of the
+# promotion. We do this check before the slot is enabled on the new primary
+# below, otherwise, the slot gets active setting inactive_since to NULL.
+my $inactive_since_on_new_primary =
+ $standby1->validate_slot_inactive_since('lsub1_slot', $promotion_time_on_primary);
+
+is( $standby1->safe_psql(
+ 'postgres',
+ "SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
+ ),
+ "t",
+ 'synchronized slot has got its own inactive_since on the new primary after promotion');
+
# Update subscription with the new primary's connection info
my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
$subscriber1->safe_psql('postgres',