Convert the PGPROC->lwWaitLink list into a dlist instead of open coding it.
authorAndres Freund <andres@anarazel.de>
Thu, 25 Dec 2014 16:24:30 +0000 (17:24 +0100)
committerAndres Freund <andres@anarazel.de>
Thu, 25 Dec 2014 16:24:30 +0000 (17:24 +0100)
Besides being shorter and much easier to read it changes the logic in
LWLockRelease() to release all shared lockers when waking up any. This
can yield some significant performance improvements - and the fairness
isn't really much worse than before, as we always allowed new shared
lockers to jump the queue.

src/backend/access/transam/twophase.c
src/backend/storage/lmgr/lwlock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lwlock.h
src/include/storage/proc.h

index 40de84e934ed28a49ba2dcb4e15f5ca893b56c60..ad3e87271816e8773b3832480d0a0eb0be65d4b8 100644 (file)
@@ -390,7 +390,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
        proc->roleId = owner;
        proc->lwWaiting = false;
        proc->lwWaitMode = 0;
-       proc->lwWaitLink = NULL;
        proc->waitLock = NULL;
        proc->waitProcLock = NULL;
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
index 43f4d6b6534bc46de60762c1e8c1bce165069688..d9d704461b7aebb97f2f7192ff46e36a828a6d78 100644 (file)
@@ -117,9 +117,9 @@ inline static void
 PRINT_LWDEBUG(const char *where, const LWLock *lock)
 {
        if (Trace_lwlocks)
-               elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
+               elog(LOG, "%s(%s %d): excl %d shared %d rOK %d",
                         where, T_NAME(lock), T_ID(lock),
-                        (int) lock->exclusive, lock->shared, lock->head,
+                        (int) lock->exclusive, lock->shared,
                         (int) lock->releaseOK);
 }
 
@@ -479,8 +479,7 @@ LWLockInitialize(LWLock *lock, int tranche_id)
        lock->exclusive = 0;
        lock->shared = 0;
        lock->tranche = tranche_id;
-       lock->head = NULL;
-       lock->tail = NULL;
+       dlist_init(&lock->waiters);
 }
 
 
@@ -619,12 +618,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 
                proc->lwWaiting = true;
                proc->lwWaitMode = mode;
-               proc->lwWaitLink = NULL;
-               if (lock->head == NULL)
-                       lock->head = proc;
-               else
-                       lock->tail->lwWaitLink = proc;
-               lock->tail = proc;
+               dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
                /* Can release the mutex now */
                SpinLockRelease(&lock->mutex);
@@ -840,12 +834,7 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 
                proc->lwWaiting = true;
                proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-               proc->lwWaitLink = NULL;
-               if (lock->head == NULL)
-                       lock->head = proc;
-               else
-                       lock->tail->lwWaitLink = proc;
-               lock->tail = proc;
+               dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
                /* Can release the mutex now */
                SpinLockRelease(&lock->mutex);
@@ -1002,10 +991,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
                proc->lwWaiting = true;
                proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
                /* waiters are added to the front of the queue */
-               proc->lwWaitLink = lock->head;
-               if (lock->head == NULL)
-                       lock->tail = proc;
-               lock->head = proc;
+               dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
                /*
                 * Set releaseOK, to make sure we get woken up as soon as the lock is
@@ -1087,9 +1073,10 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 void
 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
-       PGPROC     *head;
-       PGPROC     *proc;
-       PGPROC     *next;
+       dlist_head      wakeup;
+       dlist_mutable_iter iter;
+
+       dlist_init(&wakeup);
 
        /* Acquire mutex.  Time spent holding mutex should be short! */
        SpinLockAcquire(&lock->mutex);
@@ -1104,24 +1091,16 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
         * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
         * up. They are always in the front of the queue.
         */
-       head = lock->head;
-
-       if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+       dlist_foreach_modify(iter, &lock->waiters)
        {
-               proc = head;
-               next = proc->lwWaitLink;
-               while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
-               {
-                       proc = next;
-                       next = next->lwWaitLink;
-               }
+               PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+               if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+                       break;
 
-               /* proc is now the last PGPROC to be released */
-               lock->head = next;
-               proc->lwWaitLink = NULL;
+               dlist_delete(&waiter->lwWaitLink);
+               dlist_push_tail(&wakeup, &waiter->lwWaitLink);
        }
-       else
-               head = NULL;
 
        /* We are done updating shared state of the lock itself. */
        SpinLockRelease(&lock->mutex);
@@ -1129,15 +1108,14 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
        /*
         * Awaken any waiters I removed from the queue.
         */
-       while (head != NULL)
+       dlist_foreach_modify(iter, &wakeup)
        {
-               proc = head;
-               head = proc->lwWaitLink;
-               proc->lwWaitLink = NULL;
+               PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+               dlist_delete(&waiter->lwWaitLink);
                /* check comment in LWLockRelease() about this barrier */
                pg_write_barrier();
-               proc->lwWaiting = false;
-               PGSemaphoreUnlock(&proc->sem);
+               waiter->lwWaiting = false;
+               PGSemaphoreUnlock(&waiter->sem);
        }
 }
 
@@ -1148,10 +1126,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 void
 LWLockRelease(LWLock *lock)
 {
-       PGPROC     *head;
-       PGPROC     *proc;
+       dlist_head      wakeup;
+       dlist_mutable_iter iter;
        int                     i;
 
+       dlist_init(&wakeup);
+
        PRINT_LWDEBUG("LWLockRelease", lock);
 
        /*
@@ -1187,58 +1167,39 @@ LWLockRelease(LWLock *lock)
         * if someone has already awakened waiters that haven't yet acquired the
         * lock.
         */
-       head = lock->head;
-       if (head != NULL)
+       if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
        {
-               if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
+               /*
+                * Remove the to-be-awakened PGPROCs from the queue.
+                */
+               bool            releaseOK = true;
+               bool            wokeup_somebody = false;
+
+               dlist_foreach_modify(iter, &lock->waiters)
                {
-                       /*
-                        * Remove the to-be-awakened PGPROCs from the queue.
-                        */
-                       bool            releaseOK = true;
+                       PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
-                       proc = head;
+                       if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+                               continue;
 
-                       /*
-                        * First wake up any backends that want to be woken up without
-                        * acquiring the lock.
-                        */
-                       while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-                               proc = proc->lwWaitLink;
+                       dlist_delete(&waiter->lwWaitLink);
+                       dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 
                        /*
-                        * If the front waiter wants exclusive lock, awaken him only.
-                        * Otherwise awaken as many waiters as want shared access.
+                        * Prevent additional wakeups until retryer gets to
+                        * run. Backends that are just waiting for the lock to become
+                        * free don't retry automatically.
                         */
-                       if (proc->lwWaitMode != LW_EXCLUSIVE)
+                       if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
                        {
-                               while (proc->lwWaitLink != NULL &&
-                                          proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
-                               {
-                                       if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-                                               releaseOK = false;
-                                       proc = proc->lwWaitLink;
-                               }
-                       }
-                       /* proc is now the last PGPROC to be released */
-                       lock->head = proc->lwWaitLink;
-                       proc->lwWaitLink = NULL;
-
-                       /*
-                        * Prevent additional wakeups until retryer gets to run. Backends
-                        * that are just waiting for the lock to become free don't retry
-                        * automatically.
-                        */
-                       if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
                                releaseOK = false;
+                               wokeup_somebody = true;
+                       }
 
-                       lock->releaseOK = releaseOK;
-               }
-               else
-               {
-                       /* lock is still held, can't awaken anything */
-                       head = NULL;
+                       if(waiter->lwWaitMode == LW_EXCLUSIVE)
+                               break;
                }
+               lock->releaseOK = releaseOK;
        }
 
        /* We are done updating shared state of the lock itself. */
@@ -1249,13 +1210,12 @@ LWLockRelease(LWLock *lock)
        /*
         * Awaken any waiters I removed from the queue.
         */
-       while (head != NULL)
+       dlist_foreach_modify(iter, &wakeup)
        {
+               PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
                LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock),
                                        "release waiter");
-               proc = head;
-               head = proc->lwWaitLink;
-               proc->lwWaitLink = NULL;
+               dlist_delete(&waiter->lwWaitLink);
                /*
                 * Guarantee that lwWaiting being unset only becomes visible once the
                 * unlink from the link has completed. Otherwise the target backend
@@ -1267,8 +1227,8 @@ LWLockRelease(LWLock *lock)
                 * another lock.
                 */
                pg_write_barrier();
-               proc->lwWaiting = false;
-               PGSemaphoreUnlock(&proc->sem);
+               waiter->lwWaiting = false;
+               PGSemaphoreUnlock(&waiter->sem);
        }
 
        /*
index ea88a241443f393995fc1a316f13cc41ae270af3..a4789fcdc87a9cbf253892b98be9e3bae75c1883 100644 (file)
@@ -372,7 +372,6 @@ InitProcess(void)
                MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
        MyProc->lwWaiting = false;
        MyProc->lwWaitMode = 0;
-       MyProc->lwWaitLink = NULL;
        MyProc->waitLock = NULL;
        MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
@@ -535,7 +534,6 @@ InitAuxiliaryProcess(void)
        MyPgXact->vacuumFlags = 0;
        MyProc->lwWaiting = false;
        MyProc->lwWaitMode = 0;
-       MyProc->lwWaitLink = NULL;
        MyProc->waitLock = NULL;
        MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
index 09654a873e3ded3c170cdbf327c0d7f598722b25..c84970a7add8c51c99cd3de75c163d5a41f352d8 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef LWLOCK_H
 #define LWLOCK_H
 
+#include "lib/ilist.h"
 #include "storage/s_lock.h"
 
 struct PGPROC;
@@ -50,9 +51,7 @@ typedef struct LWLock
        char            exclusive;              /* # of exclusive holders (0 or 1) */
        int                     shared;                 /* # of shared holders (0..MaxBackends) */
        int                     tranche;                /* tranche ID */
-       struct PGPROC *head;            /* head of list of waiting PGPROCs */
-       struct PGPROC *tail;            /* tail of list of waiting PGPROCs */
-       /* tail is undefined when head is NULL */
+       dlist_head      waiters;                /* list of waiting PGPROCs */
 } LWLock;
 
 /*
index 4ad4164927ecd76f3250438b48c1c0e05287b7b9..e868f0cd9f1e0486a7659b5e8ff669d44fa457e2 100644 (file)
@@ -15,6 +15,7 @@
 #define _PROC_H_
 
 #include "access/xlogdefs.h"
+#include "lib/ilist.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -104,7 +105,7 @@ struct PGPROC
        /* Info about LWLock the process is currently waiting for, if any. */
        bool            lwWaiting;              /* true if waiting for an LW lock */
        uint8           lwWaitMode;             /* lwlock mode being waited for */
-       struct PGPROC *lwWaitLink;      /* next waiter for same LW lock */
+       dlist_node      lwWaitLink;             /* position in LW lock wait list */
 
        /* Info about lock the process is currently waiting for, if any. */
        /* waitLock and waitProcLock are NULL if not currently waiting. */