* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.217 2005/08/22 00:41:28 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.218 2005/08/22 23:59:04 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* default method. We assume that fsync() is always available, and that
* configure determined whether fdatasync() is.
*/
-#ifdef O_SYNC
-#define CMP_OPEN_SYNC_FLAG O_SYNC
+#if defined(O_SYNC)
+#define BARE_OPEN_SYNC_FLAG O_SYNC
#elif defined(O_FSYNC)
-#define CMP_OPEN_SYNC_FLAG O_FSYNC
+#define BARE_OPEN_SYNC_FLAG O_FSYNC
#endif
-#ifdef CMP_OPEN_SYNC_FLAG
-#define OPEN_SYNC_FLAG (CMP_OPEN_SYNC_FLAG | PG_O_DIRECT)
+#ifdef BARE_OPEN_SYNC_FLAG
+#define OPEN_SYNC_FLAG (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
#endif
-#ifdef O_DSYNC
-#ifdef OPEN_SYNC_FLAG
+#if defined(O_DSYNC)
+#if defined(OPEN_SYNC_FLAG)
/* O_DSYNC is distinct? */
-#if O_DSYNC != CMP_OPEN_SYNC_FLAG
+#if O_DSYNC != BARE_OPEN_SYNC_FLAG
#define OPEN_DATASYNC_FLAG (O_DSYNC | PG_O_DIRECT)
#endif
#else /* !defined(OPEN_SYNC_FLAG) */
#endif
#endif
-#ifdef OPEN_DATASYNC_FLAG
+#if defined(OPEN_DATASYNC_FLAG)
#define DEFAULT_SYNC_METHOD_STR "open_datasync"
#define DEFAULT_SYNC_METHOD SYNC_METHOD_OPEN
#define DEFAULT_SYNC_FLAGBIT OPEN_DATASYNC_FLAG
static bool XLogCheckBuffer(XLogRecData *rdata,
XLogRecPtr *lsn, BkpBlock *bkpb);
static bool AdvanceXLInsertBuffer(void);
-static void XLogWrite(XLogwrtRqst WriteRqst);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
static char *str_time(time_t tnow);
static void issue_xlog_fsync(void);
-/* XLog gather-write stuff */
-typedef struct XLogPages
-{
- char *head; /* Start of first page to write */
- Size size; /* Total bytes to write == count(pages) * BLCKSZ */
- uint32 offset; /* Starting offset in xlog segment file */
-} XLogPages;
-
-static void XLogPageReset(XLogPages *pages);
-static void XLogPageWrite(XLogPages *pages, int index);
-static void XLogPageFlush(XLogPages *pages, int index);
-
#ifdef WAL_DEBUG
static void xlog_outrec(char *buf, XLogRecord *record);
#endif
{
if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
{
+ /*
+ * Since the amount of data we write here is completely optional
+ * anyway, tell XLogWrite it can be "flexible" and stop at a
+ * convenient boundary. This allows writes triggered by this
+ * mechanism to synchronize with the cache boundaries, so that
+ * in a long transaction we'll basically dump alternating halves
+ * of the buffer array.
+ */
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
- XLogWrite(LogwrtRqst);
+ XLogWrite(LogwrtRqst, true);
LWLockRelease(WALWriteLock);
}
}
WriteRqst.Write = OldPageRqstPtr;
WriteRqst.Flush.xlogid = 0;
WriteRqst.Flush.xrecoff = 0;
- XLogWrite(WriteRqst);
+ XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
}
/*
* Write and/or fsync the log at least as far as WriteRqst indicates.
*
+ * If flexible == TRUE, we don't have to write as far as WriteRqst, but
+ * may stop at any convenient boundary (such as a cache or logfile boundary).
+ * This option allows us to avoid uselessly issuing multiple writes when a
+ * single one would do.
+ *
* Must be called with WALWriteLock held.
*/
static void
-XLogWrite(XLogwrtRqst WriteRqst)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
XLogCtlWrite *Write = &XLogCtl->Write;
bool ispartialpage;
+ bool finishing_seg;
bool use_existent;
- int currentIndex = Write->curridx;
- XLogPages pages;
+ int curridx;
+ int npages;
+ int startidx;
+ uint32 startoffset;
/* We should always be inside a critical section here */
Assert(CritSectionCount > 0);
*/
LogwrtResult = Write->LogwrtResult;
- XLogPageReset(&pages);
+ /*
+ * Since successive pages in the xlog cache are consecutively allocated,
+ * we can usually gather multiple pages together and issue just one
+ * write() call. npages is the number of pages we have determined can
+ * be written together; startidx is the cache block index of the first
+ * one, and startoffset is the file offset at which it should go.
+ * The latter two variables are only valid when npages > 0, but we must
+ * initialize all of them to keep the compiler quiet.
+ */
+ npages = 0;
+ startidx = 0;
+ startoffset = 0;
+
+ /*
+ * Within the loop, curridx is the cache block index of the page to
+ * consider writing. We advance Write->curridx only after successfully
+ * writing pages. (Right now, this refinement is useless since we are
+ * going to PANIC if any error occurs anyway; but someday it may come
+ * in useful.)
+ */
+ curridx = Write->curridx;
while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
{
* end of the last page that's been initialized by
* AdvanceXLInsertBuffer.
*/
- if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex]))
+ if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
- XLogCtl->xlblocks[currentIndex].xlogid,
- XLogCtl->xlblocks[currentIndex].xrecoff);
+ XLogCtl->xlblocks[curridx].xlogid,
+ XLogCtl->xlblocks[curridx].xrecoff);
/* Advance LogwrtResult.Write to end of current buffer page */
- LogwrtResult.Write = XLogCtl->xlblocks[currentIndex];
+ LogwrtResult.Write = XLogCtl->xlblocks[curridx];
ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
{
/*
- * Switch to new logfile segment.
+ * Switch to new logfile segment. We cannot have any pending
+ * pages here (since we dump what we have at segment end).
*/
- XLogPageFlush(&pages, currentIndex);
+ Assert(npages == 0);
if (openLogFile >= 0)
{
if (close(openLogFile))
LWLockRelease(ControlFileLock);
}
+ /* Make sure we have the current logfile open */
if (openLogFile < 0)
{
XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
openLogOff = 0;
}
- /* Add a page to buffer */
- XLogPageWrite(&pages, currentIndex);
+ /* Add current page to the set of pending pages-to-dump */
+ if (npages == 0)
+ {
+ /* first of group */
+ startidx = curridx;
+ startoffset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
+ }
+ npages++;
/*
- * If we just wrote the whole last page of a logfile segment,
- * fsync the segment immediately. This avoids having to go back
- * and re-open prior segments when an fsync request comes along
- * later. Doing it here ensures that one and only one backend will
- * perform this fsync.
- *
- * This is also the right place to notify the Archiver that the
- * segment is ready to copy to archival storage.
+ * Dump the set if this will be the last loop iteration, or if
+ * we are at the last page of the cache area (since the next page
+ * won't be contiguous in memory), or if we are at the end of the
+ * logfile segment.
*/
- if (openLogOff + pages.size >= XLogSegSize && !ispartialpage)
+ finishing_seg = !ispartialpage &&
+ (startoffset + npages * BLCKSZ) >= XLogSegSize;
+
+ if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
+ curridx == XLogCtl->XLogCacheBlck ||
+ finishing_seg)
{
- XLogPageFlush(&pages, currentIndex);
- issue_xlog_fsync();
- LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */
+ char *from;
+ Size nbytes;
- if (XLogArchivingActive())
- XLogArchiveNotifySeg(openLogId, openLogSeg);
+ /* Need to seek in the file? */
+ if (openLogOff != startoffset)
+ {
+ if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not seek in log file %u, "
+ "segment %u to offset %u: %m",
+ openLogId, openLogSeg, startoffset)));
+ openLogOff = startoffset;
+ }
+
+ /* OK to write the page(s) */
+ from = XLogCtl->pages + startidx * (Size) BLCKSZ;
+ nbytes = npages * (Size) BLCKSZ;
+ errno = 0;
+ if (write(openLogFile, from, nbytes) != nbytes)
+ {
+ /* if write didn't set errno, assume no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to log file %u, segment %u "
+ "at offset %u length %lu: %m",
+ openLogId, openLogSeg,
+ openLogOff, (unsigned long) nbytes)));
+ }
+
+ /* Update state for write */
+ openLogOff += nbytes;
+ Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
+ npages = 0;
+
+ /*
+ * If we just wrote the whole last page of a logfile segment,
+ * fsync the segment immediately. This avoids having to go back
+ * and re-open prior segments when an fsync request comes along
+ * later. Doing it here ensures that one and only one backend will
+ * perform this fsync.
+ *
+ * This is also the right place to notify the Archiver that the
+ * segment is ready to copy to archival storage.
+ */
+ if (finishing_seg)
+ {
+ issue_xlog_fsync();
+ LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
+
+ if (XLogArchivingActive())
+ XLogArchiveNotifySeg(openLogId, openLogSeg);
+ }
}
if (ispartialpage)
LogwrtResult.Write = WriteRqst.Write;
break;
}
- currentIndex = NextBufIdx(currentIndex);
+ curridx = NextBufIdx(curridx);
+
+ /* If flexible, break out of loop as soon as we wrote something */
+ if (flexible && npages == 0)
+ break;
}
- XLogPageFlush(&pages, currentIndex);
+
+ Assert(npages == 0);
+ Assert(curridx == Write->curridx);
/*
* If asked to flush, do so
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = record;
}
- XLogWrite(WriteRqst);
+ XLogWrite(WriteRqst, false);
}
LWLockRelease(WALWriteLock);
}
errmsg("could not remove file \"%s\": %m",
BACKUP_LABEL_FILE)));
}
-
-
-/* XLog gather-write stuff */
-
-static void
-XLogPageReset(XLogPages *pages)
-{
- memset(pages, 0, sizeof(*pages));
-}
-
-static void
-XLogPageWrite(XLogPages *pages, int index)
-{
- char *page = XLogCtl->pages + index * (Size) BLCKSZ;
- Size size = BLCKSZ;
- uint32 offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
-
- if (pages->head + pages->size == page &&
- pages->offset + pages->size == offset)
- { /* Pages are continuous. Append new page. */
- pages->size += size;
- }
- else
- { /* Pages are not continuous. Flush and clear. */
- XLogPageFlush(pages, PrevBufIdx(index));
- pages->head = page;
- pages->size = size;
- pages->offset = offset;
- }
-}
-
-static void
-XLogPageFlush(XLogPages *pages, int index)
-{
- if (!pages->head)
- { /* Nothing to write */
- XLogCtl->Write.curridx = index;
- return;
- }
-
- /* Need to seek in the file? */
- if (openLogOff != pages->offset)
- {
- openLogOff = pages->offset;
- if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- openLogId, openLogSeg, openLogOff)));
- }
-
- /* OK to write the page */
- errno = 0;
- if (write(openLogFile, pages->head, pages->size) != pages->size)
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not write to log file %u, segment %u length %u at offset %u: %m",
- openLogId, openLogSeg,
- (unsigned int) pages->size, openLogOff)));
- }
-
- openLogOff += pages->size;
- XLogCtl->Write.curridx = index;
- XLogPageReset(pages);
-}