Hi, Bruce!
authorBruce Momjian <bruce@momjian.us>
Thu, 13 May 1999 15:55:45 +0000 (15:55 +0000)
committerBruce Momjian <bruce@momjian.us>
Thu, 13 May 1999 15:55:45 +0000 (15:55 +0000)
These are my last changes to lmgr fixing deadlock handling.
Please apply them to cvs...

Vadim

src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h

index 6c3437837df504be3774d5adcd8cf86c3e006d04..09e85e5e1341b0f867f641346ae2e54820ec7615 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.51 1999/05/10 00:45:43 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.52 1999/05/13 15:55:44 momjian Exp $
  *
  * NOTES
  *       Outside modules can create a lock table and acquire/release
@@ -83,9 +83,9 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 #define LOCK_PRINT_AUX(where,lock,type) \
        TPRINTF(TRACE_ALL, \
-                "%s: lock(%x) tbl(%d) rel(%u) db(%d) obj(%u) mask(%x) " \
-                "hold(%d,%d,%d,%d,%d)=%d " \
-                "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
+                "%s: lock(%x) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) " \
+                "hold(%d,%d,%d,%d,%d,%d,%d)=%d " \
+                "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
                 where, \
                 MAKE_OFFSET(lock), \
                 lock->tag.lockmethod, \
@@ -98,12 +98,16 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
                 lock->holders[3], \
                 lock->holders[4], \
                 lock->holders[5], \
+                lock->holders[6], \
+                lock->holders[7], \
                 lock->nHolding, \
                 lock->activeHolders[1], \
                 lock->activeHolders[2], \
                 lock->activeHolders[3], \
                 lock->activeHolders[4], \
                 lock->activeHolders[5], \
+                lock->activeHolders[6], \
+                lock->activeHolders[7], \
                 lock->nActive, \
                 lock->waitProcs.size, \
                 lock_types[type])
@@ -119,8 +123,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 #define XID_PRINT_AUX(where,xidentP) \
        TPRINTF(TRACE_ALL, \
-                "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
-                "hold(%d,%d,%d,%d,%d)=%d", \
+                "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%u) " \
+                "hold(%d,%d,%d,%d,%d,%d,%d)=%d", \
                 where, \
                 MAKE_OFFSET(xidentP), \
                 xidentP->tag.lock, \
@@ -132,6 +136,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
                 xidentP->holders[3], \
                 xidentP->holders[4], \
                 xidentP->holders[5], \
+                xidentP->holders[6], \
+                xidentP->holders[7], \
                 xidentP->nHolding)
 
 #else                                                  /* !LOCK_MGR_DEBUG */
@@ -1561,19 +1567,26 @@ LockingDisabled()
  * We have already locked the master lock before being called.
  */
 bool
-DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
+DeadLockCheck(void *proc, LOCK *findlock)
 {
-       int                                             done;
        XIDLookupEnt               *xidLook = NULL;
        XIDLookupEnt               *tmp = NULL;
+       PROC                               *thisProc = (PROC*) proc,
+                                                  *waitProc;
+       SHM_QUEUE                          *lockQueue = &(thisProc->lockQueue);
        SHMEM_OFFSET                    end = MAKE_OFFSET(lockQueue);
        LOCK                               *lock;
+       PROC_QUEUE                         *waitQueue;
+       int                                             i,
+                                                       j;
+       bool                                    first_run = (thisProc == MyProc),
+                                                       done;
 
        static PROC                        *checked_procs[MAXBACKENDS];
        static int                              nprocs;
 
        /* initialize at start of recursion */
-       if (skip_check)
+       if (first_run)
        {
                checked_procs[0] = MyProc;
                nprocs = 1;
@@ -1593,74 +1606,186 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 
                LOCK_PRINT("DeadLockCheck", lock, 0);
 
+               if (lock->tag.relId == 0)       /* user' lock */
+                       goto nxtl;
+
                /*
-                * This is our only check to see if we found the lock we want.
-                *
-                * The lock we are waiting for is already in MyProc->lockQueue so we
-                * need to skip it here.  We are trying to find it in someone
-                * else's lockQueue.   bjm
+                * waitLock is always in lockQueue of waiting proc,
+                * if !first_run then upper caller will handle waitProcs
+                * queue of waitLock.
                 */
-               if (lock == findlock && !skip_check)
-                       return true;
+               if (thisProc->waitLock == lock && !first_run)
+                       goto nxtl;
 
+               /*
+                * If we found proc holding findlock and sleeping on some my 
+                * other lock then we have to check does it block me or
+                * another waiters.
+                */
+               if (lock == findlock && !first_run)
                {
-                       PROC_QUEUE *waitQueue = &(lock->waitProcs);
-                       PROC       *proc;
-                       int                     i;
-                       int                     j;
+                       LOCKMETHODCTL  *lockctl = 
+                                                       LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+                       int                             lm;
 
-                       proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
-                       for (i = 0; i < waitQueue->size; i++)
+                       Assert(xidLook->nHolding > 0);
+                       for (lm = 1; lm <= lockctl->numLockModes; lm++)
                        {
+                               if (xidLook->holders[lm] > 0 && 
+                                       lockctl->conflictTab[lm] & findlock->waitMask)
+                                       return true;
+                       }
+                       /*
+                        * Else - get the next lock from thisProc's lockQueue
+                        */
+                       goto nxtl;      
+               }
+
+               waitQueue = &(lock->waitProcs);
+               waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+
+               for (i = 0; i < waitQueue->size; i++)
+               {
+                       if (waitProc == thisProc)
+                       {
+                               Assert(waitProc->waitLock == lock);
+                               Assert(waitProc == MyProc);
+                               waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                               continue;
+                       }
+                       if (lock == findlock)   /* first_run also true */
+                       {
+                               LOCKMETHODCTL  *lockctl = 
+                                               LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+
                                /*
-                                * If I hold some locks on findlock and another proc 
-                                * waits on it holding locks too - check if we are
-                                * waiting one another.
+                                * If me blocked by his holdlock...
                                 */
-                               if (proc != MyProc &&
-                                       lock == findlock &&     /* skip_check also true */
-                                       MyProc->holdLock)
+                               if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock)
                                {
-                                       LOCKMETHODCTL  *lockctl = 
-                                                       LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
-
-                                       Assert(skip_check);
-                                       if (lockctl->conflictTab[MyProc->token] & proc->holdLock && 
-                                               lockctl->conflictTab[proc->token] & MyProc->holdLock)
+                                       /* and he blocked by me -> deadlock */
+                                       if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock)
                                                return true;
+                                       /* we shouldn't look at lockQueue of our blockers */
+                                       waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                                       continue;
                                }
-
                                /*
-                                * No sense in looking at the wait queue of the lock we
-                                * are looking for. If lock == findlock, and I got here,
-                                * skip_check must be true too.
+                                * If he isn't blocked by me and we request non-conflicting 
+                                * lock modes - no deadlock here because of he isn't
+                                * blocked by me in any sence (explicitle or implicitly). 
+                                * Note that we don't do like test if !first_run
+                                * (when thisProc is holder and non-waiter on lock) and so
+                                * we call DeadLockCheck below for every waitProc in
+                                * thisProc->lockQueue, even for waitProc-s un-blocked
+                                * by thisProc. Should we? This could save us some time...
                                 */
-                               if (lock != findlock)
+                               if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) && 
+                                       !(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token)))
                                {
-                                       for (j = 0; j < nprocs; j++)
-                                               if (checked_procs[j] == proc)
-                                                       break;
-                                       if (j >= nprocs && lock != findlock)
-                                       {
-                                               Assert(nprocs < MAXBACKENDS);
-                                               checked_procs[nprocs++] = proc;
+                                       waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                                       continue;
+                               }
+                       }
 
+                       /*
+                        * Look in lockQueue of this waitProc, if didn't do this before.
+                        */
+                       for (j = 0; j < nprocs; j++)
+                       {
+                               if (checked_procs[j] == waitProc)
+                                       break;
+                       }
+                       if (j >= nprocs)
+                       {
+                               Assert(nprocs < MAXBACKENDS);
+                               checked_procs[nprocs++] = waitProc;
+
+                               if (DeadLockCheck(waitProc, findlock))
+                               {
+                                       LOCKMETHODCTL  *lockctl = 
+                                                       LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
+                                       int                             holdLock;
+
+                                       /*
+                                        * Ok, but is waitProc waiting for me (thisProc) ?
+                                        */
+                                       if (thisProc->waitLock == lock)
+                                       {
+                                               Assert(first_run);
+                                               holdLock = thisProc->holdLock;
+                                       }
+                                       else    /* should we cache holdLock ? */
+                                       {
+                                               int     lm, tmpMask = 2;
+
+                                               Assert(xidLook->nHolding > 0);
+                                               for (holdLock = 0, lm = 1; 
+                                                               lm <= lockctl->numLockModes; 
+                                                               lm++, tmpMask <<= 1)
+                                               {
+                                                       if (xidLook->holders[lm] > 0)
+                                                               holdLock |= tmpMask;
+                                               }
+                                               Assert(holdLock != 0);
+                                       }
+                                       if (lockctl->conflictTab[waitProc->token] & holdLock)
+                                       {
                                                /*
-                                                * For non-MyProc entries, we are looking only
-                                                * waiters, not necessarily people who already
-                                                * hold locks and are waiting. Now we check for
-                                                * cases where we have two or more tables in a
-                                                * deadlock.  We do this by continuing to search
-                                                * for someone holding a lock       bjm
+                                                * Last attempt to avoid deadlock - try to wakeup
+                                                * myself.
                                                 */
-                                               if (DeadLockCheck(&(proc->lockQueue), findlock, false))
-                                                       return true;
+                                               if (first_run)
+                                               {
+                                                       if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+                                                                                                        MyProc->waitLock,
+                                                                                                        MyProc->token,
+                                                                                                        MyProc->xid,
+                                                                                                        NULL) == STATUS_OK)
+                                                       {
+                                                               GrantLock(MyProc->waitLock, MyProc->token);
+                                                               (MyProc->waitLock->waitProcs.size)--;
+                                                               ProcWakeup(MyProc, NO_ERROR);
+                                                               return false;
+                                                       }
+                                               }
+                                               return true;
+                                       }
+                                       /*
+                                        * Hell! Is he blocked by any (other) holder ?
+                                        */
+                                       if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+                                                                                        lock,
+                                                                                        waitProc->token,
+                                                                                        waitProc->xid,
+                                                                                        NULL) != STATUS_OK)
+                                       {
+                                               /*
+                                                * Blocked by others - no deadlock...
+                                                */
+#ifdef DEADLOCK_DEBUG
+                                               LOCK_PRINT("DeadLockCheck: blocked by others", 
+                                                                               lock, waitProc->token);
+#endif
+                                               waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                                               continue;
                                        }
+                                       /*
+                                        * Well - wakeup this guy! This is the case of
+                                        * implicit blocking: thisProc blocked someone who blocked
+                                        * waitProc by the fact that he (someone) is already
+                                        * waiting for lock (we do this for anti-starving).
+                                        */
+                                       GrantLock(lock, waitProc->token);
+                                       waitQueue->size--;
+                                       waitProc = ProcWakeup(waitProc, NO_ERROR);
+                                       continue;
                                }
-                               proc = (PROC *) MAKE_PTR(proc->links.prev);
                        }
+                       waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
                }
 
+nxtl:;
                if (done)
                        break;
                SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
index b80d32e1b44e727c3b4cbb9568936c884781f283..4eb97b8a6a86ff3f3b3b1c36ed5b84b15b7daff4 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *             This is so that we can support more backends. (system-wide semaphore
  *             sets run out pretty fast.)                                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
  */
 #include <sys/time.h>
 #include <unistd.h>
@@ -78,7 +78,6 @@
 #include "utils/trace.h"
 
 static void HandleDeadLock(int sig);
-static PROC *ProcWakeup(PROC *proc, int errType);
 static void ProcFreeAllSemaphores(void);
 
 #define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
@@ -640,7 +639,7 @@ rt:;
  *      remove the process from the wait queue and set its links invalid.
  *      RETURN: the next process in the wait queue.
  */
-static PROC *
+PROC *
 ProcWakeup(PROC *proc, int errType)
 {
        PROC       *retProc;
@@ -806,10 +805,10 @@ HandleDeadLock(int sig)
        DumpAllLocks();
 #endif
 
-       if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
+       MyProc->errType = STATUS_NOT_FOUND;
+       if (!DeadLockCheck(MyProc, MyProc->waitLock))
        {
                UnlockLockTable();
-               MyProc->errType = STATUS_NOT_FOUND;
                return;
        }
 
index 387f164247cacbad8b8457c646d7390f2e76c38c..ab084eb94fdb8db5d6e383a5a70cb6317016595a 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $
+ * $Id: lock.h,v 1.26 1999/05/13 15:55:44 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -249,8 +249,7 @@ extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
 extern int     LockShmemSize(int maxBackends);
 extern bool LockingDisabled(void);
-extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock,
-                         bool skip_check);
+extern bool DeadLockCheck(void *proc, LOCK *findlock);
 
 #ifdef DEADLOCK_DEBUG
 extern void DumpLocks(void);
index 53b677858fd59c4d88bef2fdcc7c742addf42219..3dcf1e281b5c6772baa09a9a7a67d23eef1aa244 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $
+ * $Id: proc.h,v 1.22 1999/05/13 15:55:45 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -119,6 +119,7 @@ extern bool ProcRemove(int pid);
 extern void ProcQueueInit(PROC_QUEUE *queue);
 extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token, 
                                        LOCK *lock);
+extern PROC *ProcWakeup(PROC *proc, int errType);
 extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
                           LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);