Take care to reprocess an uncommitted notify message.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 17 Feb 2010 16:54:06 +0000 (16:54 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 17 Feb 2010 16:54:06 +0000 (16:54 +0000)
Oversight in my changes to cope with possible errors during message
processing; spotted by Joachim Wieland.

src/backend/commands/async.c

index 23c57cea04453303865503246c8ee0704ca5328f..8a31182c99747e16a533537d9fe6a82b57a69213 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.152 2010/02/17 00:52:09 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.153 2010/02/17 16:54:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1951,6 +1951,7 @@ asyncQueueReadAllNotifications(void)
  * The function returns true once we have reached the stop position or an
  * uncommitted notification, and false if we have finished with the page.
  * In other words: once it returns true there is no need to look further.
+ * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
 asyncQueueProcessPageEntries(QueuePosition *current,
@@ -1963,10 +1964,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
 
    do
    {
-       if (QUEUE_POS_EQUAL(*current, stop))
+       QueuePosition   thisentry = *current;
+
+       if (QUEUE_POS_EQUAL(thisentry, stop))
            break;
 
-       qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(*current));
+       qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
        /*
         * Advance *current over this message, possibly to the next page.
@@ -2002,8 +2005,14 @@ asyncQueueProcessPageEntries(QueuePosition *current,
            {
                /*
                 * The transaction has neither committed nor aborted so far,
-                * so we can't process its message yet.  Break out of the loop.
+                * so we can't process its message yet.  Break out of the loop,
+                * but first back up *current so we will reprocess the message
+                * next time.  (Note: it is unlikely but not impossible for
+                * TransactionIdDidCommit to fail, so we can't really avoid
+                * this advance-then-back-up behavior when dealing with an
+                * uncommitted message.)
                 */
+               *current = thisentry;
                reachedStop = true;
                break;
            }