Use dlist/dclist instead of PROC_QUEUE / SHM_QUEUE for heavyweight locks
authorAndres Freund <andres@anarazel.de>
Wed, 18 Jan 2023 19:41:14 +0000 (11:41 -0800)
committerAndres Freund <andres@anarazel.de>
Wed, 18 Jan 2023 19:41:14 +0000 (11:41 -0800)
Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used
and have an easier to use interface.

As PROC_QUEUE is now unused, remove it.

Reviewed-by: Thomas Munro <thomas.munro@gmail.com> (in an older version)
Discussion: https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.de
Discussion: https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de

src/backend/access/transam/twophase.c
src/backend/storage/lmgr/deadlock.c
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h
src/tools/pgindent/typedefs.list

index 24e29b7998b0f8d482756975678e8a467ec7f623..068e59bec005b8aaece896673f345b7fea7195e2 100644 (file)
@@ -461,7 +461,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
        /* Initialize the PGPROC entry */
        MemSet(proc, 0, sizeof(PGPROC));
        proc->pgprocno = gxact->pgprocno;
-       SHMQueueElemInit(&(proc->links));
+       dlist_node_init(&proc->links);
        proc->waitStatus = PROC_WAIT_STATUS_OK;
        if (LocalTransactionIdIsValid(MyProc->lxid))
        {
@@ -491,7 +491,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
        proc->waitProcLock = NULL;
        pg_atomic_init_u64(&proc->waitStart, 0);
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-               SHMQueueInit(&(proc->myProcLocks[i]));
+               dlist_init(&proc->myProcLocks[i]);
        /* subxid data must be filled later by GXactLoadSubxactData */
        proc->subxidStatus.overflowed = false;
        proc->subxidStatus.count = 0;
index 2f592341f02b5b9b00b2bad2fbee03149bc629e4..617ebacdd4c75c30b9f69985f48554cf10f90dd9 100644 (file)
@@ -216,9 +216,6 @@ InitDeadLockChecking(void)
 DeadLockState
 DeadLockCheck(PGPROC *proc)
 {
-       int                     i,
-                               j;
-
        /* Initialize to "no constraints" */
        nCurConstraints = 0;
        nPossibleConstraints = 0;
@@ -246,26 +243,23 @@ DeadLockCheck(PGPROC *proc)
        }
 
        /* Apply any needed rearrangements of wait queues */
-       for (i = 0; i < nWaitOrders; i++)
+       for (int i = 0; i < nWaitOrders; i++)
        {
                LOCK       *lock = waitOrders[i].lock;
                PGPROC    **procs = waitOrders[i].procs;
                int                     nProcs = waitOrders[i].nProcs;
-               PROC_QUEUE *waitQueue = &(lock->waitProcs);
+               dclist_head *waitQueue = &lock->waitProcs;
 
-               Assert(nProcs == waitQueue->size);
+               Assert(nProcs == dclist_count(waitQueue));
 
 #ifdef DEBUG_DEADLOCK
                PrintLockQueue(lock, "DeadLockCheck:");
 #endif
 
                /* Reset the queue and re-add procs in the desired order */
-               ProcQueueInit(waitQueue);
-               for (j = 0; j < nProcs; j++)
-               {
-                       SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
-                       waitQueue->size++;
-               }
+               dclist_init(waitQueue);
+               for (int j = 0; j < nProcs; j++)
+                       dclist_push_tail(waitQueue, &procs[j]->links);
 
 #ifdef DEBUG_DEADLOCK
                PrintLockQueue(lock, "rearranged to:");
@@ -544,11 +538,8 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
 {
        PGPROC     *proc;
        LOCK       *lock = checkProc->waitLock;
-       PROCLOCK   *proclock;
-       SHM_QUEUE  *procLocks;
+       dlist_iter      proclock_iter;
        LockMethod      lockMethodTable;
-       PROC_QUEUE *waitQueue;
-       int                     queue_size;
        int                     conflictMask;
        int                     i;
        int                     numLockModes,
@@ -571,13 +562,9 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
         * Scan for procs that already hold conflicting locks.  These are "hard"
         * edges in the waits-for graph.
         */
-       procLocks = &(lock->procLocks);
-
-       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                                offsetof(PROCLOCK, lockLink));
-
-       while (proclock)
+       dlist_foreach(proclock_iter, &lock->procLocks)
        {
+               PROCLOCK   *proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
                PGPROC     *leader;
 
                proc = proclock->tag.myProc;
@@ -636,9 +623,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
                                }
                        }
                }
-
-               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
-                                                                                        offsetof(PROCLOCK, lockLink));
        }
 
        /*
@@ -660,8 +644,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
        {
                /* Use the given hypothetical wait queue order */
                PGPROC    **procs = waitOrders[i].procs;
-
-               queue_size = waitOrders[i].nProcs;
+               int                     queue_size = waitOrders[i].nProcs;
 
                for (i = 0; i < queue_size; i++)
                {
@@ -711,9 +694,11 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
        else
        {
                PGPROC     *lastGroupMember = NULL;
+               dlist_iter      proc_iter;
+               dclist_head *waitQueue;
 
                /* Use the true lock wait queue order */
-               waitQueue = &(lock->waitProcs);
+               waitQueue = &lock->waitProcs;
 
                /*
                 * Find the last member of the lock group that is present in the wait
@@ -726,13 +711,12 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
                        lastGroupMember = checkProc;
                else
                {
-                       proc = (PGPROC *) waitQueue->links.next;
-                       queue_size = waitQueue->size;
-                       while (queue_size-- > 0)
+                       dclist_foreach(proc_iter, waitQueue)
                        {
+                               proc = dlist_container(PGPROC, links, proc_iter.cur);
+
                                if (proc->lockGroupLeader == checkProcLeader)
                                        lastGroupMember = proc;
-                               proc = (PGPROC *) proc->links.next;
                        }
                        Assert(lastGroupMember != NULL);
                }
@@ -740,12 +724,12 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
                /*
                 * OK, now rescan (or scan) the queue to identify the soft conflicts.
                 */
-               queue_size = waitQueue->size;
-               proc = (PGPROC *) waitQueue->links.next;
-               while (queue_size-- > 0)
+               dclist_foreach(proc_iter, waitQueue)
                {
                        PGPROC     *leader;
 
+                       proc = dlist_container(PGPROC, links, proc_iter.cur);
+
                        leader = proc->lockGroupLeader == NULL ? proc :
                                proc->lockGroupLeader;
 
@@ -779,8 +763,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
                                        return true;
                                }
                        }
-
-                       proc = (PGPROC *) proc->links.next;
                }
        }
 
@@ -832,8 +814,8 @@ ExpandConstraints(EDGE *constraints,
                /* No, so allocate a new list */
                waitOrders[nWaitOrders].lock = lock;
                waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
-               waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
-               nWaitOrderProcs += lock->waitProcs.size;
+               waitOrders[nWaitOrders].nProcs = dclist_count(&lock->waitProcs);
+               nWaitOrderProcs += dclist_count(&lock->waitProcs);
                Assert(nWaitOrderProcs <= MaxBackends);
 
                /*
@@ -880,8 +862,8 @@ TopoSort(LOCK *lock,
                 int nConstraints,
                 PGPROC **ordering)             /* output argument */
 {
-       PROC_QUEUE *waitQueue = &(lock->waitProcs);
-       int                     queue_size = waitQueue->size;
+       dclist_head *waitQueue = &lock->waitProcs;
+       int                     queue_size = dclist_count(waitQueue);
        PGPROC     *proc;
        int                     i,
                                j,
@@ -889,14 +871,16 @@ TopoSort(LOCK *lock,
                                k,
                                kk,
                                last;
+       dlist_iter      proc_iter;
 
        /* First, fill topoProcs[] array with the procs in their current order */
-       proc = (PGPROC *) waitQueue->links.next;
-       for (i = 0; i < queue_size; i++)
+       i = 0;
+       dclist_foreach(proc_iter, waitQueue)
        {
-               topoProcs[i] = proc;
-               proc = (PGPROC *) proc->links.next;
+               proc = dlist_container(PGPROC, links, proc_iter.cur);
+               topoProcs[i++] = proc;
        }
+       Assert(i == queue_size);
 
        /*
         * Scan the constraints, and for each proc in the array, generate a count
@@ -1066,17 +1050,16 @@ TopoSort(LOCK *lock,
 static void
 PrintLockQueue(LOCK *lock, const char *info)
 {
-       PROC_QUEUE *waitQueue = &(lock->waitProcs);
-       int                     queue_size = waitQueue->size;
-       PGPROC     *proc;
-       int                     i;
+       dclist_head *waitQueue = &lock->waitProcs;
+       dlist_iter      proc_iter;
 
        printf("%s lock %p queue ", info, lock);
-       proc = (PGPROC *) waitQueue->links.next;
-       for (i = 0; i < queue_size; i++)
+
+       dclist_foreach(proc_iter, waitQueue)
        {
+               PGPROC     *proc = dlist_container(PGPROC, links, proc_iter.cur);
+
                printf(" %d", proc->pid);
-               proc = (PGPROC *) proc->links.next;
        }
        printf("\n");
        fflush(stdout);
index 965049697d19b1ce6b388fc8d45d182a436b18e2..49d62a0dc7e6dc422d7387c7e55ba5b14fb63739 100644 (file)
@@ -345,7 +345,7 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
                         lock->granted[1], lock->granted[2], lock->granted[3],
                         lock->granted[4], lock->granted[5], lock->granted[6],
                         lock->granted[7], lock->nGranted,
-                        lock->waitProcs.size,
+                        dclist_count(&lock->waitProcs),
                         LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
 }
 
@@ -1058,8 +1058,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
                                uint32          proclock_hashcode;
 
                                proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
-                               SHMQueueDelete(&proclock->lockLink);
-                               SHMQueueDelete(&proclock->procLink);
+                               dlist_delete(&proclock->lockLink);
+                               dlist_delete(&proclock->procLink);
                                if (!hash_search_with_hash_value(LockMethodProcLockHash,
                                                                                                 (void *) &(proclock->tag),
                                                                                                 proclock_hashcode,
@@ -1194,8 +1194,8 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
        {
                lock->grantMask = 0;
                lock->waitMask = 0;
-               SHMQueueInit(&(lock->procLocks));
-               ProcQueueInit(&(lock->waitProcs));
+               dlist_init(&lock->procLocks);
+               dclist_init(&lock->waitProcs);
                lock->nRequested = 0;
                lock->nGranted = 0;
                MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
@@ -1237,7 +1237,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
                         * of shared memory, because there won't be anything to cause
                         * anyone to release the lock object later.
                         */
-                       Assert(SHMQueueEmpty(&(lock->procLocks)));
+                       Assert(dlist_is_empty(&(lock->procLocks)));
                        if (!hash_search_with_hash_value(LockMethodLockHash,
                                                                                         (void *) &(lock->tag),
                                                                                         hashcode,
@@ -1270,9 +1270,8 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
                proclock->holdMask = 0;
                proclock->releaseMask = 0;
                /* Add proclock to appropriate lists */
-               SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
-               SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
-                                                        &proclock->procLink);
+               dlist_push_tail(&lock->procLocks, &proclock->lockLink);
+               dlist_push_tail(&proc->myProcLocks[partition], &proclock->procLink);
                PROCLOCK_PRINT("LockAcquire: new", proclock);
        }
        else
@@ -1427,9 +1426,8 @@ LockCheckConflicts(LockMethod lockMethodTable,
        int                     conflictMask = lockMethodTable->conflictTab[lockmode];
        int                     conflictsRemaining[MAX_LOCKMODES];
        int                     totalConflictsRemaining = 0;
+       dlist_iter      proclock_iter;
        int                     i;
-       SHM_QUEUE  *procLocks;
-       PROCLOCK   *otherproclock;
 
        /*
         * first check for global conflicts: If no locks conflict with my request,
@@ -1501,11 +1499,11 @@ LockCheckConflicts(LockMethod lockMethodTable,
         * shared memory state more complex (and larger) but it doesn't seem worth
         * it.
         */
-       procLocks = &(lock->procLocks);
-       otherproclock = (PROCLOCK *)
-               SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
-       while (otherproclock != NULL)
+       dlist_foreach(proclock_iter, &lock->procLocks)
        {
+               PROCLOCK   *otherproclock =
+                       dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
+
                if (proclock != otherproclock &&
                        proclock->groupLeader == otherproclock->groupLeader &&
                        (otherproclock->holdMask & conflictMask) != 0)
@@ -1530,9 +1528,6 @@ LockCheckConflicts(LockMethod lockMethodTable,
                                return false;
                        }
                }
-               otherproclock = (PROCLOCK *)
-                       SHMQueueNext(procLocks, &otherproclock->lockLink,
-                                                offsetof(PROCLOCK, lockLink));
        }
 
        /* Nope, it's a real conflict. */
@@ -1645,8 +1640,8 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
                uint32          proclock_hashcode;
 
                PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
-               SHMQueueDelete(&proclock->lockLink);
-               SHMQueueDelete(&proclock->procLink);
+               dlist_delete(&proclock->lockLink);
+               dlist_delete(&proclock->procLink);
                proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
                if (!hash_search_with_hash_value(LockMethodProcLockHash,
                                                                                 (void *) &(proclock->tag),
@@ -1663,7 +1658,7 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
                 * object.
                 */
                LOCK_PRINT("CleanUpLock: deleting", lock, 0);
-               Assert(SHMQueueEmpty(&(lock->procLocks)));
+               Assert(dlist_is_empty(&lock->procLocks));
                if (!hash_search_with_hash_value(LockMethodLockHash,
                                                                                 (void *) &(lock->tag),
                                                                                 hashcode,
@@ -1926,12 +1921,11 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
        Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING);
        Assert(proc->links.next != NULL);
        Assert(waitLock);
-       Assert(waitLock->waitProcs.size > 0);
+       Assert(!dclist_is_empty(&waitLock->waitProcs));
        Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
 
        /* Remove proc from lock's wait queue */
-       SHMQueueDelete(&(proc->links));
-       waitLock->waitProcs.size--;
+       dclist_delete_from(&waitLock->waitProcs, &proc->links);
 
        /* Undo increments of request counts by waiting process */
        Assert(waitLock->nRequested > 0);
@@ -2185,7 +2179,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
                                numLockModes;
        LOCALLOCK  *locallock;
        LOCK       *lock;
-       PROCLOCK   *proclock;
        int                     partition;
        bool            have_fast_path_lwlock = false;
 
@@ -2342,8 +2335,8 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
        for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
        {
                LWLock     *partitionLock;
-               SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
-               PROCLOCK   *nextplock;
+               dlist_head *procLocks = &MyProc->myProcLocks[partition];
+               dlist_mutable_iter proclock_iter;
 
                partitionLock = LockHashPartitionLockByIndex(partition);
 
@@ -2366,24 +2359,16 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
                 * locallock situation, we lose that guarantee for fast-path locks.
                 * This is not ideal.
                 */
-               if (SHMQueueNext(procLocks, procLocks,
-                                                offsetof(PROCLOCK, procLink)) == NULL)
+               if (dlist_is_empty(procLocks))
                        continue;                       /* needn't examine this partition */
 
                LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
-               for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                                                 offsetof(PROCLOCK, procLink));
-                        proclock;
-                        proclock = nextplock)
+               dlist_foreach_modify(proclock_iter, procLocks)
                {
+                       PROCLOCK   *proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
                        bool            wakeupNeeded = false;
 
-                       /* Get link first, since we may unlink/delete this proclock */
-                       nextplock = (PROCLOCK *)
-                               SHMQueueNext(procLocks, &proclock->procLink,
-                                                        offsetof(PROCLOCK, procLink));
-
                        Assert(proclock->tag.myProc == MyProc);
 
                        lock = proclock->tag.myLock;
@@ -2918,7 +2903,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
        LockMethod      lockMethodTable;
        LOCK       *lock;
        LOCKMASK        conflictMask;
-       SHM_QUEUE  *procLocks;
+       dlist_iter      proclock_iter;
        PROCLOCK   *proclock;
        uint32          hashcode;
        LWLock     *partitionLock;
@@ -3064,14 +3049,10 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
        /*
         * Examine each existing holder (or awaiter) of the lock.
         */
-
-       procLocks = &(lock->procLocks);
-
-       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                                offsetof(PROCLOCK, lockLink));
-
-       while (proclock)
+       dlist_foreach(proclock_iter, &lock->procLocks)
        {
+               proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
+
                if (conflictMask & proclock->holdMask)
                {
                        PGPROC     *proc = proclock->tag.myProc;
@@ -3097,9 +3078,6 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
                                /* else, xact already committed or aborted */
                        }
                }
-
-               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
-                                                                                        offsetof(PROCLOCK, lockLink));
        }
 
        LWLockRelease(partitionLock);
@@ -3498,8 +3476,8 @@ PostPrepare_Locks(TransactionId xid)
        for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
        {
                LWLock     *partitionLock;
-               SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
-               PROCLOCK   *nextplock;
+               dlist_head *procLocks = &(MyProc->myProcLocks[partition]);
+               dlist_mutable_iter proclock_iter;
 
                partitionLock = LockHashPartitionLockByIndex(partition);
 
@@ -3511,21 +3489,14 @@ PostPrepare_Locks(TransactionId xid)
                 * another backend is adding something to our lists now.  For safety,
                 * though, we code this the same way as in LockReleaseAll.
                 */
-               if (SHMQueueNext(procLocks, procLocks,
-                                                offsetof(PROCLOCK, procLink)) == NULL)
+               if (dlist_is_empty(procLocks))
                        continue;                       /* needn't examine this partition */
 
                LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
-               for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                                                 offsetof(PROCLOCK, procLink));
-                        proclock;
-                        proclock = nextplock)
+               dlist_foreach_modify(proclock_iter, procLocks)
                {
-                       /* Get link first, since we may unlink/relink this proclock */
-                       nextplock = (PROCLOCK *)
-                               SHMQueueNext(procLocks, &proclock->procLink,
-                                                        offsetof(PROCLOCK, procLink));
+                       proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
 
                        Assert(proclock->tag.myProc == MyProc);
 
@@ -3563,7 +3534,7 @@ PostPrepare_Locks(TransactionId xid)
                         * same hash partition, cf proclock_hash().  So the partition lock
                         * we already hold is sufficient for this.
                         */
-                       SHMQueueDelete(&proclock->procLink);
+                       dlist_delete(&proclock->procLink);
 
                        /*
                         * Create the new hash key for the proclock.
@@ -3589,8 +3560,7 @@ PostPrepare_Locks(TransactionId xid)
                                elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
 
                        /* Re-link into the new proc's proclock list */
-                       SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
-                                                                &proclock->procLink);
+                       dlist_push_tail(&newproc->myProcLocks[partition], &proclock->procLink);
 
                        PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
                }                                               /* loop over PROCLOCKs within this partition */
@@ -3919,12 +3889,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
 {
        LOCK       *theLock = blocked_proc->waitLock;
        BlockedProcData *bproc;
-       SHM_QUEUE  *procLocks;
-       PROCLOCK   *proclock;
-       PROC_QUEUE *waitQueue;
-       PGPROC     *queued_proc;
+       dlist_iter      proclock_iter;
+       dlist_iter      proc_iter;
+       dclist_head *waitQueue;
        int                     queue_size;
-       int                     i;
 
        /* Nothing to do if this proc is not blocked */
        if (theLock == NULL)
@@ -3942,11 +3910,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
         */
 
        /* Collect all PROCLOCKs associated with theLock */
-       procLocks = &(theLock->procLocks);
-       proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                                offsetof(PROCLOCK, lockLink));
-       while (proclock)
+       dlist_foreach(proclock_iter, &theLock->procLocks)
        {
+               PROCLOCK   *proclock =
+                       dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
                PGPROC     *proc = proclock->tag.myProc;
                LOCK       *lock = proclock->tag.myLock;
                LockInstanceData *instance;
@@ -3971,14 +3938,11 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
                instance->leaderPid = proclock->groupLeader->pid;
                instance->fastpath = false;
                data->nlocks++;
-
-               proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
-                                                                                        offsetof(PROCLOCK, lockLink));
        }
 
        /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
        waitQueue = &(theLock->waitProcs);
-       queue_size = waitQueue->size;
+       queue_size = dclist_count(waitQueue);
 
        if (queue_size > data->maxpids - data->npids)
        {
@@ -3989,9 +3953,9 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
        }
 
        /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
-       queued_proc = (PGPROC *) waitQueue->links.next;
-       for (i = 0; i < queue_size; i++)
+       dclist_foreach(proc_iter, waitQueue)
        {
+               PGPROC     *queued_proc = dlist_container(PGPROC, links, proc_iter.cur);
                if (queued_proc == blocked_proc)
                        break;
                data->waiter_pids[data->npids++] = queued_proc->pid;
@@ -4113,9 +4077,6 @@ GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
 void
 DumpLocks(PGPROC *proc)
 {
-       SHM_QUEUE  *procLocks;
-       PROCLOCK   *proclock;
-       LOCK       *lock;
        int                     i;
 
        if (proc == NULL)
@@ -4126,23 +4087,17 @@ DumpLocks(PGPROC *proc)
 
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
        {
-               procLocks = &(proc->myProcLocks[i]);
+               dlist_head *procLocks = &proc->myProcLocks[i];
+               dlist_iter      iter;
 
-               proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                                        offsetof(PROCLOCK, procLink));
-
-               while (proclock)
+               dlist_foreach(iter, procLocks)
                {
-                       Assert(proclock->tag.myProc == proc);
-
-                       lock = proclock->tag.myLock;
+                       PROCLOCK   *proclock = dlist_container(PROCLOCK, procLink, iter.cur);
+                       LOCK       *lock = proclock->tag.myLock;
 
+                       Assert(proclock->tag.myProc == proc);
                        PROCLOCK_PRINT("DumpLocks", proclock);
                        LOCK_PRINT("DumpLocks", lock, 0);
-
-                       proclock = (PROCLOCK *)
-                               SHMQueueNext(procLocks, &proclock->procLink,
-                                                        offsetof(PROCLOCK, procLink));
                }
        }
 }
@@ -4267,8 +4222,8 @@ lock_twophase_recover(TransactionId xid, uint16 info,
        {
                lock->grantMask = 0;
                lock->waitMask = 0;
-               SHMQueueInit(&(lock->procLocks));
-               ProcQueueInit(&(lock->waitProcs));
+               dlist_init(&lock->procLocks);
+               dclist_init(&lock->waitProcs);
                lock->nRequested = 0;
                lock->nGranted = 0;
                MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
@@ -4310,7 +4265,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
                         * of shared memory, because there won't be anything to cause
                         * anyone to release the lock object later.
                         */
-                       Assert(SHMQueueEmpty(&(lock->procLocks)));
+                       Assert(dlist_is_empty(&lock->procLocks));
                        if (!hash_search_with_hash_value(LockMethodLockHash,
                                                                                         (void *) &(lock->tag),
                                                                                         hashcode,
@@ -4335,9 +4290,9 @@ lock_twophase_recover(TransactionId xid, uint16 info,
                proclock->holdMask = 0;
                proclock->releaseMask = 0;
                /* Add proclock to appropriate lists */
-               SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
-               SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
-                                                        &proclock->procLink);
+               dlist_push_tail(&lock->procLocks, &proclock->lockLink);
+               dlist_push_tail(&proc->myProcLocks[partition],
+                                               &proclock->procLink);
                PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
        }
        else
index 00d26dc0f68d112967101c7d63c3165c569f50b1..e2adf493ca920ce88aec69f7e3a99c151d8abc05 100644 (file)
@@ -15,8 +15,6 @@
 /*
  * Interface (a):
  *             ProcSleep(), ProcWakeup(),
- *             ProcQueueAlloc() -- create a shm queue for sleeping processes
- *             ProcQueueInit() -- create a queue without allocing memory
  *
  * Waiting for a lock causes the backend to be put to sleep.  Whoever releases
  * the lock wakes the process up again (and gives it an error code so it knows
@@ -173,10 +171,10 @@ InitProcGlobal(void)
         * Initialize the data structures.
         */
        ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
-       ProcGlobal->freeProcs = NULL;
-       ProcGlobal->autovacFreeProcs = NULL;
-       ProcGlobal->bgworkerFreeProcs = NULL;
-       ProcGlobal->walsenderFreeProcs = NULL;
+       dlist_init(&ProcGlobal->freeProcs);
+       dlist_init(&ProcGlobal->autovacFreeProcs);
+       dlist_init(&ProcGlobal->bgworkerFreeProcs);
+       dlist_init(&ProcGlobal->walsenderFreeProcs);
        ProcGlobal->startupBufferPinWaitBufId = -1;
        ProcGlobal->walwriterLatch = NULL;
        ProcGlobal->checkpointerLatch = NULL;
@@ -214,6 +212,8 @@ InitProcGlobal(void)
 
        for (i = 0; i < TotalProcs; i++)
        {
+               PGPROC     *proc = &procs[i];
+
                /* Common initialization for all PGPROCs, regardless of type. */
 
                /*
@@ -223,11 +223,11 @@ InitProcGlobal(void)
                 */
                if (i < MaxBackends + NUM_AUXILIARY_PROCS)
                {
-                       procs[i].sem = PGSemaphoreCreate();
-                       InitSharedLatch(&(procs[i].procLatch));
-                       LWLockInitialize(&(procs[i].fpInfoLock), LWTRANCHE_LOCK_FASTPATH);
+                       proc->sem = PGSemaphoreCreate();
+                       InitSharedLatch(&(proc->procLatch));
+                       LWLockInitialize(&(proc->fpInfoLock), LWTRANCHE_LOCK_FASTPATH);
                }
-               procs[i].pgprocno = i;
+               proc->pgprocno = i;
 
                /*
                 * Newly created PGPROCs for normal backends, autovacuum and bgworkers
@@ -240,46 +240,42 @@ InitProcGlobal(void)
                if (i < MaxConnections)
                {
                        /* PGPROC for normal backend, add to freeProcs list */
-                       procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
-                       ProcGlobal->freeProcs = &procs[i];
-                       procs[i].procgloballist = &ProcGlobal->freeProcs;
+                       dlist_push_head(&ProcGlobal->freeProcs, &proc->links);
+                       proc->procgloballist = &ProcGlobal->freeProcs;
                }
                else if (i < MaxConnections + autovacuum_max_workers + 1)
                {
                        /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
-                       procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
-                       ProcGlobal->autovacFreeProcs = &procs[i];
-                       procs[i].procgloballist = &ProcGlobal->autovacFreeProcs;
+                       dlist_push_head(&ProcGlobal->autovacFreeProcs, &proc->links);
+                       proc->procgloballist = &ProcGlobal->autovacFreeProcs;
                }
                else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes)
                {
                        /* PGPROC for bgworker, add to bgworkerFreeProcs list */
-                       procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs;
-                       ProcGlobal->bgworkerFreeProcs = &procs[i];
-                       procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs;
+                       dlist_push_head(&ProcGlobal->bgworkerFreeProcs, &proc->links);
+                       proc->procgloballist = &ProcGlobal->bgworkerFreeProcs;
                }
                else if (i < MaxBackends)
                {
                        /* PGPROC for walsender, add to walsenderFreeProcs list */
-                       procs[i].links.next = (SHM_QUEUE *) ProcGlobal->walsenderFreeProcs;
-                       ProcGlobal->walsenderFreeProcs = &procs[i];
-                       procs[i].procgloballist = &ProcGlobal->walsenderFreeProcs;
+                       dlist_push_head(&ProcGlobal->walsenderFreeProcs, &proc->links);
+                       proc->procgloballist = &ProcGlobal->walsenderFreeProcs;
                }
 
                /* Initialize myProcLocks[] shared memory queues. */
                for (j = 0; j < NUM_LOCK_PARTITIONS; j++)
-                       SHMQueueInit(&(procs[i].myProcLocks[j]));
+                       dlist_init(&(proc->myProcLocks[j]));
 
                /* Initialize lockGroupMembers list. */
-               dlist_init(&procs[i].lockGroupMembers);
+               dlist_init(&proc->lockGroupMembers);
 
                /*
                 * Initialize the atomic variables, otherwise, it won't be safe to
                 * access them for backends that aren't currently in use.
                 */
-               pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO);
-               pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO);
-               pg_atomic_init_u64(&(procs[i].waitStart), 0);
+               pg_atomic_init_u32(&(proc->procArrayGroupNext), INVALID_PGPROCNO);
+               pg_atomic_init_u32(&(proc->clogGroupNext), INVALID_PGPROCNO);
+               pg_atomic_init_u64(&(proc->waitStart), 0);
        }
 
        /*
@@ -300,7 +296,7 @@ InitProcGlobal(void)
 void
 InitProcess(void)
 {
-       PGPROC     *volatile *procgloballist;
+       dlist_head *procgloballist;
 
        /*
         * ProcGlobal should be set up already (if we are a backend, we inherit
@@ -333,11 +329,9 @@ InitProcess(void)
 
        set_spins_per_delay(ProcGlobal->spins_per_delay);
 
-       MyProc = *procgloballist;
-
-       if (MyProc != NULL)
+       if (!dlist_is_empty(procgloballist))
        {
-               *procgloballist = (PGPROC *) MyProc->links.next;
+               MyProc = (PGPROC*) dlist_pop_head_node(procgloballist);
                SpinLockRelease(ProcStructLock);
        }
        else
@@ -378,7 +372,7 @@ InitProcess(void)
         * Initialize all fields of MyProc, except for those previously
         * initialized by InitProcGlobal.
         */
-       SHMQueueElemInit(&(MyProc->links));
+       dlist_node_init(&MyProc->links);
        MyProc->waitStatus = PROC_WAIT_STATUS_OK;
        MyProc->lxid = InvalidLocalTransactionId;
        MyProc->fpVXIDLock = false;
@@ -408,7 +402,7 @@ InitProcess(void)
 
                /* Last process should have released all locks. */
                for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-                       Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
+                       Assert(dlist_is_empty(&(MyProc->myProcLocks[i])));
        }
 #endif
        MyProc->recoveryConflictPending = false;
@@ -565,7 +559,7 @@ InitAuxiliaryProcess(void)
         * Initialize all fields of MyProc, except for those previously
         * initialized by InitProcGlobal.
         */
-       SHMQueueElemInit(&(MyProc->links));
+       dlist_node_init(&MyProc->links);
        MyProc->waitStatus = PROC_WAIT_STATUS_OK;
        MyProc->lxid = InvalidLocalTransactionId;
        MyProc->fpVXIDLock = false;
@@ -590,7 +584,7 @@ InitAuxiliaryProcess(void)
 
                /* Last process should have released all locks. */
                for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-                       Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
+                       Assert(dlist_is_empty(&(MyProc->myProcLocks[i])));
        }
 #endif
 
@@ -658,16 +652,15 @@ GetStartupBufferPinWaitBufId(void)
 bool
 HaveNFreeProcs(int n)
 {
-       PGPROC     *proc;
+       dlist_iter      iter;
 
        SpinLockAcquire(ProcStructLock);
 
-       proc = ProcGlobal->freeProcs;
-
-       while (n > 0 && proc != NULL)
+       dlist_foreach(iter, &ProcGlobal->freeProcs)
        {
-               proc = (PGPROC *) proc->links.next;
                n--;
+               if (n == 0)
+                       break;
        }
 
        SpinLockRelease(ProcStructLock);
@@ -730,7 +723,7 @@ LockErrorCleanup(void)
        partitionLock = LockHashPartitionLock(lockAwaited->hashcode);
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
-       if (MyProc->links.next != NULL)
+       if (!dlist_node_is_detached(&MyProc->links))
        {
                /* We could not have been granted the lock yet */
                RemoveFromWaitQueue(MyProc, lockAwaited->hashcode);
@@ -803,7 +796,7 @@ static void
 ProcKill(int code, Datum arg)
 {
        PGPROC     *proc;
-       PGPROC     *volatile *procgloballist;
+       dlist_head *procgloballist;
 
        Assert(MyProc != NULL);
 
@@ -816,7 +809,7 @@ ProcKill(int code, Datum arg)
 
                /* Last process should have released all locks. */
                for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
-                       Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
+                       Assert(dlist_is_empty(&(MyProc->myProcLocks[i])));
        }
 #endif
 
@@ -832,7 +825,7 @@ ProcKill(int code, Datum arg)
 
        /*
         * Detach from any lock group of which we are a member.  If the leader
-        * exist before all other group members, its PGPROC will remain allocated
+        * exits before all other group members, its PGPROC will remain allocated
         * until the last group process exits; that process must return the
         * leader's PGPROC to the appropriate list.
         */
@@ -853,8 +846,7 @@ ProcKill(int code, Datum arg)
 
                                /* Leader exited first; return its PGPROC. */
                                SpinLockAcquire(ProcStructLock);
-                               leader->links.next = (SHM_QUEUE *) *procgloballist;
-                               *procgloballist = leader;
+                               dlist_push_head(procgloballist, &leader->links);
                                SpinLockRelease(ProcStructLock);
                        }
                }
@@ -893,8 +885,7 @@ ProcKill(int code, Datum arg)
                Assert(dlist_is_empty(&proc->lockGroupMembers));
 
                /* Return PGPROC structure (and semaphore) to appropriate freelist */
-               proc->links.next = (SHM_QUEUE *) *procgloballist;
-               *procgloballist = proc;
+               dlist_push_tail(procgloballist, &proc->links);
        }
 
        /* Update shared estimate of spins_per_delay */
@@ -986,44 +977,6 @@ AuxiliaryPidGetProc(int pid)
        return result;
 }
 
-/*
- * ProcQueue package: routines for putting processes to sleep
- *             and  waking them up
- */
-
-/*
- * ProcQueueAlloc -- alloc/attach to a shared memory process queue
- *
- * Returns: a pointer to the queue
- * Side Effects: Initializes the queue if it wasn't there before
- */
-#ifdef NOT_USED
-PROC_QUEUE *
-ProcQueueAlloc(const char *name)
-{
-       PROC_QUEUE *queue;
-       bool            found;
-
-       queue = (PROC_QUEUE *)
-               ShmemInitStruct(name, sizeof(PROC_QUEUE), &found);
-
-       if (!found)
-               ProcQueueInit(queue);
-
-       return queue;
-}
-#endif
-
-/*
- * ProcQueueInit -- initialize a shared memory process queue
- */
-void
-ProcQueueInit(PROC_QUEUE *queue)
-{
-       SHMQueueInit(&(queue->links));
-       queue->size = 0;
-}
-
 
 /*
  * ProcSleep -- put a process to sleep on the specified lock
@@ -1049,8 +1002,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
        PROCLOCK   *proclock = locallock->proclock;
        uint32          hashcode = locallock->hashcode;
        LWLock     *partitionLock = LockHashPartitionLock(hashcode);
-       PROC_QUEUE *waitQueue = &(lock->waitProcs);
-       SHM_QUEUE  *waitQueuePos;
+       dclist_head *waitQueue = &lock->waitProcs;
+       PGPROC     *insert_before = NULL;
        LOCKMASK        myHeldLocks = MyProc->heldLocks;
        TimestampTz standbyWaitStart = 0;
        bool            early_deadlock = false;
@@ -1058,7 +1011,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
        bool            logged_recovery_conflict = false;
        ProcWaitStatus myWaitStatus;
        PGPROC     *leader = MyProc->lockGroupLeader;
-       int                     i;
 
        /*
         * If group locking is in use, locks held by members of my locking group
@@ -1072,18 +1024,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
         */
        if (leader != NULL)
        {
-               SHM_QUEUE  *procLocks = &(lock->procLocks);
-               PROCLOCK   *otherproclock;
+               dlist_iter      iter;
 
-               otherproclock = (PROCLOCK *)
-                       SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
-               while (otherproclock != NULL)
+               dlist_foreach(iter, &lock->procLocks)
                {
+                       PROCLOCK   *otherproclock;
+
+                       otherproclock = dlist_container(PROCLOCK, lockLink, iter.cur);
+
                        if (otherproclock->groupLeader == leader)
                                myHeldLocks |= otherproclock->holdMask;
-                       otherproclock = (PROCLOCK *)
-                               SHMQueueNext(procLocks, &otherproclock->lockLink,
-                                                        offsetof(PROCLOCK, lockLink));
                }
        }
 
@@ -1104,15 +1054,14 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
         * we are only considering the part of the wait queue before my insertion
         * point.
         */
-       if (myHeldLocks != 0 && waitQueue->size > 0)
+       if (myHeldLocks != 0 && !dclist_is_empty(waitQueue))
        {
                LOCKMASK        aheadRequests = 0;
-               SHM_QUEUE  *proc_node;
+               dlist_iter      iter;
 
-               proc_node = waitQueue->links.next;
-               for (i = 0; i < waitQueue->size; i++)
+               dclist_foreach(iter, waitQueue)
                {
-                       PGPROC     *proc = (PGPROC *) proc_node;
+                       PGPROC     *proc = dlist_container(PGPROC, links, iter.cur);
 
                        /*
                         * If we're part of the same locking group as this waiter, its
@@ -1120,10 +1069,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                         * aheadRequests.
                         */
                        if (leader != NULL && leader == proc->lockGroupLeader)
-                       {
-                               proc_node = proc->links.next;
                                continue;
-                       }
+
                        /* Must he wait for me? */
                        if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks)
                        {
@@ -1151,31 +1098,23 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                                        GrantAwaitedLock();
                                        return PROC_WAIT_STATUS_OK;
                                }
-                               /* Break out of loop to put myself before him */
+
+                               /* Put myself into wait queue before conflicting process */
+                               insert_before = proc;
                                break;
                        }
                        /* Nope, so advance to next waiter */
                        aheadRequests |= LOCKBIT_ON(proc->waitLockMode);
-                       proc_node = proc->links.next;
                }
-
-               /*
-                * If we iterated through the whole queue, cur points to the waitQueue
-                * head, so we will insert at tail of queue as desired.
-                */
-               waitQueuePos = proc_node;
-       }
-       else
-       {
-               /* I hold no locks, so I can't push in front of anyone. */
-               waitQueuePos = &waitQueue->links;
        }
 
        /*
         * Insert self into queue, at the position determined above.
         */
-       SHMQueueInsertBefore(waitQueuePos, &MyProc->links);
-       waitQueue->size++;
+       if (insert_before)
+               dclist_insert_before(waitQueue, &insert_before->links, &MyProc->links);
+       else
+               dclist_push_tail(waitQueue, &MyProc->links);
 
        lock->waitMask |= LOCKBIT_ON(lockmode);
 
@@ -1453,7 +1392,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
                        long            secs;
                        int                     usecs;
                        long            msecs;
-                       SHM_QUEUE  *procLocks;
+                       dlist_iter      proc_iter;
                        PROCLOCK   *curproclock;
                        bool            first_holder = true,
                                                first_waiter = true;
@@ -1483,12 +1422,11 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 
                        LWLockAcquire(partitionLock, LW_SHARED);
 
-                       procLocks = &(lock->procLocks);
-                       curproclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-                                                                                                       offsetof(PROCLOCK, lockLink));
-
-                       while (curproclock)
+                       dlist_foreach(proc_iter, &lock->procLocks)
                        {
+                               curproclock =
+                                       dlist_container(PROCLOCK, lockLink, proc_iter.cur);
+
                                /*
                                 * we are a waiter if myProc->waitProcLock == curproclock; we
                                 * are a holder if it is NULL or something different
@@ -1519,10 +1457,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
 
                                        lockHoldersNum++;
                                }
-
-                               curproclock = (PROCLOCK *) SHMQueueNext(procLocks,
-                                                                                                               &curproclock->lockLink,
-                                                                                                               offsetof(PROCLOCK, lockLink));
                        }
 
                        LWLockRelease(partitionLock);
@@ -1657,7 +1591,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
  * ProcWakeup -- wake up a process by setting its latch.
  *
  *      Also remove the process from the wait queue and set its links invalid.
- *      RETURN: the next process in the wait queue.
  *
  * The appropriate lock partition lock must be held by caller.
  *
@@ -1666,23 +1599,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
  * to twiddle the lock's request counts too --- see RemoveFromWaitQueue.
  * Hence, in practice the waitStatus parameter must be PROC_WAIT_STATUS_OK.
  */
-PGPROC *
+void
 ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
 {
-       PGPROC     *retProc;
+       if (dlist_node_is_detached(&proc->links))
+               return;
 
-       /* Proc should be sleeping ... */
-       if (proc->links.prev == NULL ||
-               proc->links.next == NULL)
-               return NULL;
        Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING);
 
-       /* Save next process before we zap the list link */
-       retProc = (PGPROC *) proc->links.next;
-
        /* Remove process from wait queue */
-       SHMQueueDelete(&(proc->links));
-       (proc->waitLock->waitProcs.size)--;
+       dclist_delete_from_thoroughly(&proc->waitLock->waitProcs, &proc->links);
 
        /* Clean up process' state and pass it the ok/fail signal */
        proc->waitLock = NULL;
@@ -1692,8 +1618,6 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
 
        /* And awaken it */
        SetLatch(&proc->procLatch);
-
-       return retProc;
 }
 
 /*
@@ -1706,20 +1630,16 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
 void
 ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
 {
-       PROC_QUEUE *waitQueue = &(lock->waitProcs);
-       int                     queue_size = waitQueue->size;
-       PGPROC     *proc;
+       dclist_head *waitQueue = &lock->waitProcs;
        LOCKMASK        aheadRequests = 0;
+       dlist_mutable_iter miter;
 
-       Assert(queue_size >= 0);
-
-       if (queue_size == 0)
+       if (dclist_is_empty(waitQueue))
                return;
 
-       proc = (PGPROC *) waitQueue->links.next;
-
-       while (queue_size-- > 0)
+       dclist_foreach_modify(miter, waitQueue)
        {
+               PGPROC     *proc = dlist_container(PGPROC, links, miter.cur);
                LOCKMODE        lockmode = proc->waitLockMode;
 
                /*
@@ -1732,25 +1652,18 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
                {
                        /* OK to waken */
                        GrantLock(lock, proc->waitProcLock, lockmode);
-                       proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK);
-
-                       /*
-                        * ProcWakeup removes proc from the lock's waiting process queue
-                        * and returns the next proc in chain; don't use proc's next-link,
-                        * because it's been cleared.
-                        */
+                       /* removes proc from the lock's waiting process queue */
+                       ProcWakeup(proc, PROC_WAIT_STATUS_OK);
                }
                else
                {
                        /*
-                        * Cannot wake this guy. Remember his request for later checks.
+                        * Lock conflicts: Don't wake, but remember requested mode for
+                        * later checks.
                         */
                        aheadRequests |= LOCKBIT_ON(lockmode);
-                       proc = (PGPROC *) proc->links.next;
                }
        }
-
-       Assert(waitQueue->size >= 0);
 }
 
 /*
index 190bddb767c747455144b0c990682060caabcfe5..6ae434596ad40319648bff84d7d40d5f39ba065a 100644 (file)
@@ -18,6 +18,7 @@
 #error "lock.h may not be included from frontend code"
 #endif
 
+#include "lib/ilist.h"
 #include "storage/backendid.h"
 #include "storage/lockdefs.h"
 #include "storage/lwlock.h"
 /* struct PGPROC is declared in proc.h, but must forward-reference it */
 typedef struct PGPROC PGPROC;
 
-typedef struct PROC_QUEUE
-{
-       SHM_QUEUE       links;                  /* head of list of PGPROC objects */
-       int                     size;                   /* number of entries in list */
-} PROC_QUEUE;
-
 /* GUC variables */
 extern PGDLLIMPORT int max_locks_per_xact;
 
@@ -318,8 +313,8 @@ typedef struct LOCK
        /* data */
        LOCKMASK        grantMask;              /* bitmask for lock types already granted */
        LOCKMASK        waitMask;               /* bitmask for lock types awaited */
-       SHM_QUEUE       procLocks;              /* list of PROCLOCK objects assoc. with lock */
-       PROC_QUEUE      waitProcs;              /* list of PGPROC objects waiting on lock */
+       dlist_head      procLocks;              /* list of PROCLOCK objects assoc. with lock */
+       dclist_head     waitProcs;              /* list of PGPROC objects waiting on lock */
        int                     requested[MAX_LOCKMODES];       /* counts of requested locks */
        int                     nRequested;             /* total of requested[] array */
        int                     granted[MAX_LOCKMODES]; /* counts of granted locks */
@@ -380,8 +375,8 @@ typedef struct PROCLOCK
        PGPROC     *groupLeader;        /* proc's lock group leader, or proc itself */
        LOCKMASK        holdMask;               /* bitmask for lock types currently held */
        LOCKMASK        releaseMask;    /* bitmask for lock types to be released */
-       SHM_QUEUE       lockLink;               /* list link in LOCK's list of proclocks */
-       SHM_QUEUE       procLink;               /* list link in PGPROC's list of proclocks */
+       dlist_node      lockLink;               /* list link in LOCK's list of proclocks */
+       dlist_node      procLink;               /* list link in PGPROC's list of proclocks */
 } PROCLOCK;
 
 #define PROCLOCK_LOCKMETHOD(proclock) \
index b5c6f46d031a968d26a1b2ea8593ebe925892034..6eb46a7ee8e04cdbb31a9d14134b5dbd3d3195e7 100644 (file)
@@ -167,8 +167,8 @@ typedef enum
 struct PGPROC
 {
        /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
-       SHM_QUEUE       links;                  /* list link if process is in a list */
-       PGPROC    **procgloballist; /* procglobal list that owns this PGPROC */
+       dlist_node      links;                  /* list link if process is in a list */
+       dlist_head *procgloballist; /* procglobal list that owns this PGPROC */
 
        PGSemaphore sem;                        /* ONE semaphore to sleep on */
        ProcWaitStatus waitStatus;
@@ -255,7 +255,7 @@ struct PGPROC
         * linked into one of these lists, according to the partition number of
         * their lock.
         */
-       SHM_QUEUE       myProcLocks[NUM_LOCK_PARTITIONS];
+       dlist_head      myProcLocks[NUM_LOCK_PARTITIONS];
 
        XidCacheStatus subxidStatus;    /* mirrored with
                                                                         * ProcGlobal->subxidStates[i] */
@@ -385,13 +385,13 @@ typedef struct PROC_HDR
        /* Length of allProcs array */
        uint32          allProcCount;
        /* Head of list of free PGPROC structures */
-       PGPROC     *freeProcs;
+       dlist_head      freeProcs;
        /* Head of list of autovacuum's free PGPROC structures */
-       PGPROC     *autovacFreeProcs;
+       dlist_head autovacFreeProcs;
        /* Head of list of bgworker free PGPROC structures */
-       PGPROC     *bgworkerFreeProcs;
+       dlist_head bgworkerFreeProcs;
        /* Head of list of walsender free PGPROC structures */
-       PGPROC     *walsenderFreeProcs;
+       dlist_head walsenderFreeProcs;
        /* First pgproc waiting for group XID clear */
        pg_atomic_uint32 procArrayGroupFirst;
        /* First pgproc waiting for group transaction status update */
@@ -448,9 +448,8 @@ extern int  GetStartupBufferPinWaitBufId(void);
 extern bool HaveNFreeProcs(int n);
 extern void ProcReleaseLocks(bool isCommit);
 
-extern void ProcQueueInit(PROC_QUEUE *queue);
 extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable);
-extern PGPROC *ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus);
+extern void ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus);
 extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
 extern void CheckDeadLockAlert(void);
 extern bool IsWaitingForLock(void);
index 23bafec5f795fe364b6cb908b9a3e9bd7b23ad2a..09316039e4a16b42b7c5d78851447e18050e65f7 100644 (file)
@@ -1846,7 +1846,6 @@ PROCESS_INFORMATION
 PROCLOCK
 PROCLOCKTAG
 PROC_HDR
-PROC_QUEUE
 PSID
 PSID_AND_ATTRIBUTES
 PSQL_COMP_CASE