Improve scalability of WAL insertions.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 8 Jul 2013 08:23:56 +0000 (11:23 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 8 Jul 2013 08:23:56 +0000 (11:23 +0300)
This patch replaces WALInsertLock with a number of WAL insertion slots,
allowing multiple backends to insert WAL records to the WAL buffers
concurrently. This is particularly useful for parallel loading large amounts
of data on a system with many CPUs.

This has one user-visible change: switching to a new WAL segment with
pg_switch_xlog() now fills the remaining unused portion of the segment with
zeros. This potentially adds some overhead, but it has been a very common
practice by DBA's to clear the "tail" of the segment with an external
pg_clearxlogtail utility anyway, to make the WAL files compress better.
With this patch, it's no longer necessary to do that.

This patch adds a new GUC, xloginsert_slots, to tune the number of WAL
insertion slots. Performance testing suggests that the default, 8, works
pretty well for all kinds of worklods, but I left the GUC in place to allow
others with different hardware to test that easily. We might want to remove
that before release.

Reviewed by Andres Freund.

src/backend/access/transam/xlog.c
src/backend/storage/lmgr/spin.c
src/backend/utils/misc/guc.c
src/include/access/xlog.h
src/include/access/xlogdefs.h
src/include/storage/lwlock.h

index 77e5c3b5d85ad7907e9b5403bbe93141a297c468..acf0dd187619b03a2fc08387fd61301b05d6dd3c 100644 (file)
@@ -41,6 +41,7 @@
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
+#include "storage/barrier.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -83,6 +84,7 @@ int           sync_method = DEFAULT_SYNC_METHOD;
 int            wal_level = WAL_LEVEL_MINIMAL;
 int            CommitDelay = 0;    /* precommit delay in microseconds */
 int            CommitSiblings = 5; /* # concurrent xacts needed to sleep */
+int            num_xloginsert_slots = 8;
 
 #ifdef WAL_DEBUG
 bool       XLOG_DEBUG = false;
@@ -279,8 +281,8 @@ XLogRecPtr  XactLastRecEnd = InvalidXLogRecPtr;
  * (which is almost but not quite the same as a pointer to the most recent
  * CHECKPOINT record). We update this from the shared-memory copy,
  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert lock).  See XLogInsert for details. We are also allowed
- * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
+ * hold an insertion slot).  See XLogInsert for details.  We are also allowed
+ * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
  * InitXLOGAccess.
  */
@@ -321,7 +323,10 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * so it's a plain spinlock.  The other locks are held longer (potentially
  * over I/O operations), so we use LWLocks for them.  These locks are:
  *
- * WALInsertLock: must be held to insert a record into the WAL buffers.
+ * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
+ * It is only held while initializing and changing the mapping.  If the
+ * contents of the buffer being replaced haven't been written yet, the mapping
+ * lock is released while the write is done, and reacquired afterwards.
  *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
@@ -348,17 +353,83 @@ typedef struct XLogwrtResult
    XLogRecPtr  Flush;          /* last byte + 1 flushed */
 } XLogwrtResult;
 
+
+/*
+ * A slot for inserting to the WAL. This is similar to an LWLock, the main
+ * difference is that there is an extra xlogInsertingAt field that is protected
+ * by the same mutex. Unlike an LWLock, a slot can only be acquired in
+ * exclusive mode.
+ *
+ * The xlogInsertingAt field is used to advertise to other processes how far
+ * the slot owner has progressed in inserting the record. When a backend
+ * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
+ * yet know where it's going to insert the record. That's conservative
+ * but correct; the new insertion is certainly going to go to a byte position
+ * greater than 1. If another backend needs to flush the WAL, it will have to
+ * wait for the new insertion. xlogInsertingAt is updated after finishing the
+ * insert or when crossing a page boundary, which will wake up anyone waiting
+ * for it, whether the wait was necessary in the first place or not.
+ *
+ * A process can wait on a slot in two modes: LW_EXCLUSIVE or
+ * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
+ * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
+ * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
+ * released, or xlogInsertingAt is updated. In other words, a process in
+ * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
+ * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
+ * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
+ *
+ * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
+ * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
+ * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
+ * see lwlock.c for details.
+ */
+typedef struct
+{
+   slock_t     mutex;          /* protects the below fields */
+   XLogRecPtr  xlogInsertingAt; /* insert has completed up to this point */
+
+   PGPROC     *owner;          /* for debugging purposes */
+
+   bool        releaseOK;      /* T if ok to release waiters */
+   char        exclusive;      /* # of exclusive holders (0 or 1) */
+   PGPROC     *head;           /* head of list of waiting PGPROCs */
+   PGPROC     *tail;           /* tail of list of waiting PGPROCs */
+   /* tail is undefined when head is NULL */
+} XLogInsertSlot;
+
+/*
+ * All the slots are allocated as an array in shared memory. We force the
+ * array stride to be a power of 2, which saves a few cycles in indexing, but
+ * more importantly also ensures that individual slots don't cross cache line
+ * boundaries. (Of course, we have to also ensure that the array start
+ * address is suitably aligned.)
+ */
+typedef union XLogInsertSlotPadded
+{
+   XLogInsertSlot slot;
+   char        pad[64];
+} XLogInsertSlotPadded;
+
 /*
  * Shared state data for XLogInsert.
  */
 typedef struct XLogCtlInsert
 {
-   XLogRecPtr  PrevRecord;     /* start of previously-inserted record */
-   int         curridx;        /* current block index in cache */
-   XLogPageHeader currpage;    /* points to header of block in cache */
-   char       *currpos;        /* current insertion point in cache */
-   XLogRecPtr  RedoRecPtr;     /* current redo point for insertions */
-   bool        forcePageWrites;    /* forcing full-page writes for PITR? */
+   slock_t     insertpos_lck;  /* protects CurrBytePos and PrevBytePos */
+
+   /*
+    * CurrBytePos is the end of reserved WAL. The next record will be inserted
+    * at that position. PrevBytePos is the start position of the previously
+    * inserted (or rather, reserved) record - it is copied to the the prev-
+    * link of the next record. These are stored as "usable byte positions"
+    * rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+    */
+   uint64      CurrBytePos;
+   uint64      PrevBytePos;
+
+   /* insertion slots, see above for details */
+   XLogInsertSlotPadded *insertSlots;
 
    /*
     * fullPageWrites is the master copy used by all backends to determine
@@ -366,7 +437,12 @@ typedef struct XLogCtlInsert
     * This is required because, when full_page_writes is changed by SIGHUP,
     * we must WAL-log it before it actually affects WAL-logging by backends.
     * Checkpointer sets at startup or after SIGHUP.
+    *
+    * To read these fields, you must hold an insertion slot. To modify them,
+    * you must hold ALL the slots.
     */
+   XLogRecPtr  RedoRecPtr;     /* current redo point for insertions */
+   bool        forcePageWrites;    /* forcing full-page writes for PITR? */
    bool        fullPageWrites;
 
    /*
@@ -395,11 +471,11 @@ typedef struct XLogCtlWrite
  */
 typedef struct XLogCtlData
 {
-   /* Protected by WALInsertLock: */
    XLogCtlInsert Insert;
 
    /* Protected by info_lck: */
    XLogwrtRqst LogwrtRqst;
+   XLogRecPtr  RedoRecPtr;     /* a recent copy of Insert->RedoRecPtr */
    uint32      ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
    TransactionId ckptXid;
    XLogRecPtr  asyncXactLSN;   /* LSN of newest async commit/abort */
@@ -419,10 +495,21 @@ typedef struct XLogCtlData
     */
    XLogwrtResult LogwrtResult;
 
+   /*
+    * Latest initialized block index in cache.
+    *
+    * To change curridx and the identity of a buffer, you need to hold
+    * WALBufMappingLock.  To change the identity of a buffer that's still
+    * dirty, the old page needs to be written out first, and for that you
+    * need WALWriteLock, and you need to ensure that there are no in-progress
+    * insertions to the page by calling WaitXLogInsertionsToFinish().
+    */
+   int         curridx;
+
    /*
     * These values do not change after startup, although the pointed-to pages
-    * and xlblocks values certainly do.  Permission to read/write the pages
-    * and xlblocks values depends on WALInsertLock and WALWriteLock.
+    * and xlblocks values certainly do.  xlblock values are protected by
+    * WALBufMappingLock.
     */
    char       *pages;          /* buffers for unwritten XLOG pages */
    XLogRecPtr *xlblocks;       /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -518,24 +605,34 @@ static XLogCtlData *XLogCtl = NULL;
 static ControlFileData *ControlFile = NULL;
 
 /*
- * Macros for managing XLogInsert state.  In most cases, the calling routine
- * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
- * so these are passed as parameters instead of being fetched via XLogCtl.
+ * Calculate the amount of space left on the page after 'endptr'. Beware
+ * multiple evaluation!
  */
+#define INSERT_FREESPACE(endptr)   \
+   (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
 
-/* Free space remaining in the current xlog page buffer */
-#define INSERT_FREESPACE(Insert)  \
-   (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
+/* Macro to advance to next buffer index. */
+#define NextBufIdx(idx)        \
+       (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
 
-/* Construct XLogRecPtr value for current insertion point */
-#define INSERT_RECPTR(recptr,Insert,curridx)  \
-       (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert)
+/*
+ * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
+ * would hold if it was in cache, the page containing 'recptr'.
+ *
+ * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
+ * page is taken to mean the previous page.
+ */
+#define XLogRecPtrToBufIdx(recptr) \
+   (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
 
-#define PrevBufIdx(idx)        \
-       (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
+#define XLogRecEndPtrToBufIdx(recptr)  \
+   ((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
 
-#define NextBufIdx(idx)        \
-       (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
+/*
+ * These are the number of bytes in a WAL page and segment usable for WAL data.
+ */
+#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
+#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD))
 
 /*
  * Private, possibly out-of-date copy of shared LogwrtResult.
@@ -631,6 +728,9 @@ static bool InRedo = false;
 /* Have we launched bgwriter during recovery? */
 static bool bgwriterLaunched = false;
 
+/* For WALInsertSlotAcquire/Release functions */
+static int MySlotNo = 0;
+static bool holdingAllSlots = false;
 
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
@@ -651,9 +751,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
                XLogRecPtr *lsn, BkpBlock *bkpb);
 static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb,
                         char *blk, bool get_cleanup_lock, bool keep_buffer);
-static bool AdvanceXLInsertBuffer(bool new_segment);
+static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
                       bool find_free, int *max_advance,
                       bool use_lock);
@@ -693,6 +793,24 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc,
 static void rm_redo_error_callback(void *arg);
 static int get_sync_bit(int method);
 
+static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+                 XLogRecData *rdata,
+                 XLogRecPtr StartPos, XLogRecPtr EndPos);
+static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
+                         XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                 XLogRecPtr *PrevPtr);
+static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static void WakeupWaiters(XLogRecPtr EndPos);
+static char *GetXLogBuffer(XLogRecPtr ptr);
+static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
+static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
+static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+
+static void WALInsertSlotAcquire(bool exclusive);
+static void WALInsertSlotAcquireOne(int slotno);
+static void WALInsertSlotRelease(void);
+static void WALInsertSlotReleaseOne(int slotno);
 
 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -713,10 +831,6 @@ XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 {
    XLogCtlInsert *Insert = &XLogCtl->Insert;
-   XLogRecPtr  RecPtr;
-   XLogRecPtr  WriteRqst;
-   uint32      freespace;
-   int         curridx;
    XLogRecData *rdt;
    XLogRecData *rdt_lastnormal;
    Buffer      dtbuf[XLR_MAX_BKP_BLOCKS];
@@ -731,11 +845,13 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
    uint32      len,
                write_len;
    unsigned    i;
-   bool        updrqst;
    bool        doPageWrites;
    bool        isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+   bool        inserted;
    uint8       info_orig = info;
    static XLogRecord *rechdr;
+   XLogRecPtr  StartPos;
+   XLogRecPtr  EndPos;
 
    if (rechdr == NULL)
    {
@@ -761,8 +877,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
     */
    if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
    {
-       RecPtr = SizeOfXLogLongPHD;     /* start of 1st chkpt record */
-       return RecPtr;
+       EndPos = SizeOfXLogLongPHD;     /* start of 1st chkpt record */
+       return EndPos;
    }
 
    /*
@@ -770,9 +886,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
     * up.
     *
     * We may have to loop back to here if a race condition is detected below.
-    * We could prevent the race by doing all this work while holding the
-    * insert lock, but it seems better to avoid doing CRC calculations while
-    * holding the lock.
+    * We could prevent the race by doing all this work while holding an
+    * insertion slot, but it seems better to avoid doing CRC calculations
+    * while holding one.
     *
     * We add entries for backup blocks to the chain, so that they don't need
     * any special treatment in the critical section where the chunks are
@@ -789,8 +905,8 @@ begin:;
    /*
     * Decide if we need to do full-page writes in this XLOG record: true if
     * full_page_writes is on or we have a PITR request for it.  Since we
-    * don't yet have the insert lock, fullPageWrites and forcePageWrites
-    * could change under us, but we'll recheck them once we have the lock.
+    * don't yet have an insertion slot, fullPageWrites and forcePageWrites
+    * could change under us, but we'll recheck them once we have a slot.
     */
    doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
 
@@ -930,25 +1046,60 @@ begin:;
        COMP_CRC32(rdata_crc, rdt->data, rdt->len);
 
    /*
-    * Construct record header (prev-link and CRC are filled in later), and
-    * make that the first chunk in the chain.
+    * Construct record header (prev-link is filled in later, after reserving
+    * the space for the record), and make that the first chunk in the chain.
+    *
+    * The CRC calculated for the header here doesn't include prev-link,
+    * because we don't know it yet. It will be added later.
     */
    rechdr->xl_xid = GetCurrentTransactionIdIfAny();
    rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
    rechdr->xl_len = len;       /* doesn't include backup blocks */
    rechdr->xl_info = info;
    rechdr->xl_rmid = rmid;
+   rechdr->xl_prev = InvalidXLogRecPtr;
+   COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
 
    hdr_rdt.next = rdata;
    hdr_rdt.data = (char *) rechdr;
    hdr_rdt.len = SizeOfXLogRecord;
-
    write_len += SizeOfXLogRecord;
 
+   /*----------
+    *
+    * We have now done all the preparatory work we can without holding a
+    * lock or modifying shared state. From here on, inserting the new WAL
+    * record to the shared WAL buffer cache is a two-step process:
+    *
+    * 1. Reserve the right amount of space from the WAL. The current head of
+    *    reserved space is kept in Insert->CurrBytePos, and is protected by
+    *    insertpos_lck.
+    *
+    * 2. Copy the record to the reserved WAL space. This involves finding the
+    *    correct WAL buffer containing the reserved space, and copying the
+    *    record in place. This can be done concurrently in multiple processes.
+    *
+    * To keep track of which insertions are still in-progress, each concurrent
+    * inserter allocates an "insertion slot", which tells others how far the
+    * inserter has progressed. There is a small fixed number of insertion
+    * slots, determined by the num_xloginsert_slots GUC. When an inserter
+    * finishes, it updates the xlogInsertingAt of its slot to the end of the
+    * record it inserted, to let others know that it's done. xlogInsertingAt
+    * is also updated when crossing over to a new WAL buffer, to allow the
+    * the previous buffer to be flushed.
+    *
+    * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
+    * changing until the insertion is finished.
+    *
+    * Step 2 can usually be done completely in parallel. If the required WAL
+    * page is not initialized yet, you have to grab WALBufMappingLock to
+    * initialize it, but the WAL writer tries to do that ahead of insertions
+    * to avoid that from happening in the critical path.
+    *
+    *----------
+    */
    START_CRIT_SECTION();
-
-   /* Now wait to get insert lock */
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+   WALInsertSlotAcquire(isLogSwitch);
 
    /*
     * Check to see if my RedoRecPtr is out of date.  If so, may have to go
@@ -977,7 +1128,7 @@ begin:;
                     * Oops, this buffer now needs to be backed up, but we
                     * didn't think so above.  Start over.
                     */
-                   LWLockRelease(WALInsertLock);
+                   WALInsertSlotRelease();
                    END_CRIT_SECTION();
                    rdt_lastnormal->next = NULL;
                    info = info_orig;
@@ -996,7 +1147,7 @@ begin:;
    if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
    {
        /* Oops, must redo it with full-page data. */
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
        END_CRIT_SECTION();
        rdt_lastnormal->next = NULL;
        info = info_orig;
@@ -1004,238 +1155,1169 @@ begin:;
    }
 
    /*
-    * If the current page is completely full, the record goes to the next
-    * page, right after the page header.
+    * Reserve space for the record in the WAL. This also sets the xl_prev
+    * pointer.
+    */
+   if (isLogSwitch)
+       inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
+   else
+   {
+       ReserveXLogInsertLocation(write_len, &StartPos, &EndPos,
+                                 &rechdr->xl_prev);
+       inserted = true;
+   }
+
+   if (inserted)
+   {
+       /*
+        * Now that xl_prev has been filled in, finish CRC calculation of the
+        * record header.
+        */
+       COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+       FIN_CRC32(rdata_crc);
+       rechdr->xl_crc = rdata_crc;
+
+       /*
+        * All the record data, including the header, is now ready to be
+        * inserted. Copy the record in the space reserved.
+        */
+       CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos);
+   }
+   else
+   {
+       /*
+        * This was an xlog-switch record, but the current insert location was
+        * already exactly at the beginning of a segment, so there was no need
+        * to do anything.
+        */
+   }
+
+   /*
+    * Done! Let others know that we're finished.
+    */
+   WALInsertSlotRelease();
+
+   END_CRIT_SECTION();
+
+   /*
+    * Update shared LogwrtRqst.Write, if we crossed page boundary.
+    */
+   if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+   {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       /* advance global request to include new block(s) */
+       if (xlogctl->LogwrtRqst.Write < EndPos)
+           xlogctl->LogwrtRqst.Write = EndPos;
+       /* update local result copy while I have the chance */
+       LogwrtResult = xlogctl->LogwrtResult;
+       SpinLockRelease(&xlogctl->info_lck);
+   }
+
+   /*
+    * If this was an XLOG_SWITCH record, flush the record and the empty
+    * padding space that fills the rest of the segment, and perform
+    * end-of-segment actions (eg, notifying archiver).
+    */
+   if (isLogSwitch)
+   {
+       TRACE_POSTGRESQL_XLOG_SWITCH();
+       XLogFlush(EndPos);
+       /*
+        * Even though we reserved the rest of the segment for us, which is
+        * reflected in EndPos, we return a pointer to just the end of the
+        * xlog-switch record.
+        */
+       if (inserted)
+       {
+           EndPos = StartPos + SizeOfXLogRecord;
+           if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+           {
+               if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ)
+                   EndPos += SizeOfXLogLongPHD;
+               else
+                   EndPos += SizeOfXLogShortPHD;
+           }
+       }
+   }
+
+#ifdef WAL_DEBUG
+   if (XLOG_DEBUG)
+   {
+       StringInfoData buf;
+
+       initStringInfo(&buf);
+       appendStringInfo(&buf, "INSERT @ %X/%X: ",
+                        (uint32) (EndPos >> 32), (uint32) EndPos);
+       xlog_outrec(&buf, rechdr);
+       if (rdata->data != NULL)
+       {
+           appendStringInfo(&buf, " - ");
+           RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
+       }
+       elog(LOG, "%s", buf.data);
+       pfree(buf.data);
+   }
+#endif
+
+   /*
+    * Update our global variables
+    */
+   ProcLastRecPtr = StartPos;
+   XactLastRecEnd = EndPos;
+
+   return EndPos;
+}
+
+/*
+ * Reserves the right amount of space for a record of given size from the WAL.
+ * *StartPos is set to the beginning of the reserved section, *EndPos to
+ * its end+1. *PrevPtr is set to the beginning of the previous record; it is
+ * used to set the xl_prev of this record.
+ *
+ * This is the performance critical part of XLogInsert that must be serialized
+ * across backends. The rest can happen mostly in parallel. Try to keep this
+ * section as short as possible, insertpos_lck can be heavily contended on a
+ * busy system.
+ *
+ * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
+ * where we actually copy the record to the reserved space.
+ */
+static void
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
+                         XLogRecPtr *PrevPtr)
+{
+   volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+   uint64      startbytepos;
+   uint64      endbytepos;
+   uint64      prevbytepos;
+
+   size = MAXALIGN(size);
+
+   /* All (non xlog-switch) records should contain data. */
+   Assert(size > SizeOfXLogRecord);
+
+   /*
+    * The duration the spinlock needs to be held is minimized by minimizing
+    * the calculations that have to be done while holding the lock. The
+    * current tip of reserved WAL is kept in CurrBytePos, as a byte position
+    * that only counts "usable" bytes in WAL, that is, it excludes all WAL
+    * page headers. The mapping between "usable" byte positions and physical
+    * positions (XLogRecPtrs) can be done outside the locked region, and
+    * because the usable byte position doesn't include any headers, reserving
+    * X bytes from WAL is almost as simple as "CurrBytePos += X".
+    */
+   SpinLockAcquire(&Insert->insertpos_lck);
+
+   startbytepos = Insert->CurrBytePos;
+   endbytepos = startbytepos + size;
+   prevbytepos = Insert->PrevBytePos;
+   Insert->CurrBytePos = endbytepos;
+   Insert->PrevBytePos = startbytepos;
+
+   SpinLockRelease(&Insert->insertpos_lck);
+
+   *StartPos = XLogBytePosToRecPtr(startbytepos);
+   *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+   *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+   /*
+    * Check that the conversions between "usable byte positions" and
+    * XLogRecPtrs work consistently in both directions.
+    */
+   Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+   Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+   Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+}
+
+/*
+ * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
+ *
+ * A log-switch record is handled slightly differently. The rest of the
+ * segment will be reserved for this insertion, as indicated by the returned
+ * *EndPos_p value. However, if we are already at the beginning of the current
+ * segment, *StartPos_p and *EndPos_p are set to the current location without
+ * reserving any space, and the function returns false.
+*/
+static bool
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
+{
+   volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+   uint64      startbytepos;
+   uint64      endbytepos;
+   uint64      prevbytepos;
+   uint32      size = SizeOfXLogRecord;
+   XLogRecPtr  ptr;
+   uint32      segleft;
+
+   /*
+    * These calculations are a bit heavy-weight to be done while holding a
+    * spinlock, but since we're holding all the WAL insertion slots, there
+    * are no other inserters competing for it. GetXLogInsertRecPtr() does
+    * compete for it, but that's not called very frequently.
+    */
+   SpinLockAcquire(&Insert->insertpos_lck);
+
+   startbytepos = Insert->CurrBytePos;
+
+   ptr = XLogBytePosToEndRecPtr(startbytepos);
+   if (ptr % XLOG_SEG_SIZE == 0)
+   {
+       SpinLockRelease(&Insert->insertpos_lck);
+       *EndPos = *StartPos = ptr;
+       return false;
+   }
+
+   endbytepos = startbytepos + size;
+   prevbytepos = Insert->PrevBytePos;
+
+   *StartPos = XLogBytePosToRecPtr(startbytepos);
+   *EndPos = XLogBytePosToEndRecPtr(endbytepos);
+
+   segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE);
+   if (segleft != XLOG_SEG_SIZE)
+   {
+       /* consume the rest of the segment */
+       *EndPos += segleft;
+       endbytepos = XLogRecPtrToBytePos(*EndPos);
+   }
+   Insert->CurrBytePos = endbytepos;
+   Insert->PrevBytePos = startbytepos;
+
+   SpinLockRelease(&Insert->insertpos_lck);
+
+   *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
+
+   Assert((*EndPos) % XLOG_SEG_SIZE == 0);
+   Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
+   Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
+   Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
+
+   return true;
+}
+
+/*
+ * Subroutine of XLogInsert.  Copies a WAL record to an already-reserved
+ * area in the WAL.
+ */
+static void
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+                   XLogRecPtr StartPos, XLogRecPtr EndPos)
+{
+   char       *currpos;
+   int         freespace;
+   int         written;
+   XLogRecPtr  CurrPos;
+   XLogPageHeader pagehdr;
+
+   /* The first chunk is the record header */
+   Assert(rdata->len == SizeOfXLogRecord);
+
+   /*
+    * Get a pointer to the right place in the right WAL buffer to start
+    * inserting to.
+    */
+   CurrPos = StartPos;
+   currpos = GetXLogBuffer(CurrPos);
+   freespace = INSERT_FREESPACE(CurrPos);
+
+   /*
+    * there should be enough space for at least the first field (xl_tot_len)
+    * on this page.
+    */
+   Assert(freespace >= sizeof(uint32));
+
+   /* Copy record data */
+   written = 0;
+   while (rdata != NULL)
+   {
+       char       *rdata_data = rdata->data;
+       int         rdata_len = rdata->len;
+
+       while (rdata_len > freespace)
+       {
+           /*
+            * Write what fits on this page, and continue on the next page.
+            */
+           Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
+           memcpy(currpos, rdata_data, freespace);
+           rdata_data += freespace;
+           rdata_len -= freespace;
+           written += freespace;
+           CurrPos += freespace;
+
+           /*
+            * Get pointer to beginning of next page, and set the xlp_rem_len
+            * in the page header. Set XLP_FIRST_IS_CONTRECORD.
+            *
+            * It's safe to set the contrecord flag and xlp_rem_len without a
+            * lock on the page. All the other flags were already set when the
+            * page was initialized, in AdvanceXLInsertBuffer, and we're the
+            * only backend that needs to set the contrecord flag.
+            */
+           currpos = GetXLogBuffer(CurrPos);
+           pagehdr = (XLogPageHeader) currpos;
+           pagehdr->xlp_rem_len = write_len - written;
+           pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+
+           /* skip over the page header */
+           if (CurrPos % XLogSegSize == 0)
+           {
+               CurrPos += SizeOfXLogLongPHD;
+               currpos += SizeOfXLogLongPHD;
+           }
+           else
+           {
+               CurrPos += SizeOfXLogShortPHD;
+               currpos += SizeOfXLogShortPHD;
+           }
+           freespace = INSERT_FREESPACE(CurrPos);
+       }
+
+       Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
+       memcpy(currpos, rdata_data, rdata_len);
+       currpos += rdata_len;
+       CurrPos += rdata_len;
+       freespace -= rdata_len;
+       written += rdata_len;
+
+       rdata = rdata->next;
+   }
+   Assert(written == write_len);
+
+   /* Align the end position, so that the next record starts aligned */
+   CurrPos = MAXALIGN(CurrPos);
+
+   /*
+    * If this was an xlog-switch, it's not enough to write the switch record,
+    * we also have to consume all the remaining space in the WAL segment.
+    * We have already reserved it for us, but we still need to make sure it's
+    * allocated and zeroed in the WAL buffers so that when the caller (or
+    * someone else) does XLogWrite(), it can really write out all the zeros.
+    */
+   if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0)
+   {
+       /* An xlog-switch record doesn't contain any data besides the header */
+       Assert(write_len == SizeOfXLogRecord);
+
+       /*
+        * We do this one page at a time, to make sure we don't deadlock
+        * against ourselves if wal_buffers < XLOG_SEG_SIZE.
+        */
+       Assert(EndPos % XLogSegSize == 0);
+
+       /* Use up all the remaining space on the first page */
+       CurrPos += freespace;
+
+       while (CurrPos < EndPos)
+       {
+           /* initialize the next page (if not initialized already) */
+           WakeupWaiters(CurrPos);
+           AdvanceXLInsertBuffer(CurrPos, false);
+           CurrPos += XLOG_BLCKSZ;
+       }
+   }
+
+   if (CurrPos != EndPos)
+       elog(PANIC, "space reserved for WAL record does not match what was written");
+}
+
+/*
+ * Allocate a slot for insertion.
+ *
+ * In exclusive mode, all slots are reserved for the current process. That
+ * blocks all concurrent insertions.
+ */
+static void
+WALInsertSlotAcquire(bool exclusive)
+{
+   int         i;
+
+   if (exclusive)
+   {
+       for (i = 0; i < num_xloginsert_slots; i++)
+           WALInsertSlotAcquireOne(i);
+       holdingAllSlots = true;
+   }
+   else
+       WALInsertSlotAcquireOne(-1);
+}
+
+/*
+ * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
+ * one if slotno == -1. The index of the slot that was acquired is stored in
+ * MySlotNo.
+ *
+ * This is more or less equivalent to LWLockAcquire().
+ */
+static void
+WALInsertSlotAcquireOne(int slotno)
+{
+   volatile XLogInsertSlot *slot;
+   PGPROC     *proc = MyProc;
+   bool        retry = false;
+   int         extraWaits = 0;
+   static int  slotToTry = -1;
+
+   /*
+    * Try to use the slot we used last time. If the system isn't particularly
+    * busy, it's a good bet that it's available, and it's good to have some
+    * affinity to a particular slot so that you don't unnecessarily bounce
+    * cache lines between processes when there is no contention.
+    *
+    * If this is the first time through in this backend, pick a slot
+    * (semi-)randomly. This allows the slots to be used evenly if you have a
+    * lot of very short connections.
+    */
+   if (slotno != -1)
+       MySlotNo = slotno;
+   else
+   {
+       if (slotToTry == -1)
+           slotToTry = MyProc->pgprocno % num_xloginsert_slots;
+       MySlotNo = slotToTry;
+   }
+
+   /*
+    * We can't wait if we haven't got a PGPROC.  This should only occur
+    * during bootstrap or shared memory initialization.  Put an Assert here
+    * to catch unsafe coding practices.
+    */
+   Assert(MyProc != NULL);
+
+   /*
+    * Lock out cancel/die interrupts until we exit the code section protected
+    * by the slot.  This ensures that interrupts will not interfere with
+    * manipulations of data structures in shared memory.
+    */
+   START_CRIT_SECTION();
+
+   /*
+    * Loop here to try to acquire slot after each time we are signaled by
+    * WALInsertSlotRelease.
+    */
+   for (;;)
+   {
+       bool        mustwait;
+
+       slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&slot->mutex);
+
+       /* If retrying, allow WALInsertSlotRelease to release waiters again */
+       if (retry)
+           slot->releaseOK = true;
+
+       /* If I can get the slot, do so quickly. */
+       if (slot->exclusive == 0)
+       {
+           slot->exclusive++;
+           mustwait = false;
+       }
+       else
+           mustwait = true;
+
+       if (!mustwait)
+           break;              /* got the lock */
+
+       Assert(slot->owner != MyProc);
+
+       /*
+        * Add myself to wait queue.
+        */
+       proc->lwWaiting = true;
+       proc->lwWaitMode = LW_EXCLUSIVE;
+       proc->lwWaitLink = NULL;
+       if (slot->head == NULL)
+           slot->head = proc;
+       else
+           slot->tail->lwWaitLink = proc;
+       slot->tail = proc;
+
+       /* Can release the mutex now */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Wait until awakened.
+        *
+        * Since we share the process wait semaphore with the regular lock
+        * manager and ProcWaitForSignal, and we may need to acquire a slot
+        * while one of those is pending, it is possible that we get awakened
+        * for a reason other than being signaled by WALInsertSlotRelease. If
+        * so, loop back and wait again.  Once we've gotten the slot,
+        * re-increment the sema by the number of additional signals received,
+        * so that the lock manager or signal manager will see the received
+        * signal when it next waits.
+        */
+       for (;;)
+       {
+           /* "false" means cannot accept cancel/die interrupt here. */
+           PGSemaphoreLock(&proc->sem, false);
+           if (!proc->lwWaiting)
+               break;
+           extraWaits++;
+       }
+
+       /* Now loop back and try to acquire lock again. */
+       retry = true;
+   }
+
+   slot->owner = proc;
+
+   /*
+    * Normally, we initialize the xlogInsertingAt value of the slot to 1,
+    * because we don't yet know where in the WAL we're going to insert. It's
+    * not critical what it points to right now - leaving it to a too small
+    * value just means that WaitXlogInsertionsToFinish() might wait on us
+    * unnecessarily, until we update the value (when we finish the insert or
+    * move to next page).
+    *
+    * If we're grabbing all the slots, however, stamp all but the last one
+    * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
+    * slot is the one that we will update as we proceed with the insert, the
+    * rest are held just to keep off other inserters.
+    */
+   if (slotno != -1 && slotno != num_xloginsert_slots - 1)
+       slot->xlogInsertingAt = InvalidXLogRecPtr;
+   else
+       slot->xlogInsertingAt = 1;
+
+   /* We are done updating shared state of the slot itself. */
+   SpinLockRelease(&slot->mutex);
+
+   /*
+    * Fix the process wait semaphore's count for any absorbed wakeups.
+    */
+   while (extraWaits-- > 0)
+       PGSemaphoreUnlock(&proc->sem);
+
+   /*
+    * If we couldn't get the slot immediately, try another slot next time.
+    * On a system with more insertion slots than concurrent inserters, this
+    * causes all the inserters to eventually migrate to a slot that no-one
+    * else is using. On a system with more inserters than slots, it still
+    * causes the inserters to be distributed quite evenly across the slots.
+    */
+   if (slotno != -1 && retry)
+       slotToTry = (slotToTry + 1) % num_xloginsert_slots;
+}
+
+/*
+ * Wait for the given slot to become free, or for its xlogInsertingAt location
+ * to change to something else than 'waitptr'. In other words, wait for the
+ * inserter using the given slot to finish its insertion, or to at least make
+ * some progress.
+ */
+static void
+WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
+{
+   PGPROC     *proc = MyProc;
+   int         extraWaits = 0;
+
+   /*
+    * Lock out cancel/die interrupts while we sleep on the slot. There is
+    * no cleanup mechanism to remove us from the wait queue if we got
+    * interrupted.
+    */
+   HOLD_INTERRUPTS();
+
+   /*
+    * Loop here to try to acquire lock after each time we are signaled.
+    */
+   for (;;)
+   {
+       bool        mustwait;
+
+       /* Acquire mutex.  Time spent holding mutex should be short! */
+       SpinLockAcquire(&slot->mutex);
+
+       /* If I can get the lock, do so quickly. */
+       if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
+           mustwait = false;
+       else
+           mustwait = true;
+
+       if (!mustwait)
+           break;              /* the lock was free */
+
+       Assert(slot->owner != MyProc);
+
+       /*
+        * Add myself to wait queue.
+        */
+       proc->lwWaiting = true;
+       proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+       proc->lwWaitLink = NULL;
+
+       /* waiters are added to the front of the queue */
+       proc->lwWaitLink = slot->head;
+       if (slot->head == NULL)
+           slot->tail = proc;
+       slot->head = proc;
+
+       /* Can release the mutex now */
+       SpinLockRelease(&slot->mutex);
+
+       /*
+        * Wait until awakened.
+        *
+        * Since we share the process wait semaphore with other things, like
+        * the regular lock manager and ProcWaitForSignal, and we may need to
+        * acquire an LWLock while one of those is pending, it is possible that
+        * we get awakened for a reason other than being signaled by
+        * LWLockRelease. If so, loop back and wait again.  Once we've gotten
+        * the LWLock, re-increment the sema by the number of additional
+        * signals received, so that the lock manager or signal manager will
+        * see the received signal when it next waits.
+        */
+       for (;;)
+       {
+           /* "false" means cannot accept cancel/die interrupt here. */
+           PGSemaphoreLock(&proc->sem, false);
+           if (!proc->lwWaiting)
+               break;
+           extraWaits++;
+       }
+
+       /* Now loop back and try to acquire lock again. */
+   }
+
+   /* We are done updating shared state of the lock itself. */
+   SpinLockRelease(&slot->mutex);
+
+   /*
+    * Fix the process wait semaphore's count for any absorbed wakeups.
+    */
+   while (extraWaits-- > 0)
+       PGSemaphoreUnlock(&proc->sem);
+
+   /*
+    * Now okay to allow cancel/die interrupts.
+    */
+   RESUME_INTERRUPTS();
+}
+
+/*
+ * Wake up all processes waiting for us with WaitOnSlot(). Sets our
+ * xlogInsertingAt value to EndPos, without releasing the slot.
+ */
+static void
+WakeupWaiters(XLogRecPtr EndPos)
+{
+   volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
+   PGPROC     *head;
+   PGPROC     *proc;
+   PGPROC     *next;
+
+   /*
+    * If we have already reported progress up to the same point, do nothing.
+    * No other process can modify xlogInsertingAt, so we can check this before
+    * grabbing the spinlock.
+    */
+   if (slot->xlogInsertingAt == EndPos)
+       return;
+   /* xlogInsertingAt should not go backwards */
+   Assert(slot->xlogInsertingAt < EndPos);
+
+   /* Acquire mutex.  Time spent holding mutex should be short! */
+   SpinLockAcquire(&slot->mutex);
+
+   /* we should own the slot */
+   Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+   slot->xlogInsertingAt = EndPos;
+
+   /*
+    * See if there are any waiters that need to be woken up.
+    */
+   head = slot->head;
+
+   if (head != NULL)
+   {
+       proc = head;
+
+       /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */
+       next = proc->lwWaitLink;
+       while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+       {
+           proc = next;
+           next = next->lwWaitLink;
+       }
+
+       /* proc is now the last PGPROC to be released */
+       slot->head = next;
+       proc->lwWaitLink = NULL;
+   }
+
+   /* We are done updating shared state of the lock itself. */
+   SpinLockRelease(&slot->mutex);
+
+   /*
+    * Awaken any waiters I removed from the queue.
+    */
+   while (head != NULL)
+   {
+       proc = head;
+       head = proc->lwWaitLink;
+       proc->lwWaitLink = NULL;
+       proc->lwWaiting = false;
+       PGSemaphoreUnlock(&proc->sem);
+   }
+}
+
+/*
+ * Release our insertion slot (or slots, if we're holding them all).
+ */
+static void
+WALInsertSlotRelease(void)
+{
+   int         i;
+
+   if (holdingAllSlots)
+   {
+       for (i = 0; i < num_xloginsert_slots; i++)
+           WALInsertSlotReleaseOne(i);
+       holdingAllSlots = false;
+   }
+   else
+       WALInsertSlotReleaseOne(MySlotNo);
+}
+
+static void
+WALInsertSlotReleaseOne(int slotno)
+{
+   volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
+   PGPROC     *head;
+   PGPROC     *proc;
+
+   /* Acquire mutex.  Time spent holding mutex should be short! */
+   SpinLockAcquire(&slot->mutex);
+
+   /* we must be holding it */
+   Assert(slot->exclusive == 1 && slot->owner == MyProc);
+
+   slot->xlogInsertingAt = InvalidXLogRecPtr;
+
+   /* Release my hold on the slot */
+   slot->exclusive = 0;
+   slot->owner = NULL;
+
+   /*
+    * See if I need to awaken any waiters..
+    */
+   head = slot->head;
+   if (head != NULL)
+   {
+       if (slot->releaseOK)
+       {
+           /*
+            * Remove the to-be-awakened PGPROCs from the queue.
+            */
+           bool        releaseOK = true;
+
+           proc = head;
+
+           /*
+            * First wake up any backends that want to be woken up without
+            * acquiring the lock. These are always in the front of the queue.
+            */
+           while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
+               proc = proc->lwWaitLink;
+
+           /*
+            * Awaken the first exclusive-waiter, if any.
+            */
+           if (proc->lwWaitLink)
+           {
+               Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
+               proc = proc->lwWaitLink;
+               releaseOK = false;
+           }
+           /* proc is now the last PGPROC to be released */
+           slot->head = proc->lwWaitLink;
+           proc->lwWaitLink = NULL;
+
+           slot->releaseOK = releaseOK;
+       }
+       else
+           head = NULL;
+   }
+
+   /* We are done updating shared state of the slot itself. */
+   SpinLockRelease(&slot->mutex);
+
+   /*
+    * Awaken any waiters I removed from the queue.
+    */
+   while (head != NULL)
+   {
+       proc = head;
+       head = proc->lwWaitLink;
+       proc->lwWaitLink = NULL;
+       proc->lwWaiting = false;
+       PGSemaphoreUnlock(&proc->sem);
+   }
+
+   /*
+    * Now okay to allow cancel/die interrupts.
+    */
+   END_CRIT_SECTION();
+}
+
+
+/*
+ * Wait for any WAL insertions < upto to finish.
+ *
+ * Returns the location of the oldest insertion that is still in-progress.
+ * Any WAL prior to that point has been fully copied into WAL buffers, and
+ * can be flushed out to disk. Because this waits for any insertions older
+ * than 'upto' to finish, the return value is always >= 'upto'.
+ *
+ * Note: When you are about to write out WAL, you must call this function
+ * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
+ * need to wait for an insertion to finish (or at least advance to next
+ * uninitialized page), and the inserter might need to evict an old WAL buffer
+ * to make room for a new one, which in turn requires WALWriteLock.
+ */
+static XLogRecPtr
+WaitXLogInsertionsToFinish(XLogRecPtr upto)
+{
+   uint64      bytepos;
+   XLogRecPtr  reservedUpto;
+   XLogRecPtr  finishedUpto;
+   volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+   int         i;
+
+   if (MyProc == NULL)
+       elog(PANIC, "cannot wait without a PGPROC structure");
+
+   /* Read the current insert position */
+   SpinLockAcquire(&Insert->insertpos_lck);
+   bytepos = Insert->CurrBytePos;
+   SpinLockRelease(&Insert->insertpos_lck);
+   reservedUpto = XLogBytePosToEndRecPtr(bytepos);
+
+   /*
+    * No-one should request to flush a piece of WAL that hasn't even been
+    * reserved yet. However, it can happen if there is a block with a bogus
+    * LSN on disk, for example. XLogFlush checks for that situation and
+    * complains, but only after the flush. Here we just assume that to mean
+    * that all WAL that has been reserved needs to be finished. In this
+    * corner-case, the return value can be smaller than 'upto' argument.
+    */
+   if (upto > reservedUpto)
+   {
+       elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
+            (uint32) (upto >> 32), (uint32) upto,
+            (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
+       upto = reservedUpto;
+   }
+
+   /*
+    * finishedUpto is our return value, indicating the point upto which
+    * all the WAL insertions have been finished. Initialize it to the head
+    * of reserved WAL, and as we iterate through the insertion slots, back it
+    * out for any insertion that's still in progress.
+    */
+   finishedUpto = reservedUpto;
+
+   /*
+    * Loop through all the slots, sleeping on any in-progress insert older
+    * than 'upto'.
+    */
+   for (i = 0; i < num_xloginsert_slots; i++)
+   {
+       volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+       XLogRecPtr insertingat;
+
+   retry:
+       /*
+        * We can check if the slot is in use without grabbing the spinlock.
+        * The spinlock acquisition of insertpos_lck before this loop acts
+        * as a memory barrier. If someone acquires the slot after that, it
+        * can't possibly be inserting to anything < reservedUpto. If it was
+        * acquired before that, an unlocked test will return true.
+        */
+       if (!slot->exclusive)
+           continue;
+
+       SpinLockAcquire(&slot->mutex);
+       /* re-check now that we have the lock */
+       if (!slot->exclusive)
+       {
+           SpinLockRelease(&slot->mutex);
+           continue;
+       }
+       insertingat = slot->xlogInsertingAt;
+       SpinLockRelease(&slot->mutex);
+
+       if (insertingat == InvalidXLogRecPtr)
+       {
+           /*
+            * slot is reserved just to hold off other inserters, there is no
+            * actual insert in progress.
+            */
+           continue;
+       }
+
+       /*
+        * This insertion is still in progress. Do we need to wait for it?
+        *
+        * When an inserter acquires a slot, it doesn't reset 'insertingat', so
+        * it will initially point to the old value of some already-finished
+        * insertion. The inserter will update the value as soon as it finishes
+        * the insertion, moves to the next page, or has to do I/O to flush an
+        * old dirty buffer. That means that when we see a slot with
+        * insertingat value < upto, we don't know if that insertion is still
+        * truly in progress, or if the slot is reused by a new inserter that
+        * hasn't updated the insertingat value yet. We have to assume it's the
+        * latter, and wait.
+        */
+       if (insertingat < upto)
+       {
+           WaitOnSlot(slot, insertingat);
+           goto retry;
+       }
+       else
+       {
+           /*
+            * We don't need to wait for this insertion, but update the
+            * return value.
+            */
+           if (insertingat < finishedUpto)
+               finishedUpto = insertingat;
+       }
+   }
+   return finishedUpto;
+}
+
+/*
+ * Get a pointer to the right location in the WAL buffer containing the
+ * given XLogRecPtr.
+ *
+ * If the page is not initialized yet, it is initialized. That might require
+ * evicting an old dirty buffer from the buffer cache, which means I/O.
+ *
+ * The caller must ensure that the page containing the requested location
+ * isn't evicted yet, and won't be evicted. The way to ensure that is to
+ * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * to evict an old page from the buffer. (This means that once you call
+ * GetXLogBuffer() with a given 'ptr', you must not access anything before
+ * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
+ * later, because older buffers might be recycled already)
+ */
+static char *
+GetXLogBuffer(XLogRecPtr ptr)
+{
+   int         idx;
+   XLogRecPtr  endptr;
+   static uint64 cachedPage = 0;
+   static char *cachedPos = NULL;
+   XLogRecPtr  expectedEndPtr;
+
+   /*
+    * Fast path for the common case that we need to access again the same
+    * page as last time.
     */
-   updrqst = false;
-   freespace = INSERT_FREESPACE(Insert);
-   if (freespace == 0)
+   if (ptr / XLOG_BLCKSZ == cachedPage)
    {
-       updrqst = AdvanceXLInsertBuffer(false);
-       freespace = INSERT_FREESPACE(Insert);
+       Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+       Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
+       return cachedPos + ptr % XLOG_BLCKSZ;
    }
 
-   /* Compute record's XLOG location */
-   curridx = Insert->curridx;
-   INSERT_RECPTR(RecPtr, Insert, curridx);
+   /*
+    * The XLog buffer cache is organized so that a page is always loaded
+    * to a particular buffer.  That way we can easily calculate the buffer
+    * a given page must be loaded into, from the XLogRecPtr alone.
+    */
+   idx = XLogRecPtrToBufIdx(ptr);
 
    /*
-    * If the record is an XLOG_SWITCH, and we are exactly at the start of a
-    * segment, we need not insert it (and don't want to because we'd like
-    * consecutive switch requests to be no-ops).  Instead, make sure
-    * everything is written and flushed through the end of the prior segment,
-    * and return the prior segment's end address.
+    * See what page is loaded in the buffer at the moment. It could be the
+    * page we're looking for, or something older. It can't be anything newer
+    * - that would imply the page we're looking for has already been written
+    * out to disk and evicted, and the caller is responsible for making sure
+    * that doesn't happen.
+    *
+    * However, we don't hold a lock while we read the value. If someone has
+    * just initialized the page, it's possible that we get a "torn read" of
+    * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
+    * that case we will see a bogus value. That's ok, we'll grab the mapping
+    * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
+    * the page we're looking for. But it means that when we do this unlocked
+    * read, we might see a value that appears to be ahead of the page we're
+    * looking for. Don't PANIC on that, until we've verified the value while
+    * holding the lock.
     */
-   if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD)
+   expectedEndPtr = ptr;
+   expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
+
+   endptr = XLogCtl->xlblocks[idx];
+   if (expectedEndPtr != endptr)
    {
-       /* We can release insert lock immediately */
-       LWLockRelease(WALInsertLock);
+       /*
+        * Let others know that we're finished inserting the record up
+        * to the page boundary.
+        */
+       WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
 
-       RecPtr -= SizeOfXLogLongPHD;
+       AdvanceXLInsertBuffer(ptr, false);
+       endptr = XLogCtl->xlblocks[idx];
 
-       LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-       LogwrtResult = XLogCtl->LogwrtResult;
-       if (LogwrtResult.Flush < RecPtr)
-       {
-           XLogwrtRqst FlushRqst;
+       if (expectedEndPtr != endptr)
+           elog(PANIC, "could not find WAL buffer for %X/%X",
+                (uint32) (ptr >> 32) , (uint32) ptr);
+   }
+   else
+   {
+       /*
+        * Make sure the initialization of the page is visible to us, and
+        * won't arrive later to overwrite the WAL data we write on the page.
+        */
+       pg_memory_barrier();
+   }
 
-           FlushRqst.Write = RecPtr;
-           FlushRqst.Flush = RecPtr;
-           XLogWrite(FlushRqst, false, false);
-       }
-       LWLockRelease(WALWriteLock);
+   /*
+    * Found the buffer holding this page. Return a pointer to the right
+    * offset within the page.
+    */
+   cachedPage = ptr / XLOG_BLCKSZ;
+   cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
 
-       END_CRIT_SECTION();
+   Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
+   Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
 
-       /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
-       return RecPtr;
-   }
+   return cachedPos + ptr % XLOG_BLCKSZ;
+}
 
-   /* Finish the record header */
-   rechdr->xl_prev = Insert->PrevRecord;
+/*
+ * Converts a "usable byte position" to XLogRecPtr. A usable byte position
+ * is the position starting from the beginning of WAL, excluding all WAL
+ * page headers.
+ */
+static XLogRecPtr
+XLogBytePosToRecPtr(uint64 bytepos)
+{
+   uint64      fullsegs;
+   uint64      fullpages;
+   uint64      bytesleft;
+   uint32      seg_offset;
+   XLogRecPtr  result;
 
-   /* Now we can finish computing the record's CRC */
-   COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
-   FIN_CRC32(rdata_crc);
-   rechdr->xl_crc = rdata_crc;
+   fullsegs = bytepos / UsableBytesInSegment;
+   bytesleft = bytepos % UsableBytesInSegment;
 
-#ifdef WAL_DEBUG
-   if (XLOG_DEBUG)
+   if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
    {
-       StringInfoData buf;
-
-       initStringInfo(&buf);
-       appendStringInfo(&buf, "INSERT @ %X/%X: ",
-                        (uint32) (RecPtr >> 32), (uint32) RecPtr);
-       xlog_outrec(&buf, rechdr);
-       if (rdata->data != NULL)
-       {
-           appendStringInfo(&buf, " - ");
-           RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
-       }
-       elog(LOG, "%s", buf.data);
-       pfree(buf.data);
+       /* fits on first page of segment */
+       seg_offset = bytesleft + SizeOfXLogLongPHD;
    }
-#endif
-
-   /* Record begin of record in appropriate places */
-   ProcLastRecPtr = RecPtr;
-   Insert->PrevRecord = RecPtr;
-
-   /*
-    * Append the data, including backup blocks if any
-    */
-   rdata = &hdr_rdt;
-   while (write_len)
+   else
    {
-       while (rdata->data == NULL)
-           rdata = rdata->next;
+       /* account for the first page on segment with long header */
+       seg_offset = XLOG_BLCKSZ;
+       bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
 
-       if (freespace > 0)
-       {
-           if (rdata->len > freespace)
-           {
-               memcpy(Insert->currpos, rdata->data, freespace);
-               rdata->data += freespace;
-               rdata->len -= freespace;
-               write_len -= freespace;
-           }
-           else
-           {
-               memcpy(Insert->currpos, rdata->data, rdata->len);
-               freespace -= rdata->len;
-               write_len -= rdata->len;
-               Insert->currpos += rdata->len;
-               rdata = rdata->next;
-               continue;
-           }
-       }
+       fullpages = bytesleft / UsableBytesInPage;
+       bytesleft = bytesleft % UsableBytesInPage;
 
-       /* Use next buffer */
-       updrqst = AdvanceXLInsertBuffer(false);
-       curridx = Insert->curridx;
-       /* Mark page header to indicate this record continues on the page */
-       Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
-       Insert->currpage->xlp_rem_len = write_len;
-       freespace = INSERT_FREESPACE(Insert);
+       seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
    }
 
-   /* Ensure next record will be properly aligned */
-   Insert->currpos = (char *) Insert->currpage +
-       MAXALIGN(Insert->currpos - (char *) Insert->currpage);
-   freespace = INSERT_FREESPACE(Insert);
+   XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
 
-   /*
-    * The recptr I return is the beginning of the *next* record. This will be
-    * stored as LSN for changed data pages...
-    */
-   INSERT_RECPTR(RecPtr, Insert, curridx);
+   return result;
+}
 
-   /*
-    * If the record is an XLOG_SWITCH, we must now write and flush all the
-    * existing data, and then forcibly advance to the start of the next
-    * segment.  It's not good to do this I/O while holding the insert lock,
-    * but there seems too much risk of confusion if we try to release the
-    * lock sooner.  Fortunately xlog switch needn't be a high-performance
-    * operation anyway...
-    */
-   if (isLogSwitch)
+/*
+ * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
+ * returns a pointer to the beginning of the page (ie. before page header),
+ * not to where the first xlog record on that page would go to. This is used
+ * when converting a pointer to the end of a record.
+ */
+static XLogRecPtr
+XLogBytePosToEndRecPtr(uint64 bytepos)
+{
+   uint64      fullsegs;
+   uint64      fullpages;
+   uint64      bytesleft;
+   uint32      seg_offset;
+   XLogRecPtr  result;
+
+   fullsegs = bytepos / UsableBytesInSegment;
+   bytesleft = bytepos % UsableBytesInSegment;
+
+   if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
+   {
+       /* fits on first page of segment */
+       if (bytesleft == 0)
+           seg_offset = 0;
+       else
+           seg_offset = bytesleft + SizeOfXLogLongPHD;
+   }
+   else
    {
-       XLogwrtRqst FlushRqst;
-       XLogRecPtr  OldSegEnd;
+       /* account for the first page on segment with long header */
+       seg_offset = XLOG_BLCKSZ;
+       bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
 
-       TRACE_POSTGRESQL_XLOG_SWITCH();
+       fullpages = bytesleft / UsableBytesInPage;
+       bytesleft = bytesleft % UsableBytesInPage;
 
-       LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+       if (bytesleft == 0)
+           seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
+       else
+           seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
+   }
 
-       /*
-        * Flush through the end of the page containing XLOG_SWITCH, and
-        * perform end-of-segment actions (eg, notifying archiver).
-        */
-       WriteRqst = XLogCtl->xlblocks[curridx];
-       FlushRqst.Write = WriteRqst;
-       FlushRqst.Flush = WriteRqst;
-       XLogWrite(FlushRqst, false, true);
+   XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result);
 
-       /* Set up the next buffer as first page of next segment */
-       /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
-       (void) AdvanceXLInsertBuffer(true);
+   return result;
+}
 
-       /* There should be no unwritten data */
-       curridx = Insert->curridx;
-       Assert(curridx == XLogCtl->Write.curridx);
+/*
+ * Convert an XLogRecPtr to a "usable byte position".
+ */
+static uint64
+XLogRecPtrToBytePos(XLogRecPtr ptr)
+{
+   uint64      fullsegs;
+   uint32      fullpages;
+   uint32      offset;
+   uint64      result;
 
-       /* Compute end address of old segment */
-       OldSegEnd = XLogCtl->xlblocks[curridx];
-       OldSegEnd -= XLOG_BLCKSZ;
+   XLByteToSeg(ptr, fullsegs);
 
-       /* Make it look like we've written and synced all of old segment */
-       LogwrtResult.Write = OldSegEnd;
-       LogwrtResult.Flush = OldSegEnd;
+   fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ;
+   offset = ptr % XLOG_BLCKSZ;
 
-       /*
-        * Update shared-memory status --- this code should match XLogWrite
-        */
+   if (fullpages == 0)
+   {
+       result = fullsegs * UsableBytesInSegment;
+       if (offset > 0)
        {
-           /* use volatile pointer to prevent code rearrangement */
-           volatile XLogCtlData *xlogctl = XLogCtl;
-
-           SpinLockAcquire(&xlogctl->info_lck);
-           xlogctl->LogwrtResult = LogwrtResult;
-           if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write)
-               xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
-           if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush)
-               xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
-           SpinLockRelease(&xlogctl->info_lck);
+           Assert(offset >= SizeOfXLogLongPHD);
+           result += offset - SizeOfXLogLongPHD;
        }
-
-       LWLockRelease(WALWriteLock);
-
-       updrqst = false;        /* done already */
    }
    else
    {
-       /* normal case, ie not xlog switch */
-
-       /* Need to update shared LogwrtRqst if some block was filled up */
-       if (freespace == 0)
-       {
-           /* curridx is filled and available for writing out */
-           updrqst = true;
-       }
-       else
+       result = fullsegs * UsableBytesInSegment +
+           (XLOG_BLCKSZ - SizeOfXLogLongPHD) +  /* account for first page */
+           (fullpages - 1) * UsableBytesInPage; /* full pages */
+       if (offset > 0)
        {
-           /* if updrqst already set, write through end of previous buf */
-           curridx = PrevBufIdx(curridx);
+           Assert(offset >= SizeOfXLogShortPHD);
+           result += offset - SizeOfXLogShortPHD;
        }
-       WriteRqst = XLogCtl->xlblocks[curridx];
-   }
-
-   LWLockRelease(WALInsertLock);
-
-   if (updrqst)
-   {
-       /* use volatile pointer to prevent code rearrangement */
-       volatile XLogCtlData *xlogctl = XLogCtl;
-
-       SpinLockAcquire(&xlogctl->info_lck);
-       /* advance global request to include new block(s) */
-       if (xlogctl->LogwrtRqst.Write < WriteRqst)
-           xlogctl->LogwrtRqst.Write = WriteRqst;
-       /* update local result copy while I have the chance */
-       LogwrtResult = xlogctl->LogwrtResult;
-       SpinLockRelease(&xlogctl->info_lck);
    }
 
-   XactLastRecEnd = RecPtr;
-
-   END_CRIT_SECTION();
-
-   /* wake up walsenders now that we've released heavily contended locks */
-   WalSndWakeupProcessRequests();
-
-   return RecPtr;
+   return result;
 }
 
 /*
@@ -1303,158 +2385,181 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
 }
 
 /*
- * Advance the Insert state to the next buffer page, writing out the next
- * buffer if it still contains unwritten data.
- *
- * If new_segment is TRUE then we set up the next buffer page as the first
- * page of the next xlog segment file, possibly but not usually the next
- * consecutive file page.
- *
- * The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page.  If we can do this for free (without an extra lock),
- * we do so here.  Otherwise the caller must do it.  We return TRUE if the
- * request update still needs to be done, FALSE if we did it internally.
- *
- * Must be called with WALInsertLock held.
+ * Initialize XLOG buffers, writing out old buffers if they still contain
+ * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
+ * true, initialize as many pages as we can without having to write out
+ * unwritten data. Any new pages are initialized to zeros, with pages headers
+ * initialized properly.
  */
-static bool
-AdvanceXLInsertBuffer(bool new_segment)
+static void
+AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 {
    XLogCtlInsert *Insert = &XLogCtl->Insert;
-   int         nextidx = NextBufIdx(Insert->curridx);
-   bool        update_needed = true;
+   int         nextidx;
    XLogRecPtr  OldPageRqstPtr;
    XLogwrtRqst WriteRqst;
-   XLogRecPtr  NewPageEndPtr;
+   XLogRecPtr  NewPageEndPtr = InvalidXLogRecPtr;
    XLogRecPtr  NewPageBeginPtr;
    XLogPageHeader NewPage;
+   int         npages = 0;
+
+   LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
 
    /*
-    * Get ending-offset of the buffer page we need to replace (this may be
-    * zero if the buffer hasn't been used yet).  Fall through if it's already
-    * written out.
+    * Now that we have the lock, check if someone initialized the page
+    * already.
     */
-   OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
-   if (LogwrtResult.Write < OldPageRqstPtr)
+   while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic)
    {
-       /* nope, got work to do... */
-       XLogRecPtr  FinishedPageRqstPtr;
-
-       FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-
-       /* Before waiting, get info_lck and update LogwrtResult */
-       {
-           /* use volatile pointer to prevent code rearrangement */
-           volatile XLogCtlData *xlogctl = XLogCtl;
-
-           SpinLockAcquire(&xlogctl->info_lck);
-           if (xlogctl->LogwrtRqst.Write < FinishedPageRqstPtr)
-               xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
-           LogwrtResult = xlogctl->LogwrtResult;
-           SpinLockRelease(&xlogctl->info_lck);
-       }
-
-       update_needed = false;  /* Did the shared-request update */
+       nextidx = NextBufIdx(XLogCtl->curridx);
 
        /*
-        * Now that we have an up-to-date LogwrtResult value, see if we still
-        * need to write it or if someone else already did.
+        * Get ending-offset of the buffer page we need to replace (this may
+        * be zero if the buffer hasn't been used yet).  Fall through if it's
+        * already written out.
         */
+       OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
        if (LogwrtResult.Write < OldPageRqstPtr)
        {
-           /* Must acquire write lock */
-           LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-           LogwrtResult = XLogCtl->LogwrtResult;
-           if (LogwrtResult.Write >= OldPageRqstPtr)
+           /*
+            * Nope, got work to do. If we just want to pre-initialize as much
+            * as we can without flushing, give up now.
+            */
+           if (opportunistic)
+               break;
+
+           /* Before waiting, get info_lck and update LogwrtResult */
            {
-               /* OK, someone wrote it already */
-               LWLockRelease(WALWriteLock);
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr)
+                   xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
+               LogwrtResult = xlogctl->LogwrtResult;
+               SpinLockRelease(&xlogctl->info_lck);
            }
-           else
+
+           /*
+            * Now that we have an up-to-date LogwrtResult value, see if we
+            * still need to write it or if someone else already did.
+            */
+           if (LogwrtResult.Write < OldPageRqstPtr)
            {
                /*
-                * Have to write buffers while holding insert lock. This is
-                * not good, so only write as much as we absolutely must.
+                * Must acquire write lock. Release WALBufMappingLock first,
+                * to make sure that all insertions that we need to wait for
+                * can finish (up to this same position). Otherwise we risk
+                * deadlock.
                 */
-               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
-               WriteRqst.Write = OldPageRqstPtr;
-               WriteRqst.Flush = 0;
-               XLogWrite(WriteRqst, false, false);
-               LWLockRelease(WALWriteLock);
-               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+               LWLockRelease(WALBufMappingLock);
+
+               WaitXLogInsertionsToFinish(OldPageRqstPtr);
+
+               LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+               LogwrtResult = XLogCtl->LogwrtResult;
+               if (LogwrtResult.Write >= OldPageRqstPtr)
+               {
+                   /* OK, someone wrote it already */
+                   LWLockRelease(WALWriteLock);
+               }
+               else
+               {
+                   /* Have to write it ourselves */
+                   TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
+                   WriteRqst.Write = OldPageRqstPtr;
+                   WriteRqst.Flush = 0;
+                   XLogWrite(WriteRqst, false);
+                   LWLockRelease(WALWriteLock);
+                   TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
+               }
+               /* Re-acquire WALBufMappingLock and retry */
+               LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+               continue;
            }
        }
-   }
 
-   /*
-    * Now the next buffer slot is free and we can set it up to be the next
-    * output page.
-    */
-   NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx];
+       /*
+        * Now the next buffer slot is free and we can set it up to be the next
+        * output page.
+        */
+       NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx];
+       NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
 
-   if (new_segment)
-   {
-       /* force it to a segment start point */
-       if (NewPageBeginPtr % XLogSegSize != 0)
-           NewPageBeginPtr += XLogSegSize - NewPageBeginPtr % XLogSegSize;
-   }
+       Assert(NewPageEndPtr % XLOG_BLCKSZ == 0);
+       Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
+       Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
 
-   NewPageEndPtr = NewPageBeginPtr;
-   NewPageEndPtr += XLOG_BLCKSZ;
-   XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
-   NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
+       NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
-   Insert->curridx = nextidx;
-   Insert->currpage = NewPage;
+       /*
+        * Be sure to re-zero the buffer so that bytes beyond what we've
+        * written will look like zeroes and not valid XLOG records...
+        */
+       MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
 
-   Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
+       /*
+        * Fill the new page's header
+        */
+       NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
 
-   /*
-    * Be sure to re-zero the buffer so that bytes beyond what we've written
-    * will look like zeroes and not valid XLOG records...
-    */
-   MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+       /* NewPage->xlp_info = 0; */    /* done by memset */
+       NewPage   ->xlp_tli = ThisTimeLineID;
+       NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+       /* NewPage->xlp_rem_len = 0; */     /* done by memset */
 
-   /*
-    * Fill the new page's header
-    */
-   NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
+       /*
+        * If online backup is not in progress, mark the header to indicate
+        * that* WAL records beginning in this page have removable backup
+        * blocks.  This allows the WAL archiver to know whether it is safe to
+        * compress archived WAL data by transforming full-block records into
+        * the non-full-block format.  It is sufficient to record this at the
+        * page level because we force a page switch (in fact a segment switch)
+        * when starting a backup, so the flag will be off before any records
+        * can be written during the backup.  At the end of a backup, the last
+        * page will be marked as all unsafe when perhaps only part is unsafe,
+        * but at worst the archiver would miss the opportunity to compress a
+        * few records.
+        */
+       if (!Insert->forcePageWrites)
+           NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
 
-   /* NewPage->xlp_info = 0; */    /* done by memset */
-   NewPage   ->xlp_tli = ThisTimeLineID;
-   NewPage   ->xlp_pageaddr = NewPageBeginPtr;
+       /*
+        * If first page of an XLOG segment file, make it a long header.
+        */
+       if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
+       {
+           XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
 
-   /*
-    * If online backup is not in progress, mark the header to indicate that
-    * WAL records beginning in this page have removable backup blocks.  This
-    * allows the WAL archiver to know whether it is safe to compress archived
-    * WAL data by transforming full-block records into the non-full-block
-    * format.  It is sufficient to record this at the page level because we
-    * force a page switch (in fact a segment switch) when starting a backup,
-    * so the flag will be off before any records can be written during the
-    * backup.  At the end of a backup, the last page will be marked as all
-    * unsafe when perhaps only part is unsafe, but at worst the archiver
-    * would miss the opportunity to compress a few records.
-    */
-   if (!Insert->forcePageWrites)
-       NewPage   ->xlp_info |= XLP_BKP_REMOVABLE;
+           NewLongPage->xlp_sysid = ControlFile->system_identifier;
+           NewLongPage->xlp_seg_size = XLogSegSize;
+           NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
+           NewPage   ->xlp_info |= XLP_LONG_HEADER;
+       }
 
-   /*
-    * If first page of an XLOG segment file, make it a long header.
-    */
-   if ((NewPage->xlp_pageaddr % XLogSegSize) == 0)
-   {
-       XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
+       /*
+        * Make sure the initialization of the page becomes visible to others
+        * before the xlblocks update. GetXLogBuffer() reads xlblocks without
+        * holding a lock.
+        */
+       pg_write_barrier();
+
+       *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
 
-       NewLongPage->xlp_sysid = ControlFile->system_identifier;
-       NewLongPage->xlp_seg_size = XLogSegSize;
-       NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
-       NewPage   ->xlp_info |= XLP_LONG_HEADER;
+       XLogCtl->curridx = nextidx;
 
-       Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
+       npages++;
    }
+   LWLockRelease(WALBufMappingLock);
 
-   return update_needed;
+#ifdef WAL_DEBUG
+   if (npages > 0)
+   {
+       elog(DEBUG1, "initialized %d pages, upto %X/%X",
+            npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
+   }
+#endif
 }
 
 /*
@@ -1486,16 +2591,12 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
  * This option allows us to avoid uselessly issuing multiple writes when a
  * single one would do.
  *
- * If xlog_switch == TRUE, we are intending an xlog segment switch, so
- * perform end-of-segment actions after writing the last page, even if
- * it's not physically the end of its segment.  (NB: this will work properly
- * only if caller specifies WriteRqst == page-end and flexible == false,
- * and there is some data to write.)
- *
- * Must be called with WALWriteLock held.
+ * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
+ * must be called before grabbing the lock, to make sure the data is ready to
+ * write.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 {
    XLogCtlWrite *Write = &XLogCtl->Write;
    bool        ispartialpage;
@@ -1544,15 +2645,15 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
         * if we're passed a bogus WriteRqst.Write that is past the end of the
         * last page that's been initialized by AdvanceXLInsertBuffer.
         */
-       if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx])
+       XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
+       if (LogwrtResult.Write >= EndPtr)
            elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
                 (uint32) (LogwrtResult.Write >> 32),
                 (uint32) LogwrtResult.Write,
-                (uint32) (XLogCtl->xlblocks[curridx] >> 32),
-                (uint32) XLogCtl->xlblocks[curridx]);
+                (uint32) (EndPtr >> 32), (uint32) EndPtr);
 
        /* Advance LogwrtResult.Write to end of current buffer page */
-       LogwrtResult.Write = XLogCtl->xlblocks[curridx];
+       LogwrtResult.Write = EndPtr;
        ispartialpage = WriteRqst.Write < LogwrtResult.Write;
 
        if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo))
@@ -1656,16 +2757,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
             * later. Doing it here ensures that one and only one backend will
             * perform this fsync.
             *
-            * We also do this if this is the last page written for an xlog
-            * switch.
-            *
             * This is also the right place to notify the Archiver that the
             * segment is ready to copy to archival storage, and to update the
             * timer for archive_timeout, and to signal for a checkpoint if
             * too many logfile segments have been used since the last
             * checkpoint.
             */
-           if (finishing_seg || (xlog_switch && last_iteration))
+           if (finishing_seg)
            {
                issue_xlog_fsync(openLogFile, openLogSegNo);
 
@@ -1949,6 +3047,7 @@ XLogFlush(XLogRecPtr record)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr  insertpos;
 
        /* read LogwrtResult and update local state */
        SpinLockAcquire(&xlogctl->info_lck);
@@ -1961,6 +3060,12 @@ XLogFlush(XLogRecPtr record)
        if (record <= LogwrtResult.Flush)
            break;
 
+       /*
+        * Before actually performing the write, wait for all in-flight
+        * insertions to the pages we're about to write to finish.
+        */
+       insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
+
        /*
         * Try to get the write lock. If we can't get it immediately, wait
         * until it's released, and recheck if we still need to do the flush
@@ -1997,31 +3102,27 @@ XLogFlush(XLogRecPtr record)
         */
        if (CommitDelay > 0 && enableFsync &&
            MinimumActiveBackends(CommitSiblings))
+       {
            pg_usleep(CommitDelay);
 
+           /*
+            * Re-check how far we can now flush the WAL. It's generally not
+            * safe to call WaitXLogInsetionsToFinish while holding
+            * WALWriteLock, because an in-progress insertion might need to
+            * also grab WALWriteLock to make progress. But we know that all
+            * the insertions up to insertpos have already finished, because
+            * that's what the earlier WaitXLogInsertionsToFinish() returned.
+            * We're only calling it again to allow insertpos to be moved
+            * further forward, not to actually wait for anyone.
+            */
+           insertpos = WaitXLogInsertionsToFinish(insertpos);
+       }
+
        /* try to write/flush later additions to XLOG as well */
-       if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
-       {
-           XLogCtlInsert *Insert = &XLogCtl->Insert;
-           uint32      freespace = INSERT_FREESPACE(Insert);
+       WriteRqst.Write = insertpos;
+       WriteRqst.Flush = insertpos;
 
-           if (freespace == 0) /* buffer is full */
-               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-           else
-           {
-               WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
-               WriteRqstPtr -= freespace;
-           }
-           LWLockRelease(WALInsertLock);
-           WriteRqst.Write = WriteRqstPtr;
-           WriteRqst.Flush = WriteRqstPtr;
-       }
-       else
-       {
-           WriteRqst.Write = WriteRqstPtr;
-           WriteRqst.Flush = record;
-       }
-       XLogWrite(WriteRqst, false, false);
+       XLogWrite(WriteRqst, false);
 
        LWLockRelease(WALWriteLock);
        /* done */
@@ -2142,7 +3243,8 @@ XLogBackgroundFlush(void)
 
    START_CRIT_SECTION();
 
-   /* now wait for the write lock */
+   /* now wait for any in-progress insertions to finish and get write lock */
+   WaitXLogInsertionsToFinish(WriteRqstPtr);
    LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
    LogwrtResult = XLogCtl->LogwrtResult;
    if (WriteRqstPtr > LogwrtResult.Flush)
@@ -2151,7 +3253,7 @@ XLogBackgroundFlush(void)
 
        WriteRqst.Write = WriteRqstPtr;
        WriteRqst.Flush = WriteRqstPtr;
-       XLogWrite(WriteRqst, flexible, false);
+       XLogWrite(WriteRqst, flexible);
        wrote_something = true;
    }
    LWLockRelease(WALWriteLock);
@@ -2161,6 +3263,12 @@ XLogBackgroundFlush(void)
    /* wake up walsenders now that we've released heavily contended locks */
    WalSndWakeupProcessRequests();
 
+   /*
+    * Great, done. To take some work off the critical path, try to initialize
+    * as many of the no-longer-needed WAL buffers for future use as we can.
+    */
+   AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+
    return wrote_something;
 }
 
@@ -3937,10 +5045,13 @@ XLOGShmemSize(void)
 
    /* XLogCtl */
    size = sizeof(XLogCtlData);
+
+   /* xlog insertion slots, plus alignment */
+   size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
    /* xlblocks array */
    size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
    /* extra alignment padding for XLOG I/O buffers */
-   size = add_size(size, ALIGNOF_XLOG_BUFFER);
+   size = add_size(size, XLOG_BLCKSZ);
    /* and the buffers themselves */
    size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
 
@@ -3959,11 +5070,11 @@ XLOGShmemInit(void)
    bool        foundCFile,
                foundXLog;
    char       *allocptr;
+   int         i;
 
    ControlFile = (ControlFileData *)
        ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
-   XLogCtl = (XLogCtlData *)
-       ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
+   allocptr = ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
 
    if (foundCFile || foundXLog)
    {
@@ -3971,7 +5082,7 @@ XLOGShmemInit(void)
        Assert(foundCFile && foundXLog);
        return;
    }
-
+   XLogCtl = (XLogCtlData *) allocptr;
    memset(XLogCtl, 0, sizeof(XLogCtlData));
 
    /*
@@ -3979,15 +5090,23 @@ XLOGShmemInit(void)
     * multiple of the alignment for same, so no extra alignment padding is
     * needed here.
     */
-   allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
+   allocptr += sizeof(XLogCtlData);
    XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
    memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
    allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
 
+   /* Xlog insertion slots. Ensure they're aligned to the full padded size */
+   allocptr += sizeof(XLogInsertSlotPadded) -
+       ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
+   XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
+   allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
    /*
-    * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
+    * Align the start of the page buffers to a full xlog block size boundary.
+    * This simplifies some calculations in XLOG insertion. It is also required
+    * for O_DIRECT.
     */
-   allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
+   allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
    XLogCtl->pages = allocptr;
    memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
 
@@ -3999,7 +5118,21 @@ XLOGShmemInit(void)
    XLogCtl->SharedRecoveryInProgress = true;
    XLogCtl->SharedHotStandbyActive = false;
    XLogCtl->WalWriterSleeping = false;
-   XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+
+   for (i = 0; i < num_xloginsert_slots; i++)
+   {
+       XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
+       SpinLockInit(&slot->mutex);
+       slot->xlogInsertingAt = InvalidXLogRecPtr;
+       slot->owner = NULL;
+
+       slot->releaseOK = true;
+       slot->exclusive = 0;
+       slot->head = NULL;
+       slot->tail = NULL;
+   }
+
+   SpinLockInit(&XLogCtl->Insert.insertpos_lck);
    SpinLockInit(&XLogCtl->info_lck);
    SpinLockInit(&XLogCtl->ulsn_lck);
    InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
@@ -4050,8 +5183,8 @@ BootStrapXLOG(void)
    ThisTimeLineID = 1;
 
    /* page buffer must be aligned suitably for O_DIRECT */
-   buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
-   page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
+   buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
+   page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
    memset(page, 0, XLOG_BLCKSZ);
 
    /*
@@ -4893,6 +6026,7 @@ StartupXLOG(void)
    bool        backupEndRequired = false;
    bool        backupFromStandby = false;
    DBState     dbstate_at_startup;
+   int         firstIdx;
    XLogReaderState *xlogreader;
    XLogPageReadPrivate private;
    bool        fast_promoted = false;
@@ -5257,7 +6391,7 @@ StartupXLOG(void)
 
    lastFullPageWrites = checkPoint.fullPageWrites;
 
-   RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+   RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
 
    if (RecPtr < checkPoint.redo)
        ereport(PANIC,
@@ -5899,25 +7033,21 @@ StartupXLOG(void)
    openLogFile = XLogFileOpen(openLogSegNo);
    openLogOff = 0;
    Insert = &XLogCtl->Insert;
-   Insert->PrevRecord = LastRec;
-   XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
+   Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
+
+   firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
+   XLogCtl->curridx = firstIdx;
+
+   XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
 
    /*
     * Tricky point here: readBuf contains the *last* block that the LastRec
     * record spans, not the one it starts in.  The last block is indeed the
     * one we want to use.
     */
-   if (EndOfLog % XLOG_BLCKSZ == 0)
-   {
-       memset(Insert->currpage, 0, XLOG_BLCKSZ);
-   }
-   else
-   {
-       Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize);
-       memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ);
-   }
-   Insert->currpos = (char *) Insert->currpage +
-       (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]);
+   Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize);
+   memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ);
+   Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
    LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
@@ -5926,12 +7056,12 @@ StartupXLOG(void)
    XLogCtl->LogwrtRqst.Write = EndOfLog;
    XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
-   freespace = INSERT_FREESPACE(Insert);
+   freespace = INSERT_FREESPACE(EndOfLog);
    if (freespace > 0)
    {
        /* Make sure rest of page is zero */
-       MemSet(Insert->currpos, 0, freespace);
-       XLogCtl->Write.curridx = 0;
+       MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace);
+       XLogCtl->Write.curridx = firstIdx;
    }
    else
    {
@@ -5943,7 +7073,7 @@ StartupXLOG(void)
         * this is sufficient.  The first actual attempt to insert a log
         * record will advance the insert state.
         */
-       XLogCtl->Write.curridx = NextBufIdx(0);
+       XLogCtl->Write.curridx = NextBufIdx(firstIdx);
    }
 
    /* Pre-scan prepared transactions to find out the range of XIDs present */
@@ -6504,21 +7634,29 @@ InitXLOGAccess(void)
 }
 
 /*
- * Once spawned, a backend may update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
- * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
+ * Return the current Redo pointer from shared memory.
+ *
+ * As a side-effect, the local RedoRecPtr copy is updated.
  */
 XLogRecPtr
 GetRedoRecPtr(void)
 {
    /* use volatile pointer to prevent code rearrangement */
    volatile XLogCtlData *xlogctl = XLogCtl;
+   XLogRecPtr ptr;
 
+   /*
+    * The possibly not up-to-date copy in XlogCtl is enough. Even if we
+    * grabbed a WAL insertion slot to read the master copy, someone might
+    * update it just after we've released the lock.
+    */
    SpinLockAcquire(&xlogctl->info_lck);
-   Assert(RedoRecPtr <= xlogctl->Insert.RedoRecPtr);
-   RedoRecPtr = xlogctl->Insert.RedoRecPtr;
+   ptr = xlogctl->RedoRecPtr;
    SpinLockRelease(&xlogctl->info_lck);
 
+   if (RedoRecPtr < ptr)
+       RedoRecPtr = ptr;
+
    return RedoRecPtr;
 }
 
@@ -6527,9 +7665,8 @@ GetRedoRecPtr(void)
  *
  * NOTE: The value *actually* returned is the position of the last full
  * xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to acquire WALInsertLock which can be quite
- * heavily contended, and an approximation is enough for the current
- * usage of this function.
+ * For that, we don't need to scan through WAL insertion slots, and an
+ * approximation is enough for the current usage of this function.
  */
 XLogRecPtr
 GetInsertRecPtr(void)
@@ -6806,6 +7943,8 @@ LogCheckpointEnd(bool restartpoint)
 void
 CreateCheckPoint(int flags)
 {
+   /* use volatile pointer to prevent code rearrangement */
+   volatile XLogCtlData *xlogctl = XLogCtl;
    bool        shutdown;
    CheckPoint  checkPoint;
    XLogRecPtr  recptr;
@@ -6813,6 +7952,7 @@ CreateCheckPoint(int flags)
    XLogRecData rdata;
    uint32      freespace;
    XLogSegNo   _logSegNo;
+   XLogRecPtr  curInsert;
    VirtualTransactionId *vxids;
    int         nvxids;
 
@@ -6883,10 +8023,11 @@ CreateCheckPoint(int flags)
        checkPoint.oldestActiveXid = InvalidTransactionId;
 
    /*
-    * We must hold WALInsertLock while examining insert state to determine
-    * the checkpoint REDO pointer.
+    * We must block concurrent insertions while examining insert state to
+    * determine the checkpoint REDO pointer.
     */
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+   WALInsertSlotAcquire(true);
+   curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
 
    /*
     * If this isn't a shutdown or forced checkpoint, and we have not inserted
@@ -6906,14 +8047,11 @@ CreateCheckPoint(int flags)
    if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
                  CHECKPOINT_FORCE)) == 0)
    {
-       XLogRecPtr  curInsert;
-
-       INSERT_RECPTR(curInsert, Insert, Insert->curridx);
        if (curInsert == ControlFile->checkPoint +
            MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
            ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
        {
-           LWLockRelease(WALInsertLock);
+           WALInsertSlotRelease();
            LWLockRelease(CheckpointLock);
            END_CRIT_SECTION();
            return;
@@ -6945,18 +8083,19 @@ CreateCheckPoint(int flags)
     * the buffer flush work.  Those XLOG records are logically after the
     * checkpoint, even though physically before it.  Got that?
     */
-   freespace = INSERT_FREESPACE(Insert);
+   freespace = INSERT_FREESPACE(curInsert);
    if (freespace == 0)
    {
-       (void) AdvanceXLInsertBuffer(false);
-       /* OK to ignore update return flag, since we will do flush anyway */
-       freespace = INSERT_FREESPACE(Insert);
+       if (curInsert % XLogSegSize == 0)
+           curInsert += SizeOfXLogLongPHD;
+       else
+           curInsert += SizeOfXLogShortPHD;
    }
-   INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+   checkPoint.redo = curInsert;
 
    /*
     * Here we update the shared RedoRecPtr for future XLogInsert calls; this
-    * must be done while holding the insert lock AND the info_lck.
+    * must be done while holding the insertion slots.
     *
     * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
     * pointing past where it really needs to point.  This is okay; the only
@@ -6965,20 +8104,18 @@ CreateCheckPoint(int flags)
     * XLogInserts that happen while we are dumping buffers must assume that
     * their buffer changes are not included in the checkpoint.
     */
-   {
-       /* use volatile pointer to prevent code rearrangement */
-       volatile XLogCtlData *xlogctl = XLogCtl;
-
-       SpinLockAcquire(&xlogctl->info_lck);
-       RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
-       SpinLockRelease(&xlogctl->info_lck);
-   }
+   RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
 
    /*
-    * Now we can release WAL insert lock, allowing other xacts to proceed
-    * while we are flushing disk buffers.
+    * Now we can release the WAL insertion slots, allowing other xacts to
+    * proceed while we are flushing disk buffers.
     */
-   LWLockRelease(WALInsertLock);
+   WALInsertSlotRelease();
+
+   /* Update the info_lck-protected copy of RedoRecPtr as well */
+   SpinLockAcquire(&xlogctl->info_lck);
+   xlogctl->RedoRecPtr = checkPoint.redo;
+   SpinLockRelease(&xlogctl->info_lck);
 
    /*
     * If enabled, log checkpoint start.  We postpone this until now so as not
@@ -7003,10 +8140,11 @@ CreateCheckPoint(int flags)
     * we wait till he's out of his commit critical section before proceeding.
     * See notes in RecordTransactionCommit().
     *
-    * Because we've already released WALInsertLock, this test is a bit fuzzy:
-    * it is possible that we will wait for xacts we didn't really need to
-    * wait for.  But the delay should be short and it seems better to make
-    * checkpoint take a bit longer than to hold locks longer than necessary.
+    * Because we've already released the insertion slots, this test is a bit
+    * fuzzy: it is possible that we will wait for xacts we didn't really need
+    * to wait for.  But the delay should be short and it seems better to make
+    * checkpoint take a bit longer than to hold off insertions longer than
+    * necessary.
     * (In fact, the whole reason we have this issue is that xact.c does
     * commit record XLOG insertion and clog update as two separate steps
     * protected by different locks, but again that seems best on grounds of
@@ -7233,10 +8371,10 @@ CreateEndOfRecoveryRecord(void)
 
    xlrec.end_time = time(NULL);
 
-   LWLockAcquire(WALInsertLock, LW_SHARED);
+   WALInsertSlotAcquire(true);
    xlrec.ThisTimeLineID = ThisTimeLineID;
    xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
-   LWLockRelease(WALInsertLock);
+   WALInsertSlotRelease();
 
    LocalSetXLogInsertAllowed();
 
@@ -7437,15 +8575,18 @@ CreateRestartPoint(int flags)
     * the number of segments replayed since last restartpoint, and request a
     * restartpoint if it exceeds checkpoint_segments.
     *
-    * You need to hold WALInsertLock and info_lck to update it, although
-    * during recovery acquiring WALInsertLock is just pro forma, because
-    * there is no other processes updating Insert.RedoRecPtr.
+    * Like in CreateCheckPoint(), hold off insertions to update it, although
+    * during recovery this is just pro forma, because no WAL insertions are
+    * happening.
     */
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-   SpinLockAcquire(&xlogctl->info_lck);
+   WALInsertSlotAcquire(true);
    xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
+   WALInsertSlotRelease();
+
+   /* Also update the info_lck-protected copy */
+   SpinLockAcquire(&xlogctl->info_lck);
+   xlogctl->RedoRecPtr = lastCheckPoint.redo;
    SpinLockRelease(&xlogctl->info_lck);
-   LWLockRelease(WALInsertLock);
 
    /*
     * Prepare to accumulate statistics.
@@ -7863,9 +9004,9 @@ UpdateFullPageWrites(void)
     */
    if (fullPageWrites)
    {
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        Insert->fullPageWrites = true;
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
    }
 
    /*
@@ -7886,9 +9027,9 @@ UpdateFullPageWrites(void)
 
    if (!fullPageWrites)
    {
-       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       WALInsertSlotAcquire(true);
        Insert->fullPageWrites = false;
-       LWLockRelease(WALInsertLock);
+       WALInsertSlotRelease();
    }
    END_CRIT_SECTION();
 }
@@ -8520,15 +9661,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
     * Note that forcePageWrites has no effect during an online backup from
     * the standby.
     *
-    * We must hold WALInsertLock to change the value of forcePageWrites, to
-    * ensure adequate interlocking against XLogInsert().
+    * We must hold all the insertion slots to change the value of
+    * forcePageWrites, to ensure adequate interlocking against XLogInsert().
     */
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+   WALInsertSlotAcquire(true);
    if (exclusive)
    {
        if (XLogCtl->Insert.exclusiveBackup)
        {
-           LWLockRelease(WALInsertLock);
+           WALInsertSlotRelease();
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     errmsg("a backup is already in progress"),
@@ -8539,7 +9680,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
    else
        XLogCtl->Insert.nonExclusiveBackups++;
    XLogCtl->Insert.forcePageWrites = true;
-   LWLockRelease(WALInsertLock);
+   WALInsertSlotRelease();
 
    /* Ensure we release forcePageWrites if fail below */
    PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -8654,13 +9795,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
             * taking a checkpoint right after another is not that expensive
             * either because only few buffers have been dirtied yet.
             */
-           LWLockAcquire(WALInsertLock, LW_SHARED);
+           WALInsertSlotAcquire(true);
            if (XLogCtl->Insert.lastBackupStart < startpoint)
            {
                XLogCtl->Insert.lastBackupStart = startpoint;
                gotUniqueStartpoint = true;
            }
-           LWLockRelease(WALInsertLock);
+           WALInsertSlotRelease();
        } while (!gotUniqueStartpoint);
 
        XLByteToSeg(startpoint, _logSegNo);
@@ -8750,7 +9891,7 @@ pg_start_backup_callback(int code, Datum arg)
    bool        exclusive = DatumGetBool(arg);
 
    /* Update backup counters and forcePageWrites on failure */
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+   WALInsertSlotAcquire(true);
    if (exclusive)
    {
        Assert(XLogCtl->Insert.exclusiveBackup);
@@ -8767,7 +9908,7 @@ pg_start_backup_callback(int code, Datum arg)
    {
        XLogCtl->Insert.forcePageWrites = false;
    }
-   LWLockRelease(WALInsertLock);
+   WALInsertSlotRelease();
 }
 
 /*
@@ -8838,7 +9979,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
    /*
     * OK to update backup counters and forcePageWrites
     */
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+   WALInsertSlotAcquire(true);
    if (exclusive)
        XLogCtl->Insert.exclusiveBackup = false;
    else
@@ -8858,7 +9999,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
    {
        XLogCtl->Insert.forcePageWrites = false;
    }
-   LWLockRelease(WALInsertLock);
+   WALInsertSlotRelease();
 
    if (exclusive)
    {
@@ -9143,7 +10284,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 void
 do_pg_abort_backup(void)
 {
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+   WALInsertSlotAcquire(true);
    Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
    XLogCtl->Insert.nonExclusiveBackups--;
 
@@ -9152,7 +10293,7 @@ do_pg_abort_backup(void)
    {
        XLogCtl->Insert.forcePageWrites = false;
    }
-   LWLockRelease(WALInsertLock);
+   WALInsertSlotRelease();
 }
 
 /*
@@ -9184,14 +10325,14 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-   XLogCtlInsert *Insert = &XLogCtl->Insert;
-   XLogRecPtr  current_recptr;
+   volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+   uint64      current_bytepos;
 
-   LWLockAcquire(WALInsertLock, LW_SHARED);
-   INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
-   LWLockRelease(WALInsertLock);
+   SpinLockAcquire(&Insert->insertpos_lck);
+   current_bytepos = Insert->CurrBytePos;
+   SpinLockRelease(&Insert->insertpos_lck);
 
-   return current_recptr;
+   return XLogBytePosToRecPtr(current_bytepos);
 }
 
 /*
index 5503925788e400c436266ee15856cb34fd66b5c6..f054be8806f3bee08d7ec0c659925243df92a50b 100644 (file)
@@ -22,6 +22,7 @@
  */
 #include "postgres.h"
 
+#include "access/xlog.h"
 #include "miscadmin.h"
 #include "replication/walsender.h"
 #include "storage/lwlock.h"
@@ -63,6 +64,7 @@ SpinlockSemas(void)
    nsemas = NumLWLocks();      /* one for each lwlock */
    nsemas += NBuffers;         /* one for each buffer header */
    nsemas += max_wal_senders;  /* one for each wal sender process */
+   nsemas += num_xloginsert_slots; /* one for each WAL insertion slot */
    nsemas += 30;               /* plus a bunch for other small-scale use */
 
    return nsemas;
index d6200616de8df517096fa9f9540887ad1a476439..5aefd1b62c56056d9705a027a78a23f222a7ad36 100644 (file)
@@ -2037,6 +2037,17 @@ static struct config_int ConfigureNamesInt[] =
        NULL, NULL, NULL
    },
 
+   {
+       {"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS,
+           gettext_noop("Sets the number of slots for concurrent xlog insertions."),
+           NULL,
+           GUC_NOT_IN_SAMPLE
+       },
+       &num_xloginsert_slots,
+       8, 1, 1000,
+       NULL, NULL, NULL
+   },
+
    {
        /* see max_connections */
        {"max_wal_senders", PGC_POSTMASTER, REPLICATION_SENDING,
index 83e583259dd39ec0ad8fceacade42e082efeecc0..002862cca50c6028a0dd18d3a591756018d786da 100644 (file)
@@ -190,6 +190,7 @@ extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool log_checkpoints;
+extern int num_xloginsert_slots;
 
 /* WAL levels */
 typedef enum WalLevel
index fa6497adaddc830cfcb4dbaab4deb43a464cea35..bca166ebdcd9ff4f001403bcb960e73f17912d4c 100644 (file)
@@ -93,14 +93,4 @@ typedef uint32 TimeLineID;
 #define DEFAULT_SYNC_METHOD        SYNC_METHOD_FSYNC
 #endif
 
-/*
- * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
- * but XLOG_BLCKSZ is assumed to be enough for it.
- */
-#ifdef O_DIRECT
-#define ALIGNOF_XLOG_BUFFER        XLOG_BLCKSZ
-#else
-#define ALIGNOF_XLOG_BUFFER        ALIGNOF_BUFFER
-#endif
-
 #endif   /* XLOG_DEFS_H */
index d8f7e9d64a080da690826edcc581659bf27aff47..85dc4ffdaa2ece93af5cc23b69dae251e5cf205e 100644 (file)
@@ -53,7 +53,7 @@ typedef enum LWLockId
    ProcArrayLock,
    SInvalReadLock,
    SInvalWriteLock,
-   WALInsertLock,
+   WALBufMappingLock,
    WALWriteLock,
    ControlFileLock,
    CheckpointLock,