Use the pairing heap instead of a flat array for LSN replay waiters
authorAlexander Korotkov <akorotkov@postgresql.org>
Wed, 3 Apr 2024 15:15:17 +0000 (18:15 +0300)
committerAlexander Korotkov <akorotkov@postgresql.org>
Wed, 3 Apr 2024 15:15:41 +0000 (18:15 +0300)
06c418e163 introduced pg_wal_replay_wait() procedure allowing to wait for
the particular LSN to be replayed on standby.  The waiters were stored in
the flat array.  Even though scanning small arrays is fast, that might be a
problem at scale (a lot of waiting processes).

This commit replaces the flat shared memory array with the pairing heap,
which holds the waiter with the least LSN at the top.  This gives us O(log N)
complexity for both inserting and removing waiters.

Reported-by: Alvaro Herrera
Discussion: https://postgr.es/m/202404030658.hhj3vfxeyhft%40alvherre.pgsql

src/backend/access/transam/xlogrecovery.c
src/backend/commands/waitlsn.c
src/backend/lib/pairingheap.c
src/include/commands/waitlsn.h
src/include/lib/pairingheap.h

index 24ab1b2b21364adf6be913022ad13cf5a053ad36..b2fe2d04ccf8aa57b776b53fdb43e43669f4cff9 100644 (file)
@@ -1836,7 +1836,7 @@ PerformWalRecovery(void)
             */
            if (waitLSN &&
                (XLogRecoveryCtl->lastReplayedEndRecPtr >=
-                pg_atomic_read_u64(&waitLSN->minLSN)))
+                pg_atomic_read_u64(&waitLSN->minWaitedLSN)))
                WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
 
            /* Else, try to fetch the next WAL record */
index 63e9ebf1730c2ee5ce726d6d197bc9e7a631a24e..51a34d422e237d4eab5b12ceaf124b0d8b900d19 100644 (file)
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * waitlsn.c
- *   Implements waiting for the given LSN, which is used in
+ *   Implements waiting for the given replay LSN, which is used in
  *   CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
  *
  * Copyright (c) 2024, PostgreSQL Global Development Group
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
 #include "utils/pg_lsn.h"
 #include "utils/snapmgr.h"
-#include "utils/fmgrprotos.h"
 #include "utils/wait_event_types.h"
 
-/* Add to / delete from shared memory array */
-static void addLSNWaiter(XLogRecPtr lsn);
-static void deleteLSNWaiter(void);
+static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
+                   void *arg);
 
 struct WaitLSNState *waitLSN = NULL;
-static volatile sig_atomic_t haveShmemItem = false;
 
-/*
- * Report the amount of shared memory space needed for WaitLSNState
- */
+/* Report the amount of shared memory space needed for WaitLSNState. */
 Size
 WaitLSNShmemSize(void)
 {
@@ -51,7 +47,7 @@ WaitLSNShmemSize(void)
    return size;
 }
 
-/* Initialize the WaitLSNState in the shared memory */
+/* Initialize the WaitLSNState in the shared memory. */
 void
 WaitLSNShmemInit(void)
 {
@@ -62,81 +58,93 @@ WaitLSNShmemInit(void)
                                               &found);
    if (!found)
    {
-       SpinLockInit(&waitLSN->mutex);
-       waitLSN->numWaitedProcs = 0;
-       pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+       SpinLockInit(&waitLSN->waitersHeapMutex);
+       pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX);
+       pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL);
+       memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo));
    }
 }
 
 /*
- * Add the information about the LSN waiter backend to the shared memory
- * array.
+ * Comparison function for waitLSN->waitersHeap heap.  Waiting processes are
+ * ordered by lsn, so that the waiter with smallest lsn is at the top.
  */
-static void
-addLSNWaiter(XLogRecPtr lsn)
+static int
+lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
 {
-   WaitLSNProcInfo cur;
-   int         i;
+   const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a);
+   const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b);
 
-   cur.procnum = MyProcNumber;
-   cur.waitLSN = lsn;
+   if (aproc->waitLSN < bproc->waitLSN)
+       return 1;
+   else if (aproc->waitLSN > bproc->waitLSN)
+       return -1;
+   else
+       return 0;
+}
 
-   SpinLockAcquire(&waitLSN->mutex);
+/*
+ * Update waitLSN->minWaitedLSN according to the current state of
+ * waitLSN->waitersHeap.
+ */
+static void
+updateMinWaitedLSN(void)
+{
+   XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
 
-   for (i = 0; i < waitLSN->numWaitedProcs; i++)
+   if (!pairingheap_is_empty(&waitLSN->waitersHeap))
    {
-       if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
-       {
-           WaitLSNProcInfo tmp;
+       pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
 
-           tmp = waitLSN->procInfos[i];
-           waitLSN->procInfos[i] = cur;
-           cur = tmp;
-       }
+       minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN;
    }
-   waitLSN->procInfos[i] = cur;
-   waitLSN->numWaitedProcs++;
 
-   pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
-   SpinLockRelease(&waitLSN->mutex);
+   pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN);
 }
 
 /*
- * Delete the information about the LSN waiter backend from the shared memory
- * array.
+ * Put the current process into the heap of LSN waiters.
  */
 static void
-deleteLSNWaiter(void)
+addLSNWaiter(XLogRecPtr lsn)
 {
-   int         i;
-   bool        found = false;
+   WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
 
-   SpinLockAcquire(&waitLSN->mutex);
+   Assert(!procInfo->inHeap);
 
-   for (i = 0; i < waitLSN->numWaitedProcs; i++)
-   {
-       if (waitLSN->procInfos[i].procnum == MyProcNumber)
-           found = true;
+   procInfo->procnum = MyProcNumber;
+   procInfo->waitLSN = lsn;
 
-       if (found && i < waitLSN->numWaitedProcs - 1)
-       {
-           waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
-       }
-   }
+   SpinLockAcquire(&waitLSN->waitersHeapMutex);
 
-   if (!found)
+   pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode);
+   procInfo->inHeap = true;
+   updateMinWaitedLSN();
+
+   SpinLockRelease(&waitLSN->waitersHeapMutex);
+}
+
+/*
+ * Remove the current process from the heap of LSN waiters if it's there.
+ */
+static void
+deleteLSNWaiter(void)
+{
+   WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber];
+
+   SpinLockAcquire(&waitLSN->waitersHeapMutex);
+
+   if (!procInfo->inHeap)
    {
-       SpinLockRelease(&waitLSN->mutex);
+       SpinLockRelease(&waitLSN->waitersHeapMutex);
        return;
    }
-   waitLSN->numWaitedProcs--;
 
-   if (waitLSN->numWaitedProcs != 0)
-       pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
-   else
-       pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+   pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode);
+   procInfo->inHeap = false;
+   updateMinWaitedLSN();
 
-   SpinLockRelease(&waitLSN->mutex);
+   SpinLockRelease(&waitLSN->waitersHeapMutex);
 }
 
 /*
@@ -148,41 +156,33 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
 {
    int         i;
    int        *wakeUpProcNums;
-   int         numWakeUpProcs;
+   int         numWakeUpProcs = 0;
 
    wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
 
-   SpinLockAcquire(&waitLSN->mutex);
+   SpinLockAcquire(&waitLSN->waitersHeapMutex);
 
    /*
-    * Remember processes, whose waited LSNs are already replayed.  We should
-    * set their latches later after spinlock release.
+    * Iterate the pairing heap of waiting processes till we find LSN not yet
+    * replayed.  Record the process numbers to set their latches later.
     */
-   for (i = 0; i < waitLSN->numWaitedProcs; i++)
+   while (!pairingheap_is_empty(&waitLSN->waitersHeap))
    {
+       pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap);
+       WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node);
+
        if (!XLogRecPtrIsInvalid(currentLSN) &&
-           waitLSN->procInfos[i].waitLSN > currentLSN)
+           procInfo->waitLSN > currentLSN)
            break;
 
-       wakeUpProcNums[i] = waitLSN->procInfos[i].procnum;
+       wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum;
+       (void) pairingheap_remove_first(&waitLSN->waitersHeap);
+       procInfo->inHeap = false;
    }
 
-   /*
-    * Immediately remove those processes from the shmem array.  Otherwise,
-    * shmem array items will be here till corresponding processes wake up and
-    * delete themselves.
-    */
-   numWakeUpProcs = i;
-   for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
-       waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
-   waitLSN->numWaitedProcs -= numWakeUpProcs;
-
-   if (waitLSN->numWaitedProcs != 0)
-       pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
-   else
-       pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+   updateMinWaitedLSN();
 
-   SpinLockRelease(&waitLSN->mutex);
+   SpinLockRelease(&waitLSN->waitersHeapMutex);
 
    /*
     * Set latches for processes, whose waited LSNs are already replayed. This
@@ -204,7 +204,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
 void
 WaitLSNCleanup(void)
 {
-   if (haveShmemItem)
+   if (waitLSN->procInfos[MyProcNumber].inHeap)
        deleteLSNWaiter();
 }
 
@@ -222,7 +222,7 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
    Assert(waitLSN);
 
    /* Should be only called by a backend */
-   Assert(MyBackendType == B_BACKEND);
+   Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends);
 
    if (!RecoveryInProgress())
        ereport(ERROR,
@@ -238,7 +238,6 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
        endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
 
    addLSNWaiter(targetLSN);
-   haveShmemItem = true;
 
    for (;;)
    {
@@ -280,17 +279,12 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
    if (targetLSN > currentLSN)
    {
        deleteLSNWaiter();
-       haveShmemItem = false;
        ereport(ERROR,
                (errcode(ERRCODE_QUERY_CANCELED),
                 errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
                        LSN_FORMAT_ARGS(targetLSN),
                        LSN_FORMAT_ARGS(currentLSN))));
    }
-   else
-   {
-       haveShmemItem = false;
-   }
 }
 
 Datum
index fe1deba13ec3cc4b6bb503d7fc68d216a19fafdb..7858e5e076b4baeaef2f61dad318b74502cec50a 100644 (file)
@@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg)
    pairingheap *heap;
 
    heap = (pairingheap *) palloc(sizeof(pairingheap));
+   pairingheap_initialize(heap, compare, arg);
+
+   return heap;
+}
+
+/*
+ * pairingheap_initialize
+ *
+ * Same as pairingheap_allocate(), but initializes the pairing heap in-place
+ * rather than allocating a new chunk of memory.  Useful to store the pairing
+ * heap in a shared memory.
+ */
+void
+pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare,
+                      void *arg)
+{
    heap->ph_compare = compare;
    heap->ph_arg = arg;
 
    heap->ph_root = NULL;
-
-   return heap;
 }
 
 /*
index 10ef63f0c096c7298f1d5241301beb792e173078..0d80248682ccb5cb85b867b6771d9eb9228a363d 100644 (file)
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * waitlsn.h
- *   Declarations for LSN waiting routines.
+ *   Declarations for LSN replay waiting routines.
  *
  * Copyright (c) 2024, PostgreSQL Global Development Group
  *
 #ifndef WAIT_LSN_H
 #define WAIT_LSN_H
 
+#include "lib/pairingheap.h"
 #include "postgres.h"
 #include "port/atomics.h"
 #include "storage/spin.h"
 #include "tcop/dest.h"
 
-/* Shared memory structures */
+/*
+ * WaitLSNProcInfo – the shared memory structure representing information
+ * about the single process, which may wait for LSN replay.  An item of
+ * waitLSN->procInfos array.
+ */
 typedef struct WaitLSNProcInfo
 {
+   /*
+    * A process number, same as the index of this item in waitLSN->procInfos.
+    * Stored for convenience.
+    */
    int         procnum;
+
+   /* LSN, which this process is waiting for */
    XLogRecPtr  waitLSN;
+
+   /* A pairing heap node for participation in waitLSN->waitersHeap */
+   pairingheap_node phNode;
+
+   /* A flag indicating that this item is added to waitLSN->waitersHeap */
+   bool        inHeap;
 } WaitLSNProcInfo;
 
+/*
+ * WaitLSNState - the shared memory state for the replay LSN waiting facility.
+ */
 typedef struct WaitLSNState
 {
-   pg_atomic_uint64 minLSN;
-   slock_t     mutex;
-   int         numWaitedProcs;
+   /*
+    * The minimum LSN value some process is waiting for.  Used for the
+    * fast-path checking if we need to wake up any waiters after replaying a
+    * WAL record.
+    */
+   pg_atomic_uint64 minWaitedLSN;
+
+   /*
+    * A pairing heap of waiting processes order by LSN values (least LSN is
+    * on top).
+    */
+   pairingheap waitersHeap;
+
+   /* A mutex protecting the pairing heap above */
+   slock_t     waitersHeapMutex;
+
+   /* An array with per-process information, indexed by the process number */
    WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
 } WaitLSNState;
 
index 7eade81535a5e6eab44a0bc62c255b1a06e9da2f..9e1c26033a1ef173b378db2110f24d380a74bd12 100644 (file)
@@ -77,6 +77,9 @@ typedef struct pairingheap
 
 extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
                                         void *arg);
+extern void pairingheap_initialize(pairingheap *heap,
+                                  pairingheap_comparator compare,
+                                  void *arg);
 extern void pairingheap_free(pairingheap *heap);
 extern void pairingheap_add(pairingheap *heap, pairingheap_node *node);
 extern pairingheap_node *pairingheap_first(pairingheap *heap);