Refactor page compactifying code.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 3 Feb 2015 12:09:29 +0000 (14:09 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 3 Feb 2015 12:09:29 +0000 (14:09 +0200)
The logic to compact away removed tuples from page was duplicated with
small differences in PageRepairFragmentation, PageIndexMultiDelete, and
PageIndexDeleteNoCompact. Put it into a common function.

Reviewed by Peter Geoghegan.

src/backend/storage/page/bufpage.c

index c0095c7ba18111609858bfe06bd0481f57e3c3a9..61e7437f8ffea19964ce3cca0cc27f7b8c093c6e 100644 (file)
@@ -404,10 +404,9 @@ PageRestoreTempPage(Page tempPage, Page oldPage)
  */
 typedef struct itemIdSortData
 {
-   int         offsetindex;    /* linp array index */
-   int         itemoff;        /* page offset of item data */
-   Size        alignedlen;     /* MAXALIGN(item data len) */
-   ItemIdData  olditemid;      /* used only in PageIndexMultiDelete */
+   uint16      offsetindex;    /* linp array index */
+   int16       itemoff;        /* page offset of item data */
+   uint16      alignedlen;     /* MAXALIGN(item data len) */
 } itemIdSortData;
 typedef itemIdSortData *itemIdSort;
 
@@ -419,6 +418,38 @@ itemoffcompare(const void *itemidp1, const void *itemidp2)
        ((itemIdSort) itemidp1)->itemoff;
 }
 
+/*
+ * After removing or marking some line pointers unused, move the tuples to
+ * remove the gaps caused by the removed items.
+ */
+static void
+compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
+{
+   PageHeader  phdr = (PageHeader) page;
+   Offset      upper;
+   int         i;
+
+   /* sort itemIdSortData array into decreasing itemoff order */
+   qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
+         itemoffcompare);
+
+   upper = phdr->pd_special;
+   for (i = 0; i < nitems; i++)
+   {
+       itemIdSort  itemidptr = &itemidbase[i];
+       ItemId      lp;
+
+       lp = PageGetItemId(page, itemidptr->offsetindex + 1);
+       upper -= itemidptr->alignedlen;
+       memmove((char *) page + upper,
+               (char *) page + itemidptr->itemoff,
+               itemidptr->alignedlen);
+       lp->lp_off = upper;
+   }
+
+   phdr->pd_upper = upper;
+}
+
 /*
  * PageRepairFragmentation
  *
@@ -441,7 +472,6 @@ PageRepairFragmentation(Page page)
                nunused;
    int         i;
    Size        totallen;
-   Offset      upper;
 
    /*
     * It's worth the trouble to be more paranoid here than in most places,
@@ -515,24 +545,7 @@ PageRepairFragmentation(Page page)
               errmsg("corrupted item lengths: total %u, available space %u",
                      (unsigned int) totallen, pd_special - pd_lower)));
 
-       /* sort itemIdSortData array into decreasing itemoff order */
-       qsort((char *) itemidbase, nstorage, sizeof(itemIdSortData),
-             itemoffcompare);
-
-       /* compactify page */
-       upper = pd_special;
-
-       for (i = 0, itemidptr = itemidbase; i < nstorage; i++, itemidptr++)
-       {
-           lp = PageGetItemId(page, itemidptr->offsetindex + 1);
-           upper -= itemidptr->alignedlen;
-           memmove((char *) page + upper,
-                   (char *) page + itemidptr->itemoff,
-                   itemidptr->alignedlen);
-           lp->lp_off = upper;
-       }
-
-       ((PageHeader) page)->pd_upper = upper;
+       compactify_tuples(itemidbase, nstorage, page);
    }
 
    /* Set hint bit for PageAddItem */
@@ -782,13 +795,12 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
    Offset      pd_upper = phdr->pd_upper;
    Offset      pd_special = phdr->pd_special;
    itemIdSortData itemidbase[MaxIndexTuplesPerPage];
+   ItemIdData  newitemids[MaxIndexTuplesPerPage];
    itemIdSort  itemidptr;
    ItemId      lp;
    int         nline,
                nused;
-   int         i;
    Size        totallen;
-   Offset      upper;
    Size        size;
    unsigned    offset;
    int         nextitm;
@@ -857,9 +869,9 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
        {
            itemidptr->offsetindex = nused;     /* where it will go */
            itemidptr->itemoff = offset;
-           itemidptr->olditemid = *lp;
            itemidptr->alignedlen = MAXALIGN(size);
            totallen += itemidptr->alignedlen;
+           newitemids[nused] = *lp;
            itemidptr++;
            nused++;
        }
@@ -875,26 +887,15 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
               errmsg("corrupted item lengths: total %u, available space %u",
                      (unsigned int) totallen, pd_special - pd_lower)));
 
-   /* sort itemIdSortData array into decreasing itemoff order */
-   qsort((char *) itemidbase, nused, sizeof(itemIdSortData),
-         itemoffcompare);
-
-   /* compactify page and install new itemids */
-   upper = pd_special;
-
-   for (i = 0, itemidptr = itemidbase; i < nused; i++, itemidptr++)
-   {
-       lp = PageGetItemId(page, itemidptr->offsetindex + 1);
-       upper -= itemidptr->alignedlen;
-       memmove((char *) page + upper,
-               (char *) page + itemidptr->itemoff,
-               itemidptr->alignedlen);
-       *lp = itemidptr->olditemid;
-       lp->lp_off = upper;
-   }
-
+   /*
+    * Looks good. Overwrite the line pointers with the copy, from which we've
+    * removed all the unused items.
+    */
+   memcpy(phdr->pd_linp, newitemids, nused * sizeof(ItemIdData));
    phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
-   phdr->pd_upper = upper;
+
+   /* and compactify the tuple data */
+   compactify_tuples(itemidbase, nused, page);
 }
 
 /*
@@ -1000,7 +1001,6 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
        itemIdSort  itemidptr;
        int         i;
        Size        totallen;
-       Offset      upper;
 
        /*
         * Scan the page taking note of each item that we need to preserve.
@@ -1012,7 +1012,8 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
         */
        itemidptr = itemidbase;
        totallen = 0;
-       for (i = 0; i < nline; i++, itemidptr++)
+       PageClearHasFreeLinePointers(page);
+       for (i = 0; i < nline; i++)
        {
            ItemId      lp;
 
@@ -1024,13 +1025,15 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
                itemidptr->itemoff = ItemIdGetOffset(lp);
                itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
                totallen += itemidptr->alignedlen;
+               itemidptr++;
            }
            else
            {
-               itemidptr->itemoff = 0;
-               itemidptr->alignedlen = 0;
+               PageSetHasFreeLinePointers(page);
+               ItemIdSetUnused(lp);
            }
        }
+       nline = itemidptr - itemidbase;
        /* By here, there are exactly nline elements in itemidbase array */
 
        if (totallen > (Size) (pd_special - pd_lower))
@@ -1039,38 +1042,11 @@ PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems)
                     errmsg("corrupted item lengths: total %u, available space %u",
                            (unsigned int) totallen, pd_special - pd_lower)));
 
-       /* sort itemIdSortData array into decreasing itemoff order */
-       qsort((char *) itemidbase, nline, sizeof(itemIdSortData),
-             itemoffcompare);
-
        /*
         * Defragment the data areas of each tuple, being careful to preserve
         * each item's position in the linp array.
         */
-       upper = pd_special;
-       PageClearHasFreeLinePointers(page);
-       for (i = 0, itemidptr = itemidbase; i < nline; i++, itemidptr++)
-       {
-           ItemId      lp;
-
-           lp = PageGetItemId(page, itemidptr->offsetindex + 1);
-           if (itemidptr->alignedlen == 0)
-           {
-               PageSetHasFreeLinePointers(page);
-               ItemIdSetUnused(lp);
-               continue;
-           }
-           upper -= itemidptr->alignedlen;
-           memmove((char *) page + upper,
-                   (char *) page + itemidptr->itemoff,
-                   itemidptr->alignedlen);
-           lp->lp_off = upper;
-           /* lp_flags and lp_len remain the same as originally */
-       }
-
-       /* Set the new page limits */
-       phdr->pd_upper = upper;
-       phdr->pd_lower = SizeOfPageHeaderData + i * sizeof(ItemIdData);
+       compactify_tuples(itemidbase, nline, page);
    }
 }