/* everything else is zeroed by the memset above */
SpinLockInit(&slot->mutex);
+ slot->io_in_progress_lock = LWLockAssign();
}
}
}
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot;
-
- /*
- * XXX. There used to be code to save the state here with the following
- * comment: "We don't really need to save here, but doing so guarantees
- * the slot is in a good state." It's not clear to me what is meant by
- * a good state, and it seems like the slot had better already be in
- * such a state, or we're screwed anyway. From the point of view of
- * crash recovery, merely acquiring the slot (without updating its
- * contents) is a non-event.
- */
}
/*
return strcmp(str, end) == 0;
}
+/*
+ * Flush all replication slots to disk.
+ *
+ * This needn't actually be part of a checkpoint, but it's a convenient
+ * location.
+ */
+void
+CheckPointReplicationSlots(void)
+{
+ int i;
+
+ ereport(DEBUG1, (errmsg("performing replication slot checkpoint")));
+
+ /*
+ * Prevent any slot from being created/dropped while we're active. As we
+ * explicitly do *not* want to block iterating over replication_slots or
+ * acquiring a slot we cannot take the control lock.
+ */
+ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ char path[MAXPGPATH];
+
+ if (!s->in_use)
+ continue;
+
+ /* save the slot to disk, locking is handled in SaveSlotToPath() */
+ sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+ SaveSlotToPath(s, path);
+ }
+ LWLockRelease(ReplicationSlotAllocationLock);
+}
+
/*
* Load all replication slots from disk into memory at server startup. This
* needs to be run before we start crash recovery.
char tmppath[MAXPGPATH];
char path[MAXPGPATH];
+ /*
+ * No need to take out the io_in_progress_lock, nobody else can see this
+ * slot yet, so nobody else wil write. We're reusing SaveSlotToPath which
+ * takes out the lock, if we'd take the lock here, we'd deadlock.
+ */
+
sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
fsync_fname("pg_replslot", true);
END_CRIT_SECTION();
-
}
/*
int fd;
ReplicationSlotOnDisk cp;
+ LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
+
/* silence valgrind :( */
memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
SpinLockAcquire(&slot->mutex);
- cp.slotdata.data_xmin = slot->data.data_xmin;
-
- strcpy(NameStr(cp.slotdata.name), NameStr(slot->data.name));
-
- cp.slotdata.database = slot->data.database;
- cp.slotdata.restart_lsn = slot->data.restart_lsn;
+ memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
SpinLockRelease(&slot->mutex);
fsync_fname("pg_replslot", true);
END_CRIT_SECTION();
+
+ LWLockRelease(slot->io_in_progress_lock);
}
/*
int readBytes;
pg_crc32 checksum;
+ /* no need to lock here, no concurrent access allowed yet */
+
/* delete temp file if it exists */
sprintf(path, "pg_replslot/%s/state.tmp", name);
if (unlink(path) < 0 && errno != ENOENT)
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
+#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
/* is somebody streaming out changes for this slot */
bool active;
+ /* any outstanding modifications? */
+ bool dirty;
+
/* data surviving shutdowns and crashes */
ReplicationSlotPersistentData data;
/* in-memory xmin horizon, updated after syncing to disk, used for computations */
TransactionId effective_data_xmin;
+ /* is somebody performing io on this slot? */
+ LWLock *io_in_progress_lock;
} ReplicationSlot;
/*
extern void ReplicationSlotsComputeRequiredXmin(void);
extern void ReplicationSlotsComputeRequiredLSN(void);
extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void CheckPointReplicationSlots(void);
+
extern void CheckSlotRequirements(void);
extern void ReplicationSlotAtProcExit(void);