for (slotno = 0; slotno < max_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+ ReplicationSlot slot_contents;
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
-
- ReplicationSlotPersistency persistency;
- TransactionId xmin;
- TransactionId catalog_xmin;
- XLogRecPtr restart_lsn;
- XLogRecPtr confirmed_flush_lsn;
- pid_t active_pid;
- Oid database;
- NameData slot_name;
- NameData plugin;
WALAvailability walstate;
XLogSegNo last_removed_seg;
int i;
if (!slot->in_use)
continue;
+ /* Copy slot contents while holding spinlock, then examine at leisure */
SpinLockAcquire(&slot->mutex);
-
- xmin = slot->data.xmin;
- catalog_xmin = slot->data.catalog_xmin;
- database = slot->data.database;
- restart_lsn = slot->data.restart_lsn;
- confirmed_flush_lsn = slot->data.confirmed_flush;
- namecpy(&slot_name, &slot->data.name);
- namecpy(&plugin, &slot->data.plugin);
- active_pid = slot->active_pid;
- persistency = slot->data.persistency;
-
+ slot_contents = *slot;
SpinLockRelease(&slot->mutex);
+ memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
i = 0;
- values[i++] = NameGetDatum(&slot_name);
+ values[i++] = NameGetDatum(&slot_contents.data.name);
- if (database == InvalidOid)
+ if (slot_contents.data.database == InvalidOid)
nulls[i++] = true;
else
- values[i++] = NameGetDatum(&plugin);
+ values[i++] = NameGetDatum(&slot_contents.data.plugin);
- if (database == InvalidOid)
+ if (slot_contents.data.database == InvalidOid)
values[i++] = CStringGetTextDatum("physical");
else
values[i++] = CStringGetTextDatum("logical");
- if (database == InvalidOid)
+ if (slot_contents.data.database == InvalidOid)
nulls[i++] = true;
else
- values[i++] = database;
+ values[i++] = ObjectIdGetDatum(slot_contents.data.database);
- values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
- values[i++] = BoolGetDatum(active_pid != 0);
+ values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
+ values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
- if (active_pid != 0)
- values[i++] = Int32GetDatum(active_pid);
+ if (slot_contents.active_pid != 0)
+ values[i++] = Int32GetDatum(slot_contents.active_pid);
else
nulls[i++] = true;
- if (xmin != InvalidTransactionId)
- values[i++] = TransactionIdGetDatum(xmin);
+ if (slot_contents.data.xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
else
nulls[i++] = true;
- if (catalog_xmin != InvalidTransactionId)
- values[i++] = TransactionIdGetDatum(catalog_xmin);
+ if (slot_contents.data.catalog_xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
else
nulls[i++] = true;
- if (restart_lsn != InvalidXLogRecPtr)
- values[i++] = LSNGetDatum(restart_lsn);
+ if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+ values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
else
nulls[i++] = true;
- if (confirmed_flush_lsn != InvalidXLogRecPtr)
- values[i++] = LSNGetDatum(confirmed_flush_lsn);
+ if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+ values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
else
nulls[i++] = true;
- walstate = GetWALAvailability(restart_lsn);
+ walstate = GetWALAvailability(slot_contents.data.restart_lsn);
switch (walstate)
{
case WALAVAIL_REMOVED:
values[i++] = CStringGetTextDatum("lost");
break;
+
+ default:
+ elog(ERROR, "invalid walstate: %d", (int) walstate);
}
if (max_slot_wal_keep_size_mb >= 0 &&
else
nulls[i++] = true;
+ Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
+
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
+
LWLockRelease(ReplicationSlotControlLock);
tuplestore_donestoring(tupstore);
Name src_name = PG_GETARG_NAME(0);
Name dst_name = PG_GETARG_NAME(1);
ReplicationSlot *src = NULL;
+ ReplicationSlot first_slot_contents;
+ ReplicationSlot second_slot_contents;
XLogRecPtr src_restart_lsn;
bool src_islogical;
bool temporary;
if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
{
+ /* Copy the slot contents while holding spinlock */
SpinLockAcquire(&s->mutex);
- src_islogical = SlotIsLogical(s);
- src_restart_lsn = s->data.restart_lsn;
- temporary = s->data.persistency == RS_TEMPORARY;
- plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
+ first_slot_contents = *s;
SpinLockRelease(&s->mutex);
-
src = s;
break;
}
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
+ src_islogical = SlotIsLogical(&first_slot_contents);
+ src_restart_lsn = first_slot_contents.data.restart_lsn;
+ temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
+ plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
+
/* Check type of replication slot */
if (src_islogical != logical_slot)
ereport(ERROR,
/* Copy data of source slot again */
SpinLockAcquire(&src->mutex);
- copy_effective_xmin = src->effective_xmin;
- copy_effective_catalog_xmin = src->effective_catalog_xmin;
+ second_slot_contents = *src;
+ SpinLockRelease(&src->mutex);
- copy_xmin = src->data.xmin;
- copy_catalog_xmin = src->data.catalog_xmin;
- copy_restart_lsn = src->data.restart_lsn;
- copy_confirmed_flush = src->data.confirmed_flush;
+ copy_effective_xmin = second_slot_contents.effective_xmin;
+ copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
+
+ copy_xmin = second_slot_contents.data.xmin;
+ copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
+ copy_restart_lsn = second_slot_contents.data.restart_lsn;
+ copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
/* for existence check */
- copy_name = pstrdup(NameStr(src->data.name));
- copy_islogical = SlotIsLogical(src);
- SpinLockRelease(&src->mutex);
+ copy_name = NameStr(second_slot_contents.data.name);
+ copy_islogical = SlotIsLogical(&second_slot_contents);
/*
* Check if the source slot still exists and is valid. We regard it as