Use atomics to avoid locking in InjectionPointRun()
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 15 Jul 2024 07:21:16 +0000 (10:21 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 15 Jul 2024 07:22:11 +0000 (10:22 +0300)
This allows using injection points without having a PGPROC, like early
at backend startup, or in the postmaster.

The injection points facility is new in v17, so backpatch there.

Reviewed-by: Michael Paquier <michael@paquier.xyz>
Disussion: https://www.postgresql.org/message-id/4317a7f7-8d24-435e-9e49-29b72a3dc418@iki.fi

src/backend/utils/misc/injection_point.c
src/tools/pgindent/typedefs.list

index 48f29e9b60ab79390187921262abec6245c16ff3..84ad5e470d7eb723bf0a8501033ba35b08970c4f 100644 (file)
@@ -21,7 +21,6 @@
 
 #include "fmgr.h"
 #include "miscadmin.h"
-#include "port/pg_bitutils.h"
 #include "storage/fd.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 
 #ifdef USE_INJECTION_POINTS
 
-/*
- * Hash table for storing injection points.
- *
- * InjectionPointHash is used to find an injection point by name.
- */
-static HTAB *InjectionPointHash;       /* find points from names */
-
 /* Field sizes */
 #define INJ_NAME_MAXLEN                64
 #define INJ_LIB_MAXLEN         128
 #define INJ_FUNC_MAXLEN                128
 #define INJ_PRIVATE_MAXLEN     1024
 
-/* Single injection point stored in InjectionPointHash */
+/* Single injection point stored in shared memory */
 typedef struct InjectionPointEntry
 {
+       /*
+        * Because injection points need to be usable without LWLocks, we use a
+        * generation counter on each entry to allow safe, lock-free reading.
+        *
+        * To read an entry, first read the current 'generation' value.  If it's
+        * even, then the slot is currently unused, and odd means it's in use.
+        * When reading the other fields, beware that they may change while
+        * reading them, if the entry is released and reused!  After reading the
+        * other fields, read 'generation' again: if its value hasn't changed, you
+        * can be certain that the other fields you read are valid.  Otherwise,
+        * the slot was concurrently recycled, and you should ignore it.
+        *
+        * When adding an entry, you must store all the other fields first, and
+        * then update the generation number, with an appropriate memory barrier
+        * in between. In addition to that protocol, you must also hold
+        * InjectionPointLock, to prevent two backends from modifying the array at
+        * the same time.
+        */
+       pg_atomic_uint64 generation;
+
        char            name[INJ_NAME_MAXLEN];  /* hash key */
        char            library[INJ_LIB_MAXLEN];        /* library */
        char            function[INJ_FUNC_MAXLEN];      /* function */
@@ -58,8 +70,22 @@ typedef struct InjectionPointEntry
        char            private_data[INJ_PRIVATE_MAXLEN];
 } InjectionPointEntry;
 
-#define INJECTION_POINT_HASH_INIT_SIZE 16
-#define INJECTION_POINT_HASH_MAX_SIZE  128
+#define MAX_INJECTION_POINTS   128
+
+/*
+ * Shared memory array of active injection points.
+ *
+ * 'max_inuse' is the highest index currently in use, plus one.  It's just an
+ * optimization to avoid scanning through the whole entry, in the common case
+ * that there are no injection points, or only a few.
+ */
+typedef struct InjectionPointsCtl
+{
+       pg_atomic_uint32 max_inuse;
+       InjectionPointEntry entries[MAX_INJECTION_POINTS];
+} InjectionPointsCtl;
+
+static InjectionPointsCtl *ActiveInjectionPoints;
 
 /*
  * Backend local cache of injection callbacks already loaded, stored in
@@ -70,6 +96,14 @@ typedef struct InjectionPointCacheEntry
        char            name[INJ_NAME_MAXLEN];
        char            private_data[INJ_PRIVATE_MAXLEN];
        InjectionPointCallback callback;
+
+       /*
+        * Shmem slot and copy of its generation number when this cache entry was
+        * created.  They can be used to validate if the cached entry is still
+        * valid.
+        */
+       int                     slot_idx;
+       uint64          generation;
 } InjectionPointCacheEntry;
 
 static HTAB *InjectionPointCache = NULL;
@@ -79,8 +113,10 @@ static HTAB *InjectionPointCache = NULL;
  *
  * Add an injection point to the local cache.
  */
-static void
+static InjectionPointCacheEntry *
 injection_point_cache_add(const char *name,
+                                                 int slot_idx,
+                                                 uint64 generation,
                                                  InjectionPointCallback callback,
                                                  const void *private_data)
 {
@@ -97,7 +133,7 @@ injection_point_cache_add(const char *name,
                hash_ctl.hcxt = TopMemoryContext;
 
                InjectionPointCache = hash_create("InjectionPoint cache hash",
-                                                                                 INJECTION_POINT_HASH_MAX_SIZE,
+                                                                                 MAX_INJECTION_POINTS,
                                                                                  &hash_ctl,
                                                                                  HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
        }
@@ -107,9 +143,12 @@ injection_point_cache_add(const char *name,
 
        Assert(!found);
        strlcpy(entry->name, name, sizeof(entry->name));
+       entry->slot_idx = slot_idx;
+       entry->generation = generation;
        entry->callback = callback;
-       if (private_data != NULL)
-               memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+       memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+
+       return entry;
 }
 
 /*
@@ -122,11 +161,10 @@ injection_point_cache_add(const char *name,
 static void
 injection_point_cache_remove(const char *name)
 {
-       /* leave if no cache */
-       if (InjectionPointCache == NULL)
-               return;
+       bool            found PG_USED_FOR_ASSERTS_ONLY;
 
-       (void) hash_search(InjectionPointCache, name, HASH_REMOVE, NULL);
+       (void) hash_search(InjectionPointCache, name, HASH_REMOVE, &found);
+       Assert(found);
 }
 
 /*
@@ -134,29 +172,32 @@ injection_point_cache_remove(const char *name)
  *
  * Load an injection point into the local cache.
  */
-static void
-injection_point_cache_load(InjectionPointEntry *entry_by_name)
+static InjectionPointCacheEntry *
+injection_point_cache_load(InjectionPointEntry *entry, int slot_idx, uint64 generation)
 {
        char            path[MAXPGPATH];
        void       *injection_callback_local;
 
        snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path,
-                        entry_by_name->library, DLSUFFIX);
+                        entry->library, DLSUFFIX);
 
        if (!pg_file_exists(path))
                elog(ERROR, "could not find library \"%s\" for injection point \"%s\"",
-                        path, entry_by_name->name);
+                        path, entry->name);
 
        injection_callback_local = (void *)
-               load_external_function(path, entry_by_name->function, false, NULL);
+               load_external_function(path, entry->function, false, NULL);
 
        if (injection_callback_local == NULL)
                elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"",
-                        entry_by_name->function, path, entry_by_name->name);
-
-       /* add it to the local cache when found */
-       injection_point_cache_add(entry_by_name->name, injection_callback_local,
-                                                         entry_by_name->private_data);
+                        entry->function, path, entry->name);
+
+       /* add it to the local cache */
+       return injection_point_cache_add(entry->name,
+                                                                        slot_idx,
+                                                                        generation,
+                                                                        injection_callback_local,
+                                                                        entry->private_data);
 }
 
 /*
@@ -193,8 +234,7 @@ InjectionPointShmemSize(void)
 #ifdef USE_INJECTION_POINTS
        Size            sz = 0;
 
-       sz = add_size(sz, hash_estimate_size(INJECTION_POINT_HASH_MAX_SIZE,
-                                                                                sizeof(InjectionPointEntry)));
+       sz = add_size(sz, sizeof(InjectionPointsCtl));
        return sz;
 #else
        return 0;
@@ -208,16 +248,20 @@ void
 InjectionPointShmemInit(void)
 {
 #ifdef USE_INJECTION_POINTS
-       HASHCTL         info;
-
-       /* key is a NULL-terminated string */
-       info.keysize = sizeof(char[INJ_NAME_MAXLEN]);
-       info.entrysize = sizeof(InjectionPointEntry);
-       InjectionPointHash = ShmemInitHash("InjectionPoint hash",
-                                                                          INJECTION_POINT_HASH_INIT_SIZE,
-                                                                          INJECTION_POINT_HASH_MAX_SIZE,
-                                                                          &info,
-                                                                          HASH_ELEM | HASH_FIXED_SIZE | HASH_STRINGS);
+       bool            found;
+
+       ActiveInjectionPoints = ShmemInitStruct("InjectionPoint hash",
+                                                                                       sizeof(InjectionPointsCtl),
+                                                                                       &found);
+       if (!IsUnderPostmaster)
+       {
+               Assert(!found);
+               pg_atomic_init_u32(&ActiveInjectionPoints->max_inuse, 0);
+               for (int i = 0; i < MAX_INJECTION_POINTS; i++)
+                       pg_atomic_init_u64(&ActiveInjectionPoints->entries[i].generation, 0);
+       }
+       else
+               Assert(found);
 #endif
 }
 
@@ -232,8 +276,10 @@ InjectionPointAttach(const char *name,
                                         int private_data_size)
 {
 #ifdef USE_INJECTION_POINTS
-       InjectionPointEntry *entry_by_name;
-       bool            found;
+       InjectionPointEntry *entry;
+       uint64          generation;
+       uint32          max_inuse;
+       int                     free_idx;
 
        if (strlen(name) >= INJ_NAME_MAXLEN)
                elog(ERROR, "injection point name %s too long (maximum of %u)",
@@ -253,21 +299,51 @@ InjectionPointAttach(const char *name,
         * exist.  For testing purposes this should be fine.
         */
        LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-       entry_by_name = (InjectionPointEntry *)
-               hash_search(InjectionPointHash, name,
-                                       HASH_ENTER, &found);
-       if (found)
-               elog(ERROR, "injection point \"%s\" already defined", name);
+       max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+       free_idx = -1;
+
+       for (int idx = 0; idx < max_inuse; idx++)
+       {
+               entry = &ActiveInjectionPoints->entries[idx];
+               generation = pg_atomic_read_u64(&entry->generation);
+               if (generation % 2 == 0)
+               {
+                       /*
+                        * Found a free slot where we can add the new entry, but keep
+                        * going so that we will find out if the entry already exists.
+                        */
+                       if (free_idx == -1)
+                               free_idx = idx;
+               }
+
+               if (strcmp(entry->name, name) == 0)
+                       elog(ERROR, "injection point \"%s\" already defined", name);
+       }
+       if (free_idx == -1)
+       {
+               if (max_inuse == MAX_INJECTION_POINTS)
+                       elog(ERROR, "too many injection points");
+               free_idx = max_inuse;
+       }
+       entry = &ActiveInjectionPoints->entries[free_idx];
+       generation = pg_atomic_read_u64(&entry->generation);
+       Assert(generation % 2 == 0);
 
        /* Save the entry */
-       strlcpy(entry_by_name->name, name, sizeof(entry_by_name->name));
-       entry_by_name->name[INJ_NAME_MAXLEN - 1] = '\0';
-       strlcpy(entry_by_name->library, library, sizeof(entry_by_name->library));
-       entry_by_name->library[INJ_LIB_MAXLEN - 1] = '\0';
-       strlcpy(entry_by_name->function, function, sizeof(entry_by_name->function));
-       entry_by_name->function[INJ_FUNC_MAXLEN - 1] = '\0';
+       strlcpy(entry->name, name, sizeof(entry->name));
+       entry->name[INJ_NAME_MAXLEN - 1] = '\0';
+       strlcpy(entry->library, library, sizeof(entry->library));
+       entry->library[INJ_LIB_MAXLEN - 1] = '\0';
+       strlcpy(entry->function, function, sizeof(entry->function));
+       entry->function[INJ_FUNC_MAXLEN - 1] = '\0';
        if (private_data != NULL)
-               memcpy(entry_by_name->private_data, private_data, private_data_size);
+               memcpy(entry->private_data, private_data, private_data_size);
+
+       pg_write_barrier();
+       pg_atomic_write_u64(&entry->generation, generation + 1);
+
+       if (free_idx + 1 > max_inuse)
+               pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, free_idx + 1);
 
        LWLockRelease(InjectionPointLock);
 
@@ -285,63 +361,177 @@ bool
 InjectionPointDetach(const char *name)
 {
 #ifdef USE_INJECTION_POINTS
-       bool            found;
+       bool            found = false;
+       int                     idx;
+       int                     max_inuse;
 
        LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-       hash_search(InjectionPointHash, name, HASH_REMOVE, &found);
-       LWLockRelease(InjectionPointLock);
 
-       if (!found)
-               return false;
+       /* Find it in the shmem array, and mark the slot as unused */
+       max_inuse = (int) pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+       for (idx = max_inuse - 1; idx >= 0; --idx)
+       {
+               InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+               uint64          generation;
+
+               generation = pg_atomic_read_u64(&entry->generation);
+               if (generation % 2 == 0)
+                       continue;                       /* empty slot */
+
+               if (strcmp(entry->name, name) == 0)
+               {
+                       Assert(!found);
+                       found = true;
+                       pg_atomic_write_u64(&entry->generation, generation + 1);
+                       break;
+               }
+       }
+
+       /* If we just removed the highest-numbered entry, update 'max_inuse' */
+       if (found && idx == max_inuse - 1)
+       {
+               for (; idx >= 0; --idx)
+               {
+                       InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+                       uint64          generation;
+
+                       generation = pg_atomic_read_u64(&entry->generation);
+                       if (generation % 2 != 0)
+                               break;
+               }
+               pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, idx + 1);
+       }
+       LWLockRelease(InjectionPointLock);
 
-       return true;
+       return found;
 #else
        elog(ERROR, "Injection points are not supported by this build");
        return true;                            /* silence compiler */
 #endif
 }
 
+#ifdef USE_INJECTION_POINTS
 /*
- * Load an injection point into the local cache.
+ * Common workhorse of InjectionPointRun() and InjectionPointLoad()
  *
- * This is useful to be able to load an injection point before running it,
- * especially if the injection point is called in a code path where memory
- * allocations cannot happen, like critical sections.
+ * Checks if an injection point exists in shared memory, and update
+ * the local cache entry accordingly.
  */
-void
-InjectionPointLoad(const char *name)
+static InjectionPointCacheEntry *
+InjectionPointCacheRefresh(const char *name)
 {
-#ifdef USE_INJECTION_POINTS
-       InjectionPointEntry *entry_by_name;
-       bool            found;
+       uint32          max_inuse;
+       int                     namelen;
+       InjectionPointEntry local_copy;
+       InjectionPointCacheEntry *cached;
 
-       LWLockAcquire(InjectionPointLock, LW_SHARED);
-       entry_by_name = (InjectionPointEntry *)
-               hash_search(InjectionPointHash, name,
-                                       HASH_FIND, &found);
+       /*
+        * First read the number of in-use slots.  More entries can be added or
+        * existing ones can be removed while we're reading them.  If the entry
+        * we're looking for is concurrently added or removed, we might or might
+        * not see it.  That's OK.
+        */
+       max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+       if (max_inuse == 0)
+       {
+               if (InjectionPointCache)
+               {
+                       hash_destroy(InjectionPointCache);
+                       InjectionPointCache = NULL;
+               }
+               return NULL;
+       }
 
        /*
-        * If not found, do nothing and remove it from the local cache if it
-        * existed there.
+        * If we have this entry in the local cache already, check if the cached
+        * entry is still valid.
         */
-       if (!found)
+       cached = injection_point_cache_get(name);
+       if (cached)
        {
+               int                     idx = cached->slot_idx;
+               InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+
+               if (pg_atomic_read_u64(&entry->generation) == cached->generation)
+               {
+                       /* still good */
+                       return cached;
+               }
                injection_point_cache_remove(name);
-               LWLockRelease(InjectionPointLock);
-               return;
+               cached = NULL;
        }
 
-       /* Check first the local cache, and leave if this entry exists. */
-       if (injection_point_cache_get(name) != NULL)
+       /*
+        * Search the shared memory array.
+        *
+        * It's possible that the entry we're looking for is concurrently detached
+        * or attached.  Or detached *and* re-attached, to the same slot or a
+        * different slot.  Detach and re-attach is not an atomic operation, so
+        * it's OK for us to return the old value, NULL, or the new value in such
+        * cases.
+        */
+       namelen = strlen(name);
+       for (int idx = 0; idx < max_inuse; idx++)
        {
-               LWLockRelease(InjectionPointLock);
-               return;
+               InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+               uint64          generation;
+
+               /*
+                * Read the generation number so that we can detect concurrent
+                * modifications.  The read barrier ensures that the generation number
+                * is loaded before any of the other fields.
+                */
+               generation = pg_atomic_read_u64(&entry->generation);
+               if (generation % 2 == 0)
+                       continue;                       /* empty slot */
+               pg_read_barrier();
+
+               /* Is this the injection point we're looking for? */
+               if (memcmp(entry->name, name, namelen + 1) != 0)
+                       continue;
+
+               /*
+                * The entry can change at any time, if the injection point is
+                * concurrently detached.  Copy it to local memory, and re-check the
+                * generation.  If the generation hasn't changed, we know our local
+                * copy is coherent.
+                */
+               memcpy(&local_copy, entry, sizeof(InjectionPointEntry));
+
+               pg_read_barrier();
+               if (pg_atomic_read_u64(&entry->generation) != generation)
+               {
+                       /*
+                        * The entry was concurrently detached.
+                        *
+                        * Continue the search, because if the generation number changed,
+                        * we cannot trust the result of the name comparison we did above.
+                        * It's theoretically possible that it falsely matched a mixed-up
+                        * state of the old and new name, if the slot was recycled with a
+                        * different name.
+                        */
+                       continue;
+               }
+
+               /* Success! Load it into the cache and return it */
+               return injection_point_cache_load(&local_copy, idx, generation);
        }
+       return NULL;
+}
+#endif
 
-       /* Nothing?  Then load it and leave */
-       injection_point_cache_load(entry_by_name);
-
-       LWLockRelease(InjectionPointLock);
+/*
+ * Load an injection point into the local cache.
+ *
+ * This is useful to be able to load an injection point before running it,
+ * especially if the injection point is called in a code path where memory
+ * allocations cannot happen, like critical sections.
+ */
+void
+InjectionPointLoad(const char *name)
+{
+#ifdef USE_INJECTION_POINTS
+       InjectionPointCacheRefresh(name);
 #else
        elog(ERROR, "Injection points are not supported by this build");
 #endif
@@ -349,50 +539,16 @@ InjectionPointLoad(const char *name)
 
 /*
  * Execute an injection point, if defined.
- *
- * Check first the shared hash table, and adapt the local cache depending
- * on that as it could be possible that an entry to run has been removed.
  */
 void
 InjectionPointRun(const char *name)
 {
 #ifdef USE_INJECTION_POINTS
-       InjectionPointEntry *entry_by_name;
-       bool            found;
        InjectionPointCacheEntry *cache_entry;
 
-       LWLockAcquire(InjectionPointLock, LW_SHARED);
-       entry_by_name = (InjectionPointEntry *)
-               hash_search(InjectionPointHash, name,
-                                       HASH_FIND, &found);
-
-       /*
-        * If not found, do nothing and remove it from the local cache if it
-        * existed there.
-        */
-       if (!found)
-       {
-               injection_point_cache_remove(name);
-               LWLockRelease(InjectionPointLock);
-               return;
-       }
-
-       /*
-        * Check if the callback exists in the local cache, to avoid unnecessary
-        * external loads.
-        */
-       if (injection_point_cache_get(name) == NULL)
-       {
-               /* not found in local cache, so load and register it */
-               injection_point_cache_load(entry_by_name);
-       }
-
-       /* Now loaded, so get it. */
-       cache_entry = injection_point_cache_get(name);
-
-       LWLockRelease(InjectionPointLock);
-
-       cache_entry->callback(name, cache_entry->private_data);
+       cache_entry = InjectionPointCacheRefresh(name);
+       if (cache_entry)
+               cache_entry->callback(name, cache_entry->private_data);
 #else
        elog(ERROR, "Injection points are not supported by this build");
 #endif
index 635e6d6e21545954a9332fa55af26b0b0ab81e3d..b4d7f9217cec458de320d2ba640f9bcbc0e17b51 100644 (file)
@@ -1239,6 +1239,7 @@ InjectionPointCallback
 InjectionPointCondition
 InjectionPointConditionType
 InjectionPointEntry
+InjectionPointsCtl
 InjectionPointSharedState
 InlineCodeBlock
 InsertStmt