Fix NOTIFY to cope with I/O problems, such as out-of-disk-space.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 29 Jun 2012 04:51:34 +0000 (00:51 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 29 Jun 2012 04:51:34 +0000 (00:51 -0400)
The LISTEN/NOTIFY subsystem got confused if SimpleLruZeroPage failed,
which would typically happen as a result of a write() failure while
attempting to dump a dirty pg_notify page out of memory.  Subsequently,
all attempts to send more NOTIFY messages would fail with messages like
"Could not read from file "pg_notify/nnnn" at offset nnnnn: Success".
Only restarting the server would clear this condition.  Per reports from
Kevin Grittner and Christoph Berg.

Back-patch to 9.0, where the problem was introduced during the
LISTEN/NOTIFY rewrite.

src/backend/commands/async.c

index fcb087ed15dfbb4d567ee0c742e1db8ec1353a2d..6fcd9093268e7bc5aaf6102fa9d6adcdcced1d6d 100644 (file)
@@ -1285,6 +1285,7 @@ static ListCell *
 asyncQueueAddEntries(ListCell *nextNotify)
 {
        AsyncQueueEntry qe;
+       QueuePosition queue_head;
        int                     pageno;
        int                     offset;
        int                     slotno;
@@ -1292,8 +1293,21 @@ asyncQueueAddEntries(ListCell *nextNotify)
        /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
        LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
 
+       /*
+        * We work with a local copy of QUEUE_HEAD, which we write back to shared
+        * memory upon exiting.  The reason for this is that if we have to advance
+        * to a new page, SimpleLruZeroPage might fail (out of disk space, for
+        * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
+        * subsequent insertions would try to put entries into a page that slru.c
+        * thinks doesn't exist yet.)  So, use a local position variable.  Note
+        * that if we do fail, any already-inserted queue entries are forgotten;
+        * this is okay, since they'd be useless anyway after our transaction
+        * rolls back.
+        */
+       queue_head = QUEUE_HEAD;
+
        /* Fetch the current page */
-       pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
+       pageno = QUEUE_POS_PAGE(queue_head);
        slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
        /* Note we mark the page dirty before writing in it */
        AsyncCtl->shared->page_dirty[slotno] = true;
@@ -1305,7 +1319,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
                /* Construct a valid queue entry in local variable qe */
                asyncQueueNotificationToEntry(n, &qe);
 
-               offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
+               offset = QUEUE_POS_OFFSET(queue_head);
 
                /* Check whether the entry really fits on the current page */
                if (offset + qe.length <= QUEUE_PAGESIZE)
@@ -1331,8 +1345,8 @@ asyncQueueAddEntries(ListCell *nextNotify)
                           &qe,
                           qe.length);
 
-               /* Advance QUEUE_HEAD appropriately, and note if page is full */
-               if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
+               /* Advance queue_head appropriately, and detect if page is full */
+               if (asyncQueueAdvance(&(queue_head), qe.length))
                {
                        /*
                         * Page is full, so we're done here, but first fill the next page
@@ -1342,12 +1356,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
                         * asyncQueueIsFull() ensured that there is room to create this
                         * page without overrunning the queue.
                         */
-                       slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+                       slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
                        /* And exit the loop */
                        break;
                }
        }
 
+       /* Success, so update the global QUEUE_HEAD */
+       QUEUE_HEAD = queue_head;
+
        LWLockRelease(AsyncCtlLock);
 
        return nextNotify;