Use ProcNumbers instead of direct Latch pointers to address other procs
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 1 Nov 2024 11:47:20 +0000 (13:47 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 1 Nov 2024 11:47:20 +0000 (13:47 +0200)
This is in preparation for replacing Latches with a new abstraction.
That's still work in progress, but this seems a little tidier anyway,
so let's get this refactoring out of the way already.

Discussion: https://www.postgresql.org/message-id/391abe21-413e-4d91-a650-b663af49500c%40iki.fi

src/backend/access/transam/xlog.c
src/backend/access/transam/xlogwait.c
src/backend/postmaster/checkpointer.c
src/backend/postmaster/walwriter.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/storage/lmgr/proc.c
src/include/access/xlogwait.h
src/include/replication/walreceiver.h
src/include/storage/proc.h

index 3ecaf1813927ebfebbd7f889ed6c7c6838a3f843..00fe8c8ae7241b42d9bd9bb70d515971bd3e5340 100644 (file)
@@ -2671,8 +2671,14 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
            wakeup = true;
    }
 
-   if (wakeup && ProcGlobal->walwriterLatch)
-       SetLatch(ProcGlobal->walwriterLatch);
+   if (wakeup)
+   {
+       volatile PROC_HDR *procglobal = ProcGlobal;
+       ProcNumber  walwriterProc = procglobal->walwriterProc;
+
+       if (walwriterProc != INVALID_PROC_NUMBER)
+           SetLatch(&GetPGProcByNumber(walwriterProc)->procLatch);
+   }
 }
 
 /*
index 0ec0898cfbff9788c273b57640b9a6c81c62d8d0..9b8c2ae794f272405ff9ba02cf0f8ad648f010ac 100644 (file)
@@ -112,7 +112,7 @@ addLSNWaiter(XLogRecPtr lsn)
 
    Assert(!procInfo->inHeap);
 
-   procInfo->latch = MyLatch;
+   procInfo->procno = MyProcNumber;
    procInfo->waitLSN = lsn;
 
    pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
@@ -154,16 +154,17 @@ void
 WaitLSNSetLatches(XLogRecPtr currentLSN)
 {
    int         i;
-   Latch     **wakeUpProcLatches;
+   ProcNumber *wakeUpProcs;
    int         numWakeUpProcs = 0;
 
-   wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends);
+   wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends);
 
    LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
 
    /*
     * Iterate the pairing heap of waiting processes till we find LSN not yet
-    * replayed.  Record the process latches to set them later.
+    * replayed.  Record the process numbers to wake up, but to avoid holding
+    * the lock for too long, send the wakeups only after releasing the lock.
     */
    while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
    {
@@ -174,7 +175,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
            procInfo->waitLSN > currentLSN)
            break;
 
-       wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch;
+       wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
        (void) pairingheap_remove_first(&waitLSNState->waitersHeap);
        procInfo->inHeap = false;
    }
@@ -191,9 +192,9 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
     */
    for (i = 0; i < numWakeUpProcs; i++)
    {
-       SetLatch(wakeUpProcLatches[i]);
+       SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
    }
-   pfree(wakeUpProcLatches);
+   pfree(wakeUpProcs);
 }
 
 /*
index 9087e3f8db13502921cb103ac4e632e411a49c7e..982572a75dbda3c265343f7d9f1a48af4ff8aa86 100644 (file)
@@ -324,10 +324,10 @@ CheckpointerMain(char *startup_data, size_t startup_data_len)
    UpdateSharedMemoryConfig();
 
    /*
-    * Advertise our latch that backends can use to wake us up while we're
-    * sleeping.
+    * Advertise our proc number that backends can use to wake us up while
+    * we're sleeping.
     */
-   ProcGlobal->checkpointerLatch = &MyProc->procLatch;
+   ProcGlobal->checkpointerProc = MyProcNumber;
 
    /*
     * Loop forever
@@ -1139,8 +1139,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
    LWLockRelease(CheckpointerCommLock);
 
    /* ... but not till after we release the lock */
-   if (too_full && ProcGlobal->checkpointerLatch)
-       SetLatch(ProcGlobal->checkpointerLatch);
+   if (too_full)
+   {
+       volatile PROC_HDR *procglobal = ProcGlobal;
+       ProcNumber  checkpointerProc = procglobal->checkpointerProc;
+
+       if (checkpointerProc != INVALID_PROC_NUMBER)
+           SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch);
+   }
 
    return true;
 }
index 91013f6e93660e4c28dc0a180a705aa66849b302..5a3cb8946526409a1fd37e0cce3862e09abe42ea 100644 (file)
@@ -208,10 +208,10 @@ WalWriterMain(char *startup_data, size_t startup_data_len)
    SetWalWriterSleeping(false);
 
    /*
-    * Advertise our latch that backends can use to wake us up while we're
-    * sleeping.
+    * Advertise our proc number that backends can use to wake us up while
+    * we're sleeping.
     */
-   ProcGlobal->walwriterLatch = &MyProc->procLatch;
+   ProcGlobal->walwriterProc = MyProcNumber;
 
    /*
     * Loop forever
index 97f957cd87b48424171ec792a1a84e078dd5ff9e..c74369953f81d6c46aea54b4fd6ec684255f142d 100644 (file)
@@ -30,6 +30,7 @@
 #include "pgstat.h"
 #include "pqexpbuffer.h"
 #include "replication/walreceiver.h"
+#include "storage/latch.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
index a27aee63defca301f1078a9f113d27b437bd0a13..5f641d279057ed1a979628df256cc768515d0ef2 100644 (file)
@@ -266,8 +266,8 @@ WalReceiverMain(char *startup_data, size_t startup_data_len)
    walrcv->lastMsgSendTime =
        walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
 
-   /* Report the latch to use to awaken this process */
-   walrcv->latch = &MyProc->procLatch;
+   /* Report our proc number so that others can wake us up */
+   walrcv->procno = MyProcNumber;
 
    SpinLockRelease(&walrcv->mutex);
 
@@ -819,8 +819,8 @@ WalRcvDie(int code, Datum arg)
    Assert(walrcv->pid == MyProcPid);
    walrcv->walRcvState = WALRCV_STOPPED;
    walrcv->pid = 0;
+   walrcv->procno = INVALID_PROC_NUMBER;
    walrcv->ready_to_display = false;
-   walrcv->latch = NULL;
    SpinLockRelease(&walrcv->mutex);
 
    ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
@@ -1358,15 +1358,15 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
 void
 WalRcvForceReply(void)
 {
-   Latch      *latch;
+   ProcNumber  procno;
 
    WalRcv->force_reply = true;
-   /* fetching the latch pointer might not be atomic, so use spinlock */
+   /* fetching the proc number is probably atomic, but don't rely on it */
    SpinLockAcquire(&WalRcv->mutex);
-   latch = WalRcv->latch;
+   procno = WalRcv->procno;
    SpinLockRelease(&WalRcv->mutex);
-   if (latch)
-       SetLatch(latch);
+   if (procno != INVALID_PROC_NUMBER)
+       SetLatch(&GetPGProcByNumber(procno)->procLatch);
 }
 
 /*
index 85a19cdfa5c63397661522e7fa62f2f9a51a3d07..8557d10cf9d27ca53677fab62b3508baafa44c90 100644 (file)
@@ -27,6 +27,7 @@
 #include "pgstat.h"
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
+#include "storage/proc.h"
 #include "storage/shmem.h"
 #include "utils/timestamp.h"
 
@@ -66,7 +67,7 @@ WalRcvShmemInit(void)
        ConditionVariableInit(&WalRcv->walRcvStoppedCV);
        SpinLockInit(&WalRcv->mutex);
        pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
-       WalRcv->latch = NULL;
+       WalRcv->procno = INVALID_PROC_NUMBER;
    }
 }
 
@@ -248,7 +249,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
    WalRcvData *walrcv = WalRcv;
    bool        launch = false;
    pg_time_t   now = (pg_time_t) time(NULL);
-   Latch      *latch;
+   ProcNumber  walrcv_proc;
 
    /*
     * We always start at the beginning of the segment. That prevents a broken
@@ -309,14 +310,14 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
    walrcv->receiveStart = recptr;
    walrcv->receiveStartTLI = tli;
 
-   latch = walrcv->latch;
+   walrcv_proc = walrcv->procno;
 
    SpinLockRelease(&walrcv->mutex);
 
    if (launch)
        SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
-   else if (latch)
-       SetLatch(latch);
+   else if (walrcv_proc != INVALID_PROC_NUMBER)
+       SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
 }
 
 /*
index 260e7029f5039d6fe05e3bf6ef6f61b14ee4dc3e..84a660a1438cc85b72ee0ad2d0287e89085f7a6f 100644 (file)
@@ -194,8 +194,8 @@ InitProcGlobal(void)
    dlist_init(&ProcGlobal->bgworkerFreeProcs);
    dlist_init(&ProcGlobal->walsenderFreeProcs);
    ProcGlobal->startupBufferPinWaitBufId = -1;
-   ProcGlobal->walwriterLatch = NULL;
-   ProcGlobal->checkpointerLatch = NULL;
+   ProcGlobal->walwriterProc = INVALID_PROC_NUMBER;
+   ProcGlobal->checkpointerProc = INVALID_PROC_NUMBER;
    pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PROC_NUMBER);
    pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PROC_NUMBER);
 
index eb2260aa2ecff4c85861270c4354ba3cb95090a8..7fd24c22bec561aa0e00e7e85dc13bf2c62ae44d 100644 (file)
@@ -15,7 +15,7 @@
 #include "lib/pairingheap.h"
 #include "postgres.h"
 #include "port/atomics.h"
-#include "storage/latch.h"
+#include "storage/procnumber.h"
 #include "storage/spin.h"
 #include "tcop/dest.h"
 
@@ -29,11 +29,8 @@ typedef struct WaitLSNProcInfo
    /* LSN, which this process is waiting for */
    XLogRecPtr  waitLSN;
 
-   /*
-    * A pointer to the latch, which should be set once the waitLSN is
-    * replayed.
-    */
-   Latch      *latch;
+   /* Process to wake up once the waitLSN is replayed */
+   ProcNumber  procno;
 
    /* A pairing heap node for participation in waitLSNState->waitersHeap */
    pairingheap_node phNode;
index 132e789948bb924de0c0b6db7b02b3b7279bdf55..3342286d2a3252964dfb9f9bb0f3cb90178f03a7 100644 (file)
@@ -21,7 +21,6 @@
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
-#include "storage/latch.h"
 #include "storage/spin.h"
 #include "utils/tuplestore.h"
 
@@ -58,13 +57,24 @@ typedef enum
 typedef struct
 {
    /*
-    * PID of currently active walreceiver process, its current state and
-    * start time (actually, the time at which it was requested to be
-    * started).
+    * Currently active walreceiver process's proc number and PID.
+    *
+    * The startup process uses the proc number to wake it up after telling it
+    * where to start streaming (after setting receiveStart and
+    * receiveStartTLI), and also to tell it to send apply feedback to the
+    * primary whenever specially marked commit records are applied.
     */
+   ProcNumber  procno;
    pid_t       pid;
+
+   /* Its current state */
    WalRcvState walRcvState;
    ConditionVariable walRcvStoppedCV;
+
+   /*
+    * Its start time (actually, the time at which it was requested to be
+    * started).
+    */
    pg_time_t   startTime;
 
    /*
@@ -134,15 +144,6 @@ typedef struct
    /* set true once conninfo is ready to display (obfuscated pwds etc) */
    bool        ready_to_display;
 
-   /*
-    * Latch used by startup process to wake up walreceiver after telling it
-    * where to start streaming (after setting receiveStart and
-    * receiveStartTLI), and also to tell it to send apply feedback to the
-    * primary whenever specially marked commit records are applied. This is
-    * normally mapped to procLatch when walreceiver is running.
-    */
-   Latch      *latch;
-
    slock_t     mutex;          /* locks shared variables shown above */
 
    /*
index ebcf0ad4036fd04c1c74a4d9555619922d99e1f1..d119465fa0d24037502b64d731a6125c8ffd3322 100644 (file)
@@ -418,10 +418,14 @@ typedef struct PROC_HDR
    pg_atomic_uint32 procArrayGroupFirst;
    /* First pgproc waiting for group transaction status update */
    pg_atomic_uint32 clogGroupFirst;
-   /* WALWriter process's latch */
-   Latch      *walwriterLatch;
-   /* Checkpointer process's latch */
-   Latch      *checkpointerLatch;
+
+   /*
+    * Current slot numbers of some auxiliary processes. There can be only one
+    * of each of these running at a time.
+    */
+   ProcNumber  walwriterProc;
+   ProcNumber  checkpointerProc;
+
    /* Current shared estimate of appropriate spins_per_delay value */
    int         spins_per_delay;
    /* Buffer id of the buffer that Startup process waits for pin on, or -1 */