summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorAmit Kapila2025-04-03 06:37:46 +0000
committerAmit Kapila2025-04-03 06:56:54 +0000
commit4868c96bc8c60145958f1a83bbe9409718a696cb (patch)
treedd6b5c3f0d809d26aaa2f800011b36b92ecf095b /src/backend
parenta7187c3723b41057522038c5e5db329d84f41ac4 (diff)
Fix slot synchronization for two_phase enabled slots.
The issue is that the transactions prepared before two-phase decoding is enabled can fail to replicate to the subscriber after being committed on a promoted standby following a failover. This is because the two_phase_at field of a slot, which tracks the LSN from which two-phase decoding starts, is not synchronized to standby servers. Without two_phase_at, the logical decoding might incorrectly identify prepared transaction as already replicated to the subscriber after promotion of standby server, causing them to be skipped. To address the issue on HEAD, the two_phase_at field of the slot is exposed by the pg_replication_slots view and allows the slot synchronization to copy this value to the corresponding synced slot on the standby server. This bug is likely to occur if the user toggles the two_phase option to true after initial slot creation. Given that altering the two_phase option of a replication slot is not allowed in PostgreSQL 17, this bug is less likely to occur. We can't change the view/function definition in backbranch so we can't push the same fix but we are brainstorming an appropriate solution for PG17. Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/TYAPR01MB5724CC7C288535BBCEEE65DA94A72@TYAPR01MB5724.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/system_views.sql1
-rw-r--r--src/backend/replication/logical/slotsync.c14
-rw-r--r--src/backend/replication/slotfuncs.c8
3 files changed, 18 insertions, 5 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 64a7240aa77..273008db37f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1025,6 +1025,7 @@ CREATE VIEW pg_replication_slots AS
L.wal_status,
L.safe_wal_size,
L.two_phase,
+ L.two_phase_at,
L.inactive_since,
L.conflicting,
L.invalidation_reason,
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2c0a7439be4..e22d41891e6 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -139,6 +139,7 @@ typedef struct RemoteSlot
bool failover;
XLogRecPtr restart_lsn;
XLogRecPtr confirmed_lsn;
+ XLogRecPtr two_phase_at;
TransactionId catalog_xmin;
/* RS_INVAL_NONE if valid, or the reason of invalidation */
@@ -276,7 +277,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
if (remote_dbid != slot->data.database ||
remote_slot->two_phase != slot->data.two_phase ||
remote_slot->failover != slot->data.failover ||
- strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+ strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
+ remote_slot->two_phase_at != slot->data.two_phase_at)
{
NameData plugin_name;
@@ -287,6 +289,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
slot->data.plugin = plugin_name;
slot->data.database = remote_dbid;
slot->data.two_phase = remote_slot->two_phase;
+ slot->data.two_phase_at = remote_slot->two_phase_at;
slot->data.failover = remote_slot->failover;
SpinLockRelease(&slot->mutex);
@@ -788,9 +791,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
static bool
synchronize_slots(WalReceiverConn *wrconn)
{
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
- LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+ LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
WalRcvExecResult *res;
TupleTableSlot *tupslot;
@@ -798,7 +801,7 @@ synchronize_slots(WalReceiverConn *wrconn)
bool some_slot_updated = false;
bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
- " restart_lsn, catalog_xmin, two_phase, failover,"
+ " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
" database, invalidation_reason"
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
@@ -853,6 +856,9 @@ synchronize_slots(WalReceiverConn *wrconn)
&isnull));
Assert(!isnull);
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
&isnull));
Assert(!isnull);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 146eef5871e..8a314b5ff3b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -235,7 +235,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 19
+#define PG_GET_REPLICATION_SLOTS_COLS 20
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -406,6 +406,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+ if (slot_contents.data.two_phase &&
+ !XLogRecPtrIsInvalid(slot_contents.data.two_phase_at))
+ values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
+ else
+ nulls[i++] = true;
+
if (slot_contents.inactive_since > 0)
values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
else