summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/slotsync.c57
-rw-r--r--src/backend/replication/slot.c22
2 files changed, 64 insertions, 15 deletions
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 9ac847b7806..d18e2c7342a 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -150,6 +150,7 @@ typedef struct RemoteSlot
} RemoteSlot;
static void slotsync_failure_callback(int code, Datum arg);
+static void update_synced_slots_inactive_since(void);
/*
* If necessary, update the local synced slot's metadata based on the data
@@ -584,6 +585,11 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* overwriting 'invalidated' flag to remote_slot's value. See
* InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
* if the slot is not acquired by other processes.
+ *
+ * XXX: If it ever turns out that slot acquire/release is costly for
+ * cases when none of the slot properties is changed then we can do a
+ * pre-check to ensure that at least one of the slot properties is
+ * changed before acquiring the slot.
*/
ReplicationSlotAcquire(remote_slot->name, true);
@@ -1356,6 +1362,54 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
}
/*
+ * Update the inactive_since property for synced slots.
+ *
+ * Note that this function is currently called when we shutdown the slot
+ * sync machinery.
+ */
+static void
+update_synced_slots_inactive_since(void)
+{
+ TimestampTz now = 0;
+
+ /*
+ * We need to update inactive_since only when we are promoting standby to
+ * correctly interpret the inactive_since if the standby gets promoted
+ * without a restart. We don't want the slots to appear inactive for a
+ * long time after promotion if they haven't been synchronized recently.
+ * Whoever acquires the slot i.e.makes the slot active will reset it.
+ */
+ if (!StandbyMode)
+ return;
+
+ /* The slot sync worker mustn't be running by now */
+ Assert(SlotSyncCtx->pid == InvalidPid);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ /* Check if it is a synchronized slot */
+ if (s->in_use && s->data.synced)
+ {
+ Assert(SlotIsLogical(s));
+
+ /* Use the same inactive_since time for all the slots. */
+ if (now == 0)
+ now = GetCurrentTimestamp();
+
+ SpinLockAcquire(&s->mutex);
+ s->inactive_since = now;
+ SpinLockRelease(&s->mutex);
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+}
+
+/*
* Shut down the slot sync worker.
*/
void
@@ -1368,6 +1422,7 @@ ShutDownSlotSync(void)
if (SlotSyncCtx->pid == InvalidPid)
{
SpinLockRelease(&SlotSyncCtx->mutex);
+ update_synced_slots_inactive_since();
return;
}
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1400,6 +1455,8 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
+
+ update_synced_slots_inactive_since();
}
/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d778c0b921a..3bddaae022a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -690,13 +690,10 @@ ReplicationSlotRelease(void)
}
/*
- * Set the last inactive time after marking the slot inactive. We don't
- * set it for the slots currently being synced from the primary to the
- * standby because such slots are typically inactive as decoding is not
- * allowed on those.
+ * Set the time since the slot has become inactive. We get the current
+ * time beforehand to avoid system call while holding the spinlock.
*/
- if (!(RecoveryInProgress() && slot->data.synced))
- now = GetCurrentTimestamp();
+ now = GetCurrentTimestamp();
if (slot->data.persistency == RS_PERSISTENT)
{
@@ -2369,16 +2366,11 @@ RestoreSlotFromDisk(const char *name)
slot->active_pid = 0;
/*
- * We set the last inactive time after loading the slot from the disk
- * into memory. Whoever acquires the slot i.e. makes the slot active
- * will reset it. We don't set it for the slots currently being synced
- * from the primary to the standby because such slots are typically
- * inactive as decoding is not allowed on those.
+ * Set the time since the slot has become inactive after loading the
+ * slot from the disk into memory. Whoever acquires the slot i.e.
+ * makes the slot active will reset it.
*/
- if (!(RecoveryInProgress() && slot->data.synced))
- slot->inactive_since = GetCurrentTimestamp();
- else
- slot->inactive_since = 0;
+ slot->inactive_since = GetCurrentTimestamp();
restored = true;
break;