Use ProcNumbers instead of direct Latch pointers to address other procs
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 1 Nov 2024 11:47:20 +0000 (13:47 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 1 Nov 2024 11:47:20 +0000 (13:47 +0200)
This is in preparation for replacing Latches with a new abstraction.
That's still work in progress, but this seems a little tidier anyway,
so let's get this refactoring out of the way already.

Discussion: https://www.postgresql.org/message-id/391abe21-413e-4d91-a650-b663af49500c%40iki.fi

src/backend/access/transam/xlog.c
src/backend/access/transam/xlogwait.c
src/backend/postmaster/checkpointer.c
src/backend/postmaster/walwriter.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/storage/lmgr/proc.c
src/include/access/xlogwait.h
src/include/replication/walreceiver.h
src/include/storage/proc.h

index 3ecaf1813927ebfebbd7f889ed6c7c6838a3f843..00fe8c8ae7241b42d9bd9bb70d515971bd3e5340 100644 (file)
@@ -2671,8 +2671,14 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
                        wakeup = true;
        }
 
-       if (wakeup && ProcGlobal->walwriterLatch)
-               SetLatch(ProcGlobal->walwriterLatch);
+       if (wakeup)
+       {
+               volatile PROC_HDR *procglobal = ProcGlobal;
+               ProcNumber      walwriterProc = procglobal->walwriterProc;
+
+               if (walwriterProc != INVALID_PROC_NUMBER)
+                       SetLatch(&GetPGProcByNumber(walwriterProc)->procLatch);
+       }
 }
 
 /*
index 0ec0898cfbff9788c273b57640b9a6c81c62d8d0..9b8c2ae794f272405ff9ba02cf0f8ad648f010ac 100644 (file)
@@ -112,7 +112,7 @@ addLSNWaiter(XLogRecPtr lsn)
 
        Assert(!procInfo->inHeap);
 
-       procInfo->latch = MyLatch;
+       procInfo->procno = MyProcNumber;
        procInfo->waitLSN = lsn;
 
        pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
@@ -154,16 +154,17 @@ void
 WaitLSNSetLatches(XLogRecPtr currentLSN)
 {
        int                     i;
-       Latch     **wakeUpProcLatches;
+       ProcNumber *wakeUpProcs;
        int                     numWakeUpProcs = 0;
 
-       wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends);
+       wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends);
 
        LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
 
        /*
         * Iterate the pairing heap of waiting processes till we find LSN not yet
-        * replayed.  Record the process latches to set them later.
+        * replayed.  Record the process numbers to wake up, but to avoid holding
+        * the lock for too long, send the wakeups only after releasing the lock.
         */
        while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
        {
@@ -174,7 +175,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
                        procInfo->waitLSN > currentLSN)
                        break;
 
-               wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch;
+               wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
                (void) pairingheap_remove_first(&waitLSNState->waitersHeap);
                procInfo->inHeap = false;
        }
@@ -191,9 +192,9 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
         */
        for (i = 0; i < numWakeUpProcs; i++)
        {
-               SetLatch(wakeUpProcLatches[i]);
+               SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
        }
-       pfree(wakeUpProcLatches);
+       pfree(wakeUpProcs);
 }
 
 /*
index 9087e3f8db13502921cb103ac4e632e411a49c7e..982572a75dbda3c265343f7d9f1a48af4ff8aa86 100644 (file)
@@ -324,10 +324,10 @@ CheckpointerMain(char *startup_data, size_t startup_data_len)
        UpdateSharedMemoryConfig();
 
        /*
-        * Advertise our latch that backends can use to wake us up while we're
-        * sleeping.
+        * Advertise our proc number that backends can use to wake us up while
+        * we're sleeping.
         */
-       ProcGlobal->checkpointerLatch = &MyProc->procLatch;
+       ProcGlobal->checkpointerProc = MyProcNumber;
 
        /*
         * Loop forever
@@ -1139,8 +1139,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
        LWLockRelease(CheckpointerCommLock);
 
        /* ... but not till after we release the lock */
-       if (too_full && ProcGlobal->checkpointerLatch)
-               SetLatch(ProcGlobal->checkpointerLatch);
+       if (too_full)
+       {
+               volatile PROC_HDR *procglobal = ProcGlobal;
+               ProcNumber      checkpointerProc = procglobal->checkpointerProc;
+
+               if (checkpointerProc != INVALID_PROC_NUMBER)
+                       SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch);
+       }
 
        return true;
 }
index 91013f6e93660e4c28dc0a180a705aa66849b302..5a3cb8946526409a1fd37e0cce3862e09abe42ea 100644 (file)
@@ -208,10 +208,10 @@ WalWriterMain(char *startup_data, size_t startup_data_len)
        SetWalWriterSleeping(false);
 
        /*
-        * Advertise our latch that backends can use to wake us up while we're
-        * sleeping.
+        * Advertise our proc number that backends can use to wake us up while
+        * we're sleeping.
         */
-       ProcGlobal->walwriterLatch = &MyProc->procLatch;
+       ProcGlobal->walwriterProc = MyProcNumber;
 
        /*
         * Loop forever
index 97f957cd87b48424171ec792a1a84e078dd5ff9e..c74369953f81d6c46aea54b4fd6ec684255f142d 100644 (file)
@@ -30,6 +30,7 @@
 #include "pgstat.h"
 #include "pqexpbuffer.h"
 #include "replication/walreceiver.h"
+#include "storage/latch.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
index a27aee63defca301f1078a9f113d27b437bd0a13..5f641d279057ed1a979628df256cc768515d0ef2 100644 (file)
@@ -266,8 +266,8 @@ WalReceiverMain(char *startup_data, size_t startup_data_len)
        walrcv->lastMsgSendTime =
                walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
 
-       /* Report the latch to use to awaken this process */
-       walrcv->latch = &MyProc->procLatch;
+       /* Report our proc number so that others can wake us up */
+       walrcv->procno = MyProcNumber;
 
        SpinLockRelease(&walrcv->mutex);
 
@@ -819,8 +819,8 @@ WalRcvDie(int code, Datum arg)
        Assert(walrcv->pid == MyProcPid);
        walrcv->walRcvState = WALRCV_STOPPED;
        walrcv->pid = 0;
+       walrcv->procno = INVALID_PROC_NUMBER;
        walrcv->ready_to_display = false;
-       walrcv->latch = NULL;
        SpinLockRelease(&walrcv->mutex);
 
        ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
@@ -1358,15 +1358,15 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
 void
 WalRcvForceReply(void)
 {
-       Latch      *latch;
+       ProcNumber      procno;
 
        WalRcv->force_reply = true;
-       /* fetching the latch pointer might not be atomic, so use spinlock */
+       /* fetching the proc number is probably atomic, but don't rely on it */
        SpinLockAcquire(&WalRcv->mutex);
-       latch = WalRcv->latch;
+       procno = WalRcv->procno;
        SpinLockRelease(&WalRcv->mutex);
-       if (latch)
-               SetLatch(latch);
+       if (procno != INVALID_PROC_NUMBER)
+               SetLatch(&GetPGProcByNumber(procno)->procLatch);
 }
 
 /*
index 85a19cdfa5c63397661522e7fa62f2f9a51a3d07..8557d10cf9d27ca53677fab62b3508baafa44c90 100644 (file)
@@ -27,6 +27,7 @@
 #include "pgstat.h"
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
+#include "storage/proc.h"
 #include "storage/shmem.h"
 #include "utils/timestamp.h"
 
@@ -66,7 +67,7 @@ WalRcvShmemInit(void)
                ConditionVariableInit(&WalRcv->walRcvStoppedCV);
                SpinLockInit(&WalRcv->mutex);
                pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
-               WalRcv->latch = NULL;
+               WalRcv->procno = INVALID_PROC_NUMBER;
        }
 }
 
@@ -248,7 +249,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
        WalRcvData *walrcv = WalRcv;
        bool            launch = false;
        pg_time_t       now = (pg_time_t) time(NULL);
-       Latch      *latch;
+       ProcNumber      walrcv_proc;
 
        /*
         * We always start at the beginning of the segment. That prevents a broken
@@ -309,14 +310,14 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
        walrcv->receiveStart = recptr;
        walrcv->receiveStartTLI = tli;
 
-       latch = walrcv->latch;
+       walrcv_proc = walrcv->procno;
 
        SpinLockRelease(&walrcv->mutex);
 
        if (launch)
                SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-       else if (latch)
-               SetLatch(latch);
+       else if (walrcv_proc != INVALID_PROC_NUMBER)
+               SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
 }
 
 /*
index 260e7029f5039d6fe05e3bf6ef6f61b14ee4dc3e..84a660a1438cc85b72ee0ad2d0287e89085f7a6f 100644 (file)
@@ -194,8 +194,8 @@ InitProcGlobal(void)
        dlist_init(&ProcGlobal->bgworkerFreeProcs);
        dlist_init(&ProcGlobal->walsenderFreeProcs);
        ProcGlobal->startupBufferPinWaitBufId = -1;
-       ProcGlobal->walwriterLatch = NULL;
-       ProcGlobal->checkpointerLatch = NULL;
+       ProcGlobal->walwriterProc = INVALID_PROC_NUMBER;
+       ProcGlobal->checkpointerProc = INVALID_PROC_NUMBER;
        pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PROC_NUMBER);
        pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PROC_NUMBER);
 
index eb2260aa2ecff4c85861270c4354ba3cb95090a8..7fd24c22bec561aa0e00e7e85dc13bf2c62ae44d 100644 (file)
@@ -15,7 +15,7 @@
 #include "lib/pairingheap.h"
 #include "postgres.h"
 #include "port/atomics.h"
-#include "storage/latch.h"
+#include "storage/procnumber.h"
 #include "storage/spin.h"
 #include "tcop/dest.h"
 
@@ -29,11 +29,8 @@ typedef struct WaitLSNProcInfo
        /* LSN, which this process is waiting for */
        XLogRecPtr      waitLSN;
 
-       /*
-        * A pointer to the latch, which should be set once the waitLSN is
-        * replayed.
-        */
-       Latch      *latch;
+       /* Process to wake up once the waitLSN is replayed */
+       ProcNumber      procno;
 
        /* A pairing heap node for participation in waitLSNState->waitersHeap */
        pairingheap_node phNode;
index 132e789948bb924de0c0b6db7b02b3b7279bdf55..3342286d2a3252964dfb9f9bb0f3cb90178f03a7 100644 (file)
@@ -21,7 +21,6 @@
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
-#include "storage/latch.h"
 #include "storage/spin.h"
 #include "utils/tuplestore.h"
 
@@ -58,13 +57,24 @@ typedef enum
 typedef struct
 {
        /*
-        * PID of currently active walreceiver process, its current state and
-        * start time (actually, the time at which it was requested to be
-        * started).
+        * Currently active walreceiver process's proc number and PID.
+        *
+        * The startup process uses the proc number to wake it up after telling it
+        * where to start streaming (after setting receiveStart and
+        * receiveStartTLI), and also to tell it to send apply feedback to the
+        * primary whenever specially marked commit records are applied.
         */
+       ProcNumber      procno;
        pid_t           pid;
+
+       /* Its current state */
        WalRcvState walRcvState;
        ConditionVariable walRcvStoppedCV;
+
+       /*
+        * Its start time (actually, the time at which it was requested to be
+        * started).
+        */
        pg_time_t       startTime;
 
        /*
@@ -134,15 +144,6 @@ typedef struct
        /* set true once conninfo is ready to display (obfuscated pwds etc) */
        bool            ready_to_display;
 
-       /*
-        * Latch used by startup process to wake up walreceiver after telling it
-        * where to start streaming (after setting receiveStart and
-        * receiveStartTLI), and also to tell it to send apply feedback to the
-        * primary whenever specially marked commit records are applied. This is
-        * normally mapped to procLatch when walreceiver is running.
-        */
-       Latch      *latch;
-
        slock_t         mutex;                  /* locks shared variables shown above */
 
        /*
index ebcf0ad4036fd04c1c74a4d9555619922d99e1f1..d119465fa0d24037502b64d731a6125c8ffd3322 100644 (file)
@@ -418,10 +418,14 @@ typedef struct PROC_HDR
        pg_atomic_uint32 procArrayGroupFirst;
        /* First pgproc waiting for group transaction status update */
        pg_atomic_uint32 clogGroupFirst;
-       /* WALWriter process's latch */
-       Latch      *walwriterLatch;
-       /* Checkpointer process's latch */
-       Latch      *checkpointerLatch;
+
+       /*
+        * Current slot numbers of some auxiliary processes. There can be only one
+        * of each of these running at a time.
+        */
+       ProcNumber      walwriterProc;
+       ProcNumber      checkpointerProc;
+
        /* Current shared estimate of appropriate spins_per_delay value */
        int                     spins_per_delay;
        /* Buffer id of the buffer that Startup process waits for pin on, or -1 */