summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Haas2016-02-20 08:14:53 +0000
committerRobert Haas2016-08-11 19:18:11 +0000
commiteebc8cf56d9343f59b6a0e542d9709bcfc667547 (patch)
tree80d8e5a7cd25391dc27f0c73d1c472e423fe9d33
parent9be428f900b234510ac10a34e91da671958fc0e3 (diff)
Replace buffer I/O locks with condition variables.cv
-rw-r--r--src/backend/storage/buffer/buf_init.c25
-rw-r--r--src/backend/storage/buffer/bufmgr.c59
-rw-r--r--src/include/storage/buf_internals.h7
-rw-r--r--src/include/storage/condition_variable.h11
4 files changed, 38 insertions, 64 deletions
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index a4163cf717..37ebd37e4f 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -20,7 +20,7 @@
BufferDescPadded *BufferDescriptors;
char *BufferBlocks;
-LWLockMinimallyPadded *BufferIOLWLockArray = NULL;
+ConditionVariableMinimallyPadded *BufferIOCVArray = NULL;
LWLockTranche BufferIOLWLockTranche;
LWLockTranche BufferContentLWLockTranche;
WritebackContext BackendWritebackContext;
@@ -71,7 +71,7 @@ InitBufferPool(void)
{
bool foundBufs,
foundDescs,
- foundIOLocks,
+ foundIOCV,
foundBufCkpt;
/* Align descriptors to a cacheline boundary. */
@@ -85,16 +85,10 @@ InitBufferPool(void)
NBuffers * (Size) BLCKSZ, &foundBufs);
/* Align lwlocks to cacheline boundary */
- BufferIOLWLockArray = (LWLockMinimallyPadded *)
- ShmemInitStruct("Buffer IO Locks",
- NBuffers * (Size) sizeof(LWLockMinimallyPadded),
- &foundIOLocks);
-
- BufferIOLWLockTranche.name = "buffer_io";
- BufferIOLWLockTranche.array_base = BufferIOLWLockArray;
- BufferIOLWLockTranche.array_stride = sizeof(LWLockMinimallyPadded);
- LWLockRegisterTranche(LWTRANCHE_BUFFER_IO_IN_PROGRESS,
- &BufferIOLWLockTranche);
+ BufferIOCVArray = (ConditionVariableMinimallyPadded *)
+ ShmemInitStruct("Buffer IO Condition Variables",
+ NBuffers * (Size) sizeof(ConditionVariableMinimallyPadded),
+ &foundIOCV);
BufferContentLWLockTranche.name = "buffer_content";
BufferContentLWLockTranche.array_base =
@@ -114,10 +108,10 @@ InitBufferPool(void)
ShmemInitStruct("Checkpoint BufferIds",
NBuffers * sizeof(CkptSortItem), &foundBufCkpt);
- if (foundDescs || foundBufs || foundIOLocks || foundBufCkpt)
+ if (foundDescs || foundBufs || foundIOCV || foundBufCkpt)
{
/* should find all of these, or none of them */
- Assert(foundDescs && foundBufs && foundIOLocks && foundBufCkpt);
+ Assert(foundDescs && foundBufs && foundIOCV && foundBufCkpt);
/* note: this path is only taken in EXEC_BACKEND case */
}
else
@@ -147,8 +141,7 @@ InitBufferPool(void)
LWLockInitialize(BufferDescriptorGetContentLock(buf),
LWTRANCHE_BUFFER_CONTENT);
- LWLockInitialize(BufferDescriptorGetIOLock(buf),
- LWTRANCHE_BUFFER_IO_IN_PROGRESS);
+ ConditionVariableInit(BufferDescriptorGetIOCV(buf));
}
/* Correct last entry of linked list */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 76ade3727c..403db6e291 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1314,8 +1314,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
LWLockRelease(newPartitionLock);
/*
- * Buffer contents are currently invalid. Try to get the io_in_progress
- * lock. If StartBufferIO returns false, then someone else managed to
+ * Buffer contents are currently invalid. Try to obtain the right to start
+ * I/O. If StartBufferIO returns false, then someone else managed to
* read it before we did, so there's nothing left for BufferAlloc() to do.
*/
if (StartBufferIO(buf, true))
@@ -1693,9 +1693,8 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
uint32 buf_state;
uint32 old_buf_state;
- /* I'd better not still hold any locks on the buffer */
+ /* I'd better not still hold the buffer content lock */
Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
- Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
/*
* Decrement the shared reference count.
@@ -2656,9 +2655,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
uint32 buf_state;
/*
- * Acquire the buffer's io_in_progress lock. If StartBufferIO returns
- * false, then someone else flushed the buffer before we could, so we need
- * not do anything.
+ * Try to start an I/O operation. If StartBufferIO returns false, then
+ * someone else flushed the buffer before we could, so we need not do
+ * anything.
*/
if (!StartBufferIO(buf, false))
return;
@@ -2714,7 +2713,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
/*
* Now it's safe to write buffer to disk. Note that no one else should
* have been able to write it while we were busy with log flushing because
- * we have the io_in_progress lock.
+ * only one process at a time can set the BM_IO_IN_PROGRESS bit.
*/
bufBlock = BufHdrGetBlock(buf);
@@ -2749,7 +2748,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
/*
* Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
- * end the io_in_progress state.
+ * end the BM_IO_IN_PROGRESS state.
*/
TerminateBufferIO(buf, true, 0);
@@ -3755,7 +3754,7 @@ ConditionalLockBufferForCleanup(Buffer buffer)
* Functions for buffer I/O handling
*
* Note: We assume that nested buffer I/O never occurs.
- * i.e at most one io_in_progress lock is held per proc.
+ * i.e at most one BM_IO_IN_PROGRESS bit is set per proc.
*
* Also note that these are used only for shared buffers, not local ones.
*/
@@ -3766,13 +3765,6 @@ ConditionalLockBufferForCleanup(Buffer buffer)
static void
WaitIO(BufferDesc *buf)
{
- /*
- * Changed to wait until there's no IO - Inoue 01/13/2000
- *
- * Note this is *necessary* because an error abort in the process doing
- * I/O could release the io_in_progress_lock prematurely. See
- * AbortBufferIO.
- */
for (;;)
{
uint32 buf_state;
@@ -3782,14 +3774,15 @@ WaitIO(BufferDesc *buf)
* here, but since this test is essential for correctness, we'd better
* play it safe.
*/
+ ConditionVariablePrepareToSleep(BufferDescriptorGetIOCV(buf));
buf_state = LockBufHdr(buf);
UnlockBufHdr(buf, buf_state);
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
- LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
- LWLockRelease(BufferDescriptorGetIOLock(buf));
+ ConditionVariableSleep();
}
+ ConditionVariableCancelSleep();
}
/*
@@ -3801,7 +3794,7 @@ WaitIO(BufferDesc *buf)
* In some scenarios there are race conditions in which multiple backends
* could attempt the same I/O operation concurrently. If someone else
* has already started I/O on this buffer then we will block on the
- * io_in_progress lock until he's done.
+ * I/O condition variable until he's done.
*
* Input operations are only attempted on buffers that are not BM_VALID,
* and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -3819,25 +3812,11 @@ StartBufferIO(BufferDesc *buf, bool forInput)
for (;;)
{
- /*
- * Grab the io_in_progress lock so that other processes can wait for
- * me to finish the I/O.
- */
- LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
buf_state = LockBufHdr(buf);
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
-
- /*
- * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
- * lock isn't held is if the process doing the I/O is recovering from
- * an error (see AbortBufferIO). If that's the case, we must wait for
- * him to get unwedged.
- */
UnlockBufHdr(buf, buf_state);
- LWLockRelease(BufferDescriptorGetIOLock(buf));
WaitIO(buf);
}
@@ -3847,7 +3826,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
{
/* someone else already did the I/O */
UnlockBufHdr(buf, buf_state);
- LWLockRelease(BufferDescriptorGetIOLock(buf));
return false;
}
@@ -3865,7 +3843,6 @@ StartBufferIO(BufferDesc *buf, bool forInput)
* (Assumptions)
* My process is executing IO for the buffer
* BM_IO_IN_PROGRESS bit is set for the buffer
- * We hold the buffer's io_in_progress lock
* The buffer is Pinned
*
* If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
@@ -3897,7 +3874,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
InProgressBuf = NULL;
- LWLockRelease(BufferDescriptorGetIOLock(buf));
+ ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
}
/*
@@ -3918,14 +3895,6 @@ AbortBufferIO(void)
{
uint32 buf_state;
- /*
- * Since LWLockReleaseAll has already been called, we're not holding
- * the buffer's io_in_progress_lock. We have to re-acquire it so that
- * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
- * buffer will be in a busy spin until we succeed in doing this.
- */
- LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
-
buf_state = LockBufHdr(buf);
Assert(buf_state & BM_IO_IN_PROGRESS);
if (IsForInput)
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index e0dfb2f5b0..18ef400f41 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -17,6 +17,7 @@
#include "storage/buf.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
@@ -221,12 +222,12 @@ typedef union BufferDescPadded
#define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1)
-#define BufferDescriptorGetIOLock(bdesc) \
- (&(BufferIOLWLockArray[(bdesc)->buf_id]).lock)
+#define BufferDescriptorGetIOCV(bdesc) \
+ (&(BufferIOCVArray[(bdesc)->buf_id]).cv)
#define BufferDescriptorGetContentLock(bdesc) \
((LWLock*) (&(bdesc)->content_lock))
-extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
+extern PGDLLIMPORT ConditionVariableMinimallyPadded *BufferIOCVArray;
/*
* The freeNext field is either the index of the next freelist entry,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 54b7fba05b..38626d5acd 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -31,6 +31,17 @@ typedef struct
proclist_head wakeup;
} ConditionVariable;
+/*
+ * Pad a condition variable to a power-of-two size so that an array of
+ * condition variables does not cross a cache line boundary.
+ */
+#define CV_MINIMAL_SIZE (sizeof(ConditionVariable) <= 16 ? 16 : 32)
+typedef union ConditionVariableMinimallyPadded
+{
+ ConditionVariable cv;
+ char pad[CV_MINIMAL_SIZE];
+} ConditionVariableMinimallyPadded;
+
/* Initialize a condition variable. */
extern void ConditionVariableInit(ConditionVariable *);