wal_decoding: Introduce the replication slot interface for resource reservation
authorAndres Freund <andres@anarazel.de>
Mon, 27 Jan 2014 16:13:48 +0000 (17:13 +0100)
committerRobert Haas <rhaas@postgresql.org>
Mon, 27 Jan 2014 22:15:25 +0000 (17:15 -0500)
18 files changed:
src/backend/access/transam/xlog.c
src/backend/catalog/system_views.sql
src/backend/replication/Makefile
src/backend/replication/slot.c [new file with mode: 0644]
src/backend/replication/slotfuncs.c [new file with mode: 0644]
src/backend/storage/ipc/ipci.c
src/backend/storage/ipc/procarray.c
src/backend/storage/lmgr/proc.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/bin/initdb/initdb.c
src/include/access/xlog.h
src/include/catalog/pg_proc.h
src/include/nodes/replnodes.h
src/include/replication/slot.h [new file with mode: 0644]
src/include/storage/lwlock.h
src/include/storage/procarray.h
src/test/regress/expected/rules.out

index b333d820c7236e9df0ed5472857fe94f2484fa56..8d52bdcd9e85438304f7c8f3abb1436ba7a6cb06 100644 (file)
@@ -39,6 +39,7 @@
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/startup.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/barrier.h"
@@ -485,6 +486,8 @@ typedef struct XLogCtlData
        uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
        TransactionId ckptXid;
        XLogRecPtr      asyncXactLSN;   /* LSN of newest async commit/abort */
+       XLogRecPtr      peggedLSN;              /* LSN after which segments can be removed */
+
        XLogSegNo       lastRemovedSegNo;               /* latest removed/recycled XLOG
                                                                                 * segment */
 
@@ -748,6 +751,7 @@ static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static XLogRecPtr XLogGetPeggedLSN(void);
 
 static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -2908,6 +2912,39 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
                SetLatch(ProcGlobal->walwriterLatch);
 }
 
+/*
+ * Record the LSN up to which we can remove WAL because it's not required for
+ * replication.
+ */
+void
+XLogSetPeggedLSN(XLogRecPtr lsn)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       xlogctl->peggedLSN = lsn;
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+
+/*
+ * Return the LSN up to which we can remove WAL because it's not required for
+ * replication.
+ */
+static XLogRecPtr
+XLogGetPeggedLSN(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr              retval;
+       SpinLockAcquire(&xlogctl->info_lck);
+       retval = xlogctl->peggedLSN;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return retval;
+}
+
 /*
  * Advance minRecoveryPoint in control file.
  *
@@ -8938,24 +8975,39 @@ CreateRestartPoint(int flags)
 
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
- * wal_keep_segments. This is calculated by subtracting wal_keep_segments
- * from the given xlog location, recptr.
+ * either wal_keep_segments or replication slots.
+ *
+ * This is calculated by subtracting wal_keep_segments from the given xlog
+ * location, recptr and by making sure that that result is below the
+ * requirement of replication slots.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
-       XLogSegNo       segno;
-
-       if (wal_keep_segments == 0)
-               return;
+       XLogSegNo       segno, slotSegNo;
+       XLogRecPtr      keep;
 
        XLByteToSeg(recptr, segno);
+       keep = XLogGetPeggedLSN();
 
-       /* avoid underflow, don't go below 1 */
-       if (segno <= wal_keep_segments)
-               segno = 1;
-       else
-               segno = segno - wal_keep_segments;
+       /* compute limit for wal_keep_segments first */
+       if (wal_keep_segments > 0)
+       {
+               /* avoid underflow, don't go below 1 */
+               if (segno <= wal_keep_segments)
+                       segno = 1;
+               else
+                       segno = segno - wal_keep_segments;
+       }
+
+       /* then check whether slots limit removal further */
+       if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
+       {
+               XLByteToPrevSeg(keep, slotSegNo);
+
+               if (slotSegNo < segno)
+                       segno = slotSegNo;
+       }
 
        /* don't delete WAL segments newer than the calculated segment */
        if (segno < *logSegNo)
index 043d1181fd73a91a1135641eb210049b675503ea..e17eeeb9aef0734d1716368b2f210896ff25877d 100644 (file)
@@ -613,6 +613,18 @@ CREATE VIEW pg_stat_replication AS
     WHERE S.usesysid = U.oid AND
             S.pid = W.pid;
 
+CREATE VIEW pg_replication_slots AS
+    SELECT
+            L.slot_name,
+            L.slot_type,
+            L.datoid,
+            D.datname AS database,
+            L.active,
+            L.data_xmin,
+            L.restart_decoding_lsn
+    FROM pg_get_replication_slots() AS L
+            LEFT JOIN pg_database D ON (L.datoid = D.oid);
+
 CREATE VIEW pg_stat_database AS
     SELECT
             D.oid AS datid,
index 2dde0118a47185d6de954725c55abc7007afd20d..7941cb8d5e7663d4b4bb72f53b685a1f326de0cb 100644 (file)
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
-       repl_gram.o syncrep.o
+       repl_gram.o slot.o slotfuncs.o syncrep.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
new file mode 100644 (file)
index 0000000..ca51eef
--- /dev/null
@@ -0,0 +1,1090 @@
+/*-------------------------------------------------------------------------
+ *
+ * slot.c
+ *        Replication slot management.
+ *
+ *
+ * Copyright (c) 2012-2013, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/slot.c
+ *
+ * NOTES
+ *
+ *       Replication slots are used to keep state about replicas of the cluster
+ *       they are allocated on, primarily to avoid removing resources that are
+ *       still required on replicas, but also to keep information for monitoring
+ *       purposes.
+ *       They need to be permanent (to allow restarts), crash-safe and
+ *       allocatable on standbys (to support cascading setups). Thus they need to
+ *       be stored outside the catalog as writing to the catalog in a crashsafe
+ *       manner isn't possible on standbys.
+ *    Such slots are used both for streaming replication and changeset
+ *    extraction. For the latter sometimes slot specific data needs to be
+ *    serialized to disk to avoid exhausting memory.
+ *    For both, changeset extraction and streaming replication we need to
+ *    prevent that still required WAL will be removed/recycled and that rows
+ *    we still need get vacuumed away.
+ *
+ *    To allow slots to be created on standbys and to allow additional data to
+ *    be stored per slot, each replication slot gets its own directory inside
+ *    the $PGDATA/pg_replslot directory. Inside that directory the /state file
+ *    will contain the slot's own data. Additional data can be stored
+ *    alongside that file if required.
+ *
+ *    While the server is running it would be inefficient always read the
+ *    state files from disk, instead the data is loaded into memory at startup
+ *    and most of the time only that data is accessed. Only when it is
+ *    required that a certain value needs to be the same after a restart
+ *    individual slots are serialized to disk.
+ *
+ *    Since the individual resources that need to be managed can be described
+ *    by a simple number (xmin horizon, minimal required LSN), we compute the
+ *    minimum value across all slots and store it in a separate struct, so we
+ *    don't have to access all slots when accessing the global minimum.
+ *
+ *    The shared memory data structures are protected by the
+ *    ReplicationSlotCtlLock lwlock protecting the global values in
+ *    ReplicationSlotCtlData and each slot's name and in_use
+ *    fields. Additionally each slot has a spinlock protecting the remaining
+ *    values. That means that the existance of slots and their names can be
+ *    tested while holding the lwlock in shared mode, without holding the
+ *    individual spinlocks.
+ *    -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/transam.h"
+
+#include "fmgr.h"
+#include "miscadmin.h"
+
+#include "replication/slot.h"
+
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+/*
+ * Replication slot on-disk data structure.
+ */
+typedef struct ReplicationSlotOnDisk
+{
+       /* first part of this struct needs to be version independent */
+
+       /* data not covered by checksum */
+       uint32          magic;
+       pg_crc32        checksum;
+
+       /* data covered by checksum */
+       uint32          version;
+       uint32          length;
+
+       /* data with potentially evolving format */
+       ReplicationSlot slot;
+} ReplicationSlotOnDisk;
+
+/* size of the part of the slot that is version independent */
+#define ReplicationSlotOnDiskConstantSize \
+       offsetof(ReplicationSlotOnDisk, slot)
+/* size of the slots that is not version indepenent */
+#define ReplicationSlotOnDiskDynamicSize \
+       sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
+
+#define SLOT_MAGIC             0x1051CA1               /* format identifier */
+#define SLOT_VERSION   1                               /* version for new files */
+
+/* Control array for replication slot management */
+ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
+
+/* My backend's replication slot in the shared memory array */
+ReplicationSlot *MyReplicationSlot = NULL;
+
+/* GUCs */
+int                    max_replication_slots = 0;      /* the maximum number of replication slots */
+
+/* internal persistency functions */
+static void RestoreSlot(const char *name);
+static void CreateSlot(ReplicationSlot *slot);
+static void SaveSlotGuts(ReplicationSlot *slot, const char *path);
+static void DeleteSlot(ReplicationSlot *slot);
+static void SaveSlot(ReplicationSlot *slot);
+
+/*
+ * Report shared-memory space needed by ReplicationSlotShmemInit.
+ */
+Size
+ReplicationSlotsShmemSize(void)
+{
+       Size            size = 0;
+
+       if (max_replication_slots == 0)
+               return size;
+
+       size = offsetof(ReplicationSlotCtlData, replication_slots);
+       size = add_size(size,
+                                       mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+
+       return size;
+}
+
+/*
+ * Allocate and initialize walsender-related shared memory.
+ */
+void
+ReplicationSlotsShmemInit(void)
+{
+       bool            found;
+
+       if (max_replication_slots == 0)
+               return;
+
+       ReplicationSlotCtl = (ReplicationSlotCtlData *)
+               ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
+                                               &found);
+
+       if (!found)
+       {
+               int                     i;
+
+               /* First time through, so initialize */
+               MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+
+               for (i = 0; i < max_replication_slots; i++)
+               {
+                       ReplicationSlot *slot =
+                       &ReplicationSlotCtl->replication_slots[i];
+
+                       /* everything is zero initialized by the memset above */
+
+                       SpinLockInit(&slot->mutex);
+               }
+       }
+}
+
+/*
+ * Check whether the passed slot name is valid and report errors at elevel.
+ *
+ * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
+ * the name to be uses as a directory name on every supported OS.
+ *
+ * Returns whether the directory name is valid or not if elevel < ERROR.
+ */
+bool
+ReplicationSlotValidateName(const char *name, int elevel)
+{
+       const char *cp;
+
+       if (strlen(name) == 0)
+       {
+               ereport(elevel,
+                               (errcode(ERRCODE_INVALID_NAME),
+                                errmsg("replication slot name \"%s\" is too short",
+                                               name)));
+               return false;
+       }
+
+       if (strlen(name) >= NAMEDATALEN)
+       {
+               ereport(elevel,
+                               (errcode(ERRCODE_NAME_TOO_LONG),
+                                errmsg("replication slot name \"%s\" is too long",
+                                               name)));
+               return false;
+       }
+
+       for (cp = name; *cp; cp++)
+       {
+               if (!((*cp >= 'a' && *cp <= 'z')
+                         || (*cp >= '0' && *cp <= '9')
+                         || (*cp == '_')))
+               {
+                       ereport(elevel,
+                                       (errcode(ERRCODE_INVALID_NAME),
+                                        errmsg("replication slot name \"%s\" contains invalid character",
+                                                       name),
+                                        errhint("Replication slot names may only contain letters, numbers and the underscore character.")));
+                       return false;
+               }
+       }
+       return true;
+}
+
+/*
+ * Create a new replication slot and mark it as used by this backend.
+ *
+ * name: Name of the slot
+ * db_specific: changeset extraction is db specific, if the slot is going to
+ *     be used for that pass true, otherwise false.
+ */
+void
+ReplicationSlotCreate(const char *name, bool db_specific)
+{
+       ReplicationSlot *slot;
+       bool            name_in_use;
+       int                     i;
+
+       Assert(MyReplicationSlot == NULL);
+
+       ReplicationSlotValidateName(name, ERROR);
+
+       /*
+        * Prevent concurrent creation of slots, so we can safely prevent
+        * duplicate names.
+        */
+       LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+
+       /*
+        * First, make sure the requested name is not in use. No other slots can
+        * be created while we're holding ReplicationSlotCtlLock.
+        */
+       name_in_use = false;
+       for (i = 0; i < max_replication_slots && !name_in_use; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (s->in_use && strcmp(name, NameStr(s->name)) == 0)
+                       name_in_use = true;
+       }
+
+       if (name_in_use)
+       {
+               LWLockRelease(ReplicationSlotCtlLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_OBJECT),
+                                errmsg("replication slot \"%s\" already exists", name)));
+       }
+
+       /*
+        * Find the first slot which is not in use (and thus not active either).
+        */
+       slot = NULL;
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (!s->in_use)
+               {
+                       Assert(!s->active);
+                       /* NOT releasing the lock yet */
+                       slot = s;
+                       break;
+               }
+       }
+
+       if (!slot)
+       {
+               LWLockRelease(ReplicationSlotCtlLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+                                errmsg("all replication slots are in use"),
+                                errhint("Free one or increase max_replication_slots.")));
+       }
+
+       /*
+        * we can be sure that creation of the slot succeeds now, so mark it as
+        * created
+        */
+       SpinLockAcquire(&slot->mutex);
+
+       slot->in_use = true;
+       slot->active = true;
+       if (db_specific)
+               slot->database = MyDatabaseId;
+       else
+               slot->database = InvalidOid;
+
+       strncpy(NameStr(slot->name), name, NAMEDATALEN);
+       NameStr(slot->name)[NAMEDATALEN - 1] = '\0';
+
+       /* release spinlock so it can be examined by others */
+       SpinLockRelease(&slot->mutex);
+
+       LWLockRelease(ReplicationSlotCtlLock);
+
+       /*
+        * Now create the on-disk data structures for this replication slot,
+        * thereby guaranteeing it's persistency. If we fail while manipulating
+        * the on-disk state in a acceptable manner, we cleanup and mark this slot
+        * as unused.
+        */
+       PG_TRY();
+       {
+               CreateSlot(slot);
+       }
+       PG_CATCH();
+       {
+               LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+               SpinLockAcquire(&slot->mutex);
+               slot->in_use = false;
+               slot->active = false;
+               SpinLockRelease(&slot->mutex);
+               LWLockRelease(ReplicationSlotCtlLock);
+
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       MyReplicationSlot = slot;
+}
+
+/*
+ * Find an previously created slot and mark it as used by this backend.
+ */
+void
+ReplicationSlotAcquire(const char *name)
+{
+       ReplicationSlot *slot;
+       int                     i;
+
+       Assert(MyReplicationSlot == NULL);
+
+       ReplicationSlotValidateName(name, ERROR);
+
+       LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
+
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               slot = &ReplicationSlotCtl->replication_slots[i];
+               if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0)
+               {
+                       /* NOT releasing the lock yet */
+                       break;
+               }
+       }
+
+       if (slot == NULL)
+       {
+               LWLockRelease(ReplicationSlotCtlLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("replication slot \"%s\" does not exist", name)));
+       }
+
+       /* acquire spinlock so we can test and set ->active safely */
+       SpinLockAcquire(&slot->mutex);
+
+       if (slot->active)
+       {
+               SpinLockRelease(&slot->mutex);
+               LWLockRelease(ReplicationSlotCtlLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                                errmsg("replication slot \"%s\" is already active", name)));
+       }
+
+       /* we definitely have the slot, no errors possible anymore */
+       slot->active = true;
+       SpinLockRelease(&slot->mutex);
+
+       LWLockRelease(ReplicationSlotCtlLock);
+
+       PG_TRY();
+       {
+               /*
+                * We don't really need to save here, but doing so guarantees the slot
+                * is in a good state.
+                */
+               SaveSlot(slot);
+       }
+       PG_CATCH();
+       {
+               SpinLockAcquire(&slot->mutex);
+               slot->active = false;
+               SpinLockRelease(&slot->mutex);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       MyReplicationSlot = slot;
+}
+
+/*
+ * Release a replication slot, this or another backend can ReAcquire it
+ * later. Resources this slot requires will be preserved.
+ */
+void
+ReplicationSlotRelease(void)
+{
+       ReplicationSlot *slot;
+
+       slot = MyReplicationSlot;
+
+       Assert(slot != NULL && slot->active);
+
+       /*
+        * Note that we do not need to aquire the lwlock here, we're only marking
+        * the slot as inactive, not as unused.
+        */
+       SpinLockAcquire(&slot->mutex);
+       slot->active = false;
+       MyReplicationSlot = NULL;
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * XXX: There's not actually any need to save the slot to disk, it will be
+        * marked inactive after a crash/restart anyway. But we'd need to
+        * serialize all data at shutdowns or checkpoints...
+        */
+       SaveSlot(slot);
+}
+
+/*
+ * Permanently drop replication slot identified by the passed in name.
+ */
+void
+ReplicationSlotDrop(const char *name)
+{
+       ReplicationSlot *slot = NULL;
+       int                     i;
+
+       ReplicationSlotValidateName(name, ERROR);
+
+       LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               slot = &ReplicationSlotCtl->replication_slots[i];
+
+               if (slot->in_use && strcmp(name, NameStr(slot->name)) == 0)
+                       break;
+
+               slot = NULL;
+       }
+
+       if (slot == NULL)
+       {
+               LWLockRelease(ReplicationSlotCtlLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("replication slot \"%s\" does not exist", name)));
+       }
+
+       /*
+        * Test whether the slot's currently acquired, we only need the spinlock
+        * for that test it cannot be newly acquired as that would require taking
+        * the lwlock.
+        */
+       SpinLockAcquire(&slot->mutex);
+       if (slot->active)
+       {
+               SpinLockRelease(&slot->mutex);
+               LWLockRelease(ReplicationSlotCtlLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                                errmsg("could not drop active replication slot \"%s\"", name)));
+       }
+
+       /* mark slot as active while we're releasing the SlotCtl lock */
+       slot->active = true;
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Intermittently release the slot control lock. We've marked the slot as
+        * active by this slot so it is safe to do so. We can't easily hold the
+        * ctl lock across the PG_TRY/CATCH below since elog.c manipulates
+        * InterruptHoldoffCount.
+        */
+       LWLockRelease(ReplicationSlotCtlLock);
+
+       /*
+        * Now try to delete the slot's on-disk state. If we fail but not PANIC
+        * the in-memory state will still be valid.
+        */
+       PG_TRY();
+       {
+               DeleteSlot(slot);
+       }
+       PG_CATCH();
+       {
+               SpinLockAcquire(&slot->mutex);
+               slot->active = false;
+               SpinLockRelease(&slot->mutex);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+
+       /*
+        * Ok, everything gone, after a crash we now wouldn't restore this slot,
+        * so remove the in-memory state as well.
+        */
+       LWLockAcquire(ReplicationSlotCtlLock, LW_EXCLUSIVE);
+       SpinLockAcquire(&slot->mutex);
+       slot->active = false;
+       slot->in_use = false;
+       SpinLockRelease(&slot->mutex);
+       LWLockRelease(ReplicationSlotCtlLock);
+
+       /* slot is dead and doesn't nail the xmin anymore, recompute horizon */
+       ReplicationSlotsComputeRequiredXmin(false);
+}
+
+/*
+ * Serialize the currently acquired slot's state from memory to disk, thereby
+ * guaranteeing the current state will survive a crash.
+ */
+void
+ReplicationSlotSave()
+{
+       char            path[MAXPGPATH];
+
+       Assert(MyReplicationSlot != NULL);
+
+       sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->name));
+       SaveSlotGuts(MyReplicationSlot, path);
+}
+
+/*
+ * Compute the xmin between all of the decoding slots and store it in
+ * ReplicationSlotCtlData.
+ *
+ * If already_locked is passed in ProcArrayLock is already held exlusively.
+ */
+void
+ReplicationSlotsComputeRequiredXmin(bool already_locked)
+{
+       int                     i;
+       TransactionId agg_data_xmin = InvalidTransactionId;
+
+       Assert(ReplicationSlotCtl != NULL);
+
+       /* make sure slots aren't concurrently dropped/created */
+       LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
+
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               TransactionId   effective_data_xmin;
+
+               /* extra block to ensure nothing is accessed without spinlock */
+               {
+                       ReplicationSlot *slot;
+                       slot = &ReplicationSlotCtl->replication_slots[i];
+                       if (!slot->in_use)
+                               continue;
+
+                       SpinLockAcquire(&slot->mutex);
+                       effective_data_xmin = slot->effective_data_xmin;
+                       SpinLockRelease(&slot->mutex);
+               }
+
+               /* check the data xmin */
+               if (TransactionIdIsValid(effective_data_xmin) &&
+                       (!TransactionIdIsValid(agg_data_xmin) ||
+                        TransactionIdPrecedes(effective_data_xmin, agg_data_xmin)))
+                       agg_data_xmin = effective_data_xmin;
+       }
+       LWLockRelease(ReplicationSlotCtlLock);
+
+       ProcArraySetPeggedXmin(agg_data_xmin, already_locked);
+}
+
+/*
+ * Compute the xmin between all of the decoding slots and store it in
+ * ReplicationSlotCtlData.
+ */
+void
+ReplicationSlotsComputeRequiredLSN(void)
+{
+       int                     i;
+       XLogRecPtr  min_required = InvalidXLogRecPtr;
+
+       Assert(ReplicationSlotCtl != NULL);
+
+       /* make sure slots aren't concurrently dropped/created */
+       LWLockAcquire(ReplicationSlotCtlLock, LW_SHARED);
+
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               XLogRecPtr restart_decoding;
+
+               /* extra block to ensure nothing is accessed without spinlock */
+               {
+                       ReplicationSlot *slot;
+                       slot = &ReplicationSlotCtl->replication_slots[i];
+                       if (!slot->in_use)
+                               continue;
+
+                       SpinLockAcquire(&slot->mutex);
+                       restart_decoding = slot->restart_decoding;
+                       SpinLockRelease(&slot->mutex);
+               }
+
+               if (restart_decoding != InvalidXLogRecPtr &&
+                       (min_required == InvalidXLogRecPtr ||
+                        restart_decoding < min_required))
+                       min_required = restart_decoding;
+       }
+
+       LWLockRelease(ReplicationSlotCtlLock);
+
+       XLogSetPeggedLSN(min_required);
+}
+
+/*
+ * Check whether the server's configuration supports using replication
+ * slots. Usage or creating a replication slot might still fail, even if this
+ * function doesn't error out.
+ */
+void
+CheckSlotRequirements(void)
+{
+       if (max_replication_slots == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                (errmsg("replication slot usage requires max_replication_slots > 0"))));
+
+       if (wal_level < WAL_LEVEL_ARCHIVE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("replication slot usage requires wal_level >= archive")));
+}
+
+/*
+ * Make sure this backend hasn't acquired a replication slot when exiting.
+ */
+void
+ReplicationSlotAtProcExit(void)
+{
+       if (MyReplicationSlot && MyReplicationSlot->active)
+       {
+               /*
+                * Acquire spinlock so other backends are guaranteed to see this in
+                * time - we cannot generally acquire the lwlock here since we might
+                * be still holding it in an error path.
+                */
+               SpinLockAcquire(&MyReplicationSlot->mutex);
+               MyReplicationSlot->active = false;
+               SpinLockRelease(&MyReplicationSlot->mutex);
+       }
+       MyReplicationSlot = NULL;
+}
+
+/*
+ * Returns whether the string `str' has the postfix `end'.
+ */
+static bool
+string_endswith(const char *str, const char *end)
+{
+       size_t slen = strlen(str);
+       size_t elen = strlen(end);
+
+       /* can't be a postfix if longer */
+       if (elen > slen)
+               return false;
+
+       /* compare the end of the strings */
+       str += slen - elen;
+       return strcmp(str, end) == 0;
+}
+
+/*
+ * Load all replication slots from disk into memory at server startup. This
+ * needs to be run before we start crash recovery.
+ */
+void
+StartupReplicationSlots(XLogRecPtr checkPointRedo)
+{
+       DIR                *replication_dir;
+       struct dirent *replication_de;
+
+       ereport(DEBUG1,
+                       (errmsg("starting up replication slots from LSN %X/%X",
+                                       (uint32) (checkPointRedo >> 32), (uint32) checkPointRedo)));
+
+       /* restore all slots by iterating over all on-disk entries */
+       replication_dir = AllocateDir("pg_replslot");
+       while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
+       {
+               struct stat     statbuf;
+               char            path[MAXPGPATH];
+
+               if (strcmp(replication_de->d_name, ".") == 0 ||
+                       strcmp(replication_de->d_name, "..") == 0)
+                       continue;
+
+               snprintf(path, MAXPGPATH, "pg_replslot/%s", replication_de->d_name);
+
+               /* we're only creating directories here, skip if it's not our's */
+               if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+                       continue;
+
+               /* we crashed while a slot was being setup or deleted, clean up */
+               if (string_endswith(replication_de->d_name, ".tmp"))
+               {
+                       if (!rmtree(path, true))
+                       {
+                               ereport(WARNING,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not remove directory \"%s\"", path),
+                                                errmsg("some useless files may be left behind in that directory preventing creation of new slots")));
+                               continue;
+                       }
+                       fsync_fname("pg_replslot", true);
+                       continue;
+               }
+
+               /* looks like a slot in a normal state, restore */
+               RestoreSlot(replication_de->d_name);
+       }
+       FreeDir(replication_dir);
+
+       /* currently no slots exist, we're done. */
+       if (max_replication_slots <= 0)
+               return;
+
+       /* Now that we have recovered all the data, compute replication xmin */
+       ReplicationSlotsComputeRequiredXmin(false);
+       ReplicationSlotsComputeRequiredLSN();
+}
+
+/* ----
+ * Manipulation of ondisk state of replication slots
+ *
+ * NB: none of the routines below should take any notice whether a slot is the
+ * current one or not, that's all handled a layer above.
+ * ----
+ */
+static void
+CreateSlot(ReplicationSlot *slot)
+{
+       char            tmppath[MAXPGPATH];
+       char            path[MAXPGPATH];
+
+       sprintf(path, "pg_replslot/%s", NameStr(slot->name));
+       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+
+       if (mkdir(tmppath, S_IRWXU) < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not create directory \"%s\": %m",
+                                               tmppath)));
+
+       fsync_fname(tmppath, true);
+
+       SaveSlotGuts(slot, tmppath);
+
+       if (rename(tmppath, path) != 0)
+       {
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename file \"%s\" to \"%s\": %m",
+                                               tmppath, path)));
+       }
+
+       /*
+        * If we'd now fail - really unlikely - we wouldn't know wether this slot
+        * would persist after an OS crash or not - so, force a restart. The
+        * restart would try to fysnc this again till it works.
+        */
+       START_CRIT_SECTION();
+
+       fsync_fname(path, true);
+       fsync_fname("pg_replslot", true);
+
+       END_CRIT_SECTION();
+
+}
+
+/*
+ * Serialize the passed to disk.
+ */
+static void
+SaveSlot(ReplicationSlot *slot)
+{
+       char            path[MAXPGPATH];
+
+       sprintf(path, "pg_replslot/%s", NameStr(slot->name));
+       SaveSlotGuts(slot, path);
+}
+
+/*
+ * Shared functionality between saving and creating a replication slot.
+ */
+static void
+SaveSlotGuts(ReplicationSlot *slot, const char *dir)
+{
+       char            tmppath[MAXPGPATH];
+       char            path[MAXPGPATH];
+       int                     fd;
+       ReplicationSlotOnDisk cp;
+
+       /* silence valgrind :( */
+       memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
+
+       sprintf(tmppath, "%s/state.tmp", dir);
+       sprintf(path, "%s/state", dir);
+
+       fd = OpenTransientFile(tmppath,
+                                                  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+                                                  S_IRUSR | S_IWUSR);
+       if (fd < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not create file \"%s\": %m",
+                                               tmppath)));
+
+       cp.magic = SLOT_MAGIC;
+       INIT_CRC32(cp.checksum);
+       cp.version = 1;
+       cp.length = ReplicationSlotOnDiskDynamicSize;
+
+       SpinLockAcquire(&slot->mutex);
+
+       cp.slot.data_xmin = slot->data_xmin;
+       cp.slot.effective_data_xmin = slot->effective_data_xmin;
+
+       strcpy(NameStr(cp.slot.name), NameStr(slot->name));
+
+       cp.slot.database = slot->database;
+       cp.slot.confirmed_flush = slot->confirmed_flush;
+       cp.slot.restart_decoding = slot->restart_decoding;
+       cp.slot.in_use = slot->in_use;
+       cp.slot.active = false;
+
+       SpinLockRelease(&slot->mutex);
+
+       COMP_CRC32(cp.checksum,
+                          (char *)(&cp) + ReplicationSlotOnDiskConstantSize,
+                          ReplicationSlotOnDiskDynamicSize);
+
+       if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
+       {
+               CloseTransientFile(fd);
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not write to file \"%s\": %m",
+                                               tmppath)));
+       }
+
+       /* fsync the temporary file */
+       if (pg_fsync(fd) != 0)
+       {
+               CloseTransientFile(fd);
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not fsync file \"%s\": %m",
+                                               tmppath)));
+       }
+
+       CloseTransientFile(fd);
+
+       /* rename to permanent file, fsync file and directory */
+       if (rename(tmppath, path) != 0)
+       {
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename \"%s\" to \"%s\": %m",
+                                               tmppath, path)));
+       }
+
+       /* Check CreateSlot() for the reasoning of using a crit. section. */
+       START_CRIT_SECTION();
+
+       fsync_fname(path, false);
+       fsync_fname((char *) dir, true);
+       fsync_fname("pg_replslot", true);
+
+       END_CRIT_SECTION();
+}
+
+/*
+ * Delete a single slot from disk, not touching the in-memory state.
+ */
+static void
+DeleteSlot(ReplicationSlot *slot)
+{
+       char            path[MAXPGPATH];
+       char            tmppath[MAXPGPATH];
+
+       sprintf(path, "pg_replslot/%s", NameStr(slot->name));
+       sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->name));
+
+       if (rename(path, tmppath) != 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename \"%s\" to \"%s\": %m",
+                                               path, tmppath)));
+
+       /* Check CreateSlot() for the reasoning of using a crit. section. */
+       START_CRIT_SECTION();
+
+       fsync_fname(tmppath, true);
+       fsync_fname("pg_replslot", true);
+
+       END_CRIT_SECTION();
+
+       /*
+        * If we fail during removal of the directory, we'll just be unable to
+        * create new slots of this name till a restart of the server. Don't
+        * panic.
+        */
+       if (!rmtree(tmppath, true))
+       {
+               ereport(WARNING,
+                               (errcode_for_file_access(),
+                                errmsg("could not remove directory \"%s\"", tmppath),
+                                errmsg("some useless files may be left behind in that directory preventing creation of new slots")));
+       }
+}
+
+/*
+ * Load a single slot from disk into memory.
+ */
+static void
+RestoreSlot(const char *name)
+{
+       ReplicationSlotOnDisk cp;
+       int                     i;
+       char            path[MAXPGPATH];
+       int                     fd;
+       bool            restored = false;
+       int                     readBytes;
+       pg_crc32        checksum;
+
+       /* delete temp file if it exists */
+       sprintf(path, "pg_replslot/%s/state.tmp", name);
+       if (unlink(path) < 0 && errno != ENOENT)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not unlink file \"%s\": %m", path)));
+
+       sprintf(path, "pg_replslot/%s/state", name);
+
+       elog(DEBUG1, "restoring replication slot from \"%s\"", path);
+
+       fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+       /*
+        * We do not need to handle this as we are rename()ing the directory into
+        * place only after we fsync()ed the state file.
+        */
+       if (fd < 0)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not open file \"%s\": %m", path)));
+
+       /*
+        * Sync state file before we're reading from it. We might have crashed
+        * while it wasn't synced yet and we shouldn't continue on that basis.
+        */
+       if (pg_fsync(fd) != 0)
+       {
+               CloseTransientFile(fd);
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not fsync file \"%s\": %m",
+                                               path)));
+       }
+
+       /* Also sync the parent directory */
+       START_CRIT_SECTION();
+       fsync_fname(path, true);
+       END_CRIT_SECTION();
+
+       /* read part of statefile that's guaranteed to be version independent */
+       readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+       if (readBytes != ReplicationSlotOnDiskConstantSize)
+       {
+               int                     saved_errno = errno;
+
+               CloseTransientFile(fd);
+               errno = saved_errno;
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not read file \"%s\", read %d of %u: %m",
+                                               path, readBytes,
+                                               (uint32) ReplicationSlotOnDiskConstantSize)));
+       }
+
+       /* verify magic */
+       if (cp.magic != SLOT_MAGIC)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("replication slot file \"%s\" has wrong magic %u instead of %u",
+                                               path, cp.magic, SLOT_MAGIC)));
+
+       /* verify version */
+       if (cp.version != SLOT_VERSION)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("replication slot file \"%s\" has unsupported version %u",
+                                               path, cp.version)));
+
+       /* boundary check on length */
+       if (cp.length != ReplicationSlotOnDiskDynamicSize)
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("replication slot file \"%s\" has corrupted length %u",
+                                               path, cp.length)));
+
+       /* Now that we know the size, read the entire file */
+       readBytes = read(fd,
+                                        (char *)&cp + ReplicationSlotOnDiskConstantSize,
+                                        cp.length);
+       if (readBytes != cp.length)
+       {
+               int                     saved_errno = errno;
+
+               CloseTransientFile(fd);
+               errno = saved_errno;
+               ereport(PANIC,
+                               (errcode_for_file_access(),
+                                errmsg("could not read file \"%s\", read %d of %u: %m",
+                                               path, readBytes, cp.length)));
+       }
+
+       CloseTransientFile(fd);
+
+       /* now verify the CRC32 */
+       INIT_CRC32(checksum);
+       COMP_CRC32(checksum,
+                          (char *)&cp + ReplicationSlotOnDiskConstantSize,
+                          ReplicationSlotOnDiskDynamicSize);
+
+       if (!EQ_CRC32(checksum, cp.checksum))
+               ereport(PANIC,
+                               (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u",
+                                               path, checksum, cp.checksum)));
+
+       /* nothing can be active yet, don't lock anything */
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *slot;
+
+               slot = &ReplicationSlotCtl->replication_slots[i];
+
+               if (slot->in_use)
+                       continue;
+
+               slot->data_xmin = cp.slot.data_xmin;
+               /*
+                * after a crash, always use xmin, not effective_xmin, the
+                * slot obviously survived
+                */
+               slot->effective_data_xmin = cp.slot.data_xmin;
+               strcpy(NameStr(slot->name), NameStr(cp.slot.name));
+               slot->database = cp.slot.database;
+               slot->restart_decoding = cp.slot.restart_decoding;
+               slot->confirmed_flush = cp.slot.confirmed_flush;
+               /* ignore previous values, they are transient */
+               slot->in_use = true;
+               slot->active = false;
+               restored = true;
+               break;
+       }
+
+       if (!restored)
+               ereport(PANIC,
+                               (errmsg("too many replication slots active before shutdown"),
+                                errhint("Increase max_replication_slots and try again.")));
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
new file mode 100644 (file)
index 0000000..5abac8d
--- /dev/null
@@ -0,0 +1,191 @@
+/*-------------------------------------------------------------------------
+ *
+ * slotfuncs.c
+ *
+ *        Support functions for using replication slots
+ *
+
+ * Copyright (c) 2012-2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logicalfuncs.c
+ *
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/htup_details.h"
+
+#include "nodes/makefuncs.h"
+
+#include "utils/builtins.h"
+
+#include "replication/slot.h"
+
+#include "storage/fd.h"
+
+Datum          create_physical_replication_slot(PG_FUNCTION_ARGS);
+Datum          drop_replication_slot(PG_FUNCTION_ARGS);
+
+static void
+check_permissions(void)
+{
+       if (!superuser() && !has_rolreplication(GetUserId()))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                (errmsg("must be superuser or replication role to use replication slots"))));
+}
+
+/*
+ * SQL function for creating a new physical (streaming replication)
+ * replication slot.
+ */
+Datum
+create_physical_replication_slot(PG_FUNCTION_ARGS)
+{
+       Name            name = PG_GETARG_NAME(0);
+       Datum           values[2];
+       bool            nulls[2];
+       TupleDesc       tupdesc;
+       HeapTuple       tuple;
+       Datum           result;
+
+       check_permissions();
+
+       CheckSlotRequirements();
+
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       /*
+        * Acquire a logical decoding slot, this will check for conflicting
+        * names.
+        */
+       ReplicationSlotCreate(NameStr(*name), false);
+
+       values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->name));
+
+       nulls[0] = false;
+       nulls[1] = true;
+
+       tuple = heap_form_tuple(tupdesc, values, nulls);
+       result = HeapTupleGetDatum(tuple);
+
+       ReplicationSlotRelease();
+
+       PG_RETURN_DATUM(result);
+}
+
+/*
+ * SQL function for dropping a replication slot, be it logical or physical.
+ */
+Datum
+drop_replication_slot(PG_FUNCTION_ARGS)
+{
+       Name            name = PG_GETARG_NAME(0);
+
+       check_permissions();
+
+       CheckSlotRequirements();
+
+       ReplicationSlotDrop(NameStr(*name));
+
+       PG_RETURN_INT32(0);
+}
+
+/*
+ * pg_get_replication_slots - SQL SRF showing active replication slots.
+ */
+Datum
+pg_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS 6
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       Tuplestorestate *tupstore;
+       MemoryContext per_query_ctx;
+       MemoryContext oldcontext;
+       int                     slotno;
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context that cannot accept a set")));
+       if (!(rsinfo->allowedModes & SFRM_Materialize))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is not " \
+                                               "allowed in this context")));
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       /* FIXME: what permissions do we require? */
+
+       per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       tupstore = tuplestore_begin_heap(true, false, work_mem);
+       rsinfo->returnMode = SFRM_Materialize;
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+
+       MemoryContextSwitchTo(oldcontext);
+
+       for (slotno = 0; slotno < max_replication_slots; slotno++)
+       {
+               ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+               Datum           values[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS];
+               bool            nulls[PG_STAT_GET_LOGICAL_DECODING_SLOTS_COLS];
+               char            location[MAXFNAMELEN];
+               const char *slot_name;
+               TransactionId data_xmin;
+               XLogRecPtr      last_lsn;
+               bool            active;
+               Oid                     database;
+               int                     i;
+
+               SpinLockAcquire(&slot->mutex);
+               if (!slot->in_use)
+               {
+                       SpinLockRelease(&slot->mutex);
+                       continue;
+               }
+               else
+               {
+                       data_xmin = slot->data_xmin;
+                       active = slot->active;
+                       database = slot->database;
+                       last_lsn = slot->restart_decoding;
+                       slot_name = pstrdup(NameStr(slot->name));
+               }
+               SpinLockRelease(&slot->mutex);
+
+               memset(nulls, 0, sizeof(nulls));
+
+               snprintf(location, sizeof(location), "%X/%X",
+                                (uint32) (last_lsn >> 32), (uint32) last_lsn);
+
+               i = 0;
+               values[i++] = CStringGetTextDatum(slot_name);
+               if (database == InvalidOid)
+                       values[i++] = CStringGetTextDatum("physical");
+               else
+                       values[i++] = CStringGetTextDatum("logical");
+               values[i++] = database;
+               values[i++] = BoolGetDatum(active);
+               values[i++] = TransactionIdGetDatum(data_xmin);
+               values[i++] = CStringGetTextDatum(location);
+
+               tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+       }
+
+       tuplestore_donestoring(tupstore);
+
+       return (Datum) 0;
+}
index 2e717457b123f600eb60d6315ac4624f78c8dacf..c392d4fa228a3e1403ea4d340aec2b7d9d8e1f86 100644 (file)
@@ -27,6 +27,7 @@
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
                size = add_size(size, ProcSignalShmemSize());
                size = add_size(size, CheckpointerShmemSize());
                size = add_size(size, AutoVacuumShmemSize());
+               size = add_size(size, ReplicationSlotsShmemSize());
                size = add_size(size, WalSndShmemSize());
                size = add_size(size, WalRcvShmemSize());
                size = add_size(size, BTreeShmemSize());
@@ -230,6 +232,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
        ProcSignalShmemInit();
        CheckpointerShmemInit();
        AutoVacuumShmemInit();
+       ReplicationSlotsShmemInit();
        WalSndShmemInit();
        WalRcvShmemInit();
 
index b68c95612c5397d1b12396e915d431b8032616d8..a6ced4dd05165c4b2e407ec2077db4e02cf8ebad 100644 (file)
@@ -82,6 +82,9 @@ typedef struct ProcArrayStruct
         */
        TransactionId lastOverflowedXid;
 
+       /* globally "pegged" xmin horizons */
+       TransactionId pegged_xmin;
+
        /*
         * We declare pgprocnos[] as 1 entry because C wants a fixed-size array,
         * but actually it is maxProcs entries long.
@@ -228,6 +231,7 @@ CreateSharedProcArray(void)
                 */
                procArray->numProcs = 0;
                procArray->maxProcs = PROCARRAY_MAXPROCS;
+               procArray->pegged_xmin = InvalidTransactionId;
                procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
                procArray->numKnownAssignedXids = 0;
                procArray->tailKnownAssignedXids = 0;
@@ -1153,6 +1157,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
        ProcArrayStruct *arrayP = procArray;
        TransactionId result;
        int                     index;
+       volatile TransactionId pegged_xmin = InvalidTransactionId;
 
        /* Cannot look for individual databases during recovery */
        Assert(allDbs || !RecoveryInProgress());
@@ -1204,6 +1209,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                }
        }
 
+       /* fetch into volatile var while ProcArrayLock is held */
+       pegged_xmin = procArray->pegged_xmin;
+
        if (RecoveryInProgress())
        {
                /*
@@ -1244,6 +1252,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
                        result = FirstNormalTransactionId;
        }
 
+       /*
+        * Check whether there's physical slots requiring xmin to be pegged.
+        */
+       if (TransactionIdIsValid(pegged_xmin) &&
+               NormalTransactionIdPrecedes(pegged_xmin, result))
+               result = pegged_xmin;
+
        return result;
 }
 
@@ -1313,6 +1328,7 @@ GetSnapshotData(Snapshot snapshot)
        int                     count = 0;
        int                     subcount = 0;
        bool            suboverflowed = false;
+       volatile TransactionId pegged_xmin = InvalidTransactionId;
 
        Assert(snapshot != NULL);
 
@@ -1490,8 +1506,13 @@ GetSnapshotData(Snapshot snapshot)
                        suboverflowed = true;
        }
 
+
+       /* fetch into volatile var while ProcArrayLock is held */
+       pegged_xmin = procArray->pegged_xmin;
+
        if (!TransactionIdIsValid(MyPgXact->xmin))
                MyPgXact->xmin = TransactionXmin = xmin;
+
        LWLockRelease(ProcArrayLock);
 
        /*
@@ -1506,6 +1527,15 @@ GetSnapshotData(Snapshot snapshot)
        RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
        if (!TransactionIdIsNormal(RecentGlobalXmin))
                RecentGlobalXmin = FirstNormalTransactionId;
+
+       /*
+        * Check whether there's a physical slot requiring the global xmin to be
+        * pegged for all tables.
+        */
+       if (TransactionIdIsValid(pegged_xmin) &&
+               NormalTransactionIdPrecedes(pegged_xmin, RecentGlobalXmin))
+               RecentGlobalXmin = pegged_xmin;
+
        RecentXmin = xmin;
 
        snapshot->xmin = xmin;
@@ -2491,6 +2521,24 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
        return true;                            /* timed out, still conflicts */
 }
 
+/*
+ * ProcArraySetPeggedXmin
+ *
+ * Install limits to future computations of the xmin horizon to prevent vacuum
+ * and HOT pruning from removing affected rows.
+ */
+void
+ProcArraySetPeggedXmin(TransactionId xmin, bool already_locked)
+{
+       if (!already_locked)
+               LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+       procArray->pegged_xmin = xmin;
+
+       if (!already_locked)
+               LWLockRelease(ProcArrayLock);
+}
+
 
 #define XidCacheRemove(i) \
        do { \
index 1a683b83361029c1783c115586698ca0aeac3902..8f7f6c9a9c7eed0bedcf97862abeebba3ad4bed4 100644 (file)
@@ -40,6 +40,7 @@
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "postmaster/autovacuum.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -779,6 +780,9 @@ ProcKill(int code, Datum arg)
        /* Make sure we're out of the sync rep lists */
        SyncRepCleanupAtProcExit();
 
+       /* Make sure active replication slots are released */
+       ReplicationSlotAtProcExit();
+
 #ifdef USE_ASSERT_CHECKING
        if (assert_enabled)
        {
index 2cc8f90e6d4b3562171198607abcd3738b12b121..75826d309079b9c3a2ee0f542b489751dddc98c0 100644 (file)
@@ -57,6 +57,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -2099,6 +2100,17 @@ static struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               /* see max_connections */
+               {"max_replication_slots", PGC_POSTMASTER, REPLICATION_SENDING,
+                       gettext_noop("Sets the maximum number of simultaneously defined replication slots."),
+                       NULL
+               },
+               &max_replication_slots,
+               0, 0, MAX_BACKENDS /* XXX?*/,
+               NULL, NULL, NULL
+       },
+
        {
                {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
                        gettext_noop("Sets the maximum time to wait for WAL replication."),
index 7ad6b7cb4578c458cbbf429a7c2942300f416da2..135ceebf86e7e892b7dd31022152388c248f1f11 100644 (file)
 
 # Set these on the master and on any standby that will send replication data.
 
-#max_wal_senders = 0           # max number of walsender processes
+#max_wal_senders = 0           # max number of walsender processes, including
+                               # both physical and logical replication senders.
                                # (change requires restart)
 #wal_keep_segments = 0         # in logfile segments, 16MB each; 0 disables
 #wal_sender_timeout = 60s      # in milliseconds; 0 disables
 
+#max_replication_slots = 0     # max number of replication slots.
+                               # (change requires restart)
+
 # - Master Server -
 
 # These settings are ignored on a standby server.
index 6b5302f6fd3826314b4b38fc51c4b1db9bca7302..a71320d94581fad5d4cba46448b2b2f85e868d2c 100644 (file)
@@ -195,6 +195,7 @@ const char *subdirs[] = {
        "pg_multixact/offsets",
        "base",
        "base/1",
+       "pg_replslot",
        "pg_tblspc",
        "pg_stat",
        "pg_stat_tmp"
index 47e302276b4e977e25ec1b473b1d330dfb4b89eb..aab3d40512d69e2067cf0813b9cc06d9306ae478 100644 (file)
@@ -289,6 +289,7 @@ extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern void XLogSetAsyncXactLSN(XLogRecPtr record);
+extern void XLogSetPeggedLSN(XLogRecPtr lsn);
 
 extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
                                   int block_index,
index ad9774c28560dd0126d8dde20629af8e3030b416..48d4123c04b125b81a81d30a7beeaa4901d00947 100644 (file)
@@ -4752,6 +4752,14 @@ DESCR("SP-GiST support for quad tree over range");
 DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent       PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 16 "2281 2281" _null_ _null_ _null_ _null_  spg_range_quad_leaf_consistent _null_ _null_ _null_ ));
 DESCR("SP-GiST support for quad tree over range");
 
+/* replication slots */
+DATA(insert OID = 3780 (  create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,25,25}" "{i,o,o}" "{slotname,slotname,xlog_position}" _null_ create_physical_replication_slot _null_ _null_ _null_ ));
+DESCR("create a physical replication slot");
+DATA(insert OID = 3781 (  drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 23 "19" _null_ _null_ _null_ _null_ drop_replication_slot _null_ _null_ _null_ ));
+DESCR("drop a replication slot");
+DATA(insert OID = 3475 (  pg_get_replication_slots     PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{25,25,26,16,28,25}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,data_xmin,restart_decoding_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DESCR("information about replication slots currently in use");
+
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects             PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
 DESCR("list objects dropped by the current command");
index 40971153b0349e6ce198c0505f32fe5bdd638b01..2f57c7de4dc52ac1946a303bbe5f68d8b8e19c5c 100644 (file)
 #include "access/xlogdefs.h"
 #include "nodes/pg_list.h"
 
+typedef enum ReplicationKind {
+       REPLICATION_KIND_PHYSICAL,
+       REPLICATION_KIND_LOGICAL
+} ReplicationKind;
+
 
 /* ----------------------
  *             IDENTIFY_SYSTEM command
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
new file mode 100644 (file)
index 0000000..2cd37c0
--- /dev/null
@@ -0,0 +1,107 @@
+/*-------------------------------------------------------------------------
+ * slot.h
+ *        Replication slot management.
+ *
+ * Copyright (c) 2012-2013, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_H
+#define SLOT_H
+
+#include "fmgr.h"
+#include "access/xlog.h"
+#include "access/xlogreader.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+
+/*
+ * Shared memory state of a single replication slot.
+ */
+typedef struct ReplicationSlot
+{
+       /* lock, on same cacheline as effective_xmin */
+       slock_t         mutex;
+
+       /* on-disk xmin horizon, updated first */
+       TransactionId data_xmin;
+
+       /* in-memory xmin horizon, updated after syncing to disk, used for computations */
+       TransactionId effective_data_xmin;
+
+       /* is this slot defined */
+       bool            in_use;
+
+       /* is somebody streaming out changes for this slot */
+       bool            active;
+
+       /* The slot's identifier */
+       NameData        name;
+
+       /* database the slot is active on */
+       Oid                     database;
+
+       /* ----
+        * For logical decoding, this contains the point where, after a shutdown,
+        * crash, whatever where do we have to restart decoding from to
+        * a) find a valid & ready snapshot
+        * b) the complete content for all in-progress xacts
+        *
+        * For streaming replication, this contains the oldest LSN (in any
+        * timeline) the standb might ask for.
+        *
+        * For both only WAL segments that are smaller than restart_decoding, will
+        * be removed.
+        * ----
+        */
+       XLogRecPtr      restart_decoding;
+
+       /*
+        * Last location we know the client has confirmed to have safely received
+        * data to. No earlier data can be decoded after a restart/crash.
+        */
+       XLogRecPtr      confirmed_flush;
+
+} ReplicationSlot;
+
+/*
+ * Shared memory control area for all of replication slots.
+ */
+typedef struct ReplicationSlotCtlData
+{
+       ReplicationSlot replication_slots[1];
+} ReplicationSlotCtlData;
+
+/*
+ * Pointers to shared memory
+ */
+extern ReplicationSlotCtlData *ReplicationSlotCtl;
+extern ReplicationSlot *MyReplicationSlot;
+
+/* GUCs */
+extern PGDLLIMPORT int max_replication_slots;
+
+/* shmem initialization functions */
+extern Size ReplicationSlotsShmemSize(void);
+extern void ReplicationSlotsShmemInit(void);
+
+/* management of individual slots */
+extern void ReplicationSlotCreate(const char *name, bool db_specific);
+extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotRelease(void);
+extern void ReplicationSlotSave(void);
+
+/* misc stuff */
+extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
+extern void ReplicationSlotsComputeRequiredLSN(void);
+extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void CheckSlotRequirements(void);
+extern void ReplicationSlotAtProcExit(void);
+
+/* SQL callable functions */
+extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS);
+
+#endif /* SLOT_H */
index 45079262740f7c8d93952502ccd4b841605e929e..cc638360810a3c8c33a0b4c0fee507839b2f582d 100644 (file)
@@ -125,7 +125,8 @@ extern LWLockPadded *MainLWLockArray;
 #define BackgroundWorkerLock           (&MainLWLockArray[33].lock)
 #define DynamicSharedMemoryControlLock         (&MainLWLockArray[34].lock)
 #define AutoFileLock                           (&MainLWLockArray[35].lock)
-#define NUM_INDIVIDUAL_LWLOCKS         36
+#define ReplicationSlotCtlLock         (&MainLWLockArray[36].lock)
+#define NUM_INDIVIDUAL_LWLOCKS         37
 
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
index 2947cc4af9f334cee986fff232c5389d8f25f408..f6c6685072dde8ad194c6e2316fcd13a585501ce 100644 (file)
@@ -77,4 +77,6 @@ extern void XidCacheRemoveRunningXids(TransactionId xid,
                                                  int nxids, const TransactionId *xids,
                                                  TransactionId latestXid);
 
+extern void ProcArraySetPeggedXmin(TransactionId xmin, bool already_locked);
+
 #endif   /* PROCARRAY_H */
index 9f089e3f4f9911fa2b439c68c65c2e5bc1be9c00..40d7cbd06096772ce43bdd77be2542a5ab4689e2 100644 (file)
@@ -1367,6 +1367,15 @@ pg_prepared_xacts| SELECT p.transaction,
    FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
    LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
    LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_slots| SELECT l.slot_name,
+    l.slot_type,
+    l.datoid,
+    d.datname AS database,
+    l.active,
+    l.data_xmin,
+    l.restart_decoding_lsn
+   FROM (pg_get_replication_slots() l(slot_name, slot_type, datoid, active, data_xmin, restart_decoding_lsn)
+   LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
     pg_authid.rolinherit,