Release proclock immediately in RemoveFromWaitQueue() if it represents
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 1 Mar 2005 21:14:59 +0000 (21:14 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 1 Mar 2005 21:14:59 +0000 (21:14 +0000)
no held locks.  This maintains the invariant that proclocks are present
only for procs that are holding or awaiting a lock; when this is not
true, LockRelease will fail.  Per report from Stephen Clouse.

src/backend/storage/lmgr/lock.c

index 76b2ff9b480451ef5ad04ff2d2b456c865734ae5..a651033352bba67fe12d558c18d82b3bd8a8bf89 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.146 2005/02/04 02:04:53 neilc Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.147 2005/03/01 21:14:59 tgl Exp $
  *
  * NOTES
  *   Outside modules can create a lock table and acquire/release
@@ -993,9 +993,6 @@ UnGrantLock(LOCK *lock, LOCKMODE lockmode,
    }
 
    LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
-   Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
-   Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
-   Assert(lock->nGranted <= lock->nRequested);
 
    /*
     * We need only run ProcLockWakeup if the released lock conflicts with
@@ -1115,8 +1112,7 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
    {
        /*
         * We failed as a result of a deadlock, see CheckDeadLock(). Quit
-        * now.  Removal of the proclock and lock objects, if no longer
-        * needed, will happen in xact cleanup (see above for motivation).
+        * now.
         */
        awaitedLock = NULL;
        LOCK_PRINT("WaitOnLock: aborting on lock",
@@ -1148,22 +1144,21 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
  *
  * Locktable lock must be held by caller.
  *
- * NB: this does not remove the process' proclock object, nor the lock object,
- * even though their counts might now have gone to zero.  That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup. (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
+ * NB: this does not clean up any locallock object that may exist for the lock.
  */
 void
 RemoveFromWaitQueue(PGPROC *proc)
 {
    LOCK       *waitLock = proc->waitLock;
+   PROCLOCK   *proclock = proc->waitProcLock;
    LOCKMODE    lockmode = proc->waitLockMode;
+   LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
 
    /* Make sure proc is waiting */
    Assert(proc->links.next != INVALID_OFFSET);
    Assert(waitLock);
    Assert(waitLock->waitProcs.size > 0);
+   Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
 
    /* Remove proc from lock's wait queue */
    SHMQueueDelete(&(proc->links));
@@ -1183,8 +1178,32 @@ RemoveFromWaitQueue(PGPROC *proc)
    proc->waitLock = NULL;
    proc->waitProcLock = NULL;
 
+   /*
+    * Delete the proclock immediately if it represents no already-held locks.
+    * This must happen now because if the owner of the lock decides to release
+    * it, and the requested/granted counts then go to zero, LockRelease
+    * expects there to be no remaining proclocks.
+    */
+   if (proclock->holdMask == 0)
+   {
+       PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock);
+       SHMQueueDelete(&proclock->lockLink);
+       SHMQueueDelete(&proclock->procLink);
+       proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+                                           (void *) &(proclock->tag),
+                                           HASH_REMOVE, NULL);
+       if (!proclock)
+           elog(WARNING, "proclock table corrupted");
+   }
+
+   /*
+    * There should still be some requests for the lock ... else what were
+    * we waiting for?  Therefore no need to delete the lock object.
+    */
+   Assert(waitLock->nRequested > 0);
+
    /* See if any other waiters for the lock can be woken up now */
-   ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+   ProcLockWakeup(LockMethods[lockmethodid], waitLock);
 }
 
 /*
@@ -1207,7 +1226,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
    PROCLOCK   *proclock;
    LWLockId    masterLock;
    LockMethod  lockMethodTable;
-   bool        wakeupNeeded = false;
+   bool        wakeupNeeded;
 
 #ifdef LOCK_DEBUG
    if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks)
@@ -1335,7 +1354,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
     */
    if (proclock->holdMask == 0)
    {
-       PROCLOCK_PRINT("LockRelease: deleting", proclock);
+       PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
        SHMQueueDelete(&proclock->lockLink);
        SHMQueueDelete(&proclock->procLink);
        proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
@@ -1356,6 +1375,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
         * We've just released the last lock, so garbage-collect the lock
         * object.
         */
+       LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
        Assert(SHMQueueEmpty(&(lock->procLocks)));
        lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
                                    (void *) &(lock->tag),