diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/system_views.sql | 1 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 14 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 8 |
3 files changed, 18 insertions, 5 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 64a7240aa77..273008db37f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1025,6 +1025,7 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, + L.two_phase_at, L.inactive_since, L.conflicting, L.invalidation_reason, diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2c0a7439be4..e22d41891e6 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -139,6 +139,7 @@ typedef struct RemoteSlot bool failover; XLogRecPtr restart_lsn; XLogRecPtr confirmed_lsn; + XLogRecPtr two_phase_at; TransactionId catalog_xmin; /* RS_INVAL_NONE if valid, or the reason of invalidation */ @@ -276,7 +277,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, if (remote_dbid != slot->data.database || remote_slot->two_phase != slot->data.two_phase || remote_slot->failover != slot->data.failover || - strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0) + strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 || + remote_slot->two_phase_at != slot->data.two_phase_at) { NameData plugin_name; @@ -287,6 +289,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, slot->data.plugin = plugin_name; slot->data.database = remote_dbid; slot->data.two_phase = remote_slot->two_phase; + slot->data.two_phase_at = remote_slot->two_phase_at; slot->data.failover = remote_slot->failover; SpinLockRelease(&slot->mutex); @@ -788,9 +791,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) static bool synchronize_slots(WalReceiverConn *wrconn) { -#define SLOTSYNC_COLUMN_COUNT 9 +#define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, - LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID}; WalRcvExecResult *res; TupleTableSlot *tupslot; @@ -798,7 +801,7 @@ synchronize_slots(WalReceiverConn *wrconn) bool some_slot_updated = false; bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, failover," + " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," " database, invalidation_reason" " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; @@ -853,6 +856,9 @@ synchronize_slots(WalReceiverConn *wrconn) &isnull)); Assert(!isnull); + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, &isnull)); Assert(!isnull); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 146eef5871e..8a314b5ff3b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -235,7 +235,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 19 +#define PG_GET_REPLICATION_SLOTS_COLS 20 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -406,6 +406,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + if (slot_contents.data.two_phase && + !XLogRecPtrIsInvalid(slot_contents.data.two_phase_at)) + values[i++] = LSNGetDatum(slot_contents.data.two_phase_at); + else + nulls[i++] = true; + if (slot_contents.inactive_since > 0) values[i++] = TimestampTzGetDatum(slot_contents.inactive_since); else |