asyncQueueAddEntries(ListCell *nextNotify)
{
AsyncQueueEntry qe;
+ QueuePosition queue_head;
int pageno;
int offset;
int slotno;
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
+ /*
+ * We work with a local copy of QUEUE_HEAD, which we write back to shared
+ * memory upon exiting. The reason for this is that if we have to advance
+ * to a new page, SimpleLruZeroPage might fail (out of disk space, for
+ * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
+ * subsequent insertions would try to put entries into a page that slru.c
+ * thinks doesn't exist yet.) So, use a local position variable. Note
+ * that if we do fail, any already-inserted queue entries are forgotten;
+ * this is okay, since they'd be useless anyway after our transaction
+ * rolls back.
+ */
+ queue_head = QUEUE_HEAD;
+
/* Fetch the current page */
- pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
+ pageno = QUEUE_POS_PAGE(queue_head);
slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
/* Note we mark the page dirty before writing in it */
AsyncCtl->shared->page_dirty[slotno] = true;
/* Construct a valid queue entry in local variable qe */
asyncQueueNotificationToEntry(n, &qe);
- offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
+ offset = QUEUE_POS_OFFSET(queue_head);
/* Check whether the entry really fits on the current page */
if (offset + qe.length <= QUEUE_PAGESIZE)
&qe,
qe.length);
- /* Advance QUEUE_HEAD appropriately, and note if page is full */
- if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
+ /* Advance queue_head appropriately, and detect if page is full */
+ if (asyncQueueAdvance(&(queue_head), qe.length))
{
/*
* Page is full, so we're done here, but first fill the next page
* asyncQueueIsFull() ensured that there is room to create this
* page without overrunning the queue.
*/
- slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
+ slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
/* And exit the loop */
break;
}
}
+ /* Success, so update the global QUEUE_HEAD */
+ QUEUE_HEAD = queue_head;
+
LWLockRelease(AsyncCtlLock);
return nextNotify;