Use dlists instead of SHM_QUEUE for syncrep queue
authorAndres Freund <andres@anarazel.de>
Wed, 18 Jan 2023 20:15:05 +0000 (12:15 -0800)
committerAndres Freund <andres@anarazel.de>
Wed, 18 Jan 2023 20:15:05 +0000 (12:15 -0800)
Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used
and have an easier to use interface.

Reviewed-by: Thomas Munro <thomas.munro@gmail.com> (in an older version)
Discussion: https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.de
Discussion: https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de

src/backend/replication/syncrep.c
src/backend/replication/walsender.c
src/backend/storage/lmgr/proc.c
src/include/replication/walsender_private.h
src/include/storage/proc.h

index 51e92fc8c712509e0e33d602ec86619c2a8cec7f..80d681b71c84c97ee3bab8dcc3649169e670ec54 100644 (file)
@@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
        else
                mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
 
-       Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+       Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
        Assert(WalSndCtl != NULL);
 
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
         * assertions, but better safe than sorry).
         */
        pg_read_barrier();
-       Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+       Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
        MyProc->syncRepState = SYNC_REP_NOT_WAITING;
        MyProc->waitLSN = 0;
 
@@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 static void
 SyncRepQueueInsert(int mode)
 {
-       PGPROC     *proc;
+       dlist_head *queue;
+       dlist_iter iter;
 
        Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
-       proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-                                                                  &(WalSndCtl->SyncRepQueue[mode]),
-                                                                  offsetof(PGPROC, syncRepLinks));
+       queue = &WalSndCtl->SyncRepQueue[mode];
 
-       while (proc)
+       dlist_reverse_foreach(iter, queue)
        {
+               PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
                /*
-                * Stop at the queue element that we should after to ensure the queue
-                * is ordered by LSN.
+                * Stop at the queue element that we should insert after to ensure the
+                * queue is ordered by LSN.
                 */
                if (proc->waitLSN < MyProc->waitLSN)
-                       break;
-
-               proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-                                                                          &(proc->syncRepLinks),
-                                                                          offsetof(PGPROC, syncRepLinks));
+               {
+                       dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
+                       return;
+               }
        }
 
-       if (proc)
-               SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
-       else
-               SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
+       /*
+        * If we get here, the list was either empty, or this process needs to be
+        * at the head.
+        */
+       dlist_push_head(queue, &MyProc->syncRepLinks);
 }
 
 /*
@@ -373,8 +374,8 @@ static void
 SyncRepCancelWait(void)
 {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
-       if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-               SHMQueueDelete(&(MyProc->syncRepLinks));
+       if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+               dlist_delete_thoroughly(&MyProc->syncRepLinks);
        MyProc->syncRepState = SYNC_REP_NOT_WAITING;
        LWLockRelease(SyncRepLock);
 }
@@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void)
         * First check if we are removed from the queue without the lock to not
         * slow down backend exit.
         */
-       if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
+       if (!dlist_node_is_detached(&MyProc->syncRepLinks))
        {
                LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
                /* maybe we have just been removed, so recheck */
-               if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-                       SHMQueueDelete(&(MyProc->syncRepLinks));
+               if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+                       dlist_delete_thoroughly(&MyProc->syncRepLinks);
 
                LWLockRelease(SyncRepLock);
        }
@@ -879,20 +880,17 @@ static int
 SyncRepWakeQueue(bool all, int mode)
 {
        volatile WalSndCtlData *walsndctl = WalSndCtl;
-       PGPROC     *proc = NULL;
-       PGPROC     *thisproc = NULL;
        int                     numprocs = 0;
+       dlist_mutable_iter iter;
 
        Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
        Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
        Assert(SyncRepQueueIsOrderedByLSN(mode));
 
-       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                                                  &(WalSndCtl->SyncRepQueue[mode]),
-                                                                  offsetof(PGPROC, syncRepLinks));
-
-       while (proc)
+       dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
        {
+               PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
                /*
                 * Assume the queue is ordered by LSN
                 */
@@ -900,18 +898,9 @@ SyncRepWakeQueue(bool all, int mode)
                        return numprocs;
 
                /*
-                * Move to next proc, so we can delete thisproc from the queue.
-                * thisproc is valid, proc may be NULL after this.
-                */
-               thisproc = proc;
-               proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                                                          &(proc->syncRepLinks),
-                                                                          offsetof(PGPROC, syncRepLinks));
-
-               /*
-                * Remove thisproc from queue.
+                * Remove from queue.
                 */
-               SHMQueueDelete(&(thisproc->syncRepLinks));
+               dlist_delete_thoroughly(&proc->syncRepLinks);
 
                /*
                 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
@@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode)
                 * Set state to complete; see SyncRepWaitForLSN() for discussion of
                 * the various states.
                 */
-               thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
+               proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
 
                /*
                 * Wake only when we have set state and removed from queue.
                 */
-               SetLatch(&(thisproc->procLatch));
+               SetLatch(&(proc->procLatch));
 
                numprocs++;
        }
@@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void)
 static bool
 SyncRepQueueIsOrderedByLSN(int mode)
 {
-       PGPROC     *proc = NULL;
        XLogRecPtr      lastLSN;
+       dlist_iter      iter;
 
        Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
 
        lastLSN = 0;
 
-       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                                                  &(WalSndCtl->SyncRepQueue[mode]),
-                                                                  offsetof(PGPROC, syncRepLinks));
-
-       while (proc)
+       dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
        {
+               PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
                /*
                 * Check the queue is ordered by LSN and that multiple procs don't
                 * have matching LSNs
@@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
                        return false;
 
                lastLSN = proc->waitLSN;
-
-               proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                                                          &(proc->syncRepLinks),
-                                                                          offsetof(PGPROC, syncRepLinks));
        }
 
        return true;
index 015ae2995d5371461652643bb14533c7e8ef84b6..4ed3747e3f9a15b3d683e359214c30c8ea8ec8f2 100644 (file)
@@ -3275,7 +3275,7 @@ WalSndShmemInit(void)
                MemSet(WalSndCtl, 0, WalSndShmemSize());
 
                for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
-                       SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
+                       dlist_init(&(WalSndCtl->SyncRepQueue[i]));
 
                for (i = 0; i < max_wal_senders; i++)
                {
index e2adf493ca920ce88aec69f7e3a99c151d8abc05..f8ac4edd6ffe00450603bd3c02aa6d613f82ecc4 100644 (file)
@@ -410,7 +410,7 @@ InitProcess(void)
        /* Initialize fields for sync rep */
        MyProc->waitLSN = 0;
        MyProc->syncRepState = SYNC_REP_NOT_WAITING;
-       SHMQueueElemInit(&(MyProc->syncRepLinks));
+       dlist_node_init(&MyProc->syncRepLinks);
 
        /* Initialize fields for group XID clearing. */
        MyProc->procArrayGroupMember = false;
index e5e779aeb7dc55d4a2ae53cef8cdc8b65449ca08..5310e054c488777f12ea9d3e60967401d8584f93 100644 (file)
@@ -13,6 +13,7 @@
 #define _WALSENDER_PRIVATE_H
 
 #include "access/xlog.h"
+#include "lib/ilist.h"
 #include "nodes/nodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
@@ -89,7 +90,7 @@ typedef struct
         * Synchronous replication queue with one queue per request type.
         * Protected by SyncRepLock.
         */
-       SHM_QUEUE       SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
+       dlist_head      SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
 
        /*
         * Current location of the head of the queue. All waiters should have a
index 6eb46a7ee8e04cdbb31a9d14134b5dbd3d3195e7..dd45b8ee9b7424a9b56d08fa19054352c5753f9e 100644 (file)
@@ -248,7 +248,7 @@ struct PGPROC
         */
        XLogRecPtr      waitLSN;                /* waiting for this LSN or higher */
        int                     syncRepState;   /* wait state for sync rep */
-       SHM_QUEUE       syncRepLinks;   /* list link if process is in syncrep queue */
+       dlist_node      syncRepLinks;   /* list link if process is in syncrep queue */
 
        /*
         * All PROCLOCK objects for locks held or awaited by this backend are