summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xlog.c40
-rw-r--r--src/include/port/atomics.h36
2 files changed, 75 insertions, 1 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b4499cda7b3..e3fb26f5abe 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -469,6 +469,7 @@ typedef struct XLogCtlData
XLogRecPtr lastSegSwitchLSN;
/* These are accessed using atomics -- info_lck not needed */
+ pg_atomic_uint64 logInsertResult; /* last byte + 1 inserted to buffers */
pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
@@ -1499,6 +1500,7 @@ static XLogRecPtr
WaitXLogInsertionsToFinish(XLogRecPtr upto)
{
uint64 bytepos;
+ XLogRecPtr inserted;
XLogRecPtr reservedUpto;
XLogRecPtr finishedUpto;
XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1507,6 +1509,14 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
if (MyProc == NULL)
elog(PANIC, "cannot wait without a PGPROC structure");
+ /*
+ * Check if there's any work to do. Use a barrier to ensure we get the
+ * freshest value.
+ */
+ inserted = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult);
+ if (upto <= inserted)
+ return inserted;
+
/* Read the current insert position */
SpinLockAcquire(&Insert->insertpos_lck);
bytepos = Insert->CurrBytePos;
@@ -1586,6 +1596,15 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
finishedUpto = insertingat;
}
+
+ /*
+ * Advance the limit we know to have been inserted and return the freshest
+ * value we know of, which might be beyond what we requested if somebody
+ * is concurrently doing this with an 'upto' pointer ahead of us.
+ */
+ finishedUpto = pg_atomic_monotonic_advance_u64(&XLogCtl->logInsertResult,
+ finishedUpto);
+
return finishedUpto;
}
@@ -1727,13 +1746,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
{
char *pdst = dstbuf;
XLogRecPtr recptr = startptr;
+ XLogRecPtr inserted;
Size nbytes = count;
if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
return 0;
Assert(!XLogRecPtrIsInvalid(startptr));
- Assert(startptr + count <= LogwrtResult.Write);
+
+ /*
+ * Caller should ensure that the requested data has been inserted into WAL
+ * buffers before we try to read it.
+ */
+ inserted = pg_atomic_read_u64(&XLogCtl->logInsertResult);
+ if (startptr + count > inserted)
+ ereport(ERROR,
+ errmsg("cannot read past end of generated WAL: requested %X/%X, current position %X/%X",
+ LSN_FORMAT_ARGS(startptr + count),
+ LSN_FORMAT_ARGS(inserted)));
/*
* Loop through the buffers without a lock. For each buffer, atomically
@@ -2571,13 +2601,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
{
XLogRecPtr Flush;
XLogRecPtr Write;
+ XLogRecPtr Insert;
Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
pg_read_barrier();
Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
+ pg_read_barrier();
+ Insert = pg_atomic_read_u64(&XLogCtl->logInsertResult);
/* WAL written to disk is always ahead of WAL flushed */
Assert(Write >= Flush);
+
+ /* WAL inserted to buffers is always ahead of WAL written */
+ Assert(Insert >= Write);
}
#endif
}
@@ -4951,6 +4987,7 @@ XLOGShmemInit(void)
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
+ pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@@ -5979,6 +6016,7 @@ StartupXLOG(void)
* because no other process can be reading or writing WAL yet.
*/
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+ pg_atomic_write_u64(&XLogCtl->logInsertResult, EndOfLog);
pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
XLogCtl->LogwrtRqst.Write = EndOfLog;
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index ff47782cdba..78987f3154a 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
}
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value. Returns the latest value observed, which
+ * may or may not be the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline uint64
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+ uint64 currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+ AssertPointerAlignment(ptr, 8);
+#endif
+
+ currval = pg_atomic_read_u64_impl(ptr);
+ if (currval >= target_)
+ {
+ pg_memory_barrier();
+ return currval;
+ }
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+ AssertPointerAlignment(&currval, 8);
+#endif
+
+ while (currval < target_)
+ {
+ if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
+ break;
+ }
+
+ return Max(target_, currval);
+}
+
#undef INSIDE_ATOMICS_H
#endif /* ATOMICS_H */