summaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
authorAmit Kapila2024-02-14 04:15:36 +0000
committerAmit Kapila2024-02-14 04:15:36 +0000
commitddd5f4f54a026db6a6692876d0d44aef902ab686 (patch)
tree68d374eb80a2a16eb0b011f58e3df25de7878d50 /src/include/replication
parent06bd311bce24083c76d9741ae89c98750aaf4b41 (diff)
Add a slot synchronization function.
This commit introduces a new SQL function pg_sync_replication_slots() which is used to synchronize the logical replication slots from the primary server to the physical standby so that logical replication can be resumed after a failover or planned switchover. A new 'synced' flag is introduced in pg_replication_slots view, indicating whether the slot has been synchronized from the primary server. On a standby, synced slots cannot be dropped or consumed, and any attempt to perform logical decoding on them will result in an error. The logical replication slots on the primary can be synchronized to the hot standby by using the 'failover' parameter of pg-create-logical-replication-slot(), or by using the 'failover' option of CREATE SUBSCRIPTION during slot creation, and then calling pg_sync_replication_slots() on standby. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby aka 'primary_slot_name' should be configured on the standby, and 'hot_standby_feedback' must be enabled on the standby. It is also necessary to specify a valid 'dbname' in the 'primary_conninfo'. If a logical slot is invalidated on the primary, then that slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby, then that slot is dropped but will be recreated on the standby in the next pg_sync_replication_slots() call provided the slot still exists on the primary server. It is okay to recreate such slots as long as these are not consumable on standby (which is the case currently). This situation may occur due to the following reasons: - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL records from the restart_lsn of the slot. - 'primary_slot_name' is temporarily reset to null and the physical slot is removed. The slot synchronization status on the standby can be monitored using the 'synced' column of pg_replication_slots view. A functionality to automatically synchronize slots by a background worker and allow logical walsenders to wait for the physical will be done in subsequent commits. Author: Hou Zhijie, Shveta Malik, Ajin Cherian based on an earlier version by Peter Eisentraut Reviewed-by: Masahiko Sawada, Bertrand Drouvot, Peter Smith, Dilip Kumar, Nisha Moond, Kuroda Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/slot.h19
-rw-r--r--src/include/replication/slotsync.h23
-rw-r--r--src/include/replication/walsender.h3
3 files changed, 44 insertions, 1 deletions
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index da4c7764921..e706ca834c0 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,14 @@ typedef enum ReplicationSlotInvalidationCause
} ReplicationSlotInvalidationCause;
/*
+ * The possible values for 'conflict_reason' returned in
+ * pg_get_replication_slots.
+ */
+#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed"
+#define SLOT_INVAL_HORIZON_TEXT "rows_removed"
+#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient"
+
+/*
* On-Disk data of a replication slot, preserved across restarts.
*/
typedef struct ReplicationSlotPersistentData
@@ -113,6 +121,11 @@ typedef struct ReplicationSlotPersistentData
NameData plugin;
/*
+ * Was this slot synchronized from the primary server?
+ */
+ char synced;
+
+ /*
* Is this a failover slot (sync candidate for standbys)? Only relevant
* for logical slots on the primary server.
*/
@@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover);
+ bool two_phase, bool failover,
+ bool synced);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
+extern void ReplicationSlotDropAcquired(void);
extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
@@ -259,5 +274,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown);
extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
+extern ReplicationSlotInvalidationCause
+ GetSlotInvalidationCause(char *conflict_reason);
#endif /* SLOT_H */
diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h
new file mode 100644
index 00000000000..e86d8a47b85
--- /dev/null
+++ b/src/include/replication/slotsync.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ *
+ * slotsync.h
+ * Exports for slot synchronization.
+ *
+ * Portions Copyright (c) 2016-2024, PostgreSQL Global Development Group
+ *
+ * src/include/replication/slotsync.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOTSYNC_H
+#define SLOTSYNC_H
+
+#include "replication/walreceiver.h"
+
+extern void ValidateSlotSyncParams(void);
+extern bool IsSyncingReplicationSlots(void);
+extern Size SlotSyncShmemSize(void);
+extern void SlotSyncShmemInit(void);
+extern void SyncReplicationSlots(WalReceiverConn *wrconn);
+
+#endif /* SLOTSYNC_H */
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 1b58d50b3b9..0c3996e9263 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -12,6 +12,8 @@
#ifndef _WALSENDER_H
#define _WALSENDER_H
+#include "access/xlogdefs.h"
+
/*
* What to do with a snapshot in create replication slot command.
*/
@@ -37,6 +39,7 @@ extern void InitWalSender(void);
extern bool exec_replication_command(const char *cmd_string);
extern void WalSndErrorCleanup(void);
extern void WalSndResourceCleanup(bool isCommit);
+extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);