replication slot checkpointing
authorAndres Freund <andres@anarazel.de>
Tue, 28 Jan 2014 19:52:23 +0000 (20:52 +0100)
committerAndres Freund <andres@anarazel.de>
Tue, 28 Jan 2014 20:17:33 +0000 (21:17 +0100)
src/backend/access/transam/xlog.c
src/backend/replication/slot.c
src/backend/storage/lmgr/lwlock.c
src/include/replication/slot.h

index 63a00e378b2f65e5c9aff267664d245a0b0c8bb4..a77ff5fe5e9aade1f949fecfa8091f5718bb8258 100644 (file)
@@ -8672,6 +8672,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
        CheckPointMultiXact();
        CheckPointPredicate();
        CheckPointRelationMap();
+       CheckPointReplicationSlots();
        CheckPointBuffers(flags);       /* performs all required fsyncs */
        /* We deliberately delay 2PC checkpointing as long as possible */
        CheckPointTwoPhase(checkPointRedo);
index 322b79a9c988f80489b141dce24adc86ceb06eab..324d90b28e213d4f67d89ac42d739be464baf1cc 100644 (file)
@@ -133,6 +133,7 @@ ReplicationSlotsShmemInit(void)
 
                        /* everything else is zeroed by the memset above */
                        SpinLockInit(&slot->mutex);
+                       slot->io_in_progress_lock = LWLockAssign();
                }
        }
 }
@@ -334,16 +335,6 @@ ReplicationSlotAcquire(const char *name)
 
        /* We made this slot active, so it's ours now. */
        MyReplicationSlot = slot;
-
-       /*
-        * XXX. There used to be code to save the state here with the following
-        * comment: "We don't really need to save here, but doing so guarantees
-        * the slot is in a good state."  It's not clear to me what is meant by
-        * a good state, and it seems like the slot had better already be in
-        * such a state, or we're screwed anyway.  From the point of view of
-        * crash recovery, merely acquiring the slot (without updating its
-        * contents) is a non-event.
-        */
 }
 
 /*
@@ -619,6 +610,41 @@ string_endswith(const char *str, const char *end)
        return strcmp(str, end) == 0;
 }
 
+/*
+ * Flush all replication slots to disk.
+ *
+ * This needn't actually be part of a checkpoint, but it's a convenient
+ * location.
+ */
+void
+CheckPointReplicationSlots(void)
+{
+       int                     i;
+
+       ereport(DEBUG1, (errmsg("performing replication slot checkpoint")));
+
+       /*
+        * Prevent any slot from being created/dropped while we're active. As we
+        * explicitly do *not* want to block iterating over replication_slots or
+        * acquiring a slot we cannot take the control lock.
+        */
+       LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
+
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+               char            path[MAXPGPATH];
+
+               if (!s->in_use)
+                       continue;
+
+               /* save the slot to disk, locking is handled in SaveSlotToPath() */
+               sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+               SaveSlotToPath(s, path);
+       }
+       LWLockRelease(ReplicationSlotAllocationLock);
+}
+
 /*
  * Load all replication slots from disk into memory at server startup. This
  * needs to be run before we start crash recovery.
@@ -691,6 +717,12 @@ CreateSlotOnDisk(ReplicationSlot *slot)
        char            tmppath[MAXPGPATH];
        char            path[MAXPGPATH];
 
+       /*
+        * No need to take out the io_in_progress_lock, nobody else can see this
+        * slot yet, so nobody else wil write. We're reusing SaveSlotToPath which
+        * takes out the lock, if we'd take the lock here, we'd deadlock.
+        */
+
        sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
        sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
 
@@ -721,7 +753,6 @@ CreateSlotOnDisk(ReplicationSlot *slot)
        fsync_fname("pg_replslot", true);
 
        END_CRIT_SECTION();
-
 }
 
 /*
@@ -735,6 +766,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir)
        int                     fd;
        ReplicationSlotOnDisk cp;
 
+       LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE);
+
        /* silence valgrind :( */
        memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
 
@@ -757,12 +790,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir)
 
        SpinLockAcquire(&slot->mutex);
 
-       cp.slotdata.data_xmin = slot->data.data_xmin;
-
-       strcpy(NameStr(cp.slotdata.name), NameStr(slot->data.name));
-
-       cp.slotdata.database = slot->data.database;
-       cp.slotdata.restart_lsn = slot->data.restart_lsn;
+       memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
 
        SpinLockRelease(&slot->mutex);
 
@@ -808,6 +836,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir)
        fsync_fname("pg_replslot", true);
 
        END_CRIT_SECTION();
+
+       LWLockRelease(slot->io_in_progress_lock);
 }
 
 /*
@@ -824,6 +854,8 @@ RestoreSlotFromDisk(const char *name)
        int                     readBytes;
        pg_crc32        checksum;
 
+       /* no need to lock here, no concurrent access allowed yet */
+
        /* delete temp file if it exists */
        sprintf(path, "pg_replslot/%s/state.tmp", name);
        if (unlink(path) < 0 && errno != ENOENT)
index 55d9d7837cabdc019b7f58237ebf8967ddb406d7..82ef44094948afa741c8e40c928395ab2d4a0842 100644 (file)
@@ -27,6 +27,7 @@
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "replication/slot.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -238,6 +239,9 @@ NumLWLocks(void)
        /* predicate.c needs one per old serializable xid buffer */
        numLocks += NUM_OLDSERXID_BUFFERS;
 
+       /* slot.c needs one for each slot */
+       numLocks += max_replication_slots;
+
        /*
         * Add any requested by loadable modules; for backwards-compatibility
         * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
index dffbc5462fa8e383112262b0bfc164f826f08f0a..a332eff3dc296d26bb578977d2c7ade5d66ab8ba 100644 (file)
@@ -12,6 +12,7 @@
 #include "fmgr.h"
 #include "access/xlog.h"
 #include "access/xlogreader.h"
+#include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
 
@@ -57,12 +58,17 @@ typedef struct ReplicationSlot
        /* is somebody streaming out changes for this slot */
        bool            active;
 
+       /* any outstanding modifications? */
+       bool            dirty;
+
        /* data surviving shutdowns and crashes */
        ReplicationSlotPersistentData data;
 
        /* in-memory xmin horizon, updated after syncing to disk, used for computations */
        TransactionId effective_data_xmin;
 
+       /* is somebody performing io on this slot? */
+       LWLock     *io_in_progress_lock;
 } ReplicationSlot;
 
 /*
@@ -98,6 +104,8 @@ extern bool ReplicationSlotValidateName(const char *name, int elevel);
 extern void ReplicationSlotsComputeRequiredXmin(void);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
+extern void CheckPointReplicationSlots(void);
+
 extern void CheckSlotRequirements(void);
 extern void ReplicationSlotAtProcExit(void);