Fix longstanding recursion hazard in sinval message processing.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 7 Sep 2018 22:04:38 +0000 (18:04 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 7 Sep 2018 22:04:38 +0000 (18:04 -0400)
LockRelationOid and sibling routines supposed that, if our session already
holds the lock they were asked to acquire, they could skip calling
AcceptInvalidationMessages on the grounds that we must have already read
any remote sinval messages issued against the relation being locked.
This is normally true, but there's a critical special case where it's not:
processing inside AcceptInvalidationMessages might attempt to access system
relations, resulting in a recursive call to acquire a relation lock.

Hence, if the outer call had acquired that same system catalog lock, we'd
fall through, despite the possibility that there's an as-yet-unread sinval
message for that system catalog.  This could, for example, result in
failure to access a system catalog or index that had just been processed
by VACUUM FULL.  This is the explanation for buildfarm failures we've been
seeing intermittently for the past three months.  The bug is far older
than that, but commits a54e1f158 et al added a new recursion case within
AcceptInvalidationMessages that is apparently easier to hit than any
previous case.

To fix this, we must not skip calling AcceptInvalidationMessages until
we have *finished* a call to it since acquiring a relation lock, not
merely acquired the lock.  (There's already adequate logic inside
AcceptInvalidationMessages to deal with being called recursively.)
Fortunately, we can implement that at trivial cost, by adding a flag
to LOCALLOCK hashtable entries that tracks whether we know we have
completed such a call.

There is an API hazard added by this patch for external callers of
LockAcquire: if anything is testing for LOCKACQUIRE_ALREADY_HELD,
it might be fooled by the new return code LOCKACQUIRE_ALREADY_CLEAR
into thinking the lock wasn't already held.  This should be a fail-soft
condition, though, unless something very bizarre is being done in
response to the test.

Also, I added an additional output argument to LockAcquireExtended,
assuming that that probably isn't called by any outside code given
the very limited usefulness of its additional functionality.

Back-patch to all supported branches.

Discussion: https://postgr.es/m/12259.1532117714@sss.pgh.pa.us

src/backend/storage/ipc/standby.c
src/backend/storage/lmgr/lmgr.c
src/backend/storage/lmgr/lock.c
src/include/storage/lock.h

index 6ccbe58f8f6525fb1394885e4538f5ae03bfe11f..58e1e1c1e3a4467ccbb1bb7a3c976e78bb0e4038 100644 (file)
@@ -395,8 +395,8 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
        ResolveRecoveryConflictWithVirtualXIDs(backends,
                                             PROCSIG_RECOVERY_CONFLICT_LOCK);
 
-       if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
-           != LOCKACQUIRE_NOT_AVAIL)
+       if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true,
+                               false, NULL) != LOCKACQUIRE_NOT_AVAIL)
            lock_acquired = true;
    }
 }
@@ -631,8 +631,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
     */
    SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
 
-   if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
-       == LOCKACQUIRE_NOT_AVAIL)
+   if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true,
+                           false, NULL) == LOCKACQUIRE_NOT_AVAIL)
        ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
 }
 
index 3e6b8a995aee9bfb8561de815a555dc5c30b51f1..9027cbbba4585ffad58f3b6018430f4f76804eae 100644 (file)
@@ -87,11 +87,12 @@ void
 LockRelationOid(Oid relid, LOCKMODE lockmode)
 {
    LOCKTAG     tag;
+   LOCALLOCK  *locallock;
    LockAcquireResult res;
 
    SetLocktagRelationOid(&tag, relid);
 
-   res = LockAcquire(&tag, lockmode, false, false);
+   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
    /*
     * Now that we have the lock, check for invalidation messages, so that we
@@ -102,9 +103,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
     * relcache entry in an undesirable way.  (In the case where our own xact
     * modifies the rel, the relcache update happens via
     * CommandCounterIncrement, not here.)
+    *
+    * However, in corner cases where code acts on tables (usually catalogs)
+    * recursively, we might get here while still processing invalidation
+    * messages in some outer execution of this function or a sibling.  The
+    * "cleared" status of the lock tells us whether we really are done
+    * absorbing relevant inval messages.
     */
-   if (res != LOCKACQUIRE_ALREADY_HELD)
+   if (res != LOCKACQUIRE_ALREADY_CLEAR)
+   {
        AcceptInvalidationMessages();
+       MarkLockClear(locallock);
+   }
 }
 
 /*
@@ -120,11 +130,12 @@ bool
 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 {
    LOCKTAG     tag;
+   LOCALLOCK  *locallock;
    LockAcquireResult res;
 
    SetLocktagRelationOid(&tag, relid);
 
-   res = LockAcquire(&tag, lockmode, false, true);
+   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
    if (res == LOCKACQUIRE_NOT_AVAIL)
        return false;
@@ -133,8 +144,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
     * Now that we have the lock, check for invalidation messages; see notes
     * in LockRelationOid.
     */
-   if (res != LOCKACQUIRE_ALREADY_HELD)
+   if (res != LOCKACQUIRE_ALREADY_CLEAR)
+   {
        AcceptInvalidationMessages();
+       MarkLockClear(locallock);
+   }
 
    return true;
 }
@@ -181,20 +195,24 @@ void
 LockRelation(Relation relation, LOCKMODE lockmode)
 {
    LOCKTAG     tag;
+   LOCALLOCK  *locallock;
    LockAcquireResult res;
 
    SET_LOCKTAG_RELATION(tag,
                         relation->rd_lockInfo.lockRelId.dbId,
                         relation->rd_lockInfo.lockRelId.relId);
 
-   res = LockAcquire(&tag, lockmode, false, false);
+   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
 
    /*
     * Now that we have the lock, check for invalidation messages; see notes
     * in LockRelationOid.
     */
-   if (res != LOCKACQUIRE_ALREADY_HELD)
+   if (res != LOCKACQUIRE_ALREADY_CLEAR)
+   {
        AcceptInvalidationMessages();
+       MarkLockClear(locallock);
+   }
 }
 
 /*
@@ -208,13 +226,14 @@ bool
 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 {
    LOCKTAG     tag;
+   LOCALLOCK  *locallock;
    LockAcquireResult res;
 
    SET_LOCKTAG_RELATION(tag,
                         relation->rd_lockInfo.lockRelId.dbId,
                         relation->rd_lockInfo.lockRelId.relId);
 
-   res = LockAcquire(&tag, lockmode, false, true);
+   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
 
    if (res == LOCKACQUIRE_NOT_AVAIL)
        return false;
@@ -223,8 +242,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
     * Now that we have the lock, check for invalidation messages; see notes
     * in LockRelationOid.
     */
-   if (res != LOCKACQUIRE_ALREADY_HELD)
+   if (res != LOCKACQUIRE_ALREADY_CLEAR)
+   {
        AcceptInvalidationMessages();
+       MarkLockClear(locallock);
+   }
 
    return true;
 }
index 2e9214236a93b2879f023e9966b0f915ae5abc97..175c29cc2fdeed07df8e5a8324bf69ac93c3b1ba 100644 (file)
@@ -658,6 +658,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
  *     LOCKACQUIRE_NOT_AVAIL       lock not available, and dontWait=true
  *     LOCKACQUIRE_OK              lock successfully acquired
  *     LOCKACQUIRE_ALREADY_HELD    incremented count for lock already held
+ *     LOCKACQUIRE_ALREADY_CLEAR   incremented count for lock already clear
  *
  * In the normal case where dontWait=false and the caller doesn't need to
  * distinguish a freshly acquired lock from one already taken earlier in
@@ -674,7 +675,8 @@ LockAcquire(const LOCKTAG *locktag,
            bool sessionLock,
            bool dontWait)
 {
-   return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+   return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+                              true, NULL);
 }
 
 /*
@@ -685,13 +687,17 @@ LockAcquire(const LOCKTAG *locktag,
  * caller to note that the lock table is full and then begin taking
  * extreme action to reduce the number of other lock holders before
  * retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
  */
 LockAcquireResult
 LockAcquireExtended(const LOCKTAG *locktag,
                    LOCKMODE lockmode,
                    bool sessionLock,
                    bool dontWait,
-                   bool reportMemoryError)
+                   bool reportMemoryError,
+                   LOCALLOCK **locallockp)
 {
    LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
    LockMethod  lockMethodTable;
@@ -758,6 +764,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
        locallock->numLockOwners = 0;
        locallock->maxLockOwners = 8;
        locallock->holdsStrongLockCount = FALSE;
+       locallock->lockCleared = false;
        locallock->lockOwners = NULL;   /* in case next line fails */
        locallock->lockOwners = (LOCALLOCKOWNER *)
            MemoryContextAlloc(TopMemoryContext,
@@ -778,13 +785,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
    }
    hashcode = locallock->hashcode;
 
+   if (locallockp)
+       *locallockp = locallock;
+
    /*
     * If we already hold the lock, we can just increase the count locally.
+    *
+    * If lockCleared is already set, caller need not worry about absorbing
+    * sinval messages related to the lock's object.
     */
    if (locallock->nLocks > 0)
    {
        GrantLockLocal(locallock, owner);
-       return LOCKACQUIRE_ALREADY_HELD;
+       if (locallock->lockCleared)
+           return LOCKACQUIRE_ALREADY_CLEAR;
+       else
+           return LOCKACQUIRE_ALREADY_HELD;
    }
 
    /*
@@ -866,6 +882,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
                                           hashcode))
        {
            AbortStrongLockAcquire();
+           if (locallock->nLocks == 0)
+               RemoveLocalLock(locallock);
+           if (locallockp)
+               *locallockp = NULL;
            if (reportMemoryError)
                ereport(ERROR,
                        (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -900,6 +920,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
    {
        AbortStrongLockAcquire();
        LWLockRelease(partitionLock);
+       if (locallock->nLocks == 0)
+           RemoveLocalLock(locallock);
+       if (locallockp)
+           *locallockp = NULL;
        if (reportMemoryError)
            ereport(ERROR,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
@@ -965,6 +989,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
            LWLockRelease(partitionLock);
            if (locallock->nLocks == 0)
                RemoveLocalLock(locallock);
+           if (locallockp)
+               *locallockp = NULL;
            return LOCKACQUIRE_NOT_AVAIL;
        }
 
@@ -1566,6 +1592,20 @@ GrantAwaitedLock(void)
    GrantLockLocal(awaitedLock, awaitedOwner);
 }
 
+/*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+   Assert(locallock->nLocks > 0);
+   locallock->lockCleared = true;
+}
+
 /*
  * WaitOnLock -- wait to acquire a lock
  *
@@ -1833,6 +1873,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
    if (locallock->nLocks > 0)
        return TRUE;
 
+   /*
+    * At this point we can no longer suppose we are clear of invalidation
+    * messages related to this lock.  Although we'll delete the LOCALLOCK
+    * object before any intentional return from this routine, it seems worth
+    * the trouble to explicitly reset lockCleared right now, just in case
+    * some error prevents us from deleting the LOCALLOCK.
+    */
+   locallock->lockCleared = false;
+
    /* Attempt fast release of any lock eligible for the fast path. */
    if (EligibleForRelationFastPath(locktag, lockmode) &&
        FastPathLocalUseCount > 0)
index 4c49e3c6e69ad69ae37a4b222bc4b3eed49a9cf9..4e7d0aeeccd639febc72898eb05ebdc1960231b7 100644 (file)
@@ -425,6 +425,7 @@ typedef struct LOCALLOCK
    int         numLockOwners;  /* # of relevant ResourceOwners */
    int         maxLockOwners;  /* allocated size of array */
    bool        holdsStrongLockCount;   /* bumped FastPathStrongRelationLocks */
+   bool        lockCleared;    /* we read all sinval msgs for lock */
    LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
 } LOCALLOCK;
 
@@ -459,7 +460,8 @@ typedef enum
 {
    LOCKACQUIRE_NOT_AVAIL,      /* lock not available, and dontWait=true */
    LOCKACQUIRE_OK,             /* lock successfully acquired */
-   LOCKACQUIRE_ALREADY_HELD    /* incremented count for lock already held */
+   LOCKACQUIRE_ALREADY_HELD,   /* incremented count for lock already held */
+   LOCKACQUIRE_ALREADY_CLEAR   /* incremented count for lock already clear */
 } LockAcquireResult;
 
 /* Deadlock states identified by DeadLockCheck() */
@@ -503,8 +505,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
                    LOCKMODE lockmode,
                    bool sessionLock,
                    bool dontWait,
-                   bool report_memory_error);
+                   bool reportMemoryError,
+                   LOCALLOCK **locallockp);
 extern void AbortStrongLockAcquire(void);
+extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
            LOCKMODE lockmode, bool sessionLock);
 extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);