diff options
| author | Simon Riggs | 2017-03-28 14:05:21 +0000 |
|---|---|---|
| committer | Simon Riggs | 2017-03-28 14:05:21 +0000 |
| commit | ff539da31691f2cd2694360250571c5c5fb7415e (patch) | |
| tree | 494ffccc465ca33af0efd5dc764a169c66fb4bc1 /src/backend | |
| parent | 4d33a7f2e714848ca7b5b7ef8e244eead078ca13 (diff) | |
Cleanup slots during drop database
Automatically drop all logical replication slots associated with a
database when the database is dropped. Previously we threw an ERROR
if a slot existed. Now we throw ERROR only if a slot is active in
the database being dropped.
Craig Ringer
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/commands/dbcommands.c | 32 | ||||
| -rw-r--r-- | src/backend/replication/slot.c | 88 |
2 files changed, 111 insertions, 9 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 5a63b1abcbe..c0ba2b451a7 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok) errmsg("cannot drop the currently open database"))); /* - * Check whether there are, possibly unconnected, logical slots that refer - * to the to-be-dropped database. The database lock we are holding - * prevents the creation of new slots using the database. + * Check whether there are active logical slots that refer to the + * to-be-dropped database. The database lock we are holding prevents the + * creation of new slots using the database or existing slots becoming + * active. */ - if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active)) + (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active); + if (nslots_active) + { ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("database \"%s\" is used by a logical replication slot", + errmsg("database \"%s\" is used by an active logical replication slot", dbname), - errdetail_plural("There is %d slot, %d of them active.", - "There are %d slots, %d of them active.", - nslots, - nslots, nslots_active))); + errdetail_plural("There is %d active slot", + "There are %d active slots", + nslots_active, nslots_active))); + } /* * Check for other backends in the target database. (Because we hold the @@ -915,6 +918,11 @@ dropdb(const char *dbname, bool missing_ok) dropDatabaseDependencies(db_id); /* + * Drop db-specific replication slots. + */ + ReplicationSlotsDropDBSlots(db_id); + + /* * Drop pages for this database that are in the shared buffer cache. This * is important to ensure that no remaining backend tries to write out a * dirty buffer to the dead database later... @@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record) * InitPostgres() cannot fully re-execute concurrently. This * avoids backends re-connecting automatically to same database, * which can happen in some cases. + * + * This will lock out walsenders trying to connect to db-specific + * slots for logical decoding too, so it's safe for us to drop slots. */ LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock); ResolveRecoveryConflictWithDatabase(xlrec->db_id); } + /* Drop any database-specific replication slots */ + ReplicationSlotsDropDBSlots(xlrec->db_id); + /* Drop pages for this database that are in the shared buffer cache */ DropDatabaseBuffers(xlrec->db_id); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 5237a9fb078..6c5ec7a00e8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) return false; } +/* + * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the + * passed database oid. The caller should hold an exclusive lock on the + * pg_database oid for the database to prevent creation of new slots on the db + * or replay from existing slots. + * + * This routine isn't as efficient as it could be - but we don't drop databases + * often, especially databases with lots of slots. + * + * Another session that concurrently acquires an existing slot on the target DB + * (most likely to drop it) may cause this function to ERROR. If that happens + * it may have dropped some but not all slots. + */ +void +ReplicationSlotsDropDBSlots(Oid dboid) +{ + int i; + + if (max_replication_slots <= 0) + return; + +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + NameData slotname; + int active_pid; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* only logical slots are database specific, skip */ + if (!SlotIsLogical(s)) + continue; + + /* not our database, skip */ + if (s->data.database != dboid) + continue; + + /* Claim the slot, as if ReplicationSlotAcquire()ing. */ + SpinLockAcquire(&s->mutex); + strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN); + NameStr(slotname)[NAMEDATALEN-1] = '\0'; + active_pid = s->active_pid; + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + } + SpinLockRelease(&s->mutex); + + /* + * We might fail here if the slot was active. Even though we hold an + * exclusive lock on the database object a logical slot for that DB can + * still be active if it's being dropped by a backend connected to + * another DB or is otherwise acquired. + * + * It's an unlikely race that'll only arise from concurrent user action, + * so we'll just bail out. + */ + if (active_pid) + elog(ERROR, "replication slot %s is in use by pid %d", + NameStr(slotname), active_pid); + + /* + * To avoid largely duplicating ReplicationSlotDropAcquired() or + * complicating it with already_locked flags for ProcArrayLock, + * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we + * just release our ReplicationSlotControlLock to drop the slot. + * + * For safety we'll restart our scan from the beginning each + * time we release the lock. + */ + LWLockRelease(ReplicationSlotControlLock); + ReplicationSlotDropAcquired(); + goto restart; + } + LWLockRelease(ReplicationSlotControlLock); + + /* recompute limits once after all slots are dropped */ + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); +} + /* * Check whether the server's configuration supports using replication |
