From 12605414a7d63ccbe36de2b530847bdfc9cf7447 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 18 Jan 2023 12:15:05 -0800 Subject: [PATCH] Use dlists instead of SHM_QUEUE for syncrep queue Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used and have an easier to use interface. Reviewed-by: Thomas Munro (in an older version) Discussion: https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.de Discussion: https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de --- src/backend/replication/syncrep.c | 89 +++++++++------------ src/backend/replication/walsender.c | 2 +- src/backend/storage/lmgr/proc.c | 2 +- src/include/replication/walsender_private.h | 3 +- src/include/storage/proc.h | 2 +- 5 files changed, 41 insertions(+), 57 deletions(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 51e92fc8c7..80d681b71c 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) else mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); - Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + Assert(dlist_node_is_detached(&MyProc->syncRepLinks)); Assert(WalSndCtl != NULL); LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); @@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) * assertions, but better safe than sorry). */ pg_read_barrier(); - Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); + Assert(dlist_node_is_detached(&MyProc->syncRepLinks)); MyProc->syncRepState = SYNC_REP_NOT_WAITING; MyProc->waitLSN = 0; @@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) static void SyncRepQueueInsert(int mode) { - PGPROC *proc; + dlist_head *queue; + dlist_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); + queue = &WalSndCtl->SyncRepQueue[mode]; - while (proc) + dlist_reverse_foreach(iter, queue) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* - * Stop at the queue element that we should after to ensure the queue - * is ordered by LSN. + * Stop at the queue element that we should insert after to ensure the + * queue is ordered by LSN. */ if (proc->waitLSN < MyProc->waitLSN) - break; - - proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); + { + dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks); + return; + } } - if (proc) - SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); - else - SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); + /* + * If we get here, the list was either empty, or this process needs to be + * at the head. + */ + dlist_push_head(queue, &MyProc->syncRepLinks); } /* @@ -373,8 +374,8 @@ static void SyncRepCancelWait(void) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) - SHMQueueDelete(&(MyProc->syncRepLinks)); + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) + dlist_delete_thoroughly(&MyProc->syncRepLinks); MyProc->syncRepState = SYNC_REP_NOT_WAITING; LWLockRelease(SyncRepLock); } @@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void) * First check if we are removed from the queue without the lock to not * slow down backend exit. */ - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* maybe we have just been removed, so recheck */ - if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) - SHMQueueDelete(&(MyProc->syncRepLinks)); + if (!dlist_node_is_detached(&MyProc->syncRepLinks)) + dlist_delete_thoroughly(&MyProc->syncRepLinks); LWLockRelease(SyncRepLock); } @@ -879,20 +880,17 @@ static int SyncRepWakeQueue(bool all, int mode) { volatile WalSndCtlData *walsndctl = WalSndCtl; - PGPROC *proc = NULL; - PGPROC *thisproc = NULL; int numprocs = 0; + dlist_mutable_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); Assert(SyncRepQueueIsOrderedByLSN(mode)); - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); - - while (proc) + dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode]) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* * Assume the queue is ordered by LSN */ @@ -900,18 +898,9 @@ SyncRepWakeQueue(bool all, int mode) return numprocs; /* - * Move to next proc, so we can delete thisproc from the queue. - * thisproc is valid, proc may be NULL after this. - */ - thisproc = proc; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); - - /* - * Remove thisproc from queue. + * Remove from queue. */ - SHMQueueDelete(&(thisproc->syncRepLinks)); + dlist_delete_thoroughly(&proc->syncRepLinks); /* * SyncRepWaitForLSN() reads syncRepState without holding the lock, so @@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode) * Set state to complete; see SyncRepWaitForLSN() for discussion of * the various states. */ - thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; + proc->syncRepState = SYNC_REP_WAIT_COMPLETE; /* * Wake only when we have set state and removed from queue. */ - SetLatch(&(thisproc->procLatch)); + SetLatch(&(proc->procLatch)); numprocs++; } @@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void) static bool SyncRepQueueIsOrderedByLSN(int mode) { - PGPROC *proc = NULL; XLogRecPtr lastLSN; + dlist_iter iter; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); lastLSN = 0; - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(WalSndCtl->SyncRepQueue[mode]), - offsetof(PGPROC, syncRepLinks)); - - while (proc) + dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode]) { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + /* * Check the queue is ordered by LSN and that multiple procs don't * have matching LSNs @@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode) return false; lastLSN = proc->waitLSN; - - proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), - &(proc->syncRepLinks), - offsetof(PGPROC, syncRepLinks)); } return true; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 015ae2995d..4ed3747e3f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3275,7 +3275,7 @@ WalSndShmemInit(void) MemSet(WalSndCtl, 0, WalSndShmemSize()); for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); + dlist_init(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index e2adf493ca..f8ac4edd6f 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -410,7 +410,7 @@ InitProcess(void) /* Initialize fields for sync rep */ MyProc->waitLSN = 0; MyProc->syncRepState = SYNC_REP_NOT_WAITING; - SHMQueueElemInit(&(MyProc->syncRepLinks)); + dlist_node_init(&MyProc->syncRepLinks); /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index e5e779aeb7..5310e054c4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -13,6 +13,7 @@ #define _WALSENDER_PRIVATE_H #include "access/xlog.h" +#include "lib/ilist.h" #include "nodes/nodes.h" #include "replication/syncrep.h" #include "storage/latch.h" @@ -89,7 +90,7 @@ typedef struct * Synchronous replication queue with one queue per request type. * Protected by SyncRepLock. */ - SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; + dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; /* * Current location of the head of the queue. All waiters should have a diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 6eb46a7ee8..dd45b8ee9b 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -248,7 +248,7 @@ struct PGPROC */ XLogRecPtr waitLSN; /* waiting for this LSN or higher */ int syncRepState; /* wait state for sync rep */ - SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + dlist_node syncRepLinks; /* list link if process is in syncrep queue */ /* * All PROCLOCK objects for locks held or awaited by this backend are -- 2.30.2