summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorSimon Riggs2017-03-28 14:05:21 +0000
committerSimon Riggs2017-03-28 14:05:21 +0000
commitff539da31691f2cd2694360250571c5c5fb7415e (patch)
tree494ffccc465ca33af0efd5dc764a169c66fb4bc1 /src/backend
parent4d33a7f2e714848ca7b5b7ef8e244eead078ca13 (diff)
Cleanup slots during drop database
Automatically drop all logical replication slots associated with a database when the database is dropped. Previously we threw an ERROR if a slot existed. Now we throw ERROR only if a slot is active in the database being dropped. Craig Ringer
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/commands/dbcommands.c32
-rw-r--r--src/backend/replication/slot.c88
2 files changed, 111 insertions, 9 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 5a63b1abcbe..c0ba2b451a7 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
errmsg("cannot drop the currently open database")));
/*
- * Check whether there are, possibly unconnected, logical slots that refer
- * to the to-be-dropped database. The database lock we are holding
- * prevents the creation of new slots using the database.
+ * Check whether there are active logical slots that refer to the
+ * to-be-dropped database. The database lock we are holding prevents the
+ * creation of new slots using the database or existing slots becoming
+ * active.
*/
- if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
+ (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
+ if (nslots_active)
+ {
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("database \"%s\" is used by a logical replication slot",
+ errmsg("database \"%s\" is used by an active logical replication slot",
dbname),
- errdetail_plural("There is %d slot, %d of them active.",
- "There are %d slots, %d of them active.",
- nslots,
- nslots, nslots_active)));
+ errdetail_plural("There is %d active slot",
+ "There are %d active slots",
+ nslots_active, nslots_active)));
+ }
/*
* Check for other backends in the target database. (Because we hold the
@@ -915,6 +918,11 @@ dropdb(const char *dbname, bool missing_ok)
dropDatabaseDependencies(db_id);
/*
+ * Drop db-specific replication slots.
+ */
+ ReplicationSlotsDropDBSlots(db_id);
+
+ /*
* Drop pages for this database that are in the shared buffer cache. This
* is important to ensure that no remaining backend tries to write out a
* dirty buffer to the dead database later...
@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
* InitPostgres() cannot fully re-execute concurrently. This
* avoids backends re-connecting automatically to same database,
* which can happen in some cases.
+ *
+ * This will lock out walsenders trying to connect to db-specific
+ * slots for logical decoding too, so it's safe for us to drop slots.
*/
LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
ResolveRecoveryConflictWithDatabase(xlrec->db_id);
}
+ /* Drop any database-specific replication slots */
+ ReplicationSlotsDropDBSlots(xlrec->db_id);
+
/* Drop pages for this database that are in the shared buffer cache */
DropDatabaseBuffers(xlrec->db_id);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 5237a9fb078..6c5ec7a00e8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
return false;
}
+/*
+ * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
+ * passed database oid. The caller should hold an exclusive lock on the
+ * pg_database oid for the database to prevent creation of new slots on the db
+ * or replay from existing slots.
+ *
+ * This routine isn't as efficient as it could be - but we don't drop databases
+ * often, especially databases with lots of slots.
+ *
+ * Another session that concurrently acquires an existing slot on the target DB
+ * (most likely to drop it) may cause this function to ERROR. If that happens
+ * it may have dropped some but not all slots.
+ */
+void
+ReplicationSlotsDropDBSlots(Oid dboid)
+{
+ int i;
+
+ if (max_replication_slots <= 0)
+ return;
+
+restart:
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s;
+ NameData slotname;
+ int active_pid;
+
+ s = &ReplicationSlotCtl->replication_slots[i];
+
+ /* cannot change while ReplicationSlotCtlLock is held */
+ if (!s->in_use)
+ continue;
+
+ /* only logical slots are database specific, skip */
+ if (!SlotIsLogical(s))
+ continue;
+
+ /* not our database, skip */
+ if (s->data.database != dboid)
+ continue;
+
+ /* Claim the slot, as if ReplicationSlotAcquire()ing. */
+ SpinLockAcquire(&s->mutex);
+ strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
+ NameStr(slotname)[NAMEDATALEN-1] = '\0';
+ active_pid = s->active_pid;
+ if (active_pid == 0)
+ {
+ MyReplicationSlot = s;
+ s->active_pid = MyProcPid;
+ }
+ SpinLockRelease(&s->mutex);
+
+ /*
+ * We might fail here if the slot was active. Even though we hold an
+ * exclusive lock on the database object a logical slot for that DB can
+ * still be active if it's being dropped by a backend connected to
+ * another DB or is otherwise acquired.
+ *
+ * It's an unlikely race that'll only arise from concurrent user action,
+ * so we'll just bail out.
+ */
+ if (active_pid)
+ elog(ERROR, "replication slot %s is in use by pid %d",
+ NameStr(slotname), active_pid);
+
+ /*
+ * To avoid largely duplicating ReplicationSlotDropAcquired() or
+ * complicating it with already_locked flags for ProcArrayLock,
+ * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
+ * just release our ReplicationSlotControlLock to drop the slot.
+ *
+ * For safety we'll restart our scan from the beginning each
+ * time we release the lock.
+ */
+ LWLockRelease(ReplicationSlotControlLock);
+ ReplicationSlotDropAcquired();
+ goto restart;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /* recompute limits once after all slots are dropped */
+ ReplicationSlotsComputeRequiredXmin(false);
+ ReplicationSlotsComputeRequiredLSN();
+}
+
/*
* Check whether the server's configuration supports using replication