Re-implement deadlock detection and resolution, per design notes posted
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Jan 2001 03:31:16 +0000 (03:31 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Jan 2001 03:31:16 +0000 (03:31 +0000)
to pghackers on 18-Jan-01.

src/backend/storage/lmgr/Makefile
src/backend/storage/lmgr/README
src/backend/storage/lmgr/deadlock.c [new file with mode: 0644]
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h

index fca9a24a57331955cf2f1323138f20f699e6b52e..37cf81b1aa7bcbd44683c5e6d483210d8865e739 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for storage/lmgr
 #
 # IDENTIFICATION
-#    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.14 2000/08/31 16:10:36 petere Exp $
+#    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.15 2001/01/25 03:31:16 tgl Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,7 +12,7 @@ subdir = src/backend/storage/lmgr
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = lmgr.o lock.o proc.o
+OBJS = lmgr.o lock.o proc.o deadlock.o
 
 all: SUBSYS.o
 
index af9fbc8421b1291d81ca8af14c1a54fcf0d0ab23..72e0d16f128d4c6c816b9d4762a57edac0e949d4 100644 (file)
@@ -1,4 +1,4 @@
-$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.7 2001/01/25 03:31:16 tgl Exp $
 
 There are two fundamental lock structures: the per-lockable-object LOCK
 struct, and the per-lock-holder HOLDER struct.  A LOCK object exists
@@ -373,7 +373,8 @@ time with "C before B", which won't move C far enough up.  So we look for
 soft edges outgoing from C starting at the front of the wait queue.
 
 5. The working data structures needed by the deadlock detection code can
-be proven not to need more than MAXBACKENDS entries.  Therefore the
-working storage can be statically allocated instead of depending on
-palloc().  This is a good thing, since if the deadlock detector could
-fail for extraneous reasons, all the above safety proofs fall down.
+be limited to numbers of entries computed from MaxBackends.  Therefore,
+we can allocate the worst-case space needed during backend startup.
+This seems a safer approach than trying to allocate workspace on the fly;
+we don't want to risk having the deadlock detector run out of memory,
+else we really have no guarantees at all that deadlock will be detected.
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
new file mode 100644 (file)
index 0000000..aae635a
--- /dev/null
@@ -0,0 +1,734 @@
+/*-------------------------------------------------------------------------
+ *
+ * deadlock.c
+ *   POSTGRES deadlock detection code
+ *
+ * See src/backend/storage/lmgr/README for a description of the deadlock
+ * detection and resolution algorithms.
+ *
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.1 2001/01/25 03:31:16 tgl Exp $
+ *
+ * Interface:
+ *
+ * DeadLockCheck()
+ * InitDeadLockChecking()
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "utils/memutils.h"
+
+
+/* One edge in the waits-for graph */
+typedef struct {
+   PROC   *waiter;             /* the waiting process */
+   PROC   *blocker;            /* the process it is waiting for */
+   int     pred;               /* workspace for TopoSort */
+   int     link;               /* workspace for TopoSort */
+} EDGE;
+
+/* One potential reordering of a lock's wait queue */
+typedef struct {
+   LOCK   *lock;               /* the lock whose wait queue is described */
+   PROC  **procs;              /* array of PROC *'s in new wait order */
+   int     nProcs;
+} WAIT_ORDER;
+
+
+static bool DeadLockCheckRecurse(PROC *proc);
+static bool TestConfiguration(PROC *startProc);
+static bool FindLockCycle(PROC *checkProc,
+                         EDGE *softEdges, int *nSoftEdges);
+static bool FindLockCycleRecurse(PROC *checkProc,
+                                EDGE *softEdges, int *nSoftEdges);
+static bool ExpandConstraints(EDGE *constraints, int nConstraints);
+static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
+                    PROC **ordering);
+#ifdef DEBUG_DEADLOCK
+static void PrintLockQueue(LOCK *lock, const char *info);
+#endif
+
+
+/*
+ * Working space for the deadlock detector
+ */
+
+/* Workspace for FindLockCycle */
+static PROC **visitedProcs;        /* Array of visited procs */
+static int nVisitedProcs;
+/* Workspace for TopoSort */
+static PROC **topoProcs;       /* Array of not-yet-output procs */
+static int *beforeConstraints; /* Counts of remaining before-constraints */
+static int *afterConstraints;  /* List head for after-constraints */
+/* Output area for ExpandConstraints */
+static WAIT_ORDER *waitOrders; /* Array of proposed queue rearrangements */
+static int nWaitOrders;
+static PROC **waitOrderProcs;  /* Space for waitOrders queue contents */
+/* Current list of constraints being considered */
+static EDGE *curConstraints;
+static int nCurConstraints;
+static int maxCurConstraints;
+/* Storage space for results from FindLockCycle */
+static EDGE *possibleConstraints;
+static int nPossibleConstraints;
+static int maxPossibleConstraints;
+
+
+/*
+ * InitDeadLockChecking -- initialize deadlock checker during backend startup
+ *
+ * This does per-backend initialization of the deadlock checker; primarily,
+ * allocation of working memory for DeadLockCheck.  We do this per-backend
+ * since there's no percentage in making the kernel do copy-on-write
+ * inheritance of workspace from the postmaster.  We want to allocate the
+ * space at startup because the deadlock checker might be invoked when there's
+ * no free memory left.
+ */
+void
+InitDeadLockChecking(void)
+{
+   MemoryContext   oldcxt;
+
+   /* Make sure allocations are permanent */
+   oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+
+   /*
+    * FindLockCycle needs at most MaxBackends entries in visitedProcs[]
+    */
+   visitedProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+   /*
+    * TopoSort needs to consider at most MaxBackends wait-queue entries,
+    * and it needn't run concurrently with FindLockCycle.
+    */
+   topoProcs = visitedProcs;   /* re-use this space */
+   beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));
+   afterConstraints = (int *) palloc(MaxBackends * sizeof(int));
+
+   /*
+    * We need to consider rearranging at most MaxBackends/2 wait queues
+    * (since it takes at least two waiters in a queue to create a soft edge),
+    * and the expanded form of the wait queues can't involve more than
+    * MaxBackends total waiters.
+    */
+   waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER));
+   waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+   /*
+    * Allow at most MaxBackends distinct constraints in a configuration.
+    * (Is this enough?  In practice it seems it should be, but I don't quite
+    * see how to prove it.  If we run out, we might fail to find a workable
+    * wait queue rearrangement even though one exists.)  NOTE that this
+    * number limits the maximum recursion depth of DeadLockCheckRecurse.
+    * Making it really big might potentially allow a stack-overflow problem.
+    */
+   maxCurConstraints = MaxBackends;
+   curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));
+
+   /*
+    * Allow up to 3*MaxBackends constraints to be saved without having to
+    * re-run TestConfiguration.  (This is probably more than enough, but
+    * we can survive if we run low on space by doing excess runs of
+    * TestConfiguration to re-compute constraint lists each time needed.)
+    * The last MaxBackends entries in possibleConstraints[] are reserved as
+    * output workspace for FindLockCycle.
+    */
+   maxPossibleConstraints = MaxBackends * 4;
+   possibleConstraints =
+       (EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));
+
+   MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * DeadLockCheck -- Checks for deadlocks for a given process
+ *
+ * This code looks for deadlocks involving the given process.  If any
+ * are found, it tries to rearrange lock wait queues to resolve the
+ * deadlock.  If resolution is impossible, return TRUE --- the caller
+ * is then expected to abort the given proc's transaction.
+ *
+ * We can't block on user locks, so no sense testing for deadlock
+ * because there is no blocking, and no timer for the block.  So,
+ * only look at regular locks.
+ *
+ * We must have already locked the master lock before being called.
+ * NOTE: although the lockctl structure appears to allow each lock
+ * table to have a different spinlock, all locks that can block had
+ * better use the same spinlock, else this code will not be adequately
+ * interlocked!
+ */
+bool
+DeadLockCheck(PROC *proc)
+{
+   int         i,
+               j;
+
+   /* Initialize to "no constraints" */
+   nCurConstraints = 0;
+   nPossibleConstraints = 0;
+   nWaitOrders = 0;
+
+   /* Search for deadlocks and possible fixes */
+   if (DeadLockCheckRecurse(proc))
+       return true;            /* cannot find a non-deadlocked state */
+
+   /* Apply any needed rearrangements of wait queues */
+   for (i = 0; i < nWaitOrders; i++)
+   {
+       LOCK   *lock = waitOrders[i].lock;
+       PROC  **procs = waitOrders[i].procs;
+       int     nProcs = waitOrders[i].nProcs;
+       PROC_QUEUE *waitQueue = &(lock->waitProcs);
+
+       Assert(nProcs == waitQueue->size);
+
+#ifdef DEBUG_DEADLOCK
+       PrintLockQueue(lock, "DeadLockCheck:");
+#endif
+
+       /* Reset the queue and re-add procs in the desired order */
+       ProcQueueInit(waitQueue);
+       for (j = 0; j < nProcs; j++)
+       {
+           SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
+           waitQueue->size++;
+       }
+
+#ifdef DEBUG_DEADLOCK
+       PrintLockQueue(lock, "rearranged to:");
+#endif
+   }
+   return false;
+}
+
+/*
+ * DeadLockCheckRecurse -- recursively search for valid orderings
+ *
+ * curConstraints[] holds the current set of constraints being considered
+ * by an outer level of recursion.  Add to this each possible solution
+ * constraint for any cycle detected at this level.
+ *
+ * Returns TRUE if no solution exists.  Returns FALSE if a deadlock-free
+ * state is attainable, in which case waitOrders[] shows the required
+ * rearrangements of lock wait queues (if any).
+ */
+static bool
+DeadLockCheckRecurse(PROC *proc)
+{
+   int         nEdges;
+   int         oldPossibleConstraints;
+   bool        savedList;
+   int         i;
+
+   nEdges = TestConfiguration(proc);
+   if (nEdges < 0)
+       return true;            /* hard deadlock --- no solution */
+   if (nEdges == 0)
+       return false;           /* good configuration found */
+   if (nCurConstraints >= maxCurConstraints)
+       return true;            /* out of room for active constraints? */
+   oldPossibleConstraints = nPossibleConstraints;
+   if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)
+   {
+       /* We can save the edge list in possibleConstraints[] */
+       nPossibleConstraints += nEdges;
+       savedList = true;
+   }
+   else
+   {
+       /* Not room; will need to regenerate the edges on-the-fly */
+       savedList = false;
+   }
+   /*
+    * Try each available soft edge as an addition to the configuration.
+    */
+   for (i = 0; i < nEdges; i++)
+   {
+       if (!savedList && i > 0)
+       {
+           /* Regenerate the list of possible added constraints */
+           if (nEdges != TestConfiguration(proc))
+               elog(FATAL, "DeadLockCheckRecurse: inconsistent results");
+       }
+       curConstraints[nCurConstraints] =
+           possibleConstraints[oldPossibleConstraints+i];
+       nCurConstraints++;
+       if (!DeadLockCheckRecurse(proc))
+           return false;       /* found a valid solution! */
+       /* give up on that added constraint, try again */
+       nCurConstraints--;
+   }
+   nPossibleConstraints = oldPossibleConstraints;
+   return true;                /* no solution found */
+}
+
+
+/*--------------------
+ * Test a configuration (current set of constraints) for validity.
+ *
+ * Returns:
+ *     0: the configuration is good (no deadlocks)
+ *    -1: the configuration has a hard deadlock or is not self-consistent
+ *     >0: the configuration has one or more soft deadlocks
+ *
+ * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily
+ * and a list of its soft edges is returned beginning at
+ * possibleConstraints+nPossibleConstraints.  The return value is the
+ * number of soft edges.
+ *--------------------
+ */
+static bool
+TestConfiguration(PROC *startProc)
+{
+   int     softFound = 0;
+   EDGE   *softEdges = possibleConstraints + nPossibleConstraints;
+   int     nSoftEdges;
+   int     i;
+
+   /*
+    * Make sure we have room for FindLockCycle's output.
+    */
+   if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
+       return -1;
+   /*
+    * Expand current constraint set into wait orderings.  Fail if the
+    * constraint set is not self-consistent.
+    */
+   if (!ExpandConstraints(curConstraints, nCurConstraints))
+       return -1;
+   /*
+    * Check for cycles involving startProc or any of the procs mentioned
+    * in constraints.  We check startProc last because if it has a soft
+    * cycle still to be dealt with, we want to deal with that first.
+    */
+   for (i = 0; i < nCurConstraints; i++)
+   {
+       if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))
+       {
+           if (nSoftEdges == 0)
+               return -1;      /* hard deadlock detected */
+           softFound = nSoftEdges;
+       }
+       if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))
+       {
+           if (nSoftEdges == 0)
+               return -1;      /* hard deadlock detected */
+           softFound = nSoftEdges;
+       }
+   }
+   if (FindLockCycle(startProc, softEdges, &nSoftEdges))
+   {
+       if (nSoftEdges == 0)
+           return -1;          /* hard deadlock detected */
+       softFound = nSoftEdges;
+   }
+   return softFound;
+}
+
+
+/*
+ * FindLockCycle -- basic check for deadlock cycles
+ *
+ * Scan outward from the given proc to see if there is a cycle in the
+ * waits-for graph that includes this proc.  Return TRUE if a cycle
+ * is found, else FALSE.  If a cycle is found, we also return a list of
+ * the "soft edges", if any, included in the cycle.  These edges could
+ * potentially be eliminated by rearranging wait queues.
+ *
+ * Since we need to be able to check hypothetical configurations that would
+ * exist after wait queue rearrangement, the routine pays attention to the
+ * table of hypothetical queue orders in waitOrders[].  These orders will
+ * be believed in preference to the actual ordering seen in the locktable.
+ */
+static bool
+FindLockCycle(PROC *checkProc,
+             EDGE *softEdges,  /* output argument */
+             int *nSoftEdges)  /* output argument */
+{
+   nVisitedProcs = 0;
+   *nSoftEdges = 0;
+   return FindLockCycleRecurse(checkProc, softEdges, nSoftEdges);
+}
+
+static bool
+FindLockCycleRecurse(PROC *checkProc,
+                    EDGE *softEdges,   /* output argument */
+                    int *nSoftEdges)   /* output argument */
+{
+   PROC       *proc;
+   LOCK       *lock;
+   HOLDER     *holder;
+   SHM_QUEUE  *lockHolders;
+   LOCKMETHODTABLE *lockMethodTable;
+   LOCKMETHODCTL *lockctl;
+   PROC_QUEUE *waitQueue;
+   int         queue_size;
+   int         conflictMask;
+   int         i;
+   int         numLockModes,
+               lm;
+
+   /*
+    * Have we already seen this proc?
+    */
+   for (i = 0; i < nVisitedProcs; i++)
+   {
+       if (visitedProcs[i] == checkProc)
+       {
+           /* If we return to starting point, we have a deadlock cycle */
+           if (i == 0)
+               return true;
+           /*
+            * Otherwise, we have a cycle but it does not include the start
+            * point, so say "no deadlock".
+            */
+           return false;
+       }
+   }
+   /* Mark proc as seen */
+   Assert(nVisitedProcs < MaxBackends);
+   visitedProcs[nVisitedProcs++] = checkProc;
+   /*
+    * If the proc is not waiting, we have no outgoing waits-for edges.
+    */
+   if (checkProc->links.next == INVALID_OFFSET)
+       return false;
+   lock = checkProc->waitLock;
+   if (lock == NULL)
+       return false;
+   lockMethodTable = GetLocksMethodTable(lock);
+   lockctl = lockMethodTable->ctl;
+   numLockModes = lockctl->numLockModes;
+   conflictMask = lockctl->conflictTab[checkProc->waitLockMode];
+   /*
+    * Scan for procs that already hold conflicting locks.  These are
+    * "hard" edges in the waits-for graph.
+    */
+   lockHolders = &(lock->lockHolders);
+
+   holder = (HOLDER *) SHMQueueNext(lockHolders, lockHolders,
+                                    offsetof(HOLDER, lockLink));
+
+   while (holder)
+   {
+       proc = (PROC *) MAKE_PTR(holder->tag.proc);
+
+       /* A proc never blocks itself */
+       if (proc != checkProc)
+       {
+           for (lm = 1; lm <= numLockModes; lm++)
+           {
+               if (holder->holding[lm] > 0 &&
+                   ((1 << lm) & conflictMask) != 0)
+               {
+                   /* This proc hard-blocks checkProc */
+                   if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+                       return true;
+                   /* If no deadlock, we're done looking at this holder */
+                   break;
+               }
+           }
+       }
+
+       holder = (HOLDER *) SHMQueueNext(lockHolders, &holder->lockLink,
+                                        offsetof(HOLDER, lockLink));
+   }
+
+   /*
+    * Scan for procs that are ahead of this one in the lock's wait queue.
+    * Those that have conflicting requests soft-block this one.  This must
+    * be done after the hard-block search, since if another proc both
+    * hard- and soft-blocks this one, we want to call it a hard edge.
+    *
+    * If there is a proposed re-ordering of the lock's wait order,
+    * use that rather than the current wait order.
+    */
+   for (i = 0; i < nWaitOrders; i++)
+   {
+       if (waitOrders[i].lock == lock)
+           break;
+   }
+
+   if (i < nWaitOrders)
+   {
+       /* Use the given hypothetical wait queue order */
+       PROC  **procs = waitOrders[i].procs;
+
+       queue_size = waitOrders[i].nProcs;
+
+       for (i = 0; i < queue_size; i++)
+       {
+           proc = procs[i];
+
+           /* Done when we reach the target proc */
+           if (proc == checkProc)
+               break;
+
+           /* Is there a conflict with this guy's request? */
+           if (((1 << proc->waitLockMode) & conflictMask) != 0)
+           {
+               /* This proc soft-blocks checkProc */
+               if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+               {
+                   /* Add this edge to the list of soft edges in the cycle */
+                   Assert(*nSoftEdges < MaxBackends);
+                   softEdges[*nSoftEdges].waiter = checkProc;
+                   softEdges[*nSoftEdges].blocker = proc;
+                   (*nSoftEdges)++;
+                   return true;
+               }
+           }
+       }
+   }
+   else
+   {
+       /* Use the true lock wait queue order */
+       waitQueue = &(lock->waitProcs);
+       queue_size = waitQueue->size;
+
+       proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+
+       while (queue_size-- > 0)
+       {
+           /* Done when we reach the target proc */
+           if (proc == checkProc)
+               break;
+
+           /* Is there a conflict with this guy's request? */
+           if (((1 << proc->waitLockMode) & conflictMask) != 0)
+           {
+               /* This proc soft-blocks checkProc */
+               if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+               {
+                   /* Add this edge to the list of soft edges in the cycle */
+                   Assert(*nSoftEdges < MaxBackends);
+                   softEdges[*nSoftEdges].waiter = checkProc;
+                   softEdges[*nSoftEdges].blocker = proc;
+                   (*nSoftEdges)++;
+                   return true;
+               }
+           }
+
+           proc = (PROC *) MAKE_PTR(proc->links.next);
+       }
+   }
+
+   /*
+    * No conflict detected here.
+    */
+   return false;
+}
+
+
+/*
+ * ExpandConstraints -- expand a list of constraints into a set of
+ *     specific new orderings for affected wait queues
+ *
+ * Input is a list of soft edges to be reversed.  The output is a list
+ * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PROC array
+ * workspace in waitOrderProcs[].
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+ExpandConstraints(EDGE *constraints,
+                 int nConstraints)
+{
+   int         nWaitOrderProcs = 0;
+   int         i,
+               j;
+
+   nWaitOrders = 0;
+   /*
+    * Scan constraint list backwards.  This is because the last-added
+    * constraint is the only one that could fail, and so we want to test
+    * it for inconsistency first.
+    */
+   for (i = nConstraints; --i >= 0; )
+   {
+       PROC   *proc = constraints[i].waiter;
+       LOCK   *lock = proc->waitLock;
+
+       /* Did we already make a list for this lock? */
+       for (j = nWaitOrders; --j >= 0; )
+       {
+           if (waitOrders[j].lock == lock)
+               break;
+       }
+       if (j >= 0)
+           continue;
+       /* No, so allocate a new list */
+       waitOrders[nWaitOrders].lock = lock;
+       waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
+       waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
+       nWaitOrderProcs += lock->waitProcs.size;
+       Assert(nWaitOrderProcs <= MaxBackends);
+       /*
+        * Do the topo sort.  TopoSort need not examine constraints after
+        * this one, since they must be for different locks.
+        */
+       if (!TopoSort(lock, constraints, i+1,
+                     waitOrders[nWaitOrders].procs))
+           return false;
+       nWaitOrders++;
+   }
+   return true;
+}
+
+
+/*
+ * TopoSort -- topological sort of a wait queue
+ *
+ * Generate a re-ordering of a lock's wait queue that satisfies given
+ * constraints about certain procs preceding others.  (Each such constraint
+ * is a fact of a partial ordering.)  Minimize rearrangement of the queue
+ * not needed to achieve the partial ordering.
+ *
+ * This is a lot simpler and slower than, for example, the topological sort
+ * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
+ * try to minimize the damage to the existing order.  In practice we are
+ * not likely to be working with more than a few constraints, so the apparent
+ * slowness of the algorithm won't really matter.
+ *
+ * The initial queue ordering is taken directly from the lock's wait queue.
+ * The output is an array of PROC pointers, of length equal to the lock's
+ * wait queue length (the caller is responsible for providing this space).
+ * The partial order is specified by an array of EDGE structs.  Each EDGE
+ * is one that we need to reverse, therefore the "waiter" must appear before
+ * the "blocker" in the output array.  The EDGE array may well contain
+ * edges associated with other locks; these should be ignored.
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+TopoSort(LOCK *lock,
+        EDGE *constraints,
+        int nConstraints,
+        PROC **ordering)       /* output argument */
+{
+   PROC_QUEUE *waitQueue = &(lock->waitProcs);
+   int         queue_size = waitQueue->size;
+   PROC       *proc;
+   int         i,
+               j,
+               k,
+               last;
+
+   /* First, fill topoProcs[] array with the procs in their current order */
+   proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+   for (i = 0; i < queue_size; i++)
+   {
+       topoProcs[i] = proc;
+       proc = (PROC *) MAKE_PTR(proc->links.next);
+   }
+
+   /*
+    * Scan the constraints, and for each proc in the array, generate a count
+    * of the number of constraints that say it must be before something else,
+    * plus a list of the constraints that say it must be after something else.
+    * The count for the j'th proc is stored in beforeConstraints[j], and the
+    * head of its list in afterConstraints[j].  Each constraint stores its
+    * list link in constraints[i].link (note any constraint will be in
+    * just one list).  The array index for the before-proc of the i'th
+    * constraint is remembered in constraints[i].pred.
+    */
+   MemSet(beforeConstraints, 0, queue_size * sizeof(int));
+   MemSet(afterConstraints, 0, queue_size * sizeof(int));
+   for (i = 0; i < nConstraints; i++)
+   {
+       proc = constraints[i].waiter;
+       /* Ignore constraint if not for this lock */
+       if (proc->waitLock != lock)
+           continue;
+       /* Find the waiter proc in the array */
+       for (j = queue_size; --j >= 0; )
+       {
+           if (topoProcs[j] == proc)
+               break;
+       }
+       Assert(j >= 0);         /* should have found a match */
+       /* Find the blocker proc in the array */
+       proc = constraints[i].blocker;
+       for (k = queue_size; --k >= 0; )
+       {
+           if (topoProcs[k] == proc)
+               break;
+       }
+       Assert(k >= 0);         /* should have found a match */
+       beforeConstraints[j]++; /* waiter must come before */
+       /* add this constraint to list of after-constraints for blocker */
+       constraints[i].pred = j;
+       constraints[i].link = afterConstraints[k];
+       afterConstraints[k] = i+1;
+   }
+   /*--------------------
+    * Now scan the topoProcs array backwards.  At each step, output the
+    * last proc that has no remaining before-constraints, and decrease
+    * the beforeConstraints count of each of the procs it was constrained
+    * against.
+    * i = index of ordering[] entry we want to output this time
+    * j = search index for topoProcs[]
+    * k = temp for scanning constraint list for proc j
+    * last = last non-null index in topoProcs (avoid redundant searches)
+    *--------------------
+    */
+   last = queue_size-1;
+   for (i = queue_size; --i >= 0; )
+   {
+       /* Find next candidate to output */
+       while (topoProcs[last] == NULL)
+           last--;
+       for (j = last; j >= 0; j--)
+       {
+           if (topoProcs[j] != NULL && beforeConstraints[j] == 0)
+               break;
+       }
+       /* If no available candidate, topological sort fails */
+       if (j < 0)
+           return false;
+       /* Output candidate, and mark it done by zeroing topoProcs[] entry */
+       ordering[i] = topoProcs[j];
+       topoProcs[j] = NULL;
+       /* Update beforeConstraints counts of its predecessors */
+       for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link)
+       {
+           beforeConstraints[constraints[k-1].pred]--;
+       }
+   }
+
+   /* Done */
+   return true;
+}
+
+#ifdef DEBUG_DEADLOCK
+static void
+PrintLockQueue(LOCK *lock, const char *info)
+{
+   PROC_QUEUE *waitQueue = &(lock->waitProcs);
+   int         queue_size = waitQueue->size;
+   PROC       *proc;
+   int         i;
+
+   printf("%s lock %lx queue ", info, MAKE_OFFSET(lock));
+   proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+   for (i = 0; i < queue_size; i++)
+   {
+       printf(" %d", proc->pid);
+       proc = (PROC *) MAKE_PTR(proc->links.next);
+   }
+   printf("\n");
+   fflush(stdout);
+}
+#endif
index 3d77ab2b4d133011e10192355de3b93275487148..08e023718ebc766bd11495bd98ea7c09bdb79454 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.80 2001/01/24 19:43:08 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.81 2001/01/25 03:31:16 tgl Exp $
  *
  * NOTES
  *   Outside modules can create a lock table and acquire/release
  *
  * LockAcquire(), LockRelease(), LockMethodTableInit(),
  * LockMethodTableRename(), LockReleaseAll,
- * LockResolveConflicts(), GrantLock()
+ * LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
+#include "postgres.h"
+
 #include <sys/types.h>
 #include <unistd.h>
 #include <signal.h>
 
-#include "postgres.h"
-
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "storage/proc.h"
@@ -44,7 +44,6 @@ static int    WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
                       LOCK *lock, HOLDER *holder);
 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
                             int *myHolding);
-static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
 
 static char *lock_types[] =
 {
@@ -211,6 +210,18 @@ LockingDisabled(void)
    return LockingIsDisabled;
 }
 
+/*
+ * Fetch the lock method table associated with a given lock
+ */
+LOCKMETHODTABLE *
+GetLocksMethodTable(LOCK *lock)
+{
+   LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock);
+
+   Assert(lockmethod > 0 && lockmethod < NumLockMethods);
+   return LockMethodTable[lockmethod];
+}
+
 
 /*
  * LockMethodInit -- initialize the lock table's lock type
@@ -559,7 +570,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
    if (!holder)
    {
        SpinRelease(masterLock);
-       elog(NOTICE, "LockAcquire: holder table corrupted");
+       elog(FATAL, "LockAcquire: holder table corrupted");
        return FALSE;
    }
 
@@ -623,11 +634,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
    Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
    /* --------------------
-    * If I'm the only one holding any lock on this object, then there
-    * cannot be a conflict. The same is true if I already hold this lock.
+    * If I already hold one or more locks of the requested type,
+    * just grant myself another one without blocking.
     * --------------------
     */
-   if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0)
+   if (holder->holding[lockmode] > 0)
    {
        GrantLock(lock, holder, lockmode);
        HOLDER_PRINT("LockAcquire: owning", holder);
@@ -637,11 +648,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 
    /* --------------------
     * If this process (under any XID) is a holder of the lock,
-    * then there is no conflict, either.
+    * also grant myself another one without blocking.
     * --------------------
     */
    LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
-   if (myHolding[lockmode] != 0)
+   if (myHolding[lockmode] > 0)
    {
        GrantLock(lock, holder, lockmode);
        HOLDER_PRINT("LockAcquire: my other XID owning", holder);
@@ -649,42 +660,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        return TRUE;
    }
 
-   /*
-    * If lock requested conflicts with locks requested by waiters...
+   /* --------------------
+    * If lock requested conflicts with locks requested by waiters,
+    * must join wait queue.  Otherwise, check for conflict with
+    * already-held locks.  (That's last because most complex check.)
+    * --------------------
     */
    if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
-   {
-       /*
-        * If my process doesn't hold any locks that conflict with waiters
-        * then force to sleep, so that prior waiters get first chance.
-        */
-       for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
-       {
-           if (myHolding[i] > 0 &&
-               lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
-               break;          /* yes, there is a conflict */
-       }
-
-       if (i > lockMethodTable->ctl->numLockModes)
-       {
-           HOLDER_PRINT("LockAcquire: another proc already waiting",
-                        holder);
-           status = STATUS_FOUND;
-       }
-       else
-           status = LockResolveConflicts(lockmethod, lockmode,
-                                         lock, holder,
-                                         MyProc, myHolding);
-   }
+       status = STATUS_FOUND;
    else
-       status = LockResolveConflicts(lockmethod, lockmode,
-                                     lock, holder,
-                                     MyProc, myHolding);
+       status = LockCheckConflicts(lockMethodTable, lockmode,
+                                   lock, holder,
+                                   MyProc, myHolding);
 
    if (status == STATUS_OK)
+   {
+       /* No conflict with held or previously requested locks */
        GrantLock(lock, holder, lockmode);
-   else if (status == STATUS_FOUND)
+   }
+   else
    {
+       Assert(status == STATUS_FOUND);
 #ifdef USER_LOCKS
 
        /*
@@ -765,49 +761,50 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 }
 
 /* ----------------------------
- * LockResolveConflicts -- test for lock conflicts
+ * LockCheckConflicts -- test whether requested lock conflicts
+ *     with those already granted
+ *
+ * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
  *
  * NOTES:
- *     Here's what makes this complicated: one transaction's
- * locks don't conflict with one another.  When many processes
- * hold locks, each has to subtract off the other's locks when
- * determining whether or not any new lock acquired conflicts with
- * the old ones.
+ *     Here's what makes this complicated: one process's locks don't
+ * conflict with one another, even if they are held under different
+ * transaction IDs (eg, session and xact locks do not conflict).
+ * So, we must subtract off our own locks when determining whether the
+ * requested new lock conflicts with those already held.
  *
  * The caller can optionally pass the process's total holding counts, if
  * known.  If NULL is passed then these values will be computed internally.
  * ----------------------------
  */
 int
-LockResolveConflicts(LOCKMETHOD lockmethod,
-                    LOCKMODE lockmode,
-                    LOCK *lock,
-                    HOLDER *holder,
-                    PROC *proc,
-                    int *myHolding)        /* myHolding[] array or NULL */
+LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+                  LOCKMODE lockmode,
+                  LOCK *lock,
+                  HOLDER *holder,
+                  PROC *proc,
+                  int *myHolding)      /* myHolding[] array or NULL */
 {
-   LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
+   LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
    int         numLockModes = lockctl->numLockModes;
    int         bitmask;
    int         i,
                tmpMask;
    int         localHolding[MAX_LOCKMODES];
 
-   Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
-
    /* ----------------------------
     * first check for global conflicts: If no locks conflict
-    * with mine, then I get the lock.
+    * with my request, then I get the lock.
     *
     * Checking for conflict: lock->grantMask represents the types of
     * currently held locks.  conflictTable[lockmode] has a bit
-    * set for each type of lock that conflicts with mine.  Bitwise
+    * set for each type of lock that conflicts with request.   Bitwise
     * compare tells if there is a conflict.
     * ----------------------------
     */
    if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
    {
-       HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
+       HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
        return STATUS_OK;
    }
 
@@ -844,11 +841,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
    if (!(lockctl->conflictTab[lockmode] & bitmask))
    {
        /* no conflict. OK to get the lock */
-       HOLDER_PRINT("LockResolveConflicts: resolved", holder);
+       HOLDER_PRINT("LockCheckConflicts: resolved", holder);
        return STATUS_OK;
    }
 
-   HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
+   HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
    return STATUS_FOUND;
 }
 
@@ -889,33 +886,12 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
    }
 }
 
-/*
- * LockGetMyHeldLocks -- compute bitmask of lock types held by a process
- *     for a given lockable object.
- */
-static int
-LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
-{
-   int         myHolding[MAX_LOCKMODES];
-   int         heldLocks = 0;
-   int         i,
-               tmpMask;
-
-   LockCountMyLocks(lockOffset, proc, myHolding);
-
-   for (i = 1, tmpMask = 2;
-        i < MAX_LOCKMODES;
-        i++, tmpMask <<= 1)
-   {
-       if (myHolding[i] > 0)
-           heldLocks |= tmpMask;
-   }
-   return heldLocks;
-}
-
 /*
  * GrantLock -- update the lock and holder data structures to show
  *     the lock request has been granted.
+ *
+ * NOTE: if proc was blocked, it also needs to be removed from the wait list
+ * and have its waitLock/waitHolder fields cleared.  That's not done here.
  */
 void
 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
@@ -936,6 +912,9 @@ GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
 /*
  * WaitOnLock -- wait to acquire a lock
  *
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
+ *
  * The locktable spinlock must be held at entry.
  */
 static int
@@ -956,7 +935,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
    strcat(new_status, " waiting");
    set_ps_display(new_status);
 
-   /*
+   /* -------------------
     * NOTE: Think not to put any lock state cleanup after the call to
     * ProcSleep, in either the normal or failure path.  The lock state
     * must be fully set by the lock grantor, or by HandleDeadLock if we
@@ -965,12 +944,13 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
     * after someone else grants us the lock, but before we've noticed it.
     * Hence, after granting, the locktable state must fully reflect the
     * fact that we own the lock; we can't do additional work on return.
+    * -------------------
     */
 
-   if (ProcSleep(lockMethodTable->ctl,
+   if (ProcSleep(lockMethodTable,
                  lockmode,
                  lock,
-                 holder) != NO_ERROR)
+                 holder) != STATUS_OK)
    {
        /* -------------------
         * We failed as a result of a deadlock, see HandleDeadLock().
@@ -992,14 +972,60 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
    return STATUS_OK;
 }
 
+/*--------------------
+ * Remove a proc from the wait-queue it is on
+ * (caller must know it is on one).
+ *
+ * Locktable lock must be held by caller.
+ *
+ * NB: this does not remove the process' holder object, nor the lock object,
+ * even though their counts might now have gone to zero.  That will happen
+ * during a subsequent LockReleaseAll call, which we expect will happen
+ * during transaction cleanup.  (Removal of a proc from its wait queue by
+ * this routine can only happen if we are aborting the transaction.)
+ *--------------------
+ */
+void
+RemoveFromWaitQueue(PROC *proc)
+{
+   LOCK   *waitLock = proc->waitLock;
+   LOCKMODE lockmode = proc->waitLockMode;
+
+   /* Make sure proc is waiting */
+   Assert(proc->links.next != INVALID_OFFSET);
+   Assert(waitLock);
+   Assert(waitLock->waitProcs.size > 0);
+
+   /* Remove proc from lock's wait queue */
+   SHMQueueDelete(&(proc->links));
+   waitLock->waitProcs.size--;
+
+   /* Undo increments of request counts by waiting process */
+   Assert(waitLock->nRequested > 0);
+   Assert(waitLock->nRequested > proc->waitLock->nGranted);
+   waitLock->nRequested--;
+   Assert(waitLock->requested[lockmode] > 0);
+   waitLock->requested[lockmode]--;
+   /* don't forget to clear waitMask bit if appropriate */
+   if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
+       waitLock->waitMask &= BITS_OFF[lockmode];
+
+   /* Clean up the proc's own state */
+   proc->waitLock = NULL;
+   proc->waitHolder = NULL;
+
+   /* See if any other waiters for the lock can be woken up now */
+   ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+}
+
 /*
  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
- *     release it.
+ *     release one 'lockmode' lock on it.
  *
- * Side Effects: if the lock no longer conflicts with the highest
- *     priority waiting process, that process is granted the lock
- *     and awoken. (We have to grant the lock here to avoid a
- *     race between the waking process and any new process to
+ * Side Effects: find any waiting processes that are now wakable,
+ *     grant them their requested locks and awaken them.
+ *     (We have to grant the lock here to avoid a race between
+ *     the waking process and any new process to
  *     come along and request the lock.)
  */
 bool
@@ -1013,7 +1039,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
    HOLDER     *holder;
    HOLDERTAG   holdertag;
    HTAB       *holderTable;
-   bool        wakeupNeeded = true;
+   bool        wakeupNeeded = false;
 
 #ifdef LOCK_DEBUG
    if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
@@ -1086,7 +1112,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        return FALSE;
    }
    HOLDER_PRINT("LockRelease: found", holder);
-   Assert(holder->tag.lock == MAKE_OFFSET(lock));
 
    /*
     * Check that we are actually holding a lock of the type we want to
@@ -1094,11 +1119,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
     */
    if (!(holder->holding[lockmode] > 0))
    {
-       SpinRelease(masterLock);
        HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
+       Assert(holder->holding[lockmode] >= 0);
+       SpinRelease(masterLock);
        elog(NOTICE, "LockRelease: you don't own a lock of type %s",
             lock_types[lockmode]);
-       Assert(holder->holding[lockmode] >= 0);
        return FALSE;
    }
    Assert(holder->nHolding > 0);
@@ -1120,34 +1145,24 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        lock->grantMask &= BITS_OFF[lockmode];
    }
 
-#ifdef NOT_USED
+   LOCK_PRINT("LockRelease: updated", lock, lockmode);
+   Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
+   Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
+   Assert(lock->nGranted <= lock->nRequested);
+
    /* --------------------------
-    * If there are still active locks of the type I just released, no one
-    * should be woken up.  Whoever is asleep will still conflict
-    * with the remaining locks.
+    * We need only run ProcLockWakeup if the released lock conflicts with
+    * at least one of the lock types requested by waiter(s).  Otherwise
+    * whatever conflict made them wait must still exist.  NOTE: before MVCC,
+    * we could skip wakeup if lock->granted[lockmode] was still positive.
+    * But that's not true anymore, because the remaining granted locks might
+    * belong to some waiter, who could now be awakened because he doesn't
+    * conflict with his own locks.
     * --------------------------
     */
-   if (lock->granted[lockmode])
-       wakeupNeeded = false;
-   else
-#endif
-
-       /*
-        * Above is not valid any more (due to MVCC lock modes). Actually
-        * we should compare granted[lockmode] with number of
-        * waiters holding lock of this type and try to wakeup only if
-        * these numbers are equal (and lock released conflicts with locks
-        * requested by waiters). For the moment we only check the last
-        * condition.
-        */
    if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
        wakeupNeeded = true;
 
-   LOCK_PRINT("LockRelease: updated", lock, lockmode);
-   Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
-   Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
-   Assert(lock->nGranted <= lock->nRequested);
-
    if (lock->nRequested == 0)
    {
        /* ------------------
@@ -1161,8 +1176,13 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                                    (Pointer) &(lock->tag),
                                    HASH_REMOVE,
                                    &found);
-       Assert(lock && found);
-       wakeupNeeded = false;
+       if (!lock || !found)
+       {
+           SpinRelease(masterLock);
+           elog(NOTICE, "LockRelease: remove lock, table corrupted");
+           return FALSE;
+       }
+       wakeupNeeded = false;   /* should be false, but make sure */
    }
 
    /*
@@ -1192,12 +1212,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        }
    }
 
+   /*
+    * Wake up waiters if needed.
+    */
    if (wakeupNeeded)
-       ProcLockWakeup(lockmethod, lock);
-#ifdef LOCK_DEBUG
-   else if (LOCK_DEBUG_ENABLED(lock))
-        elog(DEBUG, "LockRelease: no wakeup needed");
-#endif
+       ProcLockWakeup(lockMethodTable, lock);
 
    SpinRelease(masterLock);
    return TRUE;
@@ -1310,8 +1329,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
        else
        {
            /* --------------
-            * set nRequested to zero so that we can garbage collect the lock
-            * down below...
+            * This holder accounts for all the requested locks on the object,
+            * so we can be lazy and just zero things out.
             * --------------
             */
            lock->nRequested = 0;
@@ -1347,7 +1366,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
            return FALSE;
        }
 
-       if (!lock->nRequested)
+       if (lock->nRequested == 0)
        {
            /* --------------------
             * We've just released the last lock, so garbage-collect the
@@ -1359,7 +1378,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
            lock = (LOCK *) hash_search(lockMethodTable->lockHash,
                                        (Pointer) &(lock->tag),
                                        HASH_REMOVE, &found);
-           if ((!lock) || (!found))
+           if (!lock || !found)
            {
                SpinRelease(masterLock);
                elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
@@ -1367,7 +1386,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
            }
        }
        else if (wakeupNeeded)
-           ProcLockWakeup(lockmethod, lock);
+           ProcLockWakeup(lockMethodTable, lock);
 
 next_item:
        holder = nextHolder;
@@ -1412,245 +1431,6 @@ LockShmemSize(int maxBackends)
    return size;
 }
 
-/*
- * DeadLockCheck -- Checks for deadlocks for a given process
- *
- * This code takes a list of locks a process holds, and the lock that
- * the process is sleeping on, and tries to find if any of the processes
- * waiting on its locks hold the lock it is waiting for.  If no deadlock
- * is found, it goes on to look at all the processes waiting on their locks.
- *
- * We can't block on user locks, so no sense testing for deadlock
- * because there is no blocking, and no timer for the block.  So,
- * only look at regular locks.
- *
- * We have already locked the master lock before being called.
- */
-bool
-DeadLockCheck(PROC *thisProc, LOCK *findlock)
-{
-   PROC       *waitProc;
-   PROC_QUEUE *waitQueue;
-   SHM_QUEUE  *procHolders = &(thisProc->procHolders);
-   HOLDER     *holder;
-   HOLDER     *nextHolder;
-   LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
-   LOCK       *lock;
-   int         i,
-               j;
-   bool        first_run = (thisProc == MyProc);
-
-   static PROC *checked_procs[MAXBACKENDS];
-   static int  nprocs;
-
-   /* initialize at start of recursion */
-   if (first_run)
-   {
-       checked_procs[0] = thisProc;
-       nprocs = 1;
-   }
-
-   /*
-    * Scan over all the locks held/awaited by thisProc.
-    */
-   holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
-                                    offsetof(HOLDER, procLink));
-
-   while (holder)
-   {
-       /* Get link first, since we may unlink/delete this holder */
-       nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
-                                            offsetof(HOLDER, procLink));
-
-       Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
-
-       lock = (LOCK *) MAKE_PTR(holder->tag.lock);
-
-       /* Ignore user locks */
-       if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
-           goto nxtl;
-
-       HOLDER_PRINT("DeadLockCheck", holder);
-       LOCK_PRINT("DeadLockCheck", lock, 0);
-
-       /*
-        * waitLock is always in procHolders of waiting proc, if !first_run
-        * then upper caller will handle waitProcs queue of waitLock.
-        */
-       if (thisProc->waitLock == lock && !first_run)
-           goto nxtl;
-
-       /*
-        * If we found proc holding findlock and sleeping on some my other
-        * lock then we have to check does it block me or another waiters.
-        */
-       if (lock == findlock && !first_run)
-       {
-           int         lm;
-
-           Assert(holder->nHolding > 0);
-           for (lm = 1; lm <= lockctl->numLockModes; lm++)
-           {
-               if (holder->holding[lm] > 0 &&
-                   lockctl->conflictTab[lm] & findlock->waitMask)
-                   return true;
-           }
-
-           /*
-            * Else - get the next lock from thisProc's procHolders
-            */
-           goto nxtl;
-       }
-
-       waitQueue = &(lock->waitProcs);
-       waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
-       /*
-        * Inner loop scans over all processes waiting for this lock.
-        *
-        * NOTE: loop must count down because we want to examine each item
-        * in the queue even if waitQueue->size decreases due to waking up
-        * some of the processes.
-        */
-       for (i = waitQueue->size; --i >= 0; )
-       {
-           Assert(waitProc->waitLock == lock);
-           if (waitProc == thisProc)
-           {
-               /* This should only happen at first level */
-               Assert(waitProc == MyProc);
-               goto nextWaitProc;
-           }
-           if (lock == findlock)       /* first_run also true */
-           {
-               /*
-                * If I'm blocked by his heldLocks...
-                */
-               if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks)
-               {
-                   /* and he blocked by me -> deadlock */
-                   if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
-                       return true;
-                   /* we shouldn't look at procHolders of our blockers */
-                   goto nextWaitProc;
-               }
-
-               /*
-                * If he isn't blocked by me and we request
-                * non-conflicting lock modes - no deadlock here because
-                * he isn't blocked by me in any sense (explicitly or
-                * implicitly). Note that we don't do like test if
-                * !first_run (when thisProc is holder and non-waiter on
-                * lock) and so we call DeadLockCheck below for every
-                * waitProc in thisProc->procHolders, even for waitProc-s
-                * un-blocked by thisProc. Should we? This could save us
-                * some time...
-                */
-               if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) &&
-                   !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
-                   goto nextWaitProc;
-           }
-
-           /*
-            * Skip this waiter if already checked.
-            */
-           for (j = 0; j < nprocs; j++)
-           {
-               if (checked_procs[j] == waitProc)
-                   goto nextWaitProc;
-           }
-
-           /* Recursively check this process's procHolders. */
-           Assert(nprocs < MAXBACKENDS);
-           checked_procs[nprocs++] = waitProc;
-
-           if (DeadLockCheck(waitProc, findlock))
-           {
-               int         heldLocks;
-
-               /*
-                * Ok, but is waitProc waiting for me (thisProc) ?
-                */
-               if (thisProc->waitLock == lock)
-               {
-                   Assert(first_run);
-                   heldLocks = thisProc->heldLocks;
-               }
-               else
-               {
-                   /* should we cache heldLocks to speed this up? */
-                   heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc);
-                   Assert(heldLocks != 0);
-               }
-               if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks)
-               {
-                   /*
-                    * Last attempt to avoid deadlock: try to wakeup myself.
-                    */
-                   if (first_run)
-                   {
-                       if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
-                                                MyProc->waitLockMode,
-                                                MyProc->waitLock,
-                                                MyProc->waitHolder,
-                                                MyProc,
-                                                NULL) == STATUS_OK)
-                       {
-                           GrantLock(MyProc->waitLock,
-                                     MyProc->waitHolder,
-                                     MyProc->waitLockMode);
-                           ProcWakeup(MyProc, NO_ERROR);
-                           return false;
-                       }
-                   }
-                   return true;
-               }
-
-               /*
-                * Hell! Is he blocked by any (other) holder ?
-                */
-               if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
-                                        waitProc->waitLockMode,
-                                        lock,
-                                        waitProc->waitHolder,
-                                        waitProc,
-                                        NULL) != STATUS_OK)
-               {
-                   /*
-                    * Blocked by others - no deadlock...
-                    */
-                   LOCK_PRINT("DeadLockCheck: blocked by others",
-                              lock, waitProc->waitLockMode);
-                   goto nextWaitProc;
-               }
-
-               /*
-                * Well - wakeup this guy! This is the case of
-                * implicit blocking: thisProc blocked someone who
-                * blocked waitProc by the fact that he/someone is
-                * already waiting for lock.  We do this for
-                * anti-starving.
-                */
-               GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
-               waitProc = ProcWakeup(waitProc, NO_ERROR);
-               /*
-                * Use next-proc link returned by ProcWakeup, since this
-                * proc's own links field is now cleared.
-                */
-               continue;
-           }
-
-nextWaitProc:
-           waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
-       }
-
-nxtl:
-       holder = nextHolder;
-   }
-
-   /* if we got here, no deadlock */
-   return false;
-}
 
 #ifdef LOCK_DEBUG
 /*
index 377e9dbeb58261fd5cf8b8b48eb96c14989fd96b..fd4c4b1485687e78effde87c01a88c48dc684e65 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.97 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -18,7 +18,7 @@
  *
  *
  * Interface (a):
- *     ProcSleep(), ProcWakeup(), ProcWakeupNext(),
+ *     ProcSleep(), ProcWakeup(),
  *     ProcQueueAlloc() -- create a shm queue for sleeping processes
  *     ProcQueueInit() -- create a queue without allocing memory
  *
@@ -47,8 +47,6 @@
  *     shared among backends (we keep a few sets of semaphores around).
  *     This is so that we can support more backends. (system-wide semaphore
  *     sets run out pretty fast.)                -ay 4/95
- *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
  */
 #include "postgres.h"
 
@@ -257,7 +255,7 @@ InitProcess(void)
    }
 
    SHMQueueElemInit(&(MyProc->links));
-   MyProc->errType = NO_ERROR;
+   MyProc->errType = STATUS_OK;
    MyProc->pid = MyProcPid;
    MyProc->databaseId = MyDatabaseId;
    MyProc->xid = InvalidTransactionId;
@@ -284,7 +282,16 @@ InitProcess(void)
        (location != MAKE_OFFSET(MyProc)))
        elog(STOP, "InitProcess: ShmemPID table broken");
 
+   /*
+    * Arrange to clean up at backend exit.
+    */
    on_shmem_exit(ProcKill, 0);
+
+   /*
+    * Now that we have a PROC, we could try to acquire locks,
+    * so initialize the deadlock checker.
+    */
+   InitDeadLockChecking();
 }
 
 /*
@@ -304,50 +311,6 @@ ZeroProcSemaphore(PROC *proc)
    }
 }
 
-/*
- * Remove a proc from the wait-queue it is on
- * (caller must know it is on one).
- * Locktable lock must be held by caller.
- *
- * NB: this does not remove the process' holder object, nor the lock object,
- * even though their counts might now have gone to zero.  That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup.  (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
- */
-static void
-RemoveFromWaitQueue(PROC *proc)
-{
-   LOCK   *waitLock = proc->waitLock;
-   LOCKMODE lockmode = proc->waitLockMode;
-
-   /* Make sure proc is waiting */
-   Assert(proc->links.next != INVALID_OFFSET);
-   Assert(waitLock);
-   Assert(waitLock->waitProcs.size > 0);
-
-   /* Remove proc from lock's wait queue */
-   SHMQueueDelete(&(proc->links));
-   waitLock->waitProcs.size--;
-
-   /* Undo increments of request counts by waiting process */
-   Assert(waitLock->nRequested > 0);
-   Assert(waitLock->nRequested > proc->waitLock->nGranted);
-   waitLock->nRequested--;
-   Assert(waitLock->requested[lockmode] > 0);
-   waitLock->requested[lockmode]--;
-   /* don't forget to clear waitMask bit if appropriate */
-   if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
-       waitLock->waitMask &= ~(1 << lockmode);
-
-   /* Clean up the proc's own state */
-   proc->waitLock = NULL;
-   proc->waitHolder = NULL;
-
-   /* See if any other waiters for the lock can be woken up now */
-   ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
-}
-
 /*
  * Cancel any pending wait for lock, when aborting a transaction.
  *
@@ -529,34 +492,34 @@ ProcQueueInit(PROC_QUEUE *queue)
 /*
  * ProcSleep -- put a process to sleep
  *
- * P() on the semaphore should put us to sleep.  The process
- * semaphore is normally zero, so when we try to acquire it, we sleep.
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
  *
  * Locktable's spinlock must be held at entry, and will be held
  * at exit.
  *
- * Result is NO_ERROR if we acquired the lock, STATUS_ERROR if not (deadlock).
+ * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
  *
  * ASSUME: that no one will fiddle with the queue until after
  *     we release the spin lock.
  *
  * NOTES: The process queue is now a priority queue for locking.
+ *
+ * P() on the semaphore should put us to sleep.  The process
+ * semaphore is normally zero, so when we try to acquire it, we sleep.
  */
 int
-ProcSleep(LOCKMETHODCTL *lockctl,
+ProcSleep(LOCKMETHODTABLE *lockMethodTable,
          LOCKMODE lockmode,
          LOCK *lock,
          HOLDER *holder)
 {
-   PROC_QUEUE *waitQueue = &(lock->waitProcs);
+   LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
    SPINLOCK    spinlock = lockctl->masterLock;
-   int         myMask = (1 << lockmode);
-   int         waitMask = lock->waitMask;
+   PROC_QUEUE *waitQueue = &(lock->waitProcs);
+   int         myHeldLocks = MyProc->heldLocks;
    PROC       *proc;
    int         i;
-   int         aheadGranted[MAX_LOCKMODES];
-   bool        selfConflict = (lockctl->conflictTab[lockmode] & myMask),
-               prevSame = false;
 #ifndef __BEOS__
    struct itimerval timeval,
                dummy;
@@ -564,64 +527,63 @@ ProcSleep(LOCKMETHODCTL *lockctl,
     bigtime_t time_interval;
 #endif
 
-   proc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
-   /* if we don't conflict with any waiter - be first in queue */
-   if (!(lockctl->conflictTab[lockmode] & waitMask))
-       goto ins;
-
-   /* otherwise, determine where we should go into the queue */
-   for (i = 1; i < MAX_LOCKMODES; i++)
-       aheadGranted[i] = lock->granted[i];
-   (aheadGranted[lockmode])++;
-
-   for (i = 0; i < waitQueue->size; i++)
+   /* ----------------------
+    * Determine where to add myself in the wait queue.
+    *
+    * Normally I should go at the end of the queue.  However, if I already
+    * hold locks that conflict with the request of any previous waiter,
+    * put myself in the queue just in front of the first such waiter.
+    * This is not a necessary step, since deadlock detection would move
+    * me to before that waiter anyway; but it's relatively cheap to detect
+    * such a conflict immediately, and avoid delaying till deadlock timeout.
+    *
+    * Special case: if I find I should go in front of the first waiter,
+    * and I do not conflict with already-held locks, then just grant myself
+    * the requested lock immediately.
+    * ----------------------
+    */
+   if (myHeldLocks != 0)
    {
-       LOCKMODE    procWaitMode = proc->waitLockMode;
-
-       /* must I wait for him ? */
-       if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+       proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+       for (i = 0; i < waitQueue->size; i++)
        {
-           /* is he waiting for me ? */
-           if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
+           /* Must he wait for me? */
+           if (lockctl->conflictTab[proc->waitLockMode] & myHeldLocks)
            {
-               /* Yes, report deadlock failure */
-               MyProc->errType = STATUS_ERROR;
-               return STATUS_ERROR;
-           }
-           /* I must go after him in queue - so continue loop */
-       }
-       /* if he waits for me, go before him in queue */
-       else if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
-           break;
-       /* if conflicting locks requested */
-       else if (lockctl->conflictTab[procWaitMode] & myMask)
-       {
-
-           /*
-            * If I request non self-conflicting lock and there are others
-            * requesting the same lock just before this guy - stop here.
-            */
-           if (!selfConflict && prevSame)
+               /* Must I wait for him ? */
+               if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+               {
+                   /* Yes, can report deadlock failure immediately */
+                   MyProc->errType = STATUS_ERROR;
+                   return STATUS_ERROR;
+               }
+               if (i == 0)
+               {
+                   /* I must go before first waiter.  Check special case. */
+                   if (LockCheckConflicts(lockMethodTable,
+                                          lockmode,
+                                          lock,
+                                          holder,
+                                          MyProc,
+                                          NULL) == STATUS_OK)
+                   {
+                       /* Skip the wait and just grant myself the lock. */
+                       GrantLock(lock, holder, lockmode);
+                       return STATUS_OK;
+                   }
+               }
+               /* Break out of loop to put myself before him */
                break;
+           }
+           proc = (PROC *) MAKE_PTR(proc->links.next);
        }
-
-       /*
-        * Last attempt to not move any further to the back of the queue:
-        * if we don't conflict with remaining waiters, stop here.
-        */
-       else if (!(lockctl->conflictTab[lockmode] & waitMask))
-           break;
-
-       /* Move past this guy, and update state accordingly */
-       prevSame = (procWaitMode == lockmode);
-       (aheadGranted[procWaitMode])++;
-       if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
-           waitMask &= ~(1 << procWaitMode);
-       proc = (PROC *) MAKE_PTR(proc->links.next);
+   }
+   else
+   {
+       /* I hold no locks, so I can't push in front of anyone. */
+       proc = (PROC *) &(waitQueue->links);
    }
 
-ins:;
    /* -------------------
     * Insert self into queue, ahead of the given proc (or at tail of queue).
     * -------------------
@@ -629,15 +591,14 @@ ins:;
    SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
    waitQueue->size++;
 
-   lock->waitMask |= myMask;
+   lock->waitMask |= (1 << lockmode);
 
    /* Set up wait information in PROC object, too */
    MyProc->waitLock = lock;
    MyProc->waitHolder = holder;
    MyProc->waitLockMode = lockmode;
-   /* We assume the caller set up MyProc->heldLocks */
 
-   MyProc->errType = NO_ERROR;     /* initialize result for success */
+   MyProc->errType = STATUS_OK; /* initialize result for success */
 
    /* mark that we are waiting for a lock */
    waitingForLock = true;
@@ -662,7 +623,7 @@ ins:;
     * By delaying the check until we've waited for a bit, we can avoid
     * running the rather expensive deadlock-check code in most cases.
     *
-    * Need to zero out struct to set the interval and the micro seconds fields
+    * Need to zero out struct to set the interval and the microseconds fields
     * to 0.
     * --------------
     */
@@ -768,89 +729,59 @@ ProcWakeup(PROC *proc, int errType)
 
 /*
  * ProcLockWakeup -- routine for waking up processes when a lock is
- *     released.
+ *     released (or a prior waiter is aborted).  Scan all waiters
+ *     for lock, waken any that are no longer blocked.
  */
-int
-ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
+void
+ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock)
 {
-   PROC_QUEUE *queue = &(lock->waitProcs);
+   LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
+   PROC_QUEUE *waitQueue = &(lock->waitProcs);
+   int         queue_size = waitQueue->size;
    PROC       *proc;
-   int         awoken = 0;
-   LOCKMODE    last_lockmode = 0;
-   int         queue_size = queue->size;
+   int         conflictMask = 0;
 
    Assert(queue_size >= 0);
 
-   if (!queue_size)
-       return STATUS_NOT_FOUND;
+   if (queue_size == 0)
+       return;
 
-   proc = (PROC *) MAKE_PTR(queue->links.next);
+   proc = (PROC *) MAKE_PTR(waitQueue->links.next);
 
    while (queue_size-- > 0)
    {
-       if (proc->waitLockMode == last_lockmode)
-       {
-           /*
-            * This proc will conflict as the previous one did, don't even
-            * try.
-            */
-           goto nextProc;
-       }
+       LOCKMODE lockmode = proc->waitLockMode;
 
        /*
-        * Does this proc conflict with locks held by others ?
+        * Waken if (a) doesn't conflict with requests of earlier waiters,
+        * and (b) doesn't conflict with already-held locks.
         */
-       if (LockResolveConflicts(lockmethod,
-                                proc->waitLockMode,
-                                lock,
-                                proc->waitHolder,
-                                proc,
-                                NULL) != STATUS_OK)
+       if (((1 << lockmode) & conflictMask) == 0 &&
+           LockCheckConflicts(lockMethodTable,
+                              lockmode,
+                              lock,
+                              proc->waitHolder,
+                              proc,
+                              NULL) == STATUS_OK)
        {
-           /* Yes.  Quit if we already awoke at least one process. */
-           if (awoken != 0)
-               break;
-           /* Otherwise, see if any later waiters can be awoken. */
-           last_lockmode = proc->waitLockMode;
-           goto nextProc;
+           /* OK to waken */
+           GrantLock(lock, proc->waitHolder, lockmode);
+           proc = ProcWakeup(proc, STATUS_OK);
+           /*
+            * ProcWakeup removes proc from the lock's waiting process queue
+            * and returns the next proc in chain; don't use proc's next-link,
+            * because it's been cleared.
+            */
        }
-
-       /*
-        * OK to wake up this sleeping process.
-        */
-       GrantLock(lock, proc->waitHolder, proc->waitLockMode);
-       proc = ProcWakeup(proc, NO_ERROR);
-       awoken++;
-
-       /*
-        * ProcWakeup removes proc from the lock's waiting process queue
-        * and returns the next proc in chain; don't use proc's next-link,
-        * because it's been cleared.
-        */
-       continue;
-
-nextProc:
-       proc = (PROC *) MAKE_PTR(proc->links.next);
-   }
-
-   Assert(queue->size >= 0);
-
-   if (awoken)
-       return STATUS_OK;
-   else
-   {
-       /* Something is still blocking us.  May have deadlocked. */
-#ifdef LOCK_DEBUG
-       if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+       else
        {
-           elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
-                MAKE_OFFSET(lock));
-           if (Debug_deadlocks)
-               DumpAllLocks();
+           /* Cannot wake this guy.  Add his request to conflict mask. */
+           conflictMask |= lockctl->conflictTab[lockmode];
+           proc = (PROC *) MAKE_PTR(proc->links.next);
        }
-#endif
-       return STATUS_NOT_FOUND;
    }
+
+   Assert(waitQueue->size >= 0);
 }
 
 /* --------------------
@@ -900,7 +831,7 @@ HandleDeadLock(SIGNAL_ARGS)
         DumpAllLocks();
 #endif
 
-   if (!DeadLockCheck(MyProc, MyProc->waitLock))
+   if (!DeadLockCheck(MyProc))
    {
        /* No deadlock, so keep waiting */
        UnlockLockTable();
index 39eefac0ca6f827437a4550e8e9c842a5be0591a..0ada6eaccfebee6e5b18db0934e3746dce7bdae4 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.43 2001/01/24 19:43:27 momjian Exp $
+ * $Id: lock.h,v 1.44 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -247,6 +247,7 @@ typedef struct HOLDER
 extern void InitLocks(void);
 extern void LockDisable(bool status);
 extern bool LockingDisabled(void);
+extern LOCKMETHODTABLE *GetLocksMethodTable(LOCK *lock);
 extern LOCKMETHOD LockMethodTableInit(char *tabName, LOCKMASK *conflictsP,
                    int *prioP, int numModes, int maxBackends);
 extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
@@ -256,12 +257,15 @@ extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                        TransactionId xid, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                           bool allxids, TransactionId xid);
-extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode,
-                               LOCK *lock, HOLDER *holder, PROC *proc,
-                               int *myHolding);
+extern int LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+                             LOCKMODE lockmode,
+                             LOCK *lock, HOLDER *holder, PROC *proc,
+                             int *myHolding);
 extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode);
+extern void RemoveFromWaitQueue(PROC *proc);
 extern int LockShmemSize(int maxBackends);
-extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock);
+extern bool DeadLockCheck(PROC *proc);
+extern void InitDeadLockChecking(void);
 
 #ifdef LOCK_DEBUG
 extern void DumpLocks(void);
index 3f8902e7d373850032b2cddfadab005bb246aae4..4dd5a8c2a676e04930daa3693586923942f9db96 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.38 2001/01/24 19:43:28 momjian Exp $
+ * $Id: proc.h,v 1.39 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,7 +41,7 @@ struct proc
    SHM_QUEUE   links;          /* list link if process is in a list */
 
    SEMA        sem;            /* ONE semaphore to sleep on */
-   int         errType;        /* error code tells why we woke up */
+   int         errType;        /* STATUS_OK or STATUS_ERROR after wakeup */
 
    TransactionId xid;          /* transaction currently being executed by
                                 * this proc */
@@ -86,13 +86,6 @@ do { \
    if (MyProc) (MyProc->sLocks[(lock)])--; \
 } while (0)
 
-/*
- * flags explaining why process woke up
- */
-#define NO_ERROR       0
-#define ERR_TIMEOUT        1
-#define ERR_BUFFER_IO  2
-
 
 /*
  * There is one ProcGlobal struct for the whole installation.
@@ -134,10 +127,10 @@ extern void ProcReleaseLocks(bool isCommit);
 extern bool ProcRemove(int pid);
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode,
+extern int ProcSleep(LOCKMETHODTABLE *lockMethodTable, LOCKMODE lockmode,
                     LOCK *lock, HOLDER *holder);
 extern PROC *ProcWakeup(PROC *proc, int errType);
-extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
+extern void ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock);
 extern void ProcReleaseSpins(PROC *proc);
 extern bool LockWaitCancel(void);
 extern void HandleDeadLock(SIGNAL_ARGS);