More hacking.
authorRobert Haas <rhaas@postgresql.org>
Fri, 14 Feb 2014 13:09:32 +0000 (08:09 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 14 Feb 2014 13:09:32 +0000 (08:09 -0500)
src/backend/utils/mmgr/freepage.c
src/include/utils/freepage.h

index 221025b4d61f438771f89938512ce0e501e2de7e..b86e8cd8ec446319c79f798e0984765113aed79e 100644 (file)
@@ -82,8 +82,9 @@ typedef struct FreePageBtreeSearchResult
 } FreePageBtreeSearchResult;
 
 /* Helper functions */
-static void FreePageBtreeRemove(FreePageManager *fpm, FreePageBtree *btp,
-                                       Size index);
+static void FreePageBtreeReduceAncestorKeys(FreePageManager *fpm,
+                                       FreePageBtree *btp);
+static void FreePageBtreePageRemove(FreePageBtree *btp, Size index);
 static void FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
                                        FreePageBtreeSearchResult *result);
 static Size FreePageBtreeSearchInternal(FreePageBtree *btp, Size first_page);
@@ -122,6 +123,11 @@ FreePageManagerInitialize(FreePageManager *fpm, char *base, LWLock *lock,
        relptr_store(base, fpm->lock, lock);
        fpm->lock_address_is_fixed = lock_address_is_fixed;
        relptr_store(base, fpm->btree_root, (FreePageBtree *) NULL);
+       relptr_store(base, fpm->btree_recycle, (FreePageSpanLeader *) NULL);
+       fpm->btree_depth = 0;
+       fpm->btree_recycle_count = 0;
+       fpm->singleton_first_page = 0;
+       fpm->singleton_npages = 0;
 
        for (f = 0; f < FPM_NUM_FREELISTS; f++)
                relptr_store(base, fpm->freelist[f], (FreePageSpanLeader *) NULL);
@@ -223,104 +229,214 @@ FreePageManagerGet(FreePageManager *fpm, Size npages, Size *first_page)
 }
 
 /*
- * Insert an item into the btree in the given position on the given page.
+ * Transfer a run of pages to the free page manager.
  */
-static void
-FreePageBtreeInsert(FreePageManager *fpm, FreePageBtree *btp, Size index,
-                                       Size first_page, Size npages)
+void
+FreePageManagerPut(FreePageManager *fpm, Size first_page, Size npages)
 {
-       char *base = fpm_segment_base(fpm);
-       FreePageBtree *splitroot;
-       int             nsplits = 0;
+       LWLock *lock = fpm_lock(fpm);
+       Assert(npages > 0);
 
-       Assert(btp->hdr.magic == FREE_PAGE_LEAF_MAGIC);
-       Assert(index < btp->hdr.nused);
-       Assert(btp->hdr.nused <= FPM_ITEMS_PER_LEAF_PAGE);
+       /* Acquire lock (if there is one). */
+       if (lock != NULL)
+               LWLockAcquire(lock, LW_EXCLUSIVE);
+
+       /*
+        * As a special case, we store the very first range in the FreePageManager
+        * itself, so that a request for the entire number of pages will succeed.
+        * Otherwise, we must build or update a btree.
+        */
+       if (fpm->btree_depth == 0 && fpm->singleton_npages == 0)
+       {
+               fpm->singleton_first_page = first_page;
+               fpm->singleton_npages = npages;
+       }
+       else
+       {
+               /* XXX */
+       }
+
+       /* Release lock (if there is one). */
+       if (lock != NULL)
+               LWLockRelease(lock);
+}
+
+/*
+ * Put a range of pages into the btree and freelists, consolidating it with
+ * existing free spans just before and/or after it.  If 'soft' is true,
+ * only perform the insertion if it can be done without allocating new btree
+ * pages; if false, do it always.  Returns true if the insertion was performed.
+ */
+bool
+FreePageManagerPutInternal(FreePageManager *fpm, Size first_page, Size npages,
+                                                  bool soft)
+{
+       FreePageBtreeSearchResult result;
+       FreePageBtreeLeafKey *prevkey = NULL;
+       FreePageBtreeLeafKey *nextkey = NULL;
+
+       /* Search the btree. */
+       FreePageBtreeSearch(fpm, first_page, &result);
+       Assert(result.page_exact == NULL);              /* can't already be there */
+       if (result.page_prev != NULL)
+               prevkey = &result.page_prev->u.leaf_key[result.index_prev];
+       if (result.page_next != NULL)
+               nextkey = &result.page_next->u.leaf_key[result.index_next];
+
+       /* Consolidate with the previous entry if possible. */
+       if (prevkey->first_page + prevkey->npages >= first_page)
+       {
+               bool    remove_next = false;
+
+               Assert(prevkey->first_page + prevkey->npages == first_page);
+               prevkey->npages = (first_page - prevkey->first_page) + npages;
+
+               /* Check whether we can *also* consolidate with the following entry. */
+               if (prevkey->first_page + prevkey->npages >= nextkey->first_page)
+               {
+                       Assert(prevkey->first_page + prevkey->npages ==
+                                       nextkey->first_page);
+                       prevkey->npages = (nextkey->first_page - prevkey->first_page)
+                               + nextkey->npages;
+                       remove_next = true;
+               }
+
+               /* Put the span on the correct freelist. */
+               FreePagePopSpanLeader(fpm, prevkey->first_page);
+               FreePagePushSpanLeader(fpm, prevkey->first_page, prevkey->npages);
+
+               /*
+                * If we consolidated with both the preceding and following entries,
+                * we must remove the following entry.  We do this last, because
+                * removing an element from the btree may invalidate pointers we hold
+                * into the current data structure.
+                *
+                * NB: The btree is technically in an invalid state a this point
+                * because we've already updated prevkey to cover the same key space
+                * as nextkey.  FreePageBtreeRemove() shouldn't notice that, though.
+                */
+               if (remove_next)
+                       FreePageBtreeRemove(fpm, result.page_next, result.index_next);
 
-       /* If the page is not full, this is easy as pie. */
-       if (btp->hdr.nused < FPM_ITEMS_PER_LEAF_PAGE)
+               return true;
+       }
+
+       /* Consolidate with the next entry if possible. */
+       if (first_page + npages >= nextkey->first_page)
+       {
+               Size    newpages;
+
+               /* Compute new size for span. */
+               Assert(first_page + npages == nextkey->first_page);
+               newpages = (nextkey->first_page - first_page) + nextkey->npages;
+
+               /* Put span on correct free list. */
+               FreePagePopSpanLeader(fpm, nextkey->first_page);
+               FreePagePushSpanLeader(fpm, first_page, newpages);
+
+               /* Update key in place. */
+               nextkey->first_page = first_page;
+               nextkey->npages = newpages;
+
+               /* If reducing first key on page, ancestors might need adjustment. */
+               if (result.index_next == 0)
+                       FreePageBtreeReduceAncestorKeys(fpm, result.page_next);
+
+               return true;
+       }
+
+       /*
+        * At this point, we know that the item can't be consolidated with either
+        * the preceding or following span, so we need to insert it.  If there's
+        * space on the page that contains the following key, then we can just
+        * insert it there.
+        *
+        * Note that it's not so easy to insert on the page that contains the
+        * preceding key, because the new key we're inserting is greater than
+        * anything that's on that page right now and might also be greater than
+        * the upper bound for that page.
+        */
+       if (result.page_next->hdr.nused < FPM_ITEMS_PER_LEAF_PAGE)
        {
+               FreePageBtree *btp = result.page_next;
+               Size    index = result.index_next;
+
                memmove(&btp->u.leaf_key[index + 1], &btp->u.leaf_key[index],
                                sizeof(FreePageBtreeLeafKey) * (btp->hdr.nused - index));
                btp->u.leaf_key[index].first_page = first_page;
                btp->u.leaf_key[index].npages = npages;
                ++btp->hdr.nused;
-               return;
+
+               /* If new first key on page, ancestors might need adjustment. */
+               if (index == 0)
+                       FreePageBtreeReduceAncestorKeys(fpm, result.page_next);
+
+               return true;
        }
+}
+
+/*
+ * When the first key on a leaf page is reduced, the first_page value stored
+ * in the parent's key also needs to be reduced.  We assume here that the key
+ * is not reduced to a value less than the first key of the next-lower leaf
+ * page, since that would badly hose the btree.  If the parent's key is the
+ * first one on the internal page that contains it, then we need to update
+ * it's parent as well, and so on until we either reach the root of the btree
+ * or reduce a key that's not the first one on the page.
+ */
+static void
+FreePageBtreeReduceAncestorKeys(FreePageManager *fpm, FreePageBtree *btp)
+{
+       char *base = fpm_segment_base(fpm);
+       Size    first_page;
+       FreePageBtree *parent;
+       FreePageBtree *child;
+
+       Assert(btp->hdr.magic == FREE_PAGE_LEAF_MAGIC);
+       Assert(btp->hdr.nused > 0 && btp->hdr.nused <= FPM_ITEMS_PER_LEAF_PAGE);
+       first_page = btp->u.leaf_key[0].first_page;
+       child = btp;
 
-       /*
-        * Move upward until we find a page that isn't full.  We'll need to split
-        * everything below that point.
-        */
-       ++nsplits;
-       splitroot = relptr_access(base, btp->parent);
        for (;;)
        {
-               ++nsplits;
-               if (splitroot == NULL)
-                       break;
-
-               Assert(splitroot->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
-               Assert(btp->hdr.nused <= FPM_ITEMS_PER_INTERNAL_PAGE);
+               Size    s;
 
-               if (splitroot->hdr.nused < FPM_ITEMS_PER_INTERNAL_PAGE)
+               parent = relptr_access(base, child->hdr.parent);
+               if (parent == NULL)
                        break;
-               splitroot = relptr_access(base, splitroot->parent);
-       }
+               s = FreePageBtreeSearchInternal(parent, first_page);
 
-       /*
-        * XXX. Ensure that we have at least nsplit spages in fpm->btree_recycle.
-        * How?
-        */
+#ifdef USE_ASSERT_CHECKING
+               if (assert_enabled)
+               {
+                       FreePageBtree *check;
 
-       /*
-        * If everything up to and including the root was full, split the root.
-        * The depth of the btree increases by one.
-        */
-       if (splitroot == NULL)
-       {
-               /* XXX. Perform root split. */
-       }
+                       Assert(s < parent->hdr.nused);
+                       check = relptr_access(base, parent->u.internal_key[s].child);
+                       Assert(child == check);
+               }
+#endif
 
-       /* Work our way down the tree, splitting as we go. */
-       while (splitroot->hdr.magic == FREE_PAGE_INTERNAL_MAGIC)
+               parent->u.internal_key[s].first_page = first_page;
+               if (s > 0)
+                       break;
+               child = parent;
+       }
 }
 
 /*
- * Remove from the btree the item in the given position on the given page.
+ * Remove an item from the btree at the given position on the given page.
  */
 static void
-FreePageBtreeRemove(FreePageManager *fpm, FreePageBtree *btp, Size index)
+FreePageBtreeRemoveLeaf(FreePageBtree *btp, Size index)
 {
-       char *base = fpm_segment_base(fpm);
        Assert(btp->hdr.magic == FREE_PAGE_LEAF_MAGIC);
        Assert(index < btp->hdr.nused);
 
-       /* Shuffle remaining keys. */
        --btp->hdr.nused;
        if (index < btp->hdr.nused)
                memmove(&btp->u.leaf_key[index], &btp->u.leaf_key[index + 1],
                                sizeof(FreePageBtreeLeafKey) * (btp->hdr.nused - index));
-
-       /*
-        * XXX.  At this point, the key is gone, but the node may be empty or
-        * may contain very few keys.   If we're the root page, that's life.
-        * Otherwise, unlink this page from the parent.  Alternatively, if this
-        * page is non-empty but less than half full, try to merge it with a
-        * sibling, which will likewise delete one key from the parent.
-        *
-        * Either way, we free up a page; make that into a free span.  Attempt
-        * a "no-split" insertion of that span into the btree: that is, succeed if
-        * the page can be combined with an existing span with which it is
-        * contiguous or if the btree page into which it needs to be inserted
-        * isn't full.  Otherwise, skip the insertion, which will lose the ability
-        * to consolidate that span with adjacent spans, but that's life sometimes.
-        * Put the span on the free list, setting a flag to indicate whether or not
-        * we managed to insert it into the btree.
-        *
-        * After freeing the page, repeat this process for our parent, which may
-        * now be empty or underfull.
-        */
 }
 
 /*
@@ -345,7 +461,7 @@ FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
        FreePageBtree *btp = relptr_access(base, fpm->btree_root);
        Size    index;
 
-       /* If the btree is empty, then this would be the only item. */
+       /* If the btree is empty, there's nothing to find. */
        if (btp == NULL)
        {
                result->page_exact = NULL;
@@ -358,24 +474,38 @@ FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
        while (btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC)
        {
                index = FreePageBtreeSearchInternal(btp, first_page);
+               /*
+                * If the index is 0, we're not going to find it, but we keep
+                * descending anyway so that we can find the element that follows it.
+                */
+               if (index > 0)
+                       --index;
                btp = relptr_access(base, btp->u.internal_key[index].child);
        }
 
        /* Search leaf page. */
        index = FreePageBtreeSearchLeaf(btp, first_page);
-
-       /* Did we get an exact match?  If so, return it. */
-       if (first_page == btp->u.leaf_key[index].first_page)
+       if (index >= btp->hdr.nused)
+       {
+               /* Bigger than every key on the page. */
+               Assert(index == btp->hdr.nused);
+               result->page_exact = NULL;
+               result->page_next = NULL;
+       }
+       else if (first_page == btp->u.leaf_key[index].first_page)
        {
+               /* Exact match. */
                result->page_exact = btp;
                result->index_exact = index;
-               return;
+               return;                 /* No need to set previous key in this case. */
+       }
+       else
+       {
+               /* Not equal to any key and before at least one key. */
+               result->page_exact = NULL;
+               result->page_next = btp;
+               result->index_next = index;
        }
-
-       /* No exact match, so we have the next key. */
-       result->page_exact = NULL;
-       result->page_next = btp;
-       result->index_next = index;
 
        /* Find the previous key. */
        if (index > 0)
@@ -403,7 +533,7 @@ FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
                btp = relptr_access(base, btp->u.internal_key[index - 1].child);
 
                /* Descend right. */
-               while (btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC)
+               while (btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC)
                {
                        Size    nused = btp->hdr.nused;
 
@@ -418,25 +548,29 @@ FreePageBtreeSearch(FreePageManager *fpm, Size first_page,
 
 /*
  * Search an internal page for the first key greater than or equal to a given
- * page number.
+ * page number.  Returns the index of that key, or one greater than the number
+ * of keys on the page if none.
  */
 static Size
 FreePageBtreeSearchInternal(FreePageBtree *btp, Size first_page)
 {
        Size    low = 0;
-       Size    high = btp->hdr.nused - 1;
+       Size    high = btp->hdr.nused;
 
        Assert(btp->hdr.magic == FREE_PAGE_INTERNAL_MAGIC);
        Assert(high > 0 && high < FPM_ITEMS_PER_INTERNAL_PAGE);
 
        while (low < high)
        {
-               Size    mid = (low + high + 1) / 2;
+               Size    mid = (low + high) / 2;
+               Size    val = btp->u.internal_key[mid].first_page;
 
-               if (first_page < btp->u.internal_key[mid].first_page)
-                       high = mid - 1;
+               if (first_page == val)
+                       return mid;
+               else if (first_page < val)
+                       high = mid;
                else
-                       low = mid;
+                       low = mid + 1;
        }
 
        return low;
@@ -444,25 +578,29 @@ FreePageBtreeSearchInternal(FreePageBtree *btp, Size first_page)
 
 /*
  * Search a leaf page for the first key greater than or equal to a given
- * page number.
+ * page number.  Returns the index of that key, or one greater than the number
+ * of keys on the page if none.
  */
 static Size
 FreePageBtreeSearchLeaf(FreePageBtree *btp, Size first_page)
 {
        Size    low = 0;
-       Size    high = btp->hdr.nused - 1;
+       Size    high = btp->hdr.nused;
 
        Assert(btp->hdr.magic == FREE_PAGE_LEAF_MAGIC);
        Assert(high > 0 && high < FPM_ITEMS_PER_LEAF_PAGE);
 
        while (low < high)
        {
-               Size    mid = (low + high + 1) / 2;
+               Size    mid = (low + high) / 2;
+               Size    val = btp->u.leaf_key[mid].first_page;
 
-               if (first_page < btp->u.leaf_key[mid].first_page)
-                       high = mid - 1;
+               if (first_page == val)
+                       return mid;
+               else if (first_page < val)
+                       high = mid;
                else
-                       low = mid;
+                       low = mid + 1;
        }
 
        return low;
index e33d35ae54b7a5a30d622b82df1c82d86350face..f14705669e0b4f31c60da33dccf22f2e854d89ec 100644 (file)
@@ -54,6 +54,10 @@ struct FreePageManager
        bool                    lock_address_is_fixed;
        relptr(FreePageBtree)   btree_root;
        relptr(FreePageSpanLeader)      btree_recycle;
+       unsigned                btree_depth;
+       unsigned                btree_recycle_count;
+       unsigned                singleton_first_page;
+       unsigned                singleton_npages;
        relptr(FreePageSpanLeader)  freelist[FPM_NUM_FREELISTS];
 };
 
@@ -84,5 +88,7 @@ extern void FreePageManagerInitialize(FreePageManager *fpm, char *base,
                                                  LWLock *lock, bool lock_address_is_fixed);
 extern bool FreePageManagerGet(FreePageManager *fpm, Size npages,
                                                Size *first_page);
+extern void FreePageManagerPut(FreePageManager *fpm, Size first_page,
+                                               Size npages);
 
 #endif   /* FREEPAGE_H */