diff options
-rw-r--r-- | src/backend/access/transam/clog.c | 83 |
1 files changed, 51 insertions, 32 deletions
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index b72fb5d651e..3f50c79331b 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -13,7 +13,7 @@ * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.11 2002/09/26 22:58:33 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/clog.c,v 1.11.2.1 2005/11/03 00:23:57 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -78,10 +78,8 @@ * that is reading in or writing out a page buffer does not hold the control * lock, only the per-buffer lock for the buffer it is working on. * - * To change the page number or state of a buffer, one must normally hold - * the control lock. (The sole exception to this rule is that a writer - * process changes the state from DIRTY to WRITE_IN_PROGRESS while holding - * only the per-buffer lock.) If the buffer's state is neither EMPTY nor + * To change the page number or state of a buffer, one must hold + * the control lock. If the buffer's state is neither EMPTY nor * CLEAN, then there may be processes doing (or waiting to do) I/O on the * buffer, so the page number may not be changed, and the only allowed state * transition is to change WRITE_IN_PROGRESS to DIRTY after dirtying the page. @@ -95,10 +93,6 @@ * the read, while the early marking prevents someone else from trying to * read the same page into a different buffer. * - * Note we are assuming that read and write of the state value is atomic, - * since I/O processes may examine and change the state while not holding - * the control lock. - * * As with the regular buffer manager, it is possible for another process * to re-dirty a page that is currently being written out. This is handled * by setting the page's state from WRITE_IN_PROGRESS to DIRTY. The writing @@ -407,9 +401,19 @@ ReadCLOGPage(int pageno) */ ClogCtl->page_lru_count[slotno] = 0; - /* Release shared lock, grab per-buffer lock instead */ - LWLockRelease(CLogControlLock); - LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE); + /* + * We must grab the per-buffer lock to do I/O. To avoid deadlock, + * must release ControlLock while waiting for per-buffer lock. + * Fortunately, most of the time the per-buffer lock shouldn't be + * already held, so we can do this: + */ + if (!LWLockConditionalAcquire(ClogBufferLocks[slotno], + LW_EXCLUSIVE)) + { + LWLockRelease(CLogControlLock); + LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE); + LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); + } /* * Check to see if someone else already did the read, or took the @@ -419,11 +423,12 @@ ReadCLOGPage(int pageno) ClogCtl->page_status[slotno] != CLOG_PAGE_READ_IN_PROGRESS) { LWLockRelease(ClogBufferLocks[slotno]); - LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); continue; } - /* Okay, do the read */ + /* Okay, release control lock and do the read */ + LWLockRelease(CLogControlLock); + CLOGPhysicalReadPage(pageno, slotno); /* Re-acquire shared control lock and update page state */ @@ -464,9 +469,19 @@ WriteCLOGPage(int slotno) pageno = ClogCtl->page_number[slotno]; - /* Release shared lock, grab per-buffer lock instead */ - LWLockRelease(CLogControlLock); - LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE); + /* + * We must grab the per-buffer lock to do I/O. To avoid deadlock, + * must release ControlLock while waiting for per-buffer lock. + * Fortunately, most of the time the per-buffer lock shouldn't be + * already held, so we can do this: + */ + if (!LWLockConditionalAcquire(ClogBufferLocks[slotno], + LW_EXCLUSIVE)) + { + LWLockRelease(CLogControlLock); + LWLockAcquire(ClogBufferLocks[slotno], LW_EXCLUSIVE); + LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); + } /* * Check to see if someone else already did the write, or took the @@ -480,25 +495,18 @@ WriteCLOGPage(int slotno) ClogCtl->page_status[slotno] != CLOG_PAGE_WRITE_IN_PROGRESS)) { LWLockRelease(ClogBufferLocks[slotno]); - LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); return; } /* * Mark the slot write-busy. After this point, a transaction status - * update on this page will mark it dirty again. NB: we are assuming - * that read/write of the page status field is atomic, since we change - * the state while not holding control lock. However, we cannot set - * this state any sooner, or we'd possibly fool a previous writer into - * thinking he's successfully dumped the page when he hasn't. - * (Scenario: other writer starts, page is redirtied, we come along - * and set WRITE_IN_PROGRESS again, other writer completes and sets - * CLEAN because redirty info has been lost, then we think it's clean - * too.) + * update on this page will mark it dirty again. */ ClogCtl->page_status[slotno] = CLOG_PAGE_WRITE_IN_PROGRESS; - /* Okay, do the write */ + /* Okay, release the control lock and do the write */ + LWLockRelease(CLogControlLock); + CLOGPhysicalWritePage(pageno, slotno); /* Re-acquire shared control lock and update page state */ @@ -675,11 +683,17 @@ SelectLRUCLOGPage(int pageno) /* * We need to do I/O. Normal case is that we have to write it * out, but it's possible in the worst case to have selected a - * read-busy page. In that case we use ReadCLOGPage to wait for - * the read to complete. + * read-busy page. In that case we just wait for someone else to + * complete the I/O, which we can do by waiting for the per-buffer + * lock. */ if (ClogCtl->page_status[bestslot] == CLOG_PAGE_READ_IN_PROGRESS) - (void) ReadCLOGPage(ClogCtl->page_number[bestslot]); + { + LWLockRelease(CLogControlLock); + LWLockAcquire(ClogBufferLocks[bestslot], LW_SHARED); + LWLockRelease(ClogBufferLocks[bestslot]); + LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); + } else WriteCLOGPage(bestslot); @@ -857,7 +871,12 @@ restart:; * This is the same logic as in SelectLRUCLOGPage. */ if (ClogCtl->page_status[slotno] == CLOG_PAGE_READ_IN_PROGRESS) - (void) ReadCLOGPage(ClogCtl->page_number[slotno]); + { + LWLockRelease(CLogControlLock); + LWLockAcquire(ClogBufferLocks[slotno], LW_SHARED); + LWLockRelease(ClogBufferLocks[slotno]); + LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); + } else WriteCLOGPage(slotno); goto restart; |