Code review/prettification for generic_xlog.c.
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 9 Apr 2016 19:02:19 +0000 (15:02 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 9 Apr 2016 19:02:19 +0000 (15:02 -0400)
Improve commentary, use more specific names for the delta fields,
const-ify pointer arguments where possible, avoid assuming that
initializing only the first element of a local array will guarantee
that the remaining elements end up as we need them.  (I think that
code in generic_redo actually worked, but only because InvalidBuffer
is zero; this is a particularly ugly way of depending on that ...)

src/backend/access/transam/generic_xlog.c

index bd0a1b908dfe61b97d80d63149350cb044f5326d..a32f1711cf3c1f693442536535016de8a9c9b6a6 100644 (file)
  * - length of page region (OffsetNumber)
  * - data - the data to place into the region ('length' number of bytes)
  *
- * Unchanged regions of a page are not represented in its delta.  As a
- * result, a delta can be more compact than the full page image.  But having
- * an unchanged region in the middle of two fragments that is smaller than
- * the fragment header (offset and length) does not pay off in terms of the
- * overall size of the delta. For this reason, we break fragments only if
- * the unchanged region is bigger than MATCH_THRESHOLD.
+ * Unchanged regions of a page are not represented in its delta.  As a result,
+ * a delta can be more compact than the full page image.  But having an
+ * unchanged region between two fragments that is smaller than the fragment
+ * header (offset+length) does not pay off in terms of the overall size of
+ * the delta.  For this reason, we merge adjacent fragments if the unchanged
+ * region between them is <= MATCH_THRESHOLD bytes.
  *
  * The worst case for delta sizes occurs when we did not find any unchanged
  * region in the page.  The size of the delta will be the size of the page plus
  */
 #define FRAGMENT_HEADER_SIZE   (2 * sizeof(OffsetNumber))
 #define MATCH_THRESHOLD                        FRAGMENT_HEADER_SIZE
-#define MAX_DELTA_SIZE                 BLCKSZ + FRAGMENT_HEADER_SIZE
+#define MAX_DELTA_SIZE                 (BLCKSZ + FRAGMENT_HEADER_SIZE)
 
 /* Struct of generic xlog data for single page */
 typedef struct
 {
        Buffer          buffer;                 /* registered buffer */
-       char            image[BLCKSZ];  /* copy of page image for modification */
-       char            data[MAX_DELTA_SIZE];   /* delta between page images */
-       int                     dataLen;                /* space consumed in data field */
        bool            fullImage;              /* are we taking a full image of this page? */
+       int                     deltaLen;               /* space consumed in delta field */
+       char            image[BLCKSZ];  /* copy of page image for modification */
+       char            delta[MAX_DELTA_SIZE];  /* delta between page images */
 } PageData;
 
 /* State of generic xlog record construction */
@@ -61,22 +61,26 @@ struct GenericXLogState
 };
 
 static void writeFragment(PageData *pageData, OffsetNumber offset,
-                         OffsetNumber len, Pointer data);
-static void writeDelta(PageData *pageData);
-static void applyPageRedo(Page page, Pointer data, Size dataSize);
+                         OffsetNumber len, const char *data);
+static void computeDelta(PageData *pageData);
+static void applyPageRedo(Page page, const char *delta, Size deltaSize);
+
 
 /*
- * Write next fragment into delta.
+ * Write next fragment into pageData's delta.
+ *
+ * The fragment has the given offset and length, and data points to the
+ * actual data (of length length).
  */
 static void
 writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
-                         Pointer data)
+                         const char *data)
 {
-       Pointer         ptr = pageData->data + pageData->dataLen;
+       char       *ptr = pageData->delta + pageData->deltaLen;
 
-       /* Check if we have enough space */
-       Assert(pageData->dataLen + sizeof(offset) +
-                  sizeof(length) + length <= sizeof(pageData->data));
+       /* Verify we have enough space */
+       Assert(pageData->deltaLen + sizeof(offset) +
+                  sizeof(length) + length <= sizeof(pageData->delta));
 
        /* Write fragment data */
        memcpy(ptr, &offset, sizeof(offset));
@@ -86,14 +90,14 @@ writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
        memcpy(ptr, data, length);
        ptr += length;
 
-       pageData->dataLen = ptr - pageData->data;
+       pageData->deltaLen = ptr - pageData->delta;
 }
 
 /*
- * Make delta for given page.
+ * Compute the delta record for given page.
  */
 static void
-writeDelta(PageData *pageData)
+computeDelta(PageData *pageData)
 {
        Page            page = BufferGetPage(pageData->buffer, NULL, NULL,
                                                                         BGP_NO_SNAPSHOT_TEST),
@@ -106,6 +110,8 @@ writeDelta(PageData *pageData)
                                imageLower = ((PageHeader) image)->pd_lower,
                                imageUpper = ((PageHeader) image)->pd_upper;
 
+       pageData->deltaLen = 0;
+
        for (i = 0; i < BLCKSZ; i++)
        {
                bool            match;
@@ -181,22 +187,22 @@ writeDelta(PageData *pageData)
                char            tmp[BLCKSZ];
 
                memcpy(tmp, image, BLCKSZ);
-               applyPageRedo(tmp, pageData->data, pageData->dataLen);
-               if (memcmp(tmp, page, pageLower)
-                       || memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper))
+               applyPageRedo(tmp, pageData->delta, pageData->deltaLen);
+               if (memcmp(tmp, page, pageLower) != 0 ||
+                 memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper) != 0)
                        elog(ERROR, "result of generic xlog apply does not match");
        }
 #endif
 }
 
 /*
- * Start new generic xlog record.
+ * Start new generic xlog record for modifications to specified relation.
  */
 GenericXLogState *
 GenericXLogStart(Relation relation)
 {
-       int                     i;
        GenericXLogState *state;
+       int                     i;
 
        state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
 
@@ -209,24 +215,30 @@ GenericXLogStart(Relation relation)
 
 /*
  * Register new buffer for generic xlog record.
+ *
+ * Returns pointer to the page's image in the GenericXLogState, which
+ * is what the caller should modify.
+ *
+ * If the buffer is already registered, just return its existing entry.
  */
 Page
 GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
 {
        int                     block_id;
 
-       /* Place new buffer to unused slot in array */
+       /* Search array for existing entry or first unused slot */
        for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
        {
                PageData   *page = &state->pages[block_id];
 
                if (BufferIsInvalid(page->buffer))
                {
+                       /* Empty slot, so use it (there cannot be a match later) */
                        page->buffer = buffer;
-                       memcpy(page->image, BufferGetPage(buffer, NULL, NULL,
-                                                                                         BGP_NO_SNAPSHOT_TEST), BLCKSZ);
-                       page->dataLen = 0;
                        page->fullImage = isNew;
+                       memcpy(page->image,
+                                  BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST),
+                                  BLCKSZ);
                        return (Page) page->image;
                }
                else if (page->buffer == buffer)
@@ -239,15 +251,16 @@ GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
                }
        }
 
-       elog(ERROR, "maximum number of %d generic xlog buffers is exceeded",
+       elog(ERROR, "maximum number %d of generic xlog buffers is exceeded",
                 MAX_GENERIC_XLOG_PAGES);
-
        /* keep compiler quiet */
        return NULL;
 }
 
 /*
  * Unregister particular buffer for generic xlog record.
+ *
+ * XXX this is dangerous and should go away.
  */
 void
 GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
@@ -274,7 +287,8 @@ GenericXLogUnregister(GenericXLogState *state, Buffer buffer)
 }
 
 /*
- * Put all changes in registered buffers to generic xlog record.
+ * Apply changes represented by GenericXLogState to the actual buffers,
+ * and emit a generic xlog record.
  */
 XLogRecPtr
 GenericXLogFinish(GenericXLogState *state)
@@ -291,33 +305,35 @@ GenericXLogFinish(GenericXLogState *state)
 
                for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
                {
+                       PageData   *pageData = &state->pages[i];
+                       Page            page;
                        char            tmp[BLCKSZ];
-                       PageData   *page = &state->pages[i];
 
-                       if (BufferIsInvalid(page->buffer))
+                       if (BufferIsInvalid(pageData->buffer))
                                continue;
 
+                       page = BufferGetPage(pageData->buffer, NULL, NULL,
+                                                                BGP_NO_SNAPSHOT_TEST);
+
                        /* Swap current and saved page image. */
-                       memcpy(tmp, page->image, BLCKSZ);
-                       memcpy(page->image, BufferGetPage(page->buffer, NULL, NULL,
-                                                                                         BGP_NO_SNAPSHOT_TEST), BLCKSZ);
-                       memcpy(BufferGetPage(page->buffer, NULL, NULL,
-                                                                BGP_NO_SNAPSHOT_TEST), tmp, BLCKSZ);
+                       memcpy(tmp, pageData->image, BLCKSZ);
+                       memcpy(pageData->image, page, BLCKSZ);
+                       memcpy(page, tmp, BLCKSZ);
 
-                       if (page->fullImage)
+                       if (pageData->fullImage)
                        {
                                /* A full page image does not require anything special */
-                               XLogRegisterBuffer(i, page->buffer, REGBUF_FORCE_IMAGE);
+                               XLogRegisterBuffer(i, pageData->buffer, REGBUF_FORCE_IMAGE);
                        }
                        else
                        {
                                /*
-                                * In normal mode, calculate delta and write it as data
+                                * In normal mode, calculate delta and write it as xlog data
                                 * associated with this page.
                                 */
-                               XLogRegisterBuffer(i, page->buffer, REGBUF_STANDARD);
-                               writeDelta(page);
-                               XLogRegisterBufData(i, page->data, page->dataLen);
+                               XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
+                               computeDelta(pageData);
+                               XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
                        }
                }
 
@@ -327,13 +343,13 @@ GenericXLogFinish(GenericXLogState *state)
                /* Set LSN and mark buffers dirty */
                for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
                {
-                       PageData   *page = &state->pages[i];
+                       PageData   *pageData = &state->pages[i];
 
-                       if (BufferIsInvalid(page->buffer))
+                       if (BufferIsInvalid(pageData->buffer))
                                continue;
-                       PageSetLSN(BufferGetPage(page->buffer, NULL, NULL,
+                       PageSetLSN(BufferGetPage(pageData->buffer, NULL, NULL,
                                                                         BGP_NO_SNAPSHOT_TEST), lsn);
-                       MarkBufferDirty(page->buffer);
+                       MarkBufferDirty(pageData->buffer);
                }
                END_CRIT_SECTION();
        }
@@ -343,13 +359,15 @@ GenericXLogFinish(GenericXLogState *state)
                START_CRIT_SECTION();
                for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
                {
-                       PageData   *page = &state->pages[i];
+                       PageData   *pageData = &state->pages[i];
 
-                       if (BufferIsInvalid(page->buffer))
+                       if (BufferIsInvalid(pageData->buffer))
                                continue;
-                       memcpy(BufferGetPage(page->buffer, NULL, NULL,
-                                                                BGP_NO_SNAPSHOT_TEST), page->image, BLCKSZ);
-                       MarkBufferDirty(page->buffer);
+                       memcpy(BufferGetPage(pageData->buffer, NULL, NULL,
+                                                                BGP_NO_SNAPSHOT_TEST),
+                                  pageData->image,
+                                  BLCKSZ);
+                       MarkBufferDirty(pageData->buffer);
                }
                END_CRIT_SECTION();
        }
@@ -360,7 +378,9 @@ GenericXLogFinish(GenericXLogState *state)
 }
 
 /*
- * Abort generic xlog record.
+ * Abort generic xlog record construction.  No changes are applied to buffers.
+ *
+ * Note: caller is responsible for releasing locks/pins on buffers, if needed.
  */
 void
 GenericXLogAbort(GenericXLogState *state)
@@ -372,10 +392,10 @@ GenericXLogAbort(GenericXLogState *state)
  * Apply delta to given page image.
  */
 static void
-applyPageRedo(Page page, Pointer data, Size dataSize)
+applyPageRedo(Page page, const char *delta, Size deltaSize)
 {
-       Pointer         ptr = data,
-                               end = data + dataSize;
+       const char *ptr = delta;
+       const char *end = delta + deltaSize;
 
        while (ptr < end)
        {
@@ -399,10 +419,11 @@ applyPageRedo(Page page, Pointer data, Size dataSize)
 void
 generic_redo(XLogReaderState *record)
 {
-       uint8           block_id;
-       Buffer          buffers[MAX_GENERIC_XLOG_PAGES] = {InvalidBuffer};
        XLogRecPtr      lsn = record->EndRecPtr;
+       Buffer          buffers[MAX_GENERIC_XLOG_PAGES];
+       uint8           block_id;
 
+       /* Protect limited size of buffers[] array */
        Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
 
        /* Iterate over blocks */
@@ -411,20 +432,24 @@ generic_redo(XLogReaderState *record)
                XLogRedoAction action;
 
                if (!XLogRecHasBlockRef(record, block_id))
+               {
+                       buffers[block_id] = InvalidBuffer;
                        continue;
+               }
 
                action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
 
                /* Apply redo to given block if needed */
                if (action == BLK_NEEDS_REDO)
                {
-                       Pointer         blockData;
-                       Size            blockDataSize;
                        Page            page;
+                       char       *blockDelta;
+                       Size            blockDeltaSize;
 
-                       page = BufferGetPage(buffers[block_id], NULL, NULL, BGP_NO_SNAPSHOT_TEST);
-                       blockData = XLogRecGetBlockData(record, block_id, &blockDataSize);
-                       applyPageRedo(page, blockData, blockDataSize);
+                       page = BufferGetPage(buffers[block_id], NULL, NULL,
+                                                                BGP_NO_SNAPSHOT_TEST);
+                       blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);
+                       applyPageRedo(page, blockDelta, blockDeltaSize);
 
                        PageSetLSN(page, lsn);
                        MarkBufferDirty(buffers[block_id]);