diff options
| author | Amit Kapila | 2024-02-14 04:15:36 +0000 |
|---|---|---|
| committer | Amit Kapila | 2024-02-14 04:15:36 +0000 |
| commit | ddd5f4f54a026db6a6692876d0d44aef902ab686 (patch) | |
| tree | 68d374eb80a2a16eb0b011f58e3df25de7878d50 /src/include/replication | |
| parent | 06bd311bce24083c76d9741ae89c98750aaf4b41 (diff) | |
Add a slot synchronization function.
This commit introduces a new SQL function pg_sync_replication_slots()
which is used to synchronize the logical replication slots from the
primary server to the physical standby so that logical replication can be
resumed after a failover or planned switchover.
A new 'synced' flag is introduced in pg_replication_slots view, indicating
whether the slot has been synchronized from the primary server. On a
standby, synced slots cannot be dropped or consumed, and any attempt to
perform logical decoding on them will result in an error.
The logical replication slots on the primary can be synchronized to the
hot standby by using the 'failover' parameter of
pg-create-logical-replication-slot(), or by using the 'failover' option of
CREATE SUBSCRIPTION during slot creation, and then calling
pg_sync_replication_slots() on standby. For the synchronization to work,
it is mandatory to have a physical replication slot between the primary
and the standby aka 'primary_slot_name' should be configured on the
standby, and 'hot_standby_feedback' must be enabled on the standby. It is
also necessary to specify a valid 'dbname' in the 'primary_conninfo'.
If a logical slot is invalidated on the primary, then that slot on the
standby is also invalidated.
If a logical slot on the primary is valid but is invalidated on the
standby, then that slot is dropped but will be recreated on the standby in
the next pg_sync_replication_slots() call provided the slot still exists
on the primary server. It is okay to recreate such slots as long as these
are not consumable on standby (which is the case currently). This
situation may occur due to the following reasons:
- The 'max_slot_wal_keep_size' on the standby is insufficient to retain
WAL records from the restart_lsn of the slot.
- 'primary_slot_name' is temporarily reset to null and the physical slot
is removed.
The slot synchronization status on the standby can be monitored using the
'synced' column of pg_replication_slots view.
A functionality to automatically synchronize slots by a background worker
and allow logical walsenders to wait for the physical will be done in
subsequent commits.
Author: Hou Zhijie, Shveta Malik, Ajin Cherian based on an earlier version by Peter Eisentraut
Reviewed-by: Masahiko Sawada, Bertrand Drouvot, Peter Smith, Dilip Kumar, Nisha Moond, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
Diffstat (limited to 'src/include/replication')
| -rw-r--r-- | src/include/replication/slot.h | 19 | ||||
| -rw-r--r-- | src/include/replication/slotsync.h | 23 | ||||
| -rw-r--r-- | src/include/replication/walsender.h | 3 |
3 files changed, 44 insertions, 1 deletions
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index da4c7764921..e706ca834c0 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -53,6 +53,14 @@ typedef enum ReplicationSlotInvalidationCause } ReplicationSlotInvalidationCause; /* + * The possible values for 'conflict_reason' returned in + * pg_get_replication_slots. + */ +#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed" +#define SLOT_INVAL_HORIZON_TEXT "rows_removed" +#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient" + +/* * On-Disk data of a replication slot, preserved across restarts. */ typedef struct ReplicationSlotPersistentData @@ -113,6 +121,11 @@ typedef struct ReplicationSlotPersistentData NameData plugin; /* + * Was this slot synchronized from the primary server? + */ + char synced; + + /* * Is this a failover slot (sync candidate for standbys)? Only relevant * for logical slots on the primary server. */ @@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); @@ -259,5 +274,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern ReplicationSlotInvalidationCause + GetSlotInvalidationCause(char *conflict_reason); #endif /* SLOT_H */ diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h new file mode 100644 index 00000000000..e86d8a47b85 --- /dev/null +++ b/src/include/replication/slotsync.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * slotsync.h + * Exports for slot synchronization. + * + * Portions Copyright (c) 2016-2024, PostgreSQL Global Development Group + * + * src/include/replication/slotsync.h + * + *------------------------------------------------------------------------- + */ +#ifndef SLOTSYNC_H +#define SLOTSYNC_H + +#include "replication/walreceiver.h" + +extern void ValidateSlotSyncParams(void); +extern bool IsSyncingReplicationSlots(void); +extern Size SlotSyncShmemSize(void); +extern void SlotSyncShmemInit(void); +extern void SyncReplicationSlots(WalReceiverConn *wrconn); + +#endif /* SLOTSYNC_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 1b58d50b3b9..0c3996e9263 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -12,6 +12,8 @@ #ifndef _WALSENDER_H #define _WALSENDER_H +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -37,6 +39,7 @@ extern void InitWalSender(void); extern bool exec_replication_command(const char *cmd_string); extern void WalSndErrorCleanup(void); extern void WalSndResourceCleanup(bool isCommit); +extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); |
