Fix race condition in CheckTargetForConflictsIn.
authorRobert Haas <rhaas@postgresql.org>
Thu, 19 May 2011 16:11:05 +0000 (12:11 -0400)
committerRobert Haas <rhaas@postgresql.org>
Thu, 19 May 2011 16:12:04 +0000 (12:12 -0400)
Dan Ports

src/backend/storage/lmgr/predicate.c

index 3b3158efe55d24cb86d6e4d04d64dfc61647842f..dc53a7ab5b130a0263ef10b826c0edd1943d74c3 100644 (file)
@@ -3638,6 +3638,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
    LWLockId    partitionLock;
    PREDICATELOCKTARGET *target;
    PREDICATELOCK *predlock;
+   PREDICATELOCK *mypredlock = NULL;
+   PREDICATELOCKTAG mypredlocktag;
 
    Assert(MySerializableXact != InvalidSerializableXact);
 
@@ -3683,139 +3685,21 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
        if (sxact == MySerializableXact)
        {
            /*
-            * If we're getting a write lock on the tuple and we're not in a
-            * subtransaction, we don't need a predicate (SIREAD) lock.  We
-            * can't use this optimization within a subtransaction because the
-            * subtransaction could be rolled back, and we would be left
-            * without any lock at the top level.
+            * If we're getting a write lock on a tuple, we don't need
+            * a predicate (SIREAD) lock on the same tuple. We can
+            * safely remove our SIREAD lock, but we'll defer doing so
+            * until after the loop because that requires upgrading to
+            * an exclusive partition lock.
             *
-            * At this point our transaction already has an ExclusiveRowLock
-            * on the relation, so we are OK to drop the predicate lock on the
-            * tuple, if found, without fearing that another write against the
-            * tuple will occur before the MVCC information makes it to the
-            * buffer.
+            * We can't use this optimization within a subtransaction
+            * because the subtransaction could roll back, and we
+            * would be left without any lock at the top level.
             */
            if (!IsSubTransaction()
                && GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
            {
-               uint32      predlockhashcode;
-               PREDICATELOCKTARGET *rmtarget = NULL;
-               PREDICATELOCK *rmpredlock;
-               LOCALPREDICATELOCK *locallock,
-                          *rmlocallock;
-
-               /*
-                * This is a tuple on which we have a tuple predicate lock. We
-                * only have shared LW locks now; release those, and get
-                * exclusive locks only while we modify things.
-                */
-               LWLockRelease(SerializableXactHashLock);
-               LWLockRelease(partitionLock);
-               LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
-               LWLockAcquire(partitionLock, LW_EXCLUSIVE);
-               LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
-
-               /*
-                * Remove the predicate lock from shared memory, if it wasn't
-                * removed while the locks were released.  One way that could
-                * happen is from autovacuum cleaning up an index.
-                */
-               predlockhashcode = PredicateLockHashCodeFromTargetHashCode
-                   (&(predlock->tag), targettaghash);
-               rmpredlock = (PREDICATELOCK *)
-                   hash_search_with_hash_value(PredicateLockHash,
-                                               &(predlock->tag),
-                                               predlockhashcode,
-                                               HASH_FIND, NULL);
-               if (rmpredlock)
-               {
-                   Assert(rmpredlock == predlock);
-
-                   SHMQueueDelete(predlocktargetlink);
-                   SHMQueueDelete(&(predlock->xactLink));
-
-                   rmpredlock = (PREDICATELOCK *)
-                       hash_search_with_hash_value(PredicateLockHash,
-                                                   &(predlock->tag),
-                                                   predlockhashcode,
-                                                   HASH_REMOVE, NULL);
-                   Assert(rmpredlock == predlock);
-
-                   RemoveTargetIfNoLongerUsed(target, targettaghash);
-
-                   LWLockRelease(SerializableXactHashLock);
-                   LWLockRelease(partitionLock);
-                   LWLockRelease(SerializablePredicateLockListLock);
-
-                   locallock = (LOCALPREDICATELOCK *)
-                       hash_search_with_hash_value(LocalPredicateLockHash,
-                                                   targettag, targettaghash,
-                                                   HASH_FIND, NULL);
-
-                   /*
-                    * Remove entry in local lock table if it exists and has
-                    * no children. It's OK if it doesn't exist; that means
-                    * the lock was transferred to a new target by a different
-                    * backend.
-                    */
-                   if (locallock != NULL)
-                   {
-                       locallock->held = false;
-
-                       if (locallock->childLocks == 0)
-                       {
-                           rmlocallock = (LOCALPREDICATELOCK *)
-                               hash_search_with_hash_value(LocalPredicateLockHash,
-                                                   targettag, targettaghash,
-                                                         HASH_REMOVE, NULL);
-                           Assert(rmlocallock == locallock);
-                       }
-                   }
-
-                   DecrementParentLocks(targettag);
-
-                   /*
-                    * If we've cleaned up the last of the predicate locks for
-                    * the target, bail out before re-acquiring the locks.
-                    */
-                   if (rmtarget)
-                       return;
-
-                   /*
-                    * The list has been altered.  Start over at the front.
-                    */
-                   LWLockAcquire(partitionLock, LW_SHARED);
-                   nextpredlock = (PREDICATELOCK *)
-                       SHMQueueNext(&(target->predicateLocks),
-                                    &(target->predicateLocks),
-                                    offsetof(PREDICATELOCK, targetLink));
-
-                   LWLockAcquire(SerializableXactHashLock, LW_SHARED);
-               }
-               else
-               {
-                   /*
-                    * The predicate lock was cleared while we were attempting
-                    * to upgrade our lightweight locks. Revert to the shared
-                    * locks.
-                    */
-                   LWLockRelease(SerializableXactHashLock);
-                   LWLockRelease(partitionLock);
-                   LWLockRelease(SerializablePredicateLockListLock);
-                   LWLockAcquire(partitionLock, LW_SHARED);
-
-                   /*
-                    * The list may have been altered by another process while
-                    * we weren't holding the partition lock.  Start over at
-                    * the front.
-                    */
-                   nextpredlock = (PREDICATELOCK *)
-                       SHMQueueNext(&(target->predicateLocks),
-                                    &(target->predicateLocks),
-                                    offsetof(PREDICATELOCK, targetLink));
-
-                   LWLockAcquire(SerializableXactHashLock, LW_SHARED);
-               }
+               mypredlock = predlock;
+               mypredlocktag = predlock->tag;
            }
        }
        else if (!SxactIsRolledBack(sxact)
@@ -3849,6 +3733,73 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
    }
    LWLockRelease(SerializableXactHashLock);
    LWLockRelease(partitionLock);
+
+   /*
+    * If we found one of our own SIREAD locks to remove, remove it
+    * now.
+    *
+    * At this point our transaction already has an ExclusiveRowLock
+    * on the relation, so we are OK to drop the predicate lock on the
+    * tuple, if found, without fearing that another write against the
+    * tuple will occur before the MVCC information makes it to the
+    * buffer.
+    */
+   if (mypredlock != NULL)
+   {
+       uint32      predlockhashcode;
+       PREDICATELOCK *rmpredlock;
+
+       LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+       LWLockAcquire(partitionLock, LW_EXCLUSIVE);
+       LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+       /*
+        * Remove the predicate lock from shared memory, if it wasn't
+        * removed while the locks were released.  One way that could
+        * happen is from autovacuum cleaning up an index.
+        */
+       predlockhashcode = PredicateLockHashCodeFromTargetHashCode
+           (&mypredlocktag, targettaghash);
+       rmpredlock = (PREDICATELOCK *)
+           hash_search_with_hash_value(PredicateLockHash,
+                                       &mypredlocktag,
+                                       predlockhashcode,
+                                       HASH_FIND, NULL);
+       if (rmpredlock != NULL)
+       {
+           Assert(rmpredlock == mypredlock);
+
+           SHMQueueDelete(&(mypredlock->targetLink));
+           SHMQueueDelete(&(mypredlock->xactLink));
+
+           rmpredlock = (PREDICATELOCK *)
+               hash_search_with_hash_value(PredicateLockHash,
+                                           &mypredlocktag,
+                                           predlockhashcode,
+                                           HASH_REMOVE, NULL);
+           Assert(rmpredlock == mypredlock);
+
+           RemoveTargetIfNoLongerUsed(target, targettaghash);
+       }
+
+       LWLockRelease(SerializableXactHashLock);
+       LWLockRelease(partitionLock);
+       LWLockRelease(SerializablePredicateLockListLock);
+       
+       if (rmpredlock != NULL)
+       {
+           /*
+            * Remove entry in local lock table if it exists. It's OK
+            * if it doesn't exist; that means the lock was
+            * transferred to a new target by a different backend.
+            */
+           hash_search_with_hash_value(LocalPredicateLockHash,
+                                       targettag, targettaghash,
+                                       HASH_REMOVE, NULL);
+
+           DecrementParentLocks(targettag);
+       }
+   }
 }
 
 /*