Don't call palloc() while holding a spinlock, either.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 3 Jun 2020 16:36:00 +0000 (12:36 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 3 Jun 2020 16:36:23 +0000 (12:36 -0400)
Fix some more violations of the "only straight-line code inside a
spinlock" rule.  These are hazardous not only because they risk
holding the lock for an excessively long time, but because it's
possible for palloc to throw elog(ERROR), leaving a stuck spinlock
behind.

copy_replication_slot() had two separate places that did pallocs
while holding a spinlock.  We can make the code simpler and safer
by copying the whole ReplicationSlot struct into a local variable
while holding the spinlock, and then referencing that copy.
(While that's arguably more cycles than we really need to spend
holding the lock, the struct isn't all that big, and this way seems
far more maintainable than copying fields piecemeal.  Anyway this
is surely much cheaper than a palloc.)  That bug goes back to v12.

InvalidateObsoleteReplicationSlots() not only did a palloc while
holding a spinlock, but for extra sloppiness then leaked the memory
--- probably for the lifetime of the checkpointer process, though
I didn't try to verify that.  Fortunately that silliness is new
in HEAD.

pg_get_replication_slots() had a cosmetic violation of the rule,
in that it only assumed it's safe to call namecpy() while holding
a spinlock.  Still, that's a hazard waiting to bite somebody, and
there were some other cosmetic coding-rule violations in the same
function, so clean it up.  I back-patched this as far as v10; the
code exists before that but it looks different, and this didn't
seem important enough to adapt the patch further back.

Discussion: https://postgr.es/m/20200602.161518.1399689010416646074.horikyota.ntt@gmail.com

src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/include/replication/slot.h

index e1a2d8a7ae7cda7f921663c45f4eb6f2cf073177..505445f2dc840fd6d54e33473b325d6b38269f85 100644 (file)
@@ -1099,7 +1099,7 @@ restart:
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                XLogRecPtr      restart_lsn = InvalidXLogRecPtr;
-               char       *slotname;
+               NameData        slotname;
 
                if (!s->in_use)
                        continue;
@@ -1112,7 +1112,7 @@ restart:
                        continue;
                }
 
-               slotname = pstrdup(NameStr(s->data.name));
+               slotname = s->data.name;
                restart_lsn = s->data.restart_lsn;
 
                SpinLockRelease(&s->mutex);
@@ -1120,7 +1120,8 @@ restart:
 
                for (;;)
                {
-                       int                     wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
+                       int                     wspid = ReplicationSlotAcquire(NameStr(slotname),
+                                                                                                          SAB_Inquire);
 
                        /* no walsender? success! */
                        if (wspid == 0)
@@ -1128,7 +1129,7 @@ restart:
 
                        ereport(LOG,
                                        (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
-                                                       wspid, slotname)));
+                                                       wspid, NameStr(slotname))));
                        (void) kill(wspid, SIGTERM);
 
                        ConditionVariableTimedSleep(&s->active_cv, 10,
@@ -1138,7 +1139,7 @@ restart:
 
                ereport(LOG,
                                (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
-                                               slotname,
+                                               NameStr(slotname),
                                                (uint32) (restart_lsn >> 32),
                                                (uint32) restart_lsn)));
 
index 6fed3cfd23b077e260839ee3c7211dec9e3da653..1b929a603e51de5f196b6ed1470e9d40daf07518 100644 (file)
@@ -278,18 +278,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
        for (slotno = 0; slotno < max_replication_slots; slotno++)
        {
                ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+               ReplicationSlot slot_contents;
                Datum           values[PG_GET_REPLICATION_SLOTS_COLS];
                bool            nulls[PG_GET_REPLICATION_SLOTS_COLS];
-
-               ReplicationSlotPersistency persistency;
-               TransactionId xmin;
-               TransactionId catalog_xmin;
-               XLogRecPtr      restart_lsn;
-               XLogRecPtr      confirmed_flush_lsn;
-               pid_t           active_pid;
-               Oid                     database;
-               NameData        slot_name;
-               NameData        plugin;
                WALAvailability walstate;
                XLogSegNo       last_removed_seg;
                int                     i;
@@ -297,69 +288,61 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                if (!slot->in_use)
                        continue;
 
+               /* Copy slot contents while holding spinlock, then examine at leisure */
                SpinLockAcquire(&slot->mutex);
-
-               xmin = slot->data.xmin;
-               catalog_xmin = slot->data.catalog_xmin;
-               database = slot->data.database;
-               restart_lsn = slot->data.restart_lsn;
-               confirmed_flush_lsn = slot->data.confirmed_flush;
-               namecpy(&slot_name, &slot->data.name);
-               namecpy(&plugin, &slot->data.plugin);
-               active_pid = slot->active_pid;
-               persistency = slot->data.persistency;
-
+               slot_contents = *slot;
                SpinLockRelease(&slot->mutex);
 
+               memset(values, 0, sizeof(values));
                memset(nulls, 0, sizeof(nulls));
 
                i = 0;
-               values[i++] = NameGetDatum(&slot_name);
+               values[i++] = NameGetDatum(&slot_contents.data.name);
 
-               if (database == InvalidOid)
+               if (slot_contents.data.database == InvalidOid)
                        nulls[i++] = true;
                else
-                       values[i++] = NameGetDatum(&plugin);
+                       values[i++] = NameGetDatum(&slot_contents.data.plugin);
 
-               if (database == InvalidOid)
+               if (slot_contents.data.database == InvalidOid)
                        values[i++] = CStringGetTextDatum("physical");
                else
                        values[i++] = CStringGetTextDatum("logical");
 
-               if (database == InvalidOid)
+               if (slot_contents.data.database == InvalidOid)
                        nulls[i++] = true;
                else
-                       values[i++] = database;
+                       values[i++] = ObjectIdGetDatum(slot_contents.data.database);
 
-               values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
-               values[i++] = BoolGetDatum(active_pid != 0);
+               values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
+               values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
 
-               if (active_pid != 0)
-                       values[i++] = Int32GetDatum(active_pid);
+               if (slot_contents.active_pid != 0)
+                       values[i++] = Int32GetDatum(slot_contents.active_pid);
                else
                        nulls[i++] = true;
 
-               if (xmin != InvalidTransactionId)
-                       values[i++] = TransactionIdGetDatum(xmin);
+               if (slot_contents.data.xmin != InvalidTransactionId)
+                       values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
                else
                        nulls[i++] = true;
 
-               if (catalog_xmin != InvalidTransactionId)
-                       values[i++] = TransactionIdGetDatum(catalog_xmin);
+               if (slot_contents.data.catalog_xmin != InvalidTransactionId)
+                       values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
                else
                        nulls[i++] = true;
 
-               if (restart_lsn != InvalidXLogRecPtr)
-                       values[i++] = LSNGetDatum(restart_lsn);
+               if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+                       values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
                else
                        nulls[i++] = true;
 
-               if (confirmed_flush_lsn != InvalidXLogRecPtr)
-                       values[i++] = LSNGetDatum(confirmed_flush_lsn);
+               if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+                       values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
                else
                        nulls[i++] = true;
 
-               walstate = GetWALAvailability(restart_lsn);
+               walstate = GetWALAvailability(slot_contents.data.restart_lsn);
 
                switch (walstate)
                {
@@ -378,6 +361,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                        case WALAVAIL_REMOVED:
                                values[i++] = CStringGetTextDatum("lost");
                                break;
+
+                       default:
+                               elog(ERROR, "invalid walstate: %d", (int) walstate);
                }
 
                if (max_slot_wal_keep_size_mb >= 0 &&
@@ -393,8 +379,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                else
                        nulls[i++] = true;
 
+               Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
+
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
+
        LWLockRelease(ReplicationSlotControlLock);
 
        tuplestore_donestoring(tupstore);
@@ -653,6 +642,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
        Name            src_name = PG_GETARG_NAME(0);
        Name            dst_name = PG_GETARG_NAME(1);
        ReplicationSlot *src = NULL;
+       ReplicationSlot first_slot_contents;
+       ReplicationSlot second_slot_contents;
        XLogRecPtr      src_restart_lsn;
        bool            src_islogical;
        bool            temporary;
@@ -692,13 +683,10 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
                if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
                {
+                       /* Copy the slot contents while holding spinlock */
                        SpinLockAcquire(&s->mutex);
-                       src_islogical = SlotIsLogical(s);
-                       src_restart_lsn = s->data.restart_lsn;
-                       temporary = s->data.persistency == RS_TEMPORARY;
-                       plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
+                       first_slot_contents = *s;
                        SpinLockRelease(&s->mutex);
-
                        src = s;
                        break;
                }
@@ -711,6 +699,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
 
+       src_islogical = SlotIsLogical(&first_slot_contents);
+       src_restart_lsn = first_slot_contents.data.restart_lsn;
+       temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
+       plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
+
        /* Check type of replication slot */
        if (src_islogical != logical_slot)
                ereport(ERROR,
@@ -775,18 +768,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
                /* Copy data of source slot again */
                SpinLockAcquire(&src->mutex);
-               copy_effective_xmin = src->effective_xmin;
-               copy_effective_catalog_xmin = src->effective_catalog_xmin;
+               second_slot_contents = *src;
+               SpinLockRelease(&src->mutex);
 
-               copy_xmin = src->data.xmin;
-               copy_catalog_xmin = src->data.catalog_xmin;
-               copy_restart_lsn = src->data.restart_lsn;
-               copy_confirmed_flush = src->data.confirmed_flush;
+               copy_effective_xmin = second_slot_contents.effective_xmin;
+               copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
+
+               copy_xmin = second_slot_contents.data.xmin;
+               copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
+               copy_restart_lsn = second_slot_contents.data.restart_lsn;
+               copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
 
                /* for existence check */
-               copy_name = pstrdup(NameStr(src->data.name));
-               copy_islogical = SlotIsLogical(src);
-               SpinLockRelease(&src->mutex);
+               copy_name = NameStr(second_slot_contents.data.name);
+               copy_islogical = SlotIsLogical(&second_slot_contents);
 
                /*
                 * Check if the source slot still exists and is valid. We regard it as
index 149210bae45a4fccfbeae94608fe3be0955d399d..e7649d4723e700f08c21e3d1cf1b10ce41f37290 100644 (file)
@@ -158,8 +158,8 @@ typedef struct ReplicationSlot
        XLogRecPtr      candidate_restart_lsn;
 } ReplicationSlot;
 
-#define SlotIsPhysical(slot) (slot->data.database == InvalidOid)
-#define SlotIsLogical(slot) (slot->data.database != InvalidOid)
+#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
+#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
 
 /*
  * Shared memory control area for all of replication slots.