Rewrite gather-write patch into something less obviously bolted on
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 22 Aug 2005 23:59:04 +0000 (23:59 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 22 Aug 2005 23:59:04 +0000 (23:59 +0000)
after the fact.  Fix bug with incorrect test for whether we are at end
of logfile segment.  Arrange for writes triggered by XLogInsert's
is-cache-more-than-half-full test to synchronize with the cache boundaries,
so that in long transactions we tend to write alternating halves of the
cache rather than randomly chosen portions of it; this saves one more
write syscall per cache load.

src/backend/access/transam/xlog.c

index e16ac0cb9171fc8f6b1476edca4042f72efeeaac..14490a918e7983b14254947267453c4882216ca6 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.217 2005/08/22 00:41:28 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.218 2005/08/22 23:59:04 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
  * default method.     We assume that fsync() is always available, and that
  * configure determined whether fdatasync() is.
  */
-#ifdef O_SYNC
-#define CMP_OPEN_SYNC_FLAG             O_SYNC
+#if defined(O_SYNC)
+#define BARE_OPEN_SYNC_FLAG            O_SYNC
 #elif defined(O_FSYNC)
-#define CMP_OPEN_SYNC_FLAG             O_FSYNC
+#define BARE_OPEN_SYNC_FLAG            O_FSYNC
 #endif
-#ifdef CMP_OPEN_SYNC_FLAG
-#define OPEN_SYNC_FLAG                 (CMP_OPEN_SYNC_FLAG | PG_O_DIRECT)
+#ifdef BARE_OPEN_SYNC_FLAG
+#define OPEN_SYNC_FLAG                 (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
 #endif
 
-#ifdef O_DSYNC
-#ifdef OPEN_SYNC_FLAG
+#if defined(O_DSYNC)
+#if defined(OPEN_SYNC_FLAG)
 /* O_DSYNC is distinct? */
-#if O_DSYNC != CMP_OPEN_SYNC_FLAG
+#if O_DSYNC != BARE_OPEN_SYNC_FLAG
 #define OPEN_DATASYNC_FLAG             (O_DSYNC | PG_O_DIRECT)
 #endif
 #else /* !defined(OPEN_SYNC_FLAG) */
@@ -91,7 +91,7 @@
 #endif
 #endif
 
-#ifdef OPEN_DATASYNC_FLAG
+#if defined(OPEN_DATASYNC_FLAG)
 #define DEFAULT_SYNC_METHOD_STR        "open_datasync"
 #define DEFAULT_SYNC_METHOD            SYNC_METHOD_OPEN
 #define DEFAULT_SYNC_FLAGBIT   OPEN_DATASYNC_FLAG
@@ -469,7 +469,7 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
 static bool XLogCheckBuffer(XLogRecData *rdata,
                                                        XLogRecPtr *lsn, BkpBlock *bkpb);
 static bool AdvanceXLInsertBuffer(void);
-static void XLogWrite(XLogwrtRqst WriteRqst);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static int XLogFileInit(uint32 log, uint32 seg,
                         bool *use_existent, bool use_lock);
 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
@@ -497,18 +497,6 @@ static void ReadControlFile(void);
 static char *str_time(time_t tnow);
 static void issue_xlog_fsync(void);
 
-/* XLog gather-write stuff */
-typedef struct XLogPages
-{
-       char    *head;          /* Start of first page to write */
-       Size     size;          /* Total bytes to write == count(pages) * BLCKSZ */
-       uint32   offset;        /* Starting offset in xlog segment file */
-} XLogPages;
-
-static void XLogPageReset(XLogPages *pages);
-static void XLogPageWrite(XLogPages *pages, int index);
-static void XLogPageFlush(XLogPages *pages, int index);
-
 #ifdef WAL_DEBUG
 static void xlog_outrec(char *buf, XLogRecord *record);
 #endif
@@ -726,9 +714,17 @@ begin:;
        {
                if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
                {
+                       /*
+                        * Since the amount of data we write here is completely optional
+                        * anyway, tell XLogWrite it can be "flexible" and stop at a
+                        * convenient boundary.  This allows writes triggered by this
+                        * mechanism to synchronize with the cache boundaries, so that
+                        * in a long transaction we'll basically dump alternating halves
+                        * of the buffer array.
+                        */
                        LogwrtResult = XLogCtl->Write.LogwrtResult;
                        if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
-                               XLogWrite(LogwrtRqst);
+                               XLogWrite(LogwrtRqst, true);
                        LWLockRelease(WALWriteLock);
                }
        }
@@ -1219,7 +1215,7 @@ AdvanceXLInsertBuffer(void)
                                WriteRqst.Write = OldPageRqstPtr;
                                WriteRqst.Flush.xlogid = 0;
                                WriteRqst.Flush.xrecoff = 0;
-                               XLogWrite(WriteRqst);
+                               XLogWrite(WriteRqst, false);
                                LWLockRelease(WALWriteLock);
                                Insert->LogwrtResult = LogwrtResult;
                        }
@@ -1279,16 +1275,24 @@ AdvanceXLInsertBuffer(void)
 /*
  * Write and/or fsync the log at least as far as WriteRqst indicates.
  *
+ * If flexible == TRUE, we don't have to write as far as WriteRqst, but
+ * may stop at any convenient boundary (such as a cache or logfile boundary).
+ * This option allows us to avoid uselessly issuing multiple writes when a
+ * single one would do.
+ *
  * Must be called with WALWriteLock held.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
        XLogCtlWrite *Write = &XLogCtl->Write;
        bool            ispartialpage;
+       bool            finishing_seg;
        bool            use_existent;
-       int                     currentIndex = Write->curridx;
-       XLogPages       pages;
+       int                     curridx;
+       int                     npages;
+       int                     startidx;
+       uint32          startoffset;
 
        /* We should always be inside a critical section here */
        Assert(CritSectionCount > 0);
@@ -1299,7 +1303,27 @@ XLogWrite(XLogwrtRqst WriteRqst)
         */
        LogwrtResult = Write->LogwrtResult;
 
-       XLogPageReset(&pages);
+       /*
+        * Since successive pages in the xlog cache are consecutively allocated,
+        * we can usually gather multiple pages together and issue just one
+        * write() call.  npages is the number of pages we have determined can
+        * be written together; startidx is the cache block index of the first
+        * one, and startoffset is the file offset at which it should go.
+        * The latter two variables are only valid when npages > 0, but we must
+        * initialize all of them to keep the compiler quiet.
+        */
+       npages = 0;
+       startidx = 0;
+       startoffset = 0;
+
+       /*
+        * Within the loop, curridx is the cache block index of the page to
+        * consider writing.  We advance Write->curridx only after successfully
+        * writing pages.  (Right now, this refinement is useless since we are
+        * going to PANIC if any error occurs anyway; but someday it may come
+        * in useful.)
+        */
+       curridx = Write->curridx;
 
        while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
        {
@@ -1309,22 +1333,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
                 * end of the last page that's been initialized by
                 * AdvanceXLInsertBuffer.
                 */
-               if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[currentIndex]))
+               if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
                        elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
                                 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
-                                XLogCtl->xlblocks[currentIndex].xlogid,
-                                XLogCtl->xlblocks[currentIndex].xrecoff);
+                                XLogCtl->xlblocks[curridx].xlogid,
+                                XLogCtl->xlblocks[curridx].xrecoff);
 
                /* Advance LogwrtResult.Write to end of current buffer page */
-               LogwrtResult.Write = XLogCtl->xlblocks[currentIndex];
+               LogwrtResult.Write = XLogCtl->xlblocks[curridx];
                ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
 
                if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
                {
                        /*
-                        * Switch to new logfile segment.
+                        * Switch to new logfile segment.  We cannot have any pending
+                        * pages here (since we dump what we have at segment end).
                         */
-                       XLogPageFlush(&pages, currentIndex);
+                       Assert(npages == 0);
                        if (openLogFile >= 0)
                        {
                                if (close(openLogFile))
@@ -1391,6 +1416,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
                        LWLockRelease(ControlFileLock);
                }
 
+               /* Make sure we have the current logfile open */
                if (openLogFile < 0)
                {
                        XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
@@ -1398,27 +1424,83 @@ XLogWrite(XLogwrtRqst WriteRqst)
                        openLogOff = 0;
                }
 
-               /* Add a page to buffer */
-               XLogPageWrite(&pages, currentIndex);
+               /* Add current page to the set of pending pages-to-dump */
+               if (npages == 0)
+               {
+                       /* first of group */
+                       startidx = curridx;
+                       startoffset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
+               }
+               npages++;
 
                /*
-                * If we just wrote the whole last page of a logfile segment,
-                * fsync the segment immediately.  This avoids having to go back
-                * and re-open prior segments when an fsync request comes along
-                * later. Doing it here ensures that one and only one backend will
-                * perform this fsync.
-                *
-                * This is also the right place to notify the Archiver that the
-                * segment is ready to copy to archival storage.
+                * Dump the set if this will be the last loop iteration, or if
+                * we are at the last page of the cache area (since the next page
+                * won't be contiguous in memory), or if we are at the end of the
+                * logfile segment.
                 */
-               if (openLogOff + pages.size >= XLogSegSize && !ispartialpage)
+               finishing_seg = !ispartialpage &&
+                       (startoffset + npages * BLCKSZ) >= XLogSegSize;
+
+               if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
+                       curridx == XLogCtl->XLogCacheBlck ||
+                       finishing_seg)
                {
-                       XLogPageFlush(&pages, currentIndex);
-                       issue_xlog_fsync();
-                       LogwrtResult.Flush = LogwrtResult.Write;        /* end of current page */
+                       char       *from;
+                       Size            nbytes;
 
-                       if (XLogArchivingActive())
-                               XLogArchiveNotifySeg(openLogId, openLogSeg);
+                       /* Need to seek in the file? */
+                       if (openLogOff != startoffset)
+                       {
+                               if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
+                                       ereport(PANIC,
+                                                       (errcode_for_file_access(),
+                                                        errmsg("could not seek in log file %u, "
+                                                                       "segment %u to offset %u: %m",
+                                                                       openLogId, openLogSeg, startoffset)));
+                               openLogOff = startoffset;
+                       }
+
+                       /* OK to write the page(s) */
+                       from = XLogCtl->pages + startidx * (Size) BLCKSZ;
+                       nbytes = npages * (Size) BLCKSZ;
+                       errno = 0;
+                       if (write(openLogFile, from, nbytes) != nbytes)
+                       {
+                               /* if write didn't set errno, assume no disk space */
+                               if (errno == 0)
+                                       errno = ENOSPC;
+                               ereport(PANIC,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not write to log file %u, segment %u "
+                                                               "at offset %u length %lu: %m",
+                                                               openLogId, openLogSeg,
+                                                               openLogOff, (unsigned long) nbytes)));
+                       }
+
+                       /* Update state for write */
+                       openLogOff += nbytes;
+                       Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
+                       npages = 0;
+
+                       /*
+                        * If we just wrote the whole last page of a logfile segment,
+                        * fsync the segment immediately.  This avoids having to go back
+                        * and re-open prior segments when an fsync request comes along
+                        * later. Doing it here ensures that one and only one backend will
+                        * perform this fsync.
+                        *
+                        * This is also the right place to notify the Archiver that the
+                        * segment is ready to copy to archival storage.
+                        */
+                       if (finishing_seg)
+                       {
+                               issue_xlog_fsync();
+                               LogwrtResult.Flush = LogwrtResult.Write;        /* end of page */
+
+                               if (XLogArchivingActive())
+                                       XLogArchiveNotifySeg(openLogId, openLogSeg);
+                       }
                }
 
                if (ispartialpage)
@@ -1427,9 +1509,15 @@ XLogWrite(XLogwrtRqst WriteRqst)
                        LogwrtResult.Write = WriteRqst.Write;
                        break;
                }
-               currentIndex = NextBufIdx(currentIndex);
+               curridx = NextBufIdx(curridx);
+
+               /* If flexible, break out of loop as soon as we wrote something */
+               if (flexible && npages == 0)
+                       break;
        }
-       XLogPageFlush(&pages, currentIndex);
+
+       Assert(npages == 0);
+       Assert(curridx == Write->curridx);
 
        /*
         * If asked to flush, do so
@@ -1572,7 +1660,7 @@ XLogFlush(XLogRecPtr record)
                                WriteRqst.Write = WriteRqstPtr;
                                WriteRqst.Flush = record;
                        }
-                       XLogWrite(WriteRqst);
+                       XLogWrite(WriteRqst, false);
                }
                LWLockRelease(WALWriteLock);
        }
@@ -5898,72 +5986,3 @@ remove_backup_label(void)
                                         errmsg("could not remove file \"%s\": %m",
                                                        BACKUP_LABEL_FILE)));
 }
-
-
-/* XLog gather-write stuff */
-
-static void
-XLogPageReset(XLogPages *pages)
-{
-       memset(pages, 0, sizeof(*pages));
-}
-
-static void
-XLogPageWrite(XLogPages *pages, int index)
-{
-       char *page = XLogCtl->pages + index * (Size) BLCKSZ;
-       Size size = BLCKSZ;
-       uint32 offset = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
-
-       if (pages->head + pages->size == page &&
-               pages->offset + pages->size == offset)
-       {       /* Pages are continuous. Append new page. */
-               pages->size += size;
-       }
-       else
-       {       /* Pages are not continuous. Flush and clear. */
-               XLogPageFlush(pages, PrevBufIdx(index));
-               pages->head = page;
-               pages->size = size;
-               pages->offset = offset;
-       }
-}
-
-static void
-XLogPageFlush(XLogPages *pages, int index)
-{
-       if (!pages->head)
-       {       /* Nothing to write */
-               XLogCtl->Write.curridx = index;
-               return;
-       }
-
-       /* Need to seek in the file? */
-       if (openLogOff != pages->offset)
-       {
-               openLogOff = pages->offset;
-               if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
-                       ereport(PANIC,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not seek in log file %u, segment %u to offset %u: %m",
-                                                       openLogId, openLogSeg, openLogOff)));
-       }
-
-       /* OK to write the page */
-       errno = 0;
-       if (write(openLogFile, pages->head, pages->size) != pages->size)
-       {
-               /* if write didn't set errno, assume problem is no disk space */
-               if (errno == 0)
-                       errno = ENOSPC;
-               ereport(PANIC,
-                               (errcode_for_file_access(),
-                                errmsg("could not write to log file %u, segment %u length %u at offset %u: %m",
-                                               openLogId, openLogSeg,
-                                               (unsigned int) pages->size, openLogOff)));
-       }
-
-       openLogOff += pages->size;
-       XLogCtl->Write.curridx = index;
-       XLogPageReset(pages);
-}