Rewrite the sinval messaging mechanism to reduce contention and avoid
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 19 Jun 2008 21:32:56 +0000 (21:32 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 19 Jun 2008 21:32:56 +0000 (21:32 +0000)
unnecessary cache resets.  The major changes are:

* When the queue overflows, we only issue a cache reset to the specific
backend or backends that still haven't read the oldest message, rather
than resetting everyone as in the original coding.

* When we observe backend(s) falling well behind, we signal SIGUSR1
to only one backend, the one that is furthest behind and doesn't already
have a signal outstanding for it.  When it finishes catching up, it will
in turn signal SIGUSR1 to the next-furthest-back guy, if there is one that
is far enough behind to justify a signal.  The PMSIGNAL_WAKEN_CHILDREN
mechanism is removed.

* We don't attempt to clean out dead messages after every message-receipt
operation; rather, we do it on the insertion side, and only when the queue
fullness passes certain thresholds.

* Split SInvalLock into SInvalReadLock and SInvalWriteLock so that readers
don't block writers nor vice versa (except during the infrequent queue
cleanout operations).

* Transfer multiple sinval messages for each acquisition of a read or
write lock.

src/backend/postmaster/postmaster.c
src/backend/storage/ipc/sinval.c
src/backend/storage/ipc/sinvaladt.c
src/backend/utils/cache/inval.c
src/include/storage/lwlock.h
src/include/storage/pmsignal.h
src/include/storage/sinval.h
src/include/storage/sinvaladt.h

index 751bb8244d383d46ce3dd5366d9906dea395c8ca..73d6dae56d8f7ea9674bc8235c01132e1aee05c3 100644 (file)
@@ -37,7 +37,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.558 2008/06/06 22:35:22 alvherre Exp $
+ *   $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.559 2008/06/19 21:32:56 tgl Exp $
  *
  * NOTES
  *
@@ -3829,16 +3829,6 @@ sigusr1_handler(SIGNAL_ARGS)
        load_role();
    }
 
-   if (CheckPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN))
-   {
-       /*
-        * Send SIGUSR1 to all children (triggers CatchupInterruptHandler).
-        * See storage/ipc/sinval[adt].c for the use of this.
-        */
-       if (Shutdown <= SmartShutdown)
-           SignalChildren(SIGUSR1);
-   }
-
    if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) &&
        PgArchPID != 0)
    {
index 4b8a8f1afbdc487131c8738a1648f1b5c2aa340f..e2c6ca2aec9b976105015a6ed34bbe7b09bcd1df 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.85 2008/03/17 11:50:26 alvherre Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -17,9 +17,7 @@
 #include "access/xact.h"
 #include "commands/async.h"
 #include "miscadmin.h"
-#include "storage/backendid.h"
 #include "storage/ipc.h"
-#include "storage/proc.h"
 #include "storage/sinvaladt.h"
 #include "utils/inval.h"
 
@@ -27,9 +25,9 @@
 /*
  * Because backends sitting idle will not be reading sinval events, we
  * need a way to give an idle backend a swift kick in the rear and make
- * it catch up before the sinval queue overflows and forces everyone
- * through a cache reset exercise. This is done by broadcasting SIGUSR1
- * to all backends when the queue is threatening to become full.
+ * it catch up before the sinval queue overflows and forces it to go
+ * through a cache reset exercise. This is done by sending SIGUSR1
+ * to any backend that gets too far behind.
  *
  * State for catchup events consists of two flags: one saying whether
  * the signal handler is currently allowed to call ProcessCatchupEvent
@@ -47,67 +45,101 @@ static void ProcessCatchupEvent(void);
 
 
 /*
- * SendSharedInvalidMessage
- * Add a shared-cache-invalidation message to the global SI message queue.
+ * SendSharedInvalidMessages
+ * Add shared-cache-invalidation message(s) to the global SI message queue.
  */
 void
-SendSharedInvalidMessage(SharedInvalidationMessage *msg)
+SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
 {
-   bool        insertOK;
-
-   insertOK = SIInsertDataEntry(msg);
-   if (!insertOK)
-       elog(DEBUG4, "SI buffer overflow");
+   SIInsertDataEntries(msgs, n);
 }
 
 /*
  * ReceiveSharedInvalidMessages
  *     Process shared-cache-invalidation messages waiting for this backend
  *
+ * We guarantee to process all messages that had been queued before the
+ * routine was entered.  It is of course possible for more messages to get
+ * queued right after our last SIGetDataEntries call.
+ *
  * NOTE: it is entirely possible for this routine to be invoked recursively
  * as a consequence of processing inside the invalFunction or resetFunction.
- * Hence, we must be holding no SI resources when we call them.  The only
- * bad side-effect is that SIDelExpiredDataEntries might be called extra
- * times on the way out of a nested call.
+ * Furthermore, such a recursive call must guarantee that all outstanding
+ * inval messages have been processed before it exits.  This is the reason
+ * for the strange-looking choice to use a statically allocated buffer array
+ * and counters; it's so that a recursive call can process messages already
+ * sucked out of sinvaladt.c.
  */
 void
 ReceiveSharedInvalidMessages(
                      void (*invalFunction) (SharedInvalidationMessage *msg),
                             void (*resetFunction) (void))
 {
-   SharedInvalidationMessage data;
-   int         getResult;
-   bool        gotMessage = false;
+#define MAXINVALMSGS 32
+   static SharedInvalidationMessage messages[MAXINVALMSGS];
+   /*
+    * We use volatile here to prevent bugs if a compiler doesn't realize
+    * that recursion is a possibility ...
+    */
+   static volatile int nextmsg = 0;
+   static volatile int nummsgs = 0;
 
-   for (;;)
+   /* Deal with any messages still pending from an outer recursion */
+   while (nextmsg < nummsgs)
    {
-       /*
-        * We can discard any pending catchup event, since we will not exit
-        * this loop until we're fully caught up.
-        */
-       catchupInterruptOccurred = 0;
+       SharedInvalidationMessage *msg = &messages[nextmsg++];
 
-       getResult = SIGetDataEntry(MyBackendId, &data);
+       invalFunction(msg);
+   }
+
+   do
+   {
+       int         getResult;
+
+       nextmsg = nummsgs = 0;
+
+       /* Try to get some more messages */
+       getResult = SIGetDataEntries(messages, MAXINVALMSGS);
 
-       if (getResult == 0)
-           break;              /* nothing more to do */
        if (getResult < 0)
        {
            /* got a reset message */
            elog(DEBUG4, "cache state reset");
            resetFunction();
+           break;              /* nothing more to do */
        }
-       else
+
+       /* Process them, being wary that a recursive call might eat some */
+       nextmsg = 0;
+       nummsgs = getResult;
+
+       while (nextmsg < nummsgs)
        {
-           /* got a normal data message */
-           invalFunction(&data);
+           SharedInvalidationMessage *msg = &messages[nextmsg++];
+
+           invalFunction(msg);
        }
-       gotMessage = true;
-   }
 
-   /* If we got any messages, try to release dead messages */
-   if (gotMessage)
-       SIDelExpiredDataEntries(false);
+       /*
+        * We only need to loop if the last SIGetDataEntries call (which
+        * might have been within a recursive call) returned a full buffer.
+        */
+   } while (nummsgs == MAXINVALMSGS);
+
+   /*
+    * We are now caught up.  If we received a catchup signal, reset that
+    * flag, and call SICleanupQueue().  This is not so much because we
+    * need to flush dead messages right now, as that we want to pass on
+    * the catchup signal to the next slowest backend.  "Daisy chaining" the
+    * catchup signal this way avoids creating spikes in system load for
+    * what should be just a background maintenance activity.
+    */
+   if (catchupInterruptOccurred)
+   {
+       catchupInterruptOccurred = 0;
+       elog(DEBUG4, "sinval catchup complete, cleaning queue");
+       SICleanupQueue(false, 0);
+   }
 }
 
 
index ddbc08ef55f608eebf6d2146a82c9e30291a4eb2..0befc4a93419a526d857219c7a80dc60a38e3436 100644 (file)
@@ -1,24 +1,25 @@
 /*-------------------------------------------------------------------------
  *
  * sinvaladt.c
- *   POSTGRES shared cache invalidation segment definitions.
+ *   POSTGRES shared cache invalidation data manager.
  *
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.70 2008/06/17 20:07:08 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include <signal.h>
+#include <unistd.h>
+
 #include "miscadmin.h"
 #include "storage/backendid.h"
 #include "storage/ipc.h"
-#include "storage/lwlock.h"
-#include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
 #include "storage/sinvaladt.h"
 /*
  * Conceptually, the shared cache invalidation messages are stored in an
  * infinite array, where maxMsgNum is the next array subscript to store a
- * submitted message in, minMsgNum is the smallest array subscript containing a
- * message not yet read by all backends, and we always have maxMsgNum >=
+ * submitted message in, minMsgNum is the smallest array subscript containing
+ * message not yet read by all backends, and we always have maxMsgNum >=
  * minMsgNum.  (They are equal when there are no messages pending.)  For each
  * active backend, there is a nextMsgNum pointer indicating the next message it
  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
  * backend.
  *
+ * (In the current implementation, minMsgNum is a lower bound for the
+ * per-process nextMsgNum values, but it isn't rigorously kept equal to the
+ * smallest nextMsgNum --- it may lag behind.  We only update it when
+ * SICleanupQueue is called, and we try not to do that often.)
+ *
  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
  * entries.  We translate MsgNum values into circular-buffer indexes by
  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
  * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
- * in the buffer.  If the buffer does overflow, we reset it to empty and
- * force each backend to "reset", ie, discard all its invalidatable state.
+ * in the buffer.  If the buffer does overflow, we recover by setting the
+ * "reset" flag for each backend that has fallen too far behind.  A backend
+ * that is in "reset" state is ignored while determining minMsgNum.  When
+ * it does finally attempt to receive inval messages, it must discard all
+ * its invalidatable state, since it won't know what it missed.
+ *
+ * To reduce the probability of needing resets, we send a "catchup" interrupt
+ * to any backend that seems to be falling unreasonably far behind.  The
+ * normal behavior is that at most one such interrupt is in flight at a time;
+ * when a backend completes processing a catchup interrupt, it executes
+ * SICleanupQueue, which will signal the next-furthest-behind backend if
+ * needed.  This avoids undue contention from multiple backends all trying
+ * to catch up at once.  However, the furthest-back backend might be stuck
+ * in a state where it can't catch up.  Eventually it will get reset, so it
+ * won't cause any more problems for anyone but itself.  But we don't want
+ * to find that a bunch of other backends are now too close to the reset
+ * threshold to be saved.  So SICleanupQueue is designed to occasionally
+ * send extra catchup interrupts as the queue gets fuller, to backends that
+ * are far behind and haven't gotten one yet.  As long as there aren't a lot
+ * of "stuck" backends, we won't need a lot of extra interrupts, since ones
+ * that aren't stuck will propagate their interrupts to the next guy.
  *
  * We would have problems if the MsgNum values overflow an integer, so
  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
  * large so that we don't need to do this often.  It must be a multiple of
  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
  * to be moved when we do it.
+ *
+ * Access to the shared sinval array is protected by two locks, SInvalReadLock
+ * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
+ * authorizes them to modify their own ProcState but not to modify or even
+ * look at anyone else's.  When we need to perform array-wide updates,
+ * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
+ * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
+ * mode) to serialize adding messages to the queue.  Note that a writer
+ * can operate in parallel with one or more readers, because the writer
+ * has no need to touch anyone's ProcState, except in the infrequent cases
+ * when SICleanupQueue is needed.  The only point of overlap is that
+ * the writer might change maxMsgNum while readers are looking at it.
+ * This should be okay: we are assuming that fetching or storing an int
+ * is atomic, an assumption also made elsewhere in Postgres.  However
+ * readers mustn't assume that maxMsgNum isn't changing under them.
  */
 
 
  *
  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
+ *
+ * CLEANUP_MIN: the minimum number of messages that must be in the buffer
+ * before we bother to call SICleanupQueue.
+ *
+ * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
+ * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
+ *
+ * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
+ * behind before we'll send it SIGUSR1.
+ *
+ * WRITE_QUANTUM: the max number of messages to push into the buffer per
+ * iteration of SIInsertDataEntries.  Noncritical but should be less than
+ * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
+ * per iteration.
  */
 
 #define MAXNUMMESSAGES 4096
-#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
+#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
+#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
+#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
+#define WRITE_QUANTUM 64
 
 /* Per-backend state in shared invalidation structure */
 typedef struct ProcState
 {
-   /* nextMsgNum is -1 in an inactive ProcState array entry. */
-   int         nextMsgNum;     /* next message number to read, or -1 */
-   bool        resetState;     /* true, if backend has to reset its state */
+   /* procPid is zero in an inactive ProcState array entry. */
+   pid_t       procPid;        /* PID of backend, for signaling */
+   /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
+   int         nextMsgNum;     /* next message number to read */
+   bool        resetState;     /* backend needs to reset its state */
+   bool        signaled;       /* backend has been sent catchup signal */
+
+   /*
+    * Next LocalTransactionId to use for each idle backend slot.  We keep
+    * this here because it is indexed by BackendId and it is convenient to
+    * copy the value to and from local memory when MyBackendId is set.
+    * It's meaningless in an active ProcState entry.
+    */
+   LocalTransactionId nextLXID;
 } ProcState;
 
 /* Shared cache invalidation memory segment */
@@ -80,16 +149,10 @@ typedef struct SISeg
     */
    int         minMsgNum;      /* oldest message still needed */
    int         maxMsgNum;      /* next message number to be assigned */
+   int         nextThreshold;  /* # of messages to call SICleanupQueue */
    int         lastBackend;    /* index of last active procState entry, +1 */
    int         maxBackends;    /* size of procState array */
 
-   /*
-    * Next LocalTransactionId to use for each idle backend slot.  We keep
-    * this here because it is indexed by BackendId and it is convenient to
-    * copy the value to and from local memory when MyBackendId is set.
-    */
-   LocalTransactionId *nextLXID;       /* array of maxBackends entries */
-
    /*
     * Circular buffer holding shared-inval messages
     */
@@ -110,7 +173,6 @@ static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
 static LocalTransactionId nextLocalTransactionId;
 
 static void CleanupInvalidationState(int status, Datum arg);
-static void SISetProcStateInvalid(SISeg *segP);
 
 
 /*
@@ -124,8 +186,6 @@ SInvalShmemSize(void)
    size = offsetof(SISeg, procState);
    size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
 
-   size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
-
    return size;
 }
 
@@ -149,11 +209,10 @@ CreateSharedInvalidationState(void)
    if (found)
        return;
 
-   shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
-
    /* Clear message counters, save size of procState array */
    shmInvalBuffer->minMsgNum = 0;
    shmInvalBuffer->maxMsgNum = 0;
+   shmInvalBuffer->nextThreshold = CLEANUP_MIN;
    shmInvalBuffer->lastBackend = 0;
    shmInvalBuffer->maxBackends = MaxBackends;
 
@@ -162,9 +221,11 @@ CreateSharedInvalidationState(void)
    /* Mark all backends inactive, and initialize nextLXID */
    for (i = 0; i < shmInvalBuffer->maxBackends; i++)
    {
-       shmInvalBuffer->procState[i].nextMsgNum = -1;       /* inactive */
+       shmInvalBuffer->procState[i].procPid = 0;           /* inactive */
+       shmInvalBuffer->procState[i].nextMsgNum = 0;        /* meaningless */
        shmInvalBuffer->procState[i].resetState = false;
-       shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
+       shmInvalBuffer->procState[i].signaled = false;
+       shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
    }
 }
 
@@ -179,12 +240,19 @@ SharedInvalBackendInit(void)
    ProcState  *stateP = NULL;
    SISeg      *segP = shmInvalBuffer;
 
-   LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+   /*
+    * This can run in parallel with read operations, and for that matter
+    * with write operations; but not in parallel with additions and removals
+    * of backends, nor in parallel with SICleanupQueue.  It doesn't seem
+    * worth having a third lock, so we choose to use SInvalWriteLock to
+    * serialize additions/removals.
+    */
+   LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
    /* Look for a free entry in the procState array */
    for (index = 0; index < segP->lastBackend; index++)
    {
-       if (segP->procState[index].nextMsgNum < 0)      /* inactive slot? */
+       if (segP->procState[index].procPid == 0)        /* inactive slot? */
        {
            stateP = &segP->procState[index];
            break;
@@ -196,7 +264,7 @@ SharedInvalBackendInit(void)
        if (segP->lastBackend < segP->maxBackends)
        {
            stateP = &segP->procState[segP->lastBackend];
-           Assert(stateP->nextMsgNum < 0);
+           Assert(stateP->procPid == 0);
            segP->lastBackend++;
        }
        else
@@ -205,7 +273,7 @@ SharedInvalBackendInit(void)
             * out of procState slots: MaxBackends exceeded -- report normally
             */
            MyBackendId = InvalidBackendId;
-           LWLockRelease(SInvalLock);
+           LWLockRelease(SInvalWriteLock);
            ereport(FATAL,
                    (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
                     errmsg("sorry, too many clients already")));
@@ -214,21 +282,21 @@ SharedInvalBackendInit(void)
 
    MyBackendId = (stateP - &segP->procState[0]) + 1;
 
-#ifdef INVALIDDEBUG
-   elog(DEBUG2, "my backend id is %d", MyBackendId);
-#endif   /* INVALIDDEBUG */
+   elog(DEBUG4, "my backend id is %d", MyBackendId);
 
    /* Advertise assigned backend ID in MyProc */
    MyProc->backendId = MyBackendId;
 
    /* Fetch next local transaction ID into local memory */
-   nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
+   nextLocalTransactionId = stateP->nextLXID;
 
    /* mark myself active, with all extant messages already read */
+   stateP->procPid = MyProcPid;
    stateP->nextMsgNum = segP->maxMsgNum;
    stateP->resetState = false;
+   stateP->signaled = false;
 
-   LWLockRelease(SInvalLock);
+   LWLockRelease(SInvalWriteLock);
 
    /* register exit routine to mark my entry inactive at exit */
    on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
@@ -238,8 +306,7 @@ SharedInvalBackendInit(void)
  * CleanupInvalidationState
  *     Mark the current backend as no longer active.
  *
- * This function is called via on_shmem_exit() during backend shutdown,
- * so the caller has NOT acquired the lock for us.
+ * This function is called via on_shmem_exit() during backend shutdown.
  *
  * arg is really of type "SISeg*".
  */
@@ -247,227 +314,247 @@ static void
 CleanupInvalidationState(int status, Datum arg)
 {
    SISeg      *segP = (SISeg *) DatumGetPointer(arg);
+   ProcState  *stateP;
    int         i;
 
    Assert(PointerIsValid(segP));
 
-   LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+   LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+
+   stateP = &segP->procState[MyBackendId - 1];
 
    /* Update next local transaction ID for next holder of this backendID */
-   segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
+   stateP->nextLXID = nextLocalTransactionId;
 
    /* Mark myself inactive */
-   segP->procState[MyBackendId - 1].nextMsgNum = -1;
-   segP->procState[MyBackendId - 1].resetState = false;
+   stateP->procPid = 0;
+   stateP->nextMsgNum = 0;
+   stateP->resetState = false;
+   stateP->signaled = false;
 
    /* Recompute index of last active backend */
    for (i = segP->lastBackend; i > 0; i--)
    {
-       if (segP->procState[i - 1].nextMsgNum >= 0)
+       if (segP->procState[i - 1].procPid != 0)
            break;
    }
    segP->lastBackend = i;
 
-   LWLockRelease(SInvalLock);
+   LWLockRelease(SInvalWriteLock);
 }
 
 /*
- * SIInsertDataEntry
- *     Add a new invalidation message to the buffer.
- *
- * If we are unable to insert the message because the buffer is full,
- * then clear the buffer and assert the "reset" flag to each backend.
- * This will cause all the backends to discard *all* invalidatable state.
- *
- * Returns true for normal successful insertion, false if had to reset.
+ * SIInsertDataEntries
+ *     Add new invalidation message(s) to the buffer.
  */
-bool
-SIInsertDataEntry(SharedInvalidationMessage *data)
+void
+SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 {
-   int         numMsgs;
-   bool        signal_postmaster = false;
-   SISeg      *segP;
+   SISeg      *segP = shmInvalBuffer;
+
+   /*
+    * N can be arbitrarily large.  We divide the work into groups of no more
+    * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
+    * an unreasonably long time.  (This is not so much because we care about
+    * letting in other writers, as that some just-caught-up backend might be
+    * trying to do SICleanupQueue to pass on its signal, and we don't want it
+    * to have to wait a long time.)  Also, we need to consider calling
+    * SICleanupQueue every so often.
+    */
+   while (n > 0)
+   {
+       int     nthistime = Min(n, WRITE_QUANTUM);
+       int     numMsgs;
 
-   LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+       n -= nthistime;
 
-   segP = shmInvalBuffer;
-   numMsgs = segP->maxMsgNum - segP->minMsgNum;
+       LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 
-   /* Is the buffer full? */
-   if (numMsgs >= MAXNUMMESSAGES)
-   {
        /*
-        * Don't panic just yet: slowest backend might have consumed some
-        * messages but not yet have done SIDelExpiredDataEntries() to advance
-        * minMsgNum.  So, make sure minMsgNum is up-to-date.
+        * If the buffer is full, we *must* acquire some space.  Clean the
+        * queue and reset anyone who is preventing space from being freed.
+        * Otherwise, clean the queue only when it's exceeded the next
+        * fullness threshold.
         */
-       SIDelExpiredDataEntries(true);
        numMsgs = segP->maxMsgNum - segP->minMsgNum;
-       if (numMsgs >= MAXNUMMESSAGES)
+       if (numMsgs + nthistime > MAXNUMMESSAGES)
        {
-           /* Yup, it's definitely full, no choice but to reset */
-           SISetProcStateInvalid(segP);
-           LWLockRelease(SInvalLock);
-           return false;
+           SICleanupQueue(true, nthistime);
+           Assert((segP->maxMsgNum - segP->minMsgNum + nthistime) <= MAXNUMMESSAGES);
        }
-   }
-
-   /*
-    * Try to prevent table overflow.  When the table is 70% full send a
-    * WAKEN_CHILDREN request to the postmaster.  The postmaster will send a
-    * SIGUSR1 signal to all the backends, which will cause sinval.c to read
-    * any pending SI entries.
-    *
-    * This should never happen if all the backends are actively executing
-    * queries, but if a backend is sitting idle then it won't be starting
-    * transactions and so won't be reading SI entries.
-    */
-   if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster)
-       signal_postmaster = true;
-
-   /*
-    * Insert new message into proper slot of circular buffer
-    */
-   segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
-   segP->maxMsgNum++;
-
-   LWLockRelease(SInvalLock);
-
-   if (signal_postmaster)
-   {
-       elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
-       SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
-   }
-
-   return true;
-}
-
-/*
- * SISetProcStateInvalid
- *     Flush pending messages from buffer, assert reset flag for each backend
- *
- * This is used only to recover from SI buffer overflow.
- */
-static void
-SISetProcStateInvalid(SISeg *segP)
-{
-   int         i;
-
-   segP->minMsgNum = 0;
-   segP->maxMsgNum = 0;
+       else if (numMsgs >= segP->nextThreshold)
+           SICleanupQueue(true, 0);
 
-   for (i = 0; i < segP->lastBackend; i++)
-   {
-       if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
+       /*
+        * Insert new message(s) into proper slot of circular buffer
+        */
+       while (nthistime-- > 0)
        {
-           segP->procState[i].resetState = true;
-           segP->procState[i].nextMsgNum = 0;
+           segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++;
+           segP->maxMsgNum++;
        }
+
+       LWLockRelease(SInvalWriteLock);
    }
 }
 
 /*
- * SIGetDataEntry
- *     get next SI message for specified backend, if there is one
+ * SIGetDataEntries
+ *     get next SI message(s) for current backend, if there are any
  *
  * Possible return values:
- * 0: no SI message available
- * 1: next SI message has been extracted into *data
- *     (there may be more messages available after this one!)
- * -1: SI reset message extracted
+ * 0:   no SI message available
+ * n>0: next n SI messages have been extracted into data[]
+ * -1:   SI reset message extracted
+ *
+ * If the return value is less than the array size "datasize", the caller
+ * can assume that there are no more SI messages after the one(s) returned.
+ * Otherwise, another call is needed to collect more messages.
  *
- * NB: this can run in parallel with other instances of SIGetDataEntry
+ * NB: this can run in parallel with other instances of SIGetDataEntries
  * executing on behalf of other backends, since each instance will modify only
  * fields of its own backend's ProcState, and no instance will look at fields
- * of other backends' ProcStates.  We express this by grabbing SInvalLock in
- * shared mode.  Note that this is not exactly the normal (read-only)
+ * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
+ * in shared mode.  Note that this is not exactly the normal (read-only)
  * interpretation of a shared lock! Look closely at the interactions before
- * allowing SInvalLock to be grabbed in shared mode for any other reason!
+ * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
+ *
+ * NB: this can also run in parallel with SIInsertDataEntries.  It is not
+ * guaranteed that we will return any messages added after the routine is
+ * entered.
+ *
+ * Note: we assume that "datasize" is not so large that it might be important
+ * to break our hold on SInvalReadLock into segments.
  */
 int
-SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
+SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 {
-   ProcState  *stateP;
    SISeg      *segP;
+   ProcState  *stateP;
+   int         n;
    
-   LWLockAcquire(SInvalLock, LW_SHARED);
+   LWLockAcquire(SInvalReadLock, LW_SHARED);
 
    segP = shmInvalBuffer;
-   stateP = &segP->procState[backendId - 1];
+   stateP = &segP->procState[MyBackendId - 1];
 
    if (stateP->resetState)
    {
        /*
         * Force reset.  We can say we have dealt with any messages added
-        * since the reset, as well...
+        * since the reset, as well; and that means we should clear the
+        * signaled flag, too.
         */
-       stateP->resetState = false;
        stateP->nextMsgNum = segP->maxMsgNum;
-       LWLockRelease(SInvalLock);
+       stateP->resetState = false;
+       stateP->signaled = false;
+       LWLockRelease(SInvalReadLock);
        return -1;
    }
 
-   if (stateP->nextMsgNum >= segP->maxMsgNum)
-   {
-       LWLockRelease(SInvalLock);
-       return 0;               /* nothing to read */
-   }
-
    /*
-    * Retrieve message and advance my counter.
+    * Retrieve messages and advance backend's counter, until data array is
+    * full or there are no more messages.
+    *
+    * There may be other backends that haven't read the message(s), so we
+    * cannot delete them here.  SICleanupQueue() will eventually remove them
+    * from the queue.
+    *
+    * Note: depending on the compiler, we might read maxMsgNum only once
+    * here, or each time through the loop.  It doesn't matter (as long as
+    * each fetch is atomic).
     */
-   *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
-   stateP->nextMsgNum++;
+   n = 0;
+   while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum)
+   {
+       data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
+       stateP->nextMsgNum++;
+   }
 
    /*
-    * There may be other backends that haven't read the message, so we cannot
-    * delete it here. SIDelExpiredDataEntries() should be called to remove
-    * dead messages.
+    * Reset our "signaled" flag whenever we have caught up completely.
     */
+   if (stateP->nextMsgNum >= segP->maxMsgNum)
+       stateP->signaled = false;
 
-   LWLockRelease(SInvalLock);
-   return 1;                   /* got a message */
+   LWLockRelease(SInvalReadLock);
+   return n;
 }
 
 /*
- * SIDelExpiredDataEntries
+ * SICleanupQueue
  *     Remove messages that have been consumed by all active backends
+ *
+ * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock.
+ * minFree is the minimum number of free message slots required at completion.
+ *
+ * Possible side effects of this routine include marking one or more
+ * backends as "reset" in the array, and sending a catchup interrupt (SIGUSR1)
+ * to some backend that seems to be getting too far behind.  We signal at
+ * most one backend at a time, for reasons explained at the top of the file.
  */
 void
-SIDelExpiredDataEntries(bool locked)
+SICleanupQueue(bool callerHasWriteLock, int minFree)
 {
    SISeg      *segP = shmInvalBuffer;
    int         min,
-               i,
-               h;
+               minsig,
+               lowbound,
+               numMsgs,
+               i;
+   ProcState  *needSig = NULL;
 
-   if (!locked)
-       LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+   /* Lock out all writers and readers */
+   if (!callerHasWriteLock)
+       LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+   LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
 
+   /*
+    * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify
+    * the furthest-back backend that needs signaling (if any), and reset
+    * any backends that are too far back.
+    */
    min = segP->maxMsgNum;
-   if (min == segP->minMsgNum)
-   {
-       if (!locked)
-           LWLockRelease(SInvalLock);
-       return;                 /* fast path if no messages exist */
-   }
-
-   /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
+   minsig = min - SIG_THRESHOLD;
+   lowbound = min - MAXNUMMESSAGES + minFree;
 
    for (i = 0; i < segP->lastBackend; i++)
    {
-       h = segP->procState[i].nextMsgNum;
-       if (h >= 0)
-       {                       /* backend active */
-           if (h < min)
-               min = h;
+       ProcState  *stateP = &segP->procState[i];
+       int     n = stateP->nextMsgNum;
+
+       /* Ignore if inactive or already in reset state */
+       if (stateP->procPid == 0 || stateP->resetState)
+           continue;
+
+       /*
+        * If we must free some space and this backend is preventing it,
+        * force him into reset state and then ignore until he catches up.
+        */
+       if (n < lowbound)
+       {
+           stateP->resetState = true;
+           /* no point in signaling him ... */
+           continue;
+       }
+
+       /* Track the global minimum nextMsgNum */
+       if (n < min)
+           min = n;
+
+       /* Also see who's furthest back of the unsignaled backends */
+       if (n < minsig && !stateP->signaled)
+       {
+           minsig = n;
+           needSig = stateP;
        }
    }
    segP->minMsgNum = min;
 
    /*
     * When minMsgNum gets really large, decrement all message counters so as
-    * to forestall overflow of the counters.
+    * to forestall overflow of the counters.  This happens seldom enough
+    * that folding it into the previous loop would be a loser.
     */
    if (min >= MSGNUMWRAPAROUND)
    {
@@ -475,13 +562,43 @@ SIDelExpiredDataEntries(bool locked)
        segP->maxMsgNum -= MSGNUMWRAPAROUND;
        for (i = 0; i < segP->lastBackend; i++)
        {
-           if (segP->procState[i].nextMsgNum >= 0)
-               segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
+           /* we don't bother skipping inactive entries here */
+           segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
        }
    }
 
-   if (!locked)
-       LWLockRelease(SInvalLock);
+   /*
+    * Determine how many messages are still in the queue, and set the
+    * threshold at which we should repeat SICleanupQueue().
+    */
+   numMsgs = segP->maxMsgNum - segP->minMsgNum;
+   if (numMsgs < CLEANUP_MIN)
+       segP->nextThreshold = CLEANUP_MIN;
+   else
+       segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
+
+   /*
+    * Lastly, signal anyone who needs a catchup interrupt.  Since kill()
+    * might not be fast, we don't want to hold locks while executing it.
+    */
+   if (needSig)
+   {
+       pid_t   his_pid = needSig->procPid;
+
+       needSig->signaled = true;
+       LWLockRelease(SInvalReadLock);
+       LWLockRelease(SInvalWriteLock);
+       elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
+       kill(his_pid, SIGUSR1);
+       if (callerHasWriteLock)
+           LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+   }
+   else
+   {
+       LWLockRelease(SInvalReadLock);
+       if (!callerHasWriteLock)
+           LWLockRelease(SInvalWriteLock);
+   }
 }
 
 
index 50e27923566493f32df7cc2ff0d600f1f16e0cad..050d7cc88de2a8eb4afff32ceaff49f9f0a5fdec 100644 (file)
@@ -80,7 +80,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.85 2008/06/19 00:46:05 alvherre Exp $
+ *   $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -203,7 +203,7 @@ AddInvalidationMessage(InvalidationChunk **listHdr,
    if (chunk == NULL)
    {
        /* First time through; create initial chunk */
-#define FIRSTCHUNKSIZE 16
+#define FIRSTCHUNKSIZE 32
        chunk = (InvalidationChunk *)
            MemoryContextAlloc(CurTransactionContext,
                               sizeof(InvalidationChunk) +
@@ -275,6 +275,23 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
        } \
    } while (0)
 
+/*
+ * Process a list of invalidation messages group-wise.
+ *
+ * As above, but the code fragment can handle an array of messages.
+ * The fragment should refer to the messages as msgs[], with n entries.
+ */
+#define ProcessMessageListMulti(listHdr, codeFragment) \
+   do { \
+       InvalidationChunk *_chunk; \
+       for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
+       { \
+           SharedInvalidationMessage *msgs = _chunk->msgs; \
+           int     n = _chunk->nitems; \
+           codeFragment; \
+       } \
+   } while (0)
+
 
 /* ----------------------------------------------------------------
  *             Invalidation set support functions
@@ -371,6 +388,18 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
    ProcessMessageList(hdr->rclist, func(msg));
 }
 
+/*
+ * As above, but the function is able to process an array of messages
+ * rather than just one at a time.
+ */
+static void
+ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
+                                void (*func) (const SharedInvalidationMessage *msgs, int n))
+{
+   ProcessMessageListMulti(hdr->cclist, func(msgs, n));
+   ProcessMessageListMulti(hdr->rclist, func(msgs, n));
+}
+
 /* ----------------------------------------------------------------
  *                   private support functions
  * ----------------------------------------------------------------
@@ -792,7 +821,7 @@ inval_twophase_postcommit(TransactionId xid, uint16 info,
        case TWOPHASE_INFO_MSG:
            msg = (SharedInvalidationMessage *) recdata;
            Assert(len == sizeof(SharedInvalidationMessage));
-           SendSharedInvalidMessage(msg);
+           SendSharedInvalidMessages(msg, 1);
            break;
        case TWOPHASE_INFO_FILE_BEFORE:
            RelationCacheInitFileInvalidate(true);
@@ -850,8 +879,8 @@ AtEOXact_Inval(bool isCommit)
        AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
                                   &transInvalInfo->CurrentCmdInvalidMsgs);
 
-       ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
-                                   SendSharedInvalidMessage);
+       ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+                                        SendSharedInvalidMessages);
 
        if (transInvalInfo->RelcacheInitFileInval)
            RelationCacheInitFileInvalidate(false);
@@ -1033,8 +1062,8 @@ EndNonTransactionalInvalidation(void)
    /* Send out the invals */
    ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
                                LocalExecuteInvalidationMessage);
-   ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
-                               SendSharedInvalidMessage);
+   ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+                                    SendSharedInvalidMessages);
 
    /* Clean up and release memory */
    for (chunk = transInvalInfo->CurrentCmdInvalidMsgs.cclist;
index baccfbf5a68233670124516a291c8040cd9da3da..b1088fcd33d985883408a362bf6f8d07eb3782a3 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.38 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.39 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -43,7 +43,8 @@ typedef enum LWLockId
    OidGenLock,
    XidGenLock,
    ProcArrayLock,
-   SInvalLock,
+   SInvalReadLock,
+   SInvalWriteLock,
    FreeSpaceLock,
    WALInsertLock,
    WALWriteLock,
index c02593e5a86914d67bbafbaa6b7d57219a82367f..94f1770ffce7a471826326ce0437b6b02ce91425 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.19 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.20 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,7 +23,6 @@
 typedef enum
 {
    PMSIGNAL_PASSWORD_CHANGE,   /* pg_auth file has changed */
-   PMSIGNAL_WAKEN_CHILDREN,    /* send a SIGUSR1 signal to all backends */
    PMSIGNAL_WAKEN_ARCHIVER,    /* send a NOTIFY signal to xlog archiver */
    PMSIGNAL_ROTATE_LOGFILE,    /* send SIGUSR1 to syslogger to rotate logfile */
    PMSIGNAL_START_AUTOVAC_LAUNCHER,    /* start an autovacuum launcher */
index 343c8d94bdb4c84e645e9d36154e21083caf86a9..3601216f1b61052af7f9cde57bf60a7a3cdb591f 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.47 2008/03/16 19:47:34 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -83,7 +83,8 @@ typedef union
 } SharedInvalidationMessage;
 
 
-extern void SendSharedInvalidMessage(SharedInvalidationMessage *msg);
+extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
+                                     int n);
 extern void ReceiveSharedInvalidMessages(
                      void (*invalFunction) (SharedInvalidationMessage *msg),
                             void (*resetFunction) (void));
index 8535cba0f065350bc3badd2e969dd1e0e61f68d2..1748f8821b42d64233eafd9c182af63ed4e35ba3 100644 (file)
@@ -1,12 +1,13 @@
 /*-------------------------------------------------------------------------
  *
  * sinvaladt.h
- *   POSTGRES shared cache invalidation segment definitions.
+ *   POSTGRES shared cache invalidation data manager.
  *
  * The shared cache invalidation manager is responsible for transmitting
  * invalidation messages between backends. Any message sent by any backend
  * must be delivered to all already-running backends before it can be
- * forgotten.
+ * forgotten.  (If we run out of space, we instead deliver a "RESET"
+ * message to backends that have fallen too far behind.)
  * 
  * The struct type SharedInvalidationMessage, defining the contents of
  * a single message, is defined in sinval.h.
@@ -14,7 +15,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.47 2008/03/17 11:50:27 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.48 2008/06/19 21:32:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -23,7 +24,6 @@
 
 #include "storage/sinval.h"
 
-
 /*
  * prototypes for functions in sinvaladt.c
  */
@@ -31,9 +31,9 @@ extern Size SInvalShmemSize(void);
 extern void CreateSharedInvalidationState(void);
 extern void SharedInvalBackendInit(void);
 
-extern bool SIInsertDataEntry(SharedInvalidationMessage *data);
-extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data);
-extern void SIDelExpiredDataEntries(bool locked);
+extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n);
+extern int SIGetDataEntries(SharedInvalidationMessage *data, int datasize);
+extern void SICleanupQueue(bool callerHasWriteLock, int minFree);
 
 extern LocalTransactionId GetNextLocalTransactionId(void);