summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/system_views.sql1
-rw-r--r--src/backend/replication/logical/slotsync.c14
-rw-r--r--src/backend/replication/slotfuncs.c8
3 files changed, 18 insertions, 5 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 64a7240aa77..273008db37f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1025,6 +1025,7 @@ CREATE VIEW pg_replication_slots AS
L.wal_status,
L.safe_wal_size,
L.two_phase,
+ L.two_phase_at,
L.inactive_since,
L.conflicting,
L.invalidation_reason,
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2c0a7439be4..e22d41891e6 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -139,6 +139,7 @@ typedef struct RemoteSlot
bool failover;
XLogRecPtr restart_lsn;
XLogRecPtr confirmed_lsn;
+ XLogRecPtr two_phase_at;
TransactionId catalog_xmin;
/* RS_INVAL_NONE if valid, or the reason of invalidation */
@@ -276,7 +277,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
if (remote_dbid != slot->data.database ||
remote_slot->two_phase != slot->data.two_phase ||
remote_slot->failover != slot->data.failover ||
- strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+ strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
+ remote_slot->two_phase_at != slot->data.two_phase_at)
{
NameData plugin_name;
@@ -287,6 +289,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
slot->data.plugin = plugin_name;
slot->data.database = remote_dbid;
slot->data.two_phase = remote_slot->two_phase;
+ slot->data.two_phase_at = remote_slot->two_phase_at;
slot->data.failover = remote_slot->failover;
SpinLockRelease(&slot->mutex);
@@ -788,9 +791,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
static bool
synchronize_slots(WalReceiverConn *wrconn)
{
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
- LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+ LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
WalRcvExecResult *res;
TupleTableSlot *tupslot;
@@ -798,7 +801,7 @@ synchronize_slots(WalReceiverConn *wrconn)
bool some_slot_updated = false;
bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
- " restart_lsn, catalog_xmin, two_phase, failover,"
+ " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
" database, invalidation_reason"
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
@@ -853,6 +856,9 @@ synchronize_slots(WalReceiverConn *wrconn)
&isnull));
Assert(!isnull);
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
&isnull));
Assert(!isnull);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 146eef5871e..8a314b5ff3b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -235,7 +235,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 19
+#define PG_GET_REPLICATION_SLOTS_COLS 20
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -406,6 +406,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+ if (slot_contents.data.two_phase &&
+ !XLogRecPtrIsInvalid(slot_contents.data.two_phase_at))
+ values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
+ else
+ nulls[i++] = true;
+
if (slot_contents.inactive_since > 0)
values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
else