--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * slot.c
+ * Replication slot management.
+ *
+ *
+ * Copyright (c) 2012-2013, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/slot.c
+ *
+ * NOTES
+ *
+ * Replication slots are used to keep state about replicas of the cluster
+ * they are allocated on, primarily to avoid removing resources that are
+ * still required on replicas, but also to keep information for monitoring
+ * purposes.
+ * They need to be permanent (to allow restarts), crash-safe and
+ * allocatable on standbys (to support cascading setups). Thus they need to
+ * be stored outside the catalog as writing to the catalog in a crashsafe
+ * manner isn't possible on standbys.
+ * Such slots are used both for streaming replication and changeset
+ * extraction. For the latter sometimes slot specific data needs to be
+ * serialized to disk to avoid exhausting memory.
+ * For both, changeset extraction and streaming replication we need to
+ * prevent that still required WAL will be removed/recycled and that rows
+ * we still need get vacuumed away.
+ *
+ * To allow slots to be created on standbys and to allow additional data to
+ * be stored per slot, each replication slot gets its own directory inside
+ * the $PGDATA/pg_replslot directory. Inside that directory the /state file
+ * will contain the slot's own data. Additional data can be stored
+ * alongside that file if required.
+ *
+ * While the server is running it would be inefficient always read the
+ * state files from disk, instead the data is loaded into memory at startup
+ * and most of the time only that data is accessed. Only when it is
+ * required that a certain value needs to be the same after a restart
+ * individual slots are serialized to disk.
+ *
+ * Since the individual resources that need to be managed can be described
+ * by a simple number (xmin horizon, minimal required LSN), we compute the
+ * minimum value across all slots and store it in a separate struct, so we
+ * don't have to access all slots when accessing the global minimum.
+ *
+ * The shared memory data structures are protected by the
+ * ReplicationSlotCtlLock lwlock protecting the global values in
+ * ReplicationSlotCtlData and each slot's name and in_use
+ * fields. Additionally each slot has a spinlock protecting the remaining
+ * values. That means that the existance of slots and their names can be
+ * tested while holding the lwlock in shared mode, without holding the
+ * individual spinlocks.
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/transam.h"
+
+#include "fmgr.h"
+#include "miscadmin.h"
+
+#include "replication/slot.h"
+
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+/*
+ * Replication slot on-disk data structure.
+ */
+typedef struct ReplicationSlotOnDisk
+{
+ /* first part of this struct needs to be version independent */
+
+ /* data not covered by checksum */
+ uint32 magic;
+ pg_crc32 checksum;
+
+ /* data covered by checksum */
+ uint32 version;
+ uint32 length;
+
+ /* data with potentially evolving format */
+ ReplicationSlot slot;
+} ReplicationSlotOnDisk;
+
+/* size of the part of the slot that is version independent */
+#define ReplicationSlotOnDiskConstantSize \
+ offsetof(ReplicationSlotOnDisk, slot)
+/* size of the slots that is not version indepenent */
+#define ReplicationSlotOnDiskDynamicSize \
+ sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
+
+#define SLOT_MAGIC 0x1051CA1 /* format identifier */
+#define SLOT_VERSION 1 /* version for new files */
+
+/* Control array for replication slot management */
+ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
+
+/* My backend's replication slot in the shared memory array */
+ReplicationSlot *MyReplicationSlot = NULL;
+
+/* GUCs */
+int max_replication_slots = 0; /* the maximum number of replication slots */
+
+/* internal persistency functions */
+static void RestoreSlot(const char *name);
+static void CreateSlot(ReplicationSlot *slot);
+static void SaveSlotGuts(ReplicationSlot *slot, const char *path);
+static void DeleteSlot(ReplicationSlot *slot);
+static void SaveSlot(ReplicationSlot *slot);
+
+/*
+ * Report shared-memory space needed by ReplicationSlotShmemInit.
+ */
+Size
+ReplicationSlotsShmemSize(void)
+{
+ Size size = 0;
+
+ if (max_replication_slots == 0)
+ return size;
+
+ size = offsetof(ReplicationSlotCtlData, replication_slots);
+ size = add_size(size,
+ mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+
+ return size;
+}
+
+/*
+ * Allocate and initialize walsender-related shared memory.
+ */
+void
+ReplicationSlotsShmemInit(void)
+{
+ bool found;
+
+ if (max_replication_slots == 0)
+ return;
+
+ ReplicationSlotCtl = (ReplicationSlotCtlData *)
+ ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
+ &found);
+
+ if (!found)
+ {
+ int i;
+
+ /* First time through, so initialize */
+ MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *slot =
+ &ReplicationSlotCtl->replication_slots[i];
+
+ /* everything is zero initialized by the memset above */
+
+ SpinLockInit(&slot->mutex);
+ }
+ }
+}
+
+/*
+ * Check whether the passed slot name is valid and report errors at elevel.
+ *
+ * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
+ * the name to be uses as a directory name on every supported OS.
+ *
+ * Returns whether the directory name is valid or not if elevel < ERROR.
+ */
+bool
+ReplicationSlotValidateName(const char *name, int elevel)
+{
+ const char *cp;
+
+ if (strlen(name) == 0)
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("replication slot name \"%s\" is too short",
+ name)));
+ return false;
+ }
+
+ if (strlen(name) >= NAMEDATALEN)
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("replication slot name \"%s\" is too long",
+ name)));
+ return false;
+ }
+
+ for (cp = name; *cp; cp++)
+ {
+ if (!((*cp >= 'a' && *cp <= 'z')
+ || (*cp >= '0' && *cp <= '9')
+ || (*cp == '_')))
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("replication slot name \"%s\" contains invalid character",
+ name),
+ errhint("Replication slot names may only contain letters, numbers and the underscore character.")));
+ return false;
+ }
+ }
+ return true;
+}
+
+/*
+ * Create a new replication slot and mark it as used by this backend.
+ *
+ * name: Name of the slot
+ * db_specific: changeset extraction is db specific, if the slot is going to
+ * be used for that pass true, otherwise false.
+ */
+void
+ReplicationSlotCreate(const char *name, bool db_specific)
+{
+ ReplicationSlot *slot;
+ bool name_in_use;
+ int i;
+
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ /*
+ * Prevent concurrent creation of slots, so we can safely prevent
+ * duplicate names.
+ */
+ LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+
+ /*
+ * First, make sure the requested name is not in use. No other slots can
+ * be created while we're holding ReplicationSlotCtlLock.
+ */
+ name_in_use = false;
+ for (i = 0; i < max_replication_slots && !name_in_use; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+ name_in_use = true;
+ }
+
+ if (name_in_use)
+ {
+ LWLockRelease(ReplicationSlotCtlLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("replication slot \"%s\" already exists", name)));
+ }
+
+ /*
+ * Find the first slot which is not in use (and thus not active either).
+ */
+ slot = NULL;
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (!s->in_use)
+ {
+ Assert(!s->active);
+ /* NOT releasing the lock yet */
+ slot = s;
+ break;
+ }
+ }
+
+ if (!slot)
+ {
+ LWLockRelease(ReplicationSlotCtlLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("all replication slots are in use"),
+ errhint("Free one or increase max_replication_slots.")));
+ }
+
+ /*
+ * we can be sure that creation of the slot succeeds now, so mark it as
+ * created
+ */
+ SpinLockAcquire(&slot->mutex);
+
+ slot->in_use = true;
+ slot->active = true;
+ if (db_specific)
+ slot->database = MyDatabaseId;
+ else
+ slot->database = InvalidOid;
+
+ strncpy(NameStr(slot->name), name, NAMEDATALEN);
+ NameStr(slot->name)[NAMEDATALEN - 1] = '\0';
+
+ /* release spinlock so it can be examined by others */
+ SpinLockRelease(&slot->mutex);
+
+ LWLockRelease(ReplicationSlotCtlLock);
+
+ /*
+ * Now create the on-disk data structures for this replication slot,
+ * thereby guaranteeing it's persistency. If we fail while manipulating
+ * the on-disk state in a acceptable manner, we cleanup and mark this slot
+ * as unused.
+ */
+ PG_TRY();
+ {
+ CreateSlot(slot);
+ }
+ PG_CATCH();
+ {
+ LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+ SpinLockAcquire(&slot->mutex);
+ slot->in_use = false;
+ slot->active = false;
+ SpinLockRelease(&slot->mutex);
+ LWLockRelease(ReplicationSlotCtlLock);
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ MyReplicationSlot = slot;
+}
+
+/*
+ * Find an previously created slot and mark it as used by this backend.
+ */
+void
+ReplicationSlotAcquire(const char *name)
+{
+ ReplicationSlot *slot;
+ int i;
+
+ Assert(MyReplicationSlot == NULL);
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ slot = &ReplicationSlotCtl->replication_slots[i];
+ if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0)
+ {
+ /* NOT releasing the lock yet */
+ break;
+ }
+ }
+
+ if (slot == NULL)
+ {
+ LWLockRelease(ReplicationSlotCtlLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", name)));
+ }
+
+ /* acquire spinlock so we can test and set ->active safely */
+ SpinLockAcquire(&slot->mutex);
+
+ if (slot->active)
+ {
+ SpinLockRelease(&slot->mutex);
+ LWLockRelease(ReplicationSlotCtlLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication slot \"%s\" is already active", name)));
+ }
+
+ /* we definitely have the slot, no errors possible anymore */
+ slot->active = true;
+ SpinLockRelease(&slot->mutex);
+
+ LWLockRelease(ReplicationSlotCtlLock);
+
+ PG_TRY();
+ {
+ /*
+ * We don't really need to save here, but doing so guarantees the slot
+ * is in a good state.
+ */
+ SaveSlot(slot);
+ }
+ PG_CATCH();
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->active = false;
+ SpinLockRelease(&slot->mutex);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ MyReplicationSlot = slot;
+}
+
+/*
+ * Release a replication slot, this or another backend can ReAcquire it
+ * later. Resources this slot requires will be preserved.
+ */
+void
+ReplicationSlotRelease(void)
+{
+ ReplicationSlot *slot;
+
+ slot = MyReplicationSlot;
+
+ Assert(slot != NULL && slot->active);
+
+ /*
+ * Note that we do not need to aquire the lwlock here, we're only marking
+ * the slot as inactive, not as unused.
+ */
+ SpinLockAcquire(&slot->mutex);
+ slot->active = false;
+ MyReplicationSlot = NULL;
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * XXX: There's not actually any need to save the slot to disk, it will be
+ * marked inactive after a crash/restart anyway. But we'd need to
+ * serialize all data at shutdowns or checkpoints...
+ */
+ SaveSlot(slot);
+}
+
+/*
+ * Permanently drop replication slot identified by the passed in name.
+ */
+void
+ReplicationSlotDrop(const char *name)
+{
+ ReplicationSlot *slot = NULL;
+ int i;
+
+ ReplicationSlotValidateName(name, ERROR);
+
+ LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ slot = &ReplicationSlotCtl->replication_slots[i];
+
+ if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0)
+ break;
+
+ slot = NULL;
+ }
+
+ if (slot == NULL)
+ {
+ LWLockRelease(ReplicationSlotCtlLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", name)));
+ }
+
+ /*
+ * Test whether the slot's currently acquired, we only need the spinlock
+ * for that test it cannot be newly acquired as that would require taking
+ * the lwlock.
+ */
+ SpinLockAcquire(&slot->mutex);
+ if (slot->active)
+ {
+ SpinLockRelease(&slot->mutex);
+ LWLockRelease(ReplicationSlotCtlLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not drop active replication slot \"%s\"", name)));
+ }
+
+ /* mark slot as active while we're releasing the SlotCtl lock */
+ slot->active = true;
+ SpinLockRelease(&slot->mutex);
+
+ /*
+ * Intermittently release the slot control lock. We've marked the slot as
+ * active by this slot so it is safe to do so. We can't easily hold the
+ * ctl lock across the PG_TRY/CATCH below since elog.c manipulates
+ * InterruptHoldoffCount.
+ */
+ LWLockRelease(ReplicationSlotCtlLock);
+
+ /*
+ * Now try to delete the slot's on-disk state. If we fail but not PANIC
+ * the in-memory state will still be valid.
+ */
+ PG_TRY();
+ {
+ DeleteSlot(slot);
+ }
+ PG_CATCH();
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->active = false;
+ SpinLockRelease(&slot->mutex);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+
+ /*
+ * Ok, everything gone, after a crash we now wouldn't restore this slot,
+ * so remove the in-memory state as well.
+ */
+ LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+ SpinLockAcquire(&slot->mutex);
+ slot->active = false;
+ slot->in_use = false;
+ SpinLockRelease(&slot->mutex);
+ LWLockRelease(ReplicationSlotCtlLock);
+
+ /* slot is dead and doesn't nail the xmin anymore, recompute horizon */
+ ReplicationSlotsComputeRequiredXmin(false);
+}
+
+/*
+ * Serialize the currently acquired slot's state from memory to disk, thereby
+ * guaranteeing the current state will survive a crash.
+ */
+void
+ReplicationSlotSave()
+{
+ char path[MAXPGPATH];
+
+ Assert(MyReplicationSlot != NULL);
+
+ sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->name));
+ SaveSlotGuts(MyReplicationSlot, path);
+}
+
+/*
+ * Compute the xmin between all of the decoding slots and store it in
+ * ReplicationSlotCtlData.
+ *
+ * If already_locked is passed in ProcArrayLock is already held exlusively.
+ */
+void
+ReplicationSlotsComputeRequiredXmin(bool already_locked)
+{
+ int i;
+ TransactionId agg_data_xmin = InvalidTransactionId;
+
+ Assert(ReplicationSlotCtl != NULL);
+
+ /* make sure slots aren't concurrently dropped/created */
+ LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ TransactionId effective_data_xmin;
+
+ /* extra block to ensure nothing is accessed without spinlock */
+ {
+ ReplicationSlot *slot;
+ slot = &ReplicationSlotCtl->replication_slots[i];
+ if (!slot->in_use)
+ continue;
+
+ SpinLockAcquire(&slot->mutex);
+ effective_data_xmin = slot->effective_data_xmin;
+ SpinLockRelease(&slot->mutex);
+ }
+
+ /* check the data xmin */
+ if (TransactionIdIsValid(effective_data_xmin) &&
+ (!TransactionIdIsValid(agg_data_xmin) ||
+ TransactionIdPrecedes(effective_data_xmin, agg_data_xmin)))
+ agg_data_xmin = effective_data_xmin;
+ }
+ LWLockRelease(ReplicationSlotCtlLock);
+
+ ProcArraySetPeggedXmin(agg_data_xmin, already_locked);
+}
+
+/*
+ * Compute the xmin between all of the decoding slots and store it in
+ * ReplicationSlotCtlData.
+ */
+void
+ReplicationSlotsComputeRequiredLSN(void)
+{
+ int i;
+ XLogRecPtr min_required = InvalidXLogRecPtr;
+
+ Assert(ReplicationSlotCtl != NULL);
+
+ /* make sure slots aren't concurrently dropped/created */
+ LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ XLogRecPtr restart_decoding;
+
+ /* extra block to ensure nothing is accessed without spinlock */
+ {
+ ReplicationSlot *slot;
+ slot = &ReplicationSlotCtl->replication_slots[i];
+ if (!slot->in_use)
+ continue;
+
+ SpinLockAcquire(&slot->mutex);
+ restart_decoding = slot->restart_decoding;
+ SpinLockRelease(&slot->mutex);
+ }
+
+ if (restart_decoding != InvalidXLogRecPtr &&
+ (min_required == InvalidXLogRecPtr ||
+ restart_decoding < min_required))
+ min_required = restart_decoding;
+ }
+
+ LWLockRelease(ReplicationSlotCtlLock);
+
+ XLogSetPeggedLSN(min_required);
+}
+
+/*
+ * Check whether the server's configuration supports using replication
+ * slots. Usage or creating a replication slot might still fail, even if this
+ * function doesn't error out.
+ */
+void
+CheckSlotRequirements(void)
+{
+ if (max_replication_slots == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ (errmsg("replication slot usage requires max_replication_slots > 0"))));
+
+ if (wal_level < WAL_LEVEL_ARCHIVE)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot usage requires wal_level >= archive")));
+}
+
+/*
+ * Make sure this backend hasn't acquired a replication slot when exiting.
+ */
+void
+ReplicationSlotAtProcExit(void)
+{
+ if (MyReplicationSlot && MyReplicationSlot->active)
+ {
+ /*
+ * Acquire spinlock so other backends are guaranteed to see this in
+ * time - we cannot generally acquire the lwlock here since we might
+ * be still holding it in an error path.
+ */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->active = false;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+ }
+ MyReplicationSlot = NULL;
+}
+
+/*
+ * Returns whether the string `str' has the postfix `end'.
+ */
+static bool
+string_endswith(const char *str, const char *end)
+{
+ size_t slen = strlen(str);
+ size_t elen = strlen(end);
+
+ /* can't be a postfix if longer */
+ if (elen > slen)
+ return false;
+
+ /* compare the end of the strings */
+ str += slen - elen;
+ return strcmp(str, end) == 0;
+}
+
+/*
+ * Load all replication slots from disk into memory at server startup. This
+ * needs to be run before we start crash recovery.
+ */
+void
+StartupReplicationSlots(XLogRecPtr checkPointRedo)
+{
+ DIR *replication_dir;
+ struct dirent *replication_de;
+
+ ereport(DEBUG1,
+ (errmsg("starting up replication slots from LSN %X/%X",
+ (uint32) (checkPointRedo >> 32), (uint32) checkPointRedo)));
+
+ /* restore all slots by iterating over all on-disk entries */
+ replication_dir = AllocateDir("pg_replslot");
+ while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
+ {
+ struct stat statbuf;
+ char path[MAXPGPATH];
+
+ if (strcmp(replication_de->d_name, ".") == 0 ||
+ strcmp(replication_de->d_name, "..") == 0)
+ continue;
+
+ snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
+
+ /* we're only creating directories here, skip if it's not our's */
+ if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+ continue;
+
+ /* we crashed while a slot was being setup or deleted, clean up */
+ if (string_endswith(replication_de->d_name, ".tmp"))
+ {
+ if (!rmtree(path, true))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove directory \"%s\"", path),
+ errmsg("some useless files may be left behind in that directory preventing creation of new slots")));
+ continue;
+ }
+ fsync_fname("pg_replslot", true);
+ continue;
+ }
+
+ /* looks like a slot in a normal state, restore */
+ RestoreSlot(replication_de->d_name);
+ }
+ FreeDir(replication_dir);
+
+ /* currently no slots exist, we're done. */
+ if (max_replication_slots <= 0)
+ return;
+
+ /* Now that we have recovered all the data, compute replication xmin */
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+}
+
+/* ----
+ * Manipulation of ondisk state of replication slots
+ *
+ * NB: none of the routines below should take any notice whether a slot is the
+ * current one or not, that's all handled a layer above.
+ * ----
+ */
+static void
+CreateSlot(ReplicationSlot *slot)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+
+ sprintf(path, "pg_replslot/%s", NameStr(slot->name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+
+ if (mkdir(tmppath, S_IRWXU) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create directory \"%s\": %m",
+ tmppath)));
+
+ fsync_fname(tmppath, true);
+
+ SaveSlotGuts(slot, tmppath);
+
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ }
+
+ /*
+ * If we'd now fail - really unlikely - we wouldn't know wether this slot
+ * would persist after an OS crash or not - so, force a restart. The
+ * restart would try to fysnc this again till it works.
+ */
+ START_CRIT_SECTION();
+
+ fsync_fname(path, true);
+ fsync_fname("pg_replslot", true);
+
+ END_CRIT_SECTION();
+
+}
+
+/*
+ * Serialize the passed to disk.
+ */
+static void
+SaveSlot(ReplicationSlot *slot)
+{
+ char path[MAXPGPATH];
+
+ sprintf(path, "pg_replslot/%s", NameStr(slot->name));
+ SaveSlotGuts(slot, path);
+}
+
+/*
+ * Shared functionality between saving and creating a replication slot.
+ */
+static void
+SaveSlotGuts(ReplicationSlot *slot, const char *dir)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ int fd;
+ ReplicationSlotOnDisk cp;
+
+ /* silence valgrind :( */
+ memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
+
+ sprintf(tmppath, "%s/state.tmp", dir);
+ sprintf(path, "%s/state", dir);
+
+ fd = OpenTransientFile(tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ tmppath)));
+
+ cp.magic = SLOT_MAGIC;
+ INIT_CRC32(cp.checksum);
+ cp.version = 1;
+ cp.length = ReplicationSlotOnDiskDynamicSize;
+
+ SpinLockAcquire(&slot->mutex);
+
+ cp.slot.data_xmin = slot->data_xmin;
+ cp.slot.effective_data_xmin = slot->effective_data_xmin;
+
+ strcpy(NameStr(cp.slot.name), NameStr(slot->name));
+
+ cp.slot.database = slot->database;
+ cp.slot.confirmed_flush = slot->confirmed_flush;
+ cp.slot.restart_decoding = slot->restart_decoding;
+ cp.slot.in_use = slot->in_use;
+ cp.slot.active = false;
+
+ SpinLockRelease(&slot->mutex);
+
+ COMP_CRC32(cp.checksum,
+ (char *)(&cp) + ReplicationSlotOnDiskConstantSize,
+ ReplicationSlotOnDiskDynamicSize);
+
+ if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
+ {
+ CloseTransientFile(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ }
+
+ /* fsync the temporary file */
+ if (pg_fsync(fd) != 0)
+ {
+ CloseTransientFile(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ tmppath)));
+ }
+
+ CloseTransientFile(fd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ }
+
+ /* Check CreateSlot() for the reasoning of using a crit. section. */
+ START_CRIT_SECTION();
+
+ fsync_fname(path, false);
+ fsync_fname((char *) dir, true);
+ fsync_fname("pg_replslot", true);
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Delete a single slot from disk, not touching the in-memory state.
+ */
+static void
+DeleteSlot(ReplicationSlot *slot)
+{
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
+
+ sprintf(path, "pg_replslot/%s", NameStr(slot->name));
+ sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+
+ if (rename(path, tmppath) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename \"%s\" to \"%s\": %m",
+ path, tmppath)));
+
+ /* Check CreateSlot() for the reasoning of using a crit. section. */
+ START_CRIT_SECTION();
+
+ fsync_fname(tmppath, true);
+ fsync_fname("pg_replslot", true);
+
+ END_CRIT_SECTION();
+
+ /*
+ * If we fail during removal of the directory, we'll just be unable to
+ * create new slots of this name till a restart of the server. Don't
+ * panic.
+ */
+ if (!rmtree(tmppath, true))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove directory \"%s\"", tmppath),
+ errmsg("some useless files may be left behind in that directory preventing creation of new slots")));
+ }
+}
+
+/*
+ * Load a single slot from disk into memory.
+ */
+static void
+RestoreSlot(const char *name)
+{
+ ReplicationSlotOnDisk cp;
+ int i;
+ char path[MAXPGPATH];
+ int fd;
+ bool restored = false;
+ int readBytes;
+ pg_crc32 checksum;
+
+ /* delete temp file if it exists */
+ sprintf(path, "pg_replslot/%s/state.tmp", name);
+ if (unlink(path) < 0 && errno != ENOENT)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not unlink file \"%s\": %m", path)));
+
+ sprintf(path, "pg_replslot/%s/state", name);
+
+ elog(DEBUG1, "restoring replication slot from \"%s\"", path);
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * We do not need to handle this as we are rename()ing the directory into
+ * place only after we fsync()ed the state file.
+ */
+ if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", path)));
+
+ /*
+ * Sync state file before we're reading from it. We might have crashed
+ * while it wasn't synced yet and we shouldn't continue on that basis.
+ */
+ if (pg_fsync(fd) != 0)
+ {
+ CloseTransientFile(fd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ path)));
+ }
+
+ /* Also sync the parent directory */
+ START_CRIT_SECTION();
+ fsync_fname(path, true);
+ END_CRIT_SECTION();
+
+ /* read part of statefile that's guaranteed to be version independent */
+ readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+ if (readBytes != ReplicationSlotOnDiskConstantSize)
+ {
+ int saved_errno = errno;
+
+ CloseTransientFile(fd);
+ errno = saved_errno;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\", read %d of %u: %m",
+ path, readBytes,
+ (uint32) ReplicationSlotOnDiskConstantSize)));
+ }
+
+ /* verify magic */
+ if (cp.magic != SLOT_MAGIC)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
+ path, cp.magic, SLOT_MAGIC)));
+
+ /* verify version */
+ if (cp.version != SLOT_VERSION)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has unsupported version %u",
+ path, cp.version)));
+
+ /* boundary check on length */
+ if (cp.length != ReplicationSlotOnDiskDynamicSize)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("replication slot file \"%s\" has corrupted length %u",
+ path, cp.length)));
+
+ /* Now that we know the size, read the entire file */
+ readBytes = read(fd,
+ (char *)&cp + ReplicationSlotOnDiskConstantSize,
+ cp.length);
+ if (readBytes != cp.length)
+ {
+ int saved_errno = errno;
+
+ CloseTransientFile(fd);
+ errno = saved_errno;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\", read %d of %u: %m",
+ path, readBytes, cp.length)));
+ }
+
+ CloseTransientFile(fd);
+
+ /* now verify the CRC32 */
+ INIT_CRC32(checksum);
+ COMP_CRC32(checksum,
+ (char *)&cp + ReplicationSlotOnDiskConstantSize,
+ ReplicationSlotOnDiskDynamicSize);
+
+ if (!EQ_CRC32(checksum, cp.checksum))
+ ereport(PANIC,
+ (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
+ path, checksum, cp.checksum)));
+
+ /* nothing can be active yet, don't lock anything */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *slot;
+
+ slot = &ReplicationSlotCtl->replication_slots[i];
+
+ if (slot->in_use)
+ continue;
+
+ slot->data_xmin = cp.slot.data_xmin;
+ /*
+ * after a crash, always use xmin, not effective_xmin, the
+ * slot obviously survived
+ */
+ slot->effective_data_xmin = cp.slot.data_xmin;
+ strcpy(NameStr(slot->name), NameStr(cp.slot.name));
+ slot->database = cp.slot.database;
+ slot->restart_decoding = cp.slot.restart_decoding;
+ slot->confirmed_flush = cp.slot.confirmed_flush;
+ /* ignore previous values, they are transient */
+ slot->in_use = true;
+ slot->active = false;
+ restored = true;
+ break;
+ }
+
+ if (!restored)
+ ereport(PANIC,
+ (errmsg("too many replication slots active before shutdown"),
+ errhint("Increase max_replication_slots and try again.")));
+}