Improve and cleanup ProcArrayAdd(), ProcArrayRemove().
authorAndres Freund <andres@anarazel.de>
Sat, 12 Jun 2021 04:22:21 +0000 (21:22 -0700)
committerAndres Freund <andres@anarazel.de>
Sat, 12 Jun 2021 04:33:07 +0000 (21:33 -0700)
941697c3c1a changed ProcArrayAdd()/Remove() substantially. As reported by
zhanyi, I missed that due to the introduction of the PGPROC->pgxactoff
ProcArrayRemove() does not need to search for the position in
procArray->pgprocnos anymore - that's pgxactoff. Remove the search loop.

The removal of the search loop reduces assertion coverage a bit - but we can
easily do better than before by adding assertions to other loops over
pgprocnos elements.

Also do a bit of cleanup, mostly by reducing line lengths by introducing local
helper variables and adding newlines.

Author: zhanyi <w@hidva.com>
Author: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/tencent_5624AA3B116B3D1C31CA9744@qq.com

src/backend/storage/ipc/procarray.c

index 42a89fc5dc9ff255c7a114a395d819390add9b3e..085bd1e40778ed4a56b812e3838e3ddf193130dd 100644 (file)
@@ -446,6 +446,7 @@ ProcArrayAdd(PGPROC *proc)
 {
        ProcArrayStruct *arrayP = procArray;
        int                     index;
+       int                     movecount;
 
        /* See ProcGlobal comment explaining why both locks are held */
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -474,33 +475,48 @@ ProcArrayAdd(PGPROC *proc)
         */
        for (index = 0; index < arrayP->numProcs; index++)
        {
-               /*
-                * If we are the first PGPROC or if we have found our right position
-                * in the array, break
-                */
-               if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
+               int                     procno PG_USED_FOR_ASSERTS_ONLY = arrayP->pgprocnos[index];
+
+               Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
+               Assert(allProcs[procno].pgxactoff == index);
+
+               /* If we have found our right position in the array, break */
+               if (arrayP->pgprocnos[index] > proc->pgprocno)
                        break;
        }
 
-       memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
-                       (arrayP->numProcs - index) * sizeof(*arrayP->pgprocnos));
-       memmove(&ProcGlobal->xids[index + 1], &ProcGlobal->xids[index],
-                       (arrayP->numProcs - index) * sizeof(*ProcGlobal->xids));
-       memmove(&ProcGlobal->subxidStates[index + 1], &ProcGlobal->subxidStates[index],
-                       (arrayP->numProcs - index) * sizeof(*ProcGlobal->subxidStates));
-       memmove(&ProcGlobal->statusFlags[index + 1], &ProcGlobal->statusFlags[index],
-                       (arrayP->numProcs - index) * sizeof(*ProcGlobal->statusFlags));
+       movecount = arrayP->numProcs - index;
+       memmove(&arrayP->pgprocnos[index + 1],
+                       &arrayP->pgprocnos[index],
+                       movecount * sizeof(*arrayP->pgprocnos));
+       memmove(&ProcGlobal->xids[index + 1],
+                       &ProcGlobal->xids[index],
+                       movecount * sizeof(*ProcGlobal->xids));
+       memmove(&ProcGlobal->subxidStates[index + 1],
+                       &ProcGlobal->subxidStates[index],
+                       movecount * sizeof(*ProcGlobal->subxidStates));
+       memmove(&ProcGlobal->statusFlags[index + 1],
+                       &ProcGlobal->statusFlags[index],
+                       movecount * sizeof(*ProcGlobal->statusFlags));
 
        arrayP->pgprocnos[index] = proc->pgprocno;
+       proc->pgxactoff = index;
        ProcGlobal->xids[index] = proc->xid;
        ProcGlobal->subxidStates[index] = proc->subxidStatus;
        ProcGlobal->statusFlags[index] = proc->statusFlags;
 
        arrayP->numProcs++;
 
+       /* adjust pgxactoff for all following PGPROCs */
+       index++;
        for (; index < arrayP->numProcs; index++)
        {
-               allProcs[arrayP->pgprocnos[index]].pgxactoff = index;
+               int                     procno = arrayP->pgprocnos[index];
+
+               Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
+               Assert(allProcs[procno].pgxactoff == index - 1);
+
+               allProcs[procno].pgxactoff = index;
        }
 
        /*
@@ -525,7 +541,8 @@ void
 ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 {
        ProcArrayStruct *arrayP = procArray;
-       int                     index;
+       int                     myoff;
+       int                     movecount;
 
 #ifdef XIDCACHE_DEBUG
        /* dump stats at backend shutdown, but not prepared-xact end */
@@ -537,11 +554,14 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
        LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
 
-       Assert(ProcGlobal->allProcs[arrayP->pgprocnos[proc->pgxactoff]].pgxactoff == proc->pgxactoff);
+       myoff = proc->pgxactoff;
+
+       Assert(myoff >= 0 && myoff < arrayP->numProcs);
+       Assert(ProcGlobal->allProcs[arrayP->pgprocnos[myoff]].pgxactoff == myoff);
 
        if (TransactionIdIsValid(latestXid))
        {
-               Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
+               Assert(TransactionIdIsValid(ProcGlobal->xids[myoff]));
 
                /* Advance global latestCompletedXid while holding the lock */
                MaintainLatestCompletedXid(latestXid);
@@ -549,57 +569,60 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
                /* Same with xactCompletionCount  */
                ShmemVariableCache->xactCompletionCount++;
 
-               ProcGlobal->xids[proc->pgxactoff] = 0;
-               ProcGlobal->subxidStates[proc->pgxactoff].overflowed = false;
-               ProcGlobal->subxidStates[proc->pgxactoff].count = 0;
+               ProcGlobal->xids[myoff] = InvalidTransactionId;
+               ProcGlobal->subxidStates[myoff].overflowed = false;
+               ProcGlobal->subxidStates[myoff].count = 0;
        }
        else
        {
                /* Shouldn't be trying to remove a live transaction here */
-               Assert(!TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
+               Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
        }
 
-       Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff] == 0));
-       Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].count == 0));
-       Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].overflowed == false));
-       ProcGlobal->statusFlags[proc->pgxactoff] = 0;
+       Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
+       Assert(ProcGlobal->subxidStates[myoff].count == 0);
+       Assert(ProcGlobal->subxidStates[myoff].overflowed == false);
+
+       ProcGlobal->statusFlags[myoff] = 0;
+
+       /* Keep the PGPROC array sorted. See notes above */
+       movecount = arrayP->numProcs - myoff - 1;
+       memmove(&arrayP->pgprocnos[myoff],
+                       &arrayP->pgprocnos[myoff + 1],
+                       movecount * sizeof(*arrayP->pgprocnos));
+       memmove(&ProcGlobal->xids[myoff],
+                       &ProcGlobal->xids[myoff + 1],
+                       movecount * sizeof(*ProcGlobal->xids));
+       memmove(&ProcGlobal->subxidStates[myoff],
+                       &ProcGlobal->subxidStates[myoff + 1],
+                       movecount * sizeof(*ProcGlobal->subxidStates));
+       memmove(&ProcGlobal->statusFlags[myoff],
+                       &ProcGlobal->statusFlags[myoff + 1],
+                       movecount * sizeof(*ProcGlobal->statusFlags));
+
+       arrayP->pgprocnos[arrayP->numProcs - 1] = -1;   /* for debugging */
+       arrayP->numProcs--;
 
-       for (index = 0; index < arrayP->numProcs; index++)
+       /*
+        * Adjust pgxactoff of following procs for removed PGPROC (note that
+        * numProcs already has been decremented).
+        */
+       for (int index = myoff; index < arrayP->numProcs; index++)
        {
-               if (arrayP->pgprocnos[index] == proc->pgprocno)
-               {
-                       /* Keep the PGPROC array sorted. See notes above */
-                       memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
-                                       (arrayP->numProcs - index - 1) * sizeof(*arrayP->pgprocnos));
-                       memmove(&ProcGlobal->xids[index], &ProcGlobal->xids[index + 1],
-                                       (arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->xids));
-                       memmove(&ProcGlobal->subxidStates[index], &ProcGlobal->subxidStates[index + 1],
-                                       (arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->subxidStates));
-                       memmove(&ProcGlobal->statusFlags[index], &ProcGlobal->statusFlags[index + 1],
-                                       (arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->statusFlags));
-
-                       arrayP->pgprocnos[arrayP->numProcs - 1] = -1;   /* for debugging */
-                       arrayP->numProcs--;
-
-                       /* adjust for removed PGPROC */
-                       for (; index < arrayP->numProcs; index++)
-                               allProcs[arrayP->pgprocnos[index]].pgxactoff--;
+               int                     procno = arrayP->pgprocnos[index];
 
-                       /*
-                        * Release in reversed acquisition order, to reduce frequency of
-                        * having to wait for XidGenLock while holding ProcArrayLock.
-                        */
-                       LWLockRelease(XidGenLock);
-                       LWLockRelease(ProcArrayLock);
-                       return;
-               }
+               Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
+               Assert(allProcs[procno].pgxactoff - 1 == index);
+
+               allProcs[procno].pgxactoff = index;
        }
 
-       /* Oops */
+       /*
+        * Release in reversed acquisition order, to reduce frequency of having to
+        * wait for XidGenLock while holding ProcArrayLock.
+        */
        LWLockRelease(XidGenLock);
        LWLockRelease(ProcArrayLock);
-
-       elog(LOG, "failed to find proc %p in ProcArray", proc);
 }