Tighten up error recovery for fast-path locking.
authorRobert Haas <rhaas@postgresql.org>
Wed, 18 Apr 2012 15:17:30 +0000 (11:17 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 1 Jun 2012 12:32:36 +0000 (08:32 -0400)
The previous code could cause a backend crash after BEGIN; SAVEPOINT a;
LOCK TABLE foo (interrupted by ^C or statement timeout); ROLLBACK TO
SAVEPOINT a; LOCK TABLE foo, and might have leaked strong-lock counts
in other situations.

Report by Zoltán Böszörményi; patch review by Jeff Davis.

src/backend/access/transam/xact.c
src/backend/storage/lmgr/README
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/backend/tcop/postgres.c
src/include/storage/lock.h
src/include/storage/proc.h

index 184fe28f5c53c602f35a52e1d680254cb50e9644..b96a1d47e92741d58242247c295b21db74ade5aa 100644 (file)
@@ -2252,7 +2252,7 @@ AbortTransaction(void)
         * Also clean up any open wait for lock, since the lock manager will choke
         * if we try to wait for another lock before doing this.
         */
-       LockWaitCancel();
+       LockErrorCleanup();
 
        /*
         * check the current transaction state
@@ -4122,7 +4122,7 @@ AbortSubTransaction(void)
        AbortBufferIO();
        UnlockBuffers();
 
-       LockWaitCancel();
+       LockErrorCleanup();
 
        /*
         * check the current transaction state
index 3165bb17c75ffb11d7c995bf7187589bcba61393..13ccb0b8f93fc658ce2e15194001e09825e33f17 100644 (file)
@@ -560,7 +560,7 @@ examines every member of the wait queue (this was not true in the 7.0
 implementation, BTW).  Therefore, if ProcLockWakeup is always invoked
 after a lock is released or a wait queue is rearranged, there can be no
 failure to wake a wakable process.  One should also note that
-LockWaitCancel (abort a waiter due to outside factors) must run
+LockErrorCleanup (abort a waiter due to outside factors) must run
 ProcLockWakeup, in case the canceled waiter was soft-blocking other
 waiters.
 
index c8a1edb66f42365177a10cb272458081ad018715..65f64ffa7f8c7b9c47a0e06fefc5bdc0488c0504 100644 (file)
@@ -250,7 +250,8 @@ static HTAB *LockMethodProcLockHash;
 static HTAB *LockMethodLocalHash;
 
 
-/* private state for GrantAwaitedLock */
+/* private state for error cleanup */
+static LOCALLOCK *StrongLockInProgress;
 static LOCALLOCK *awaitedLock;
 static ResourceOwner awaitedOwner;
 
@@ -338,6 +339,8 @@ static void RemoveLocalLock(LOCALLOCK *locallock);
 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
                             const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
+static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
+static void FinishStrongLockAcquire(void);
 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
@@ -738,22 +741,11 @@ LockAcquireExtended(const LOCKTAG *locktag,
                }
                else if (FastPathStrongMode(lockmode))
                {
-                       /*
-                        * Adding to a memory location is not atomic, so we take a
-                        * spinlock to ensure we don't collide with someone else trying
-                        * to bump the count at the same time.
-                        *
-                        * XXX: It might be worth considering using an atomic fetch-and-add
-                        * instruction here, on architectures where that is supported.
-                        */
-                       Assert(locallock->holdsStrongLockCount == FALSE);
-                       SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
-                       FastPathStrongRelationLocks->count[fasthashcode]++;
-                       locallock->holdsStrongLockCount = TRUE;
-                       SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+                       BeginStrongLockAcquire(locallock, fasthashcode);
                        if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
                                                                                           hashcode))
                        {
+                               AbortStrongLockAcquire();
                                if (reportMemoryError)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -779,6 +771,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
                                                                hashcode, lockmode);
        if (!proclock)
        {
+               AbortStrongLockAcquire();
                LWLockRelease(partitionLock);
                if (reportMemoryError)
                        ereport(ERROR,
@@ -820,6 +813,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
                 */
                if (dontWait)
                {
+                       AbortStrongLockAcquire();
                        if (proclock->holdMask == 0)
                        {
                                uint32          proclock_hashcode;
@@ -884,6 +878,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
                 */
                if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
                {
+                       AbortStrongLockAcquire();
                        PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
                        LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
                        /* Should we retry ? */
@@ -894,6 +889,12 @@ LockAcquireExtended(const LOCKTAG *locktag,
                LOCK_PRINT("LockAcquire: granted", lock, lockmode);
        }
 
+       /*
+        * Lock state is fully up-to-date now; if we error out after this, no
+        * special error cleanup is required.
+        */
+       FinishStrongLockAcquire();
+
        LWLockRelease(partitionLock);
 
        /*
@@ -1349,6 +1350,64 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
        locallock->numLockOwners++;
 }
 
+/*
+ * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
+ * and arrange for error cleanup if it fails
+ */
+static void
+BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
+{
+       Assert(StrongLockInProgress == NULL);
+       Assert(locallock->holdsStrongLockCount == FALSE);
+
+       /*
+        * Adding to a memory location is not atomic, so we take a
+        * spinlock to ensure we don't collide with someone else trying
+        * to bump the count at the same time.
+        *
+        * XXX: It might be worth considering using an atomic fetch-and-add
+        * instruction here, on architectures where that is supported.
+        */
+
+       SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
+       FastPathStrongRelationLocks->count[fasthashcode]++;
+       locallock->holdsStrongLockCount = TRUE;
+       StrongLockInProgress = locallock;
+       SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+}
+
+/*
+ * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
+ * acquisition once it's no longer needed
+ */
+static void
+FinishStrongLockAcquire(void)
+{
+       StrongLockInProgress = NULL;
+}
+
+/*
+ * AbortStrongLockAcquire - undo strong lock state changes performed by
+ * BeginStrongLockAcquire.
+ */
+void
+AbortStrongLockAcquire(void)
+{
+       uint32  fasthashcode;
+       LOCALLOCK  *locallock = StrongLockInProgress;
+       
+       if (locallock == NULL)
+               return;
+
+       fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
+       Assert(locallock->holdsStrongLockCount == TRUE);
+       SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
+       FastPathStrongRelationLocks->count[fasthashcode]--;
+       locallock->holdsStrongLockCount = FALSE;
+       StrongLockInProgress = NULL;
+       SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+}
+
 /*
  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
  *             WaitOnLock on.
@@ -1414,7 +1473,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
         * We can and do use a PG_TRY block to try to clean up after failure, but
         * this still has a major limitation: elog(FATAL) can occur while waiting
         * (eg, a "die" interrupt), and then control won't come back here. So all
-        * cleanup of essential state should happen in LockWaitCancel, not here.
+        * cleanup of essential state should happen in LockErrorCleanup, not here.
         * We can use PG_TRY to clear the "waiting" status flags, since doing that
         * is unimportant if the process exits.
         */
@@ -1441,7 +1500,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
        }
        PG_CATCH();
        {
-               /* In this path, awaitedLock remains set until LockWaitCancel */
+               /* In this path, awaitedLock remains set until LockErrorCleanup */
 
                /* Report change to non-waiting status */
                pgstat_report_waiting(false);
index 8d8979230329d5175fde11a33cbd0730320b31b0..7c9ac2e5978e40aea5f67a8f50ed3ab5536f0d8c 100644 (file)
@@ -578,17 +578,20 @@ IsWaitingForLock(void)
 }
 
 /*
- * Cancel any pending wait for lock, when aborting a transaction.
+ * Cancel any pending wait for lock, when aborting a transaction, and revert
+ * any strong lock count acquisition for a lock being acquired.
  *
  * (Normally, this would only happen if we accept a cancel/die
- * interrupt while waiting; but an ereport(ERROR) while waiting is
- * within the realm of possibility, too.)
+ * interrupt while waiting; but an ereport(ERROR) before or during the lock
+ * wait is within the realm of possibility, too.)
  */
 void
-LockWaitCancel(void)
+LockErrorCleanup(void)
 {
        LWLockId        partitionLock;
 
+       AbortStrongLockAcquire();
+
        /* Nothing to do if we weren't waiting for a lock */
        if (lockAwaited == NULL)
                return;
@@ -655,7 +658,7 @@ ProcReleaseLocks(bool isCommit)
        if (!MyProc)
                return;
        /* If waiting, get off wait queue (should only be needed after error) */
-       LockWaitCancel();
+       LockErrorCleanup();
        /* Release standard locks, including session-level if aborting */
        LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
        /* Release transaction-level advisory locks */
@@ -953,7 +956,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
         * NOTE: this may also cause us to exit critical-section state, possibly
         * allowing a cancel/die interrupt to be accepted. This is OK because we
         * have recorded the fact that we are waiting for a lock, and so
-        * LockWaitCancel will clean up if cancel/die happens.
+        * LockErrorCleanup will clean up if cancel/die happens.
         */
        LWLockRelease(partitionLock);
 
@@ -996,7 +999,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
         * don't, because we have no shared-state-change work to do after being
         * granted the lock (the grantor did it all).  We do have to worry about
         * updating the locallock table, but if we lose control to an error,
-        * LockWaitCancel will fix that up.
+        * LockErrorCleanup will fix that up.
         */
        do
        {
@@ -1140,7 +1143,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
        /*
-        * We no longer want LockWaitCancel to do anything.
+        * We no longer want LockErrorCleanup to do anything.
         */
        lockAwaited = NULL;
 
index dda82358eb6f0c4b6c393e41fd99c6bec01997d2..cc309d6f21d79733e36e281622db8526b3873467 100644 (file)
@@ -2639,7 +2639,7 @@ die(SIGNAL_ARGS)
                        /* bump holdoff count to make ProcessInterrupts() a no-op */
                        /* until we are done getting ready for it */
                        InterruptHoldoffCount++;
-                       LockWaitCancel();       /* prevent CheckDeadLock from running */
+                       LockErrorCleanup();     /* prevent CheckDeadLock from running */
                        DisableNotifyInterrupt();
                        DisableCatchupInterrupt();
                        InterruptHoldoffCount--;
@@ -2681,7 +2681,7 @@ StatementCancelHandler(SIGNAL_ARGS)
                        /* bump holdoff count to make ProcessInterrupts() a no-op */
                        /* until we are done getting ready for it */
                        InterruptHoldoffCount++;
-                       LockWaitCancel();       /* prevent CheckDeadLock from running */
+                       LockErrorCleanup();     /* prevent CheckDeadLock from running */
                        DisableNotifyInterrupt();
                        DisableCatchupInterrupt();
                        InterruptHoldoffCount--;
@@ -2833,7 +2833,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
                        /* bump holdoff count to make ProcessInterrupts() a no-op */
                        /* until we are done getting ready for it */
                        InterruptHoldoffCount++;
-                       LockWaitCancel();       /* prevent CheckDeadLock from running */
+                       LockErrorCleanup();     /* prevent CheckDeadLock from running */
                        DisableNotifyInterrupt();
                        DisableCatchupInterrupt();
                        InterruptHoldoffCount--;
index 17b9ed698e6c872ea7faca87dcca33d8ad017498..0b561159ee7a1b5465eb85db94609dff3b8ed5db 100644 (file)
@@ -491,6 +491,7 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
                                        bool sessionLock,
                                        bool dontWait,
                                        bool report_memory_error);
+extern void AbortStrongLockAcquire(void);
 extern bool LockRelease(const LOCKTAG *locktag,
                        LOCKMODE lockmode, bool sessionLock);
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
index eba7acec9e482f0a674c3cac60985e48d917e8c4..3983f33b50072b55fde6968d4aeaca99c12cce76 100644 (file)
@@ -227,7 +227,7 @@ extern int  ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable);
 extern PGPROC *ProcWakeup(PGPROC *proc, int waitStatus);
 extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
 extern bool IsWaitingForLock(void);
-extern void LockWaitCancel(void);
+extern void LockErrorCleanup(void);
 
 extern void ProcWaitForSignal(void);
 extern void ProcSendSignal(int pid);