Refactor XLogInsert a bit. The rdata entries for backup blocks are now
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 11 Jan 2012 07:46:18 +0000 (09:46 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 11 Jan 2012 09:01:47 +0000 (11:01 +0200)
constructed before acquiring WALInsertLock, which slightly reduces the time
the lock is held. Although I could not measure any benefit in benchmarks,
the code is more readable this way.

src/backend/access/transam/xlog.c

index 8e659620faba41aca7443e99fb6ad33dfb20017c..db7d9930cb6517d03eee9ff277a63e0e191a5a99 100644 (file)
@@ -694,6 +694,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
    uint32      freespace;
    int         curridx;
    XLogRecData *rdt;
+   XLogRecData *rdt_lastnormal;
    Buffer      dtbuf[XLR_MAX_BKP_BLOCKS];
    bool        dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
    BkpBlock    dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
@@ -708,6 +709,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
    bool        updrqst;
    bool        doPageWrites;
    bool        isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+   uint8       info_orig = info;
 
    /* cross-check on whether we should be here or not */
    if (!XLogInsertAllowed())
@@ -731,23 +733,18 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
    }
 
    /*
-    * Here we scan the rdata chain, determine which buffers must be backed
-    * up, and compute the CRC values for the data.  Note that the record
-    * header isn't added into the CRC initially since we don't know the final
-    * length or info bits quite yet.  Thus, the CRC will represent the CRC of
-    * the whole record in the order "rdata, then backup blocks, then record
-    * header".
+    * Here we scan the rdata chain, to determine which buffers must be backed
+    * up.
     *
     * We may have to loop back to here if a race condition is detected below.
     * We could prevent the race by doing all this work while holding the
     * insert lock, but it seems better to avoid doing CRC calculations while
-    * holding the lock.  This means we have to be careful about modifying the
-    * rdata chain until we know we aren't going to loop back again.  The only
-    * change we allow ourselves to make earlier is to set rdt->data = NULL in
-    * chain items we have decided we will have to back up the whole buffer
-    * for.  This is OK because we will certainly decide the same thing again
-    * for those items if we do it over; doing it here saves an extra pass
-    * over the chain later.
+    * holding the lock.
+    *
+    * We add entries for backup blocks to the chain, so that they don't
+    * need any special treatment in the critical section where the chunks are
+    * copied into the WAL buffers. Those entries have to be unlinked from the
+    * chain if we have to loop back here.
     */
 begin:;
    for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
@@ -764,7 +761,6 @@ begin:;
     */
    doPageWrites = fullPageWrites || Insert->forcePageWrites;
 
-   INIT_CRC32(rdata_crc);
    len = 0;
    for (rdt = rdata;;)
    {
@@ -772,7 +768,6 @@ begin:;
        {
            /* Simple data, just include it */
            len += rdt->len;
-           COMP_CRC32(rdata_crc, rdt->data, rdt->len);
        }
        else
        {
@@ -783,12 +778,12 @@ begin:;
                {
                    /* Buffer already referenced by earlier chain item */
                    if (dtbuf_bkp[i])
+                   {
                        rdt->data = NULL;
+                       rdt->len = 0;
+                   }
                    else if (rdt->data)
-                   {
                        len += rdt->len;
-                       COMP_CRC32(rdata_crc, rdt->data, rdt->len);
-                   }
                    break;
                }
                if (dtbuf[i] == InvalidBuffer)
@@ -800,12 +795,10 @@ begin:;
                    {
                        dtbuf_bkp[i] = true;
                        rdt->data = NULL;
+                       rdt->len = 0;
                    }
                    else if (rdt->data)
-                   {
                        len += rdt->len;
-                       COMP_CRC32(rdata_crc, rdt->data, rdt->len);
-                   }
                    break;
                }
            }
@@ -819,39 +812,6 @@ begin:;
        rdt = rdt->next;
    }
 
-   /*
-    * Now add the backup block headers and data into the CRC
-    */
-   for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-   {
-       if (dtbuf_bkp[i])
-       {
-           BkpBlock   *bkpb = &(dtbuf_xlg[i]);
-           char       *page;
-
-           COMP_CRC32(rdata_crc,
-                      (char *) bkpb,
-                      sizeof(BkpBlock));
-           page = (char *) BufferGetBlock(dtbuf[i]);
-           if (bkpb->hole_length == 0)
-           {
-               COMP_CRC32(rdata_crc,
-                          page,
-                          BLCKSZ);
-           }
-           else
-           {
-               /* must skip the hole */
-               COMP_CRC32(rdata_crc,
-                          page,
-                          bkpb->hole_offset);
-               COMP_CRC32(rdata_crc,
-                          page + (bkpb->hole_offset + bkpb->hole_length),
-                          BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
-           }
-       }
-   }
-
    /*
     * NOTE: We disallow len == 0 because it provides a useful bit of extra
     * error checking in ReadRecord.  This means that all callers of
@@ -862,70 +822,20 @@ begin:;
    if (len == 0 && !isLogSwitch)
        elog(PANIC, "invalid xlog record length %u", len);
 
-   START_CRIT_SECTION();
-
-   /* Now wait to get insert lock */
-   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-
-   /*
-    * Check to see if my RedoRecPtr is out of date.  If so, may have to go
-    * back and recompute everything.  This can only happen just after a
-    * checkpoint, so it's better to be slow in this case and fast otherwise.
-    *
-    * If we aren't doing full-page writes then RedoRecPtr doesn't actually
-    * affect the contents of the XLOG record, so we'll update our local copy
-    * but not force a recomputation.
-    */
-   if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
-   {
-       Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
-       RedoRecPtr = Insert->RedoRecPtr;
-
-       if (doPageWrites)
-       {
-           for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
-           {
-               if (dtbuf[i] == InvalidBuffer)
-                   continue;
-               if (dtbuf_bkp[i] == false &&
-                   XLByteLE(dtbuf_lsn[i], RedoRecPtr))
-               {
-                   /*
-                    * Oops, this buffer now needs to be backed up, but we
-                    * didn't think so above.  Start over.
-                    */
-                   LWLockRelease(WALInsertLock);
-                   END_CRIT_SECTION();
-                   goto begin;
-               }
-           }
-       }
-   }
-
-   /*
-    * Also check to see if forcePageWrites was just turned on; if we weren't
-    * already doing full-page writes then go back and recompute. (If it was
-    * just turned off, we could recompute the record without full pages, but
-    * we choose not to bother.)
-    */
-   if (Insert->forcePageWrites && !doPageWrites)
-   {
-       /* Oops, must redo it with full-page data */
-       LWLockRelease(WALInsertLock);
-       END_CRIT_SECTION();
-       goto begin;
-   }
-
    /*
     * Make additional rdata chain entries for the backup blocks, so that we
-    * don't need to special-case them in the write loop.  Note that we have
-    * now irrevocably changed the input rdata chain.  At the exit of this
-    * loop, write_len includes the backup block data.
+    * don't need to special-case them in the write loop.  This modifies the
+    * original rdata chain, but we keep a pointer to the last regular entry,
+    * rdt_lastnormal, so that we can undo this if we have to loop back to the
+    * beginning.
+    *
+    * At the exit of this loop, write_len includes the backup block data.
     *
     * Also set the appropriate info bits to show which buffers were backed
     * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
     * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
     */
+   rdt_lastnormal = rdt;
    write_len = len;
    for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
    {
@@ -974,6 +884,76 @@ begin:;
        }
    }
 
+   /*
+    * Calculate CRC of the data, including all the backup blocks
+    *
+    * Note that the record header isn't added into the CRC initially since
+    * we don't know the prev-link yet.  Thus, the CRC will represent the CRC
+    * of the whole record in the order: rdata, then backup blocks, then
+    * record header.
+    */
+   INIT_CRC32(rdata_crc);
+   for (rdt = rdata; rdt != NULL; rdt = rdt->next)
+       COMP_CRC32(rdata_crc, rdt->data, rdt->len);
+
+   START_CRIT_SECTION();
+
+   /* Now wait to get insert lock */
+   LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+
+   /*
+    * Check to see if my RedoRecPtr is out of date.  If so, may have to go
+    * back and recompute everything.  This can only happen just after a
+    * checkpoint, so it's better to be slow in this case and fast otherwise.
+    *
+    * If we aren't doing full-page writes then RedoRecPtr doesn't actually
+    * affect the contents of the XLOG record, so we'll update our local copy
+    * but not force a recomputation.
+    */
+   if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
+   {
+       Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
+       RedoRecPtr = Insert->RedoRecPtr;
+
+       if (doPageWrites)
+       {
+           for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+           {
+               if (dtbuf[i] == InvalidBuffer)
+                   continue;
+               if (dtbuf_bkp[i] == false &&
+                   XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+               {
+                   /*
+                    * Oops, this buffer now needs to be backed up, but we
+                    * didn't think so above.  Start over.
+                    */
+                   LWLockRelease(WALInsertLock);
+                   END_CRIT_SECTION();
+                   rdt_lastnormal->next = NULL;
+                   info = info_orig;
+                   goto begin;
+               }
+           }
+       }
+   }
+
+   /*
+    * Also check to see if forcePageWrites was just turned on; if we weren't
+    * already doing full-page writes then go back and recompute. (If it was
+    * just turned off, we could recompute the record without full pages, but
+    * we choose not to bother.)
+    */
+   if (Insert->forcePageWrites && !doPageWrites)
+   {
+       /* Oops, must redo it with full-page data. */
+       LWLockRelease(WALInsertLock);
+       END_CRIT_SECTION();
+       rdt_lastnormal->next = NULL;
+       info = info_orig;
+       goto begin;
+   }
+
    /*
     * If there isn't enough space on the current XLOG page for a record
     * header, advance to the next page (leaving the unused space as zeroes).