Improve LISTEN startup time when there are many unread notifications.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 1 Oct 2015 03:32:23 +0000 (23:32 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 1 Oct 2015 03:32:23 +0000 (23:32 -0400)
If some existing listener is far behind, incoming new listener sessions
would start from that session's read pointer and then need to advance over
many already-committed notification messages, which they have no interest
in.  This was expensive in itself and also thrashed the pg_notify SLRU
buffers a lot more than necessary.  We can improve matters considerably
in typical scenarios, without much added cost, by starting from the
furthest-ahead read pointer, not the furthest-behind one.  We do have to
consider only sessions in our own database when doing this, which requires
an extra field in the data structure, but that's a pretty small cost.

Back-patch to 9.0 where the current LISTEN/NOTIFY logic was introduced.

Matt Newell, slightly adjusted by me

src/backend/commands/async.c

index 2d1ab7ff21f737e114f9cac13642ae6f1b2d6d4d..2b6e7f49aef1cdd0fd915ed17e7a13ebe9887d1e 100644 (file)
@@ -199,12 +199,19 @@ typedef struct QueuePosition
     (x).page != (y).page ? (y) : \
     (x).offset < (y).offset ? (x) : (y))
 
+/* choose logically larger QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+   (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+    (x).page != (y).page ? (x) : \
+    (x).offset > (y).offset ? (x) : (y))
+
 /*
  * Struct describing a listening backend's status
  */
 typedef struct QueueBackendStatus
 {
    int32       pid;            /* either a PID or InvalidPid */
+   Oid         dboid;          /* backend's database OID, or InvalidOid */
    QueuePosition pos;          /* backend has read queue up to here */
 } QueueBackendStatus;
 
@@ -223,6 +230,7 @@ typedef struct QueueBackendStatus
  * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
  * of other backends and also change the head and tail pointers.
  *
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
  * In order to avoid deadlocks, whenever we need both locks, we always first
  * get AsyncQueueLock and then AsyncCtlLock.
  *
@@ -233,8 +241,8 @@ typedef struct QueueBackendStatus
 typedef struct AsyncQueueControl
 {
    QueuePosition head;         /* head points to the next free location */
-   QueuePosition tail;         /* the global tail is equivalent to the tail
-                                * of the "slowest" backend */
+   QueuePosition tail;         /* the global tail is equivalent to the pos of
+                                * the "slowest" backend */
    TimestampTz lastQueueFillWarn;      /* time of last queue-full msg */
    QueueBackendStatus backend[1];      /* actually of length MaxBackends+1 */
    /* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
@@ -245,6 +253,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_HEAD                 (asyncQueueControl->head)
 #define QUEUE_TAIL                 (asyncQueueControl->tail)
 #define QUEUE_BACKEND_PID(i)       (asyncQueueControl->backend[i].pid)
+#define QUEUE_BACKEND_DBOID(i)     (asyncQueueControl->backend[i].dboid)
 #define QUEUE_BACKEND_POS(i)       (asyncQueueControl->backend[i].pos)
 
 /*
@@ -463,6 +472,7 @@ AsyncShmemInit(void)
        for (i = 0; i <= MaxBackends; i++)
        {
            QUEUE_BACKEND_PID(i) = InvalidPid;
+           QUEUE_BACKEND_DBOID(i) = InvalidOid;
            SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
        }
    }
@@ -908,6 +918,10 @@ AtCommit_Notify(void)
 static void
 Exec_ListenPreCommit(void)
 {
+   QueuePosition head;
+   QueuePosition max;
+   int         i;
+
    /*
     * Nothing to do if we are already listening to something, nor if we
     * already ran this routine in this transaction.
@@ -935,10 +949,34 @@ Exec_ListenPreCommit(void)
     * over already-committed notifications.  This ensures we cannot miss any
     * not-yet-committed notifications.  We might get a few more but that
     * doesn't hurt.
+    *
+    * In some scenarios there might be a lot of committed notifications that
+    * have not yet been pruned away (because some backend is being lazy about
+    * reading them).  To reduce our startup time, we can look at other
+    * backends and adopt the maximum "pos" pointer of any backend that's in
+    * our database; any notifications it's already advanced over are surely
+    * committed and need not be re-examined by us.  (We must consider only
+    * backends connected to our DB, because others will not have bothered to
+    * check committed-ness of notifications in our DB.)  But we only bother
+    * with that if there's more than a page worth of notifications
+    * outstanding, otherwise scanning all the other backends isn't worth it.
+    *
+    * We need exclusive lock here so we can look at other backends' entries.
     */
-   LWLockAcquire(AsyncQueueLock, LW_SHARED);
-   QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+   LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
+   head = QUEUE_HEAD;
+   max = QUEUE_TAIL;
+   if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
+   {
+       for (i = 1; i <= MaxBackends; i++)
+       {
+           if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+               max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+       }
+   }
+   QUEUE_BACKEND_POS(MyBackendId) = max;
    QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
+   QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
    LWLockRelease(AsyncQueueLock);
 
    /* Now we are listed in the global array, so remember we're listening */
@@ -954,7 +992,8 @@ Exec_ListenPreCommit(void)
     *
     * This will also advance the global tail pointer if possible.
     */
-   asyncQueueReadAllNotifications();
+   if (!QUEUE_POS_EQUAL(max, head))
+       asyncQueueReadAllNotifications();
 }
 
 /*
@@ -1157,6 +1196,7 @@ asyncQueueUnregister(void)
        QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
    /* ... then mark it invalid */
    QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+   QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
    LWLockRelease(AsyncQueueLock);
 
    /* mark ourselves as no longer listed in the global array */