Split _bt_insertonpg to two functions.
authorBruce Momjian <bruce@momjian.us>
Sat, 3 Mar 2007 20:13:06 +0000 (20:13 +0000)
committerBruce Momjian <bruce@momjian.us>
Sat, 3 Mar 2007 20:13:06 +0000 (20:13 +0000)
Heikki Linnakangas

src/backend/access/nbtree/nbtinsert.c

index e229083b8a1b8f86c8f14fb8a3da3477af4d441a..5ec7def093b7bc3e5fc8d40f9f8d10175f3145dd 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.152 2007/02/21 20:02:17 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.153 2007/03/03 20:13:06 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,13 +46,18 @@ typedef struct
 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 
 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
-                Relation heapRel, Buffer buf,
+                Relation heapRel, Buffer buf, OffsetNumber ioffset,
                 ScanKey itup_scankey);
+static void _bt_findinsertloc(Relation rel,
+                 Buffer *bufptr, 
+                 OffsetNumber *offsetptr,
+                 int keysz,
+                 ScanKey scankey,
+                 IndexTuple newtup);
 static void _bt_insertonpg(Relation rel, Buffer buf,
               BTStack stack,
-              int keysz, ScanKey scankey,
               IndexTuple itup,
-              OffsetNumber afteritem,
+              OffsetNumber newitemoff,
               bool split_only_page);
 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
          OffsetNumber newitemoff, Size newitemsz,
@@ -86,6 +91,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
    ScanKey     itup_scankey;
    BTStack     stack;
    Buffer      buf;
+   OffsetNumber offset;
 
    /* we need an insertion scan key to do our search, so build one */
    itup_scankey = _bt_mkscankey(rel, itup);
@@ -94,6 +100,8 @@ top:
    /* find the first page containing this key */
    stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
 
+   offset = InvalidOffsetNumber;
+
    /* trade in our read lock for a write lock */
    LockBuffer(buf, BUFFER_LOCK_UNLOCK);
    LockBuffer(buf, BT_WRITE);
@@ -128,7 +136,8 @@ top:
    {
        TransactionId xwait;
 
-       xwait = _bt_check_unique(rel, itup, heapRel, buf, itup_scankey);
+       offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+       xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey);
 
        if (TransactionIdIsValid(xwait))
        {
@@ -142,7 +151,8 @@ top:
    }
 
    /* do the insertion */
-   _bt_insertonpg(rel, buf, stack, natts, itup_scankey, itup, 0, false);
+   _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup);
+   _bt_insertonpg(rel, buf, stack, itup, offset, false);
 
    /* be tidy */
    _bt_freestack(stack);
@@ -152,18 +162,21 @@ top:
 /*
  * _bt_check_unique() -- Check for violation of unique index constraint
  *
+ * offset points to the first possible item that could conflict. It can
+ * also point to end-of-page, which means that the first tuple to check
+ * is the first tuple on the next page.
+ *
  * Returns InvalidTransactionId if there is no conflict, else an xact ID
  * we must wait for to see if it commits a conflicting tuple.  If an actual
  * conflict is detected, no return --- just ereport().
  */
 static TransactionId
 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
-                Buffer buf, ScanKey itup_scankey)
+                Buffer buf, OffsetNumber offset, ScanKey itup_scankey)
 {
    TupleDesc   itupdesc = RelationGetDescr(rel);
    int         natts = rel->rd_rel->relnatts;
-   OffsetNumber offset,
-               maxoff;
+   OffsetNumber maxoff;
    Page        page;
    BTPageOpaque opaque;
    Buffer      nbuf = InvalidBuffer;
@@ -172,12 +185,6 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
    opaque = (BTPageOpaque) PageGetSpecialPointer(page);
    maxoff = PageGetMaxOffsetNumber(page);
 
-   /*
-    * Find first item >= proposed new item.  Note we could also get a pointer
-    * to end-of-page here.
-    */
-   offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
-
    /*
     * Scan over all equal tuples, looking for live conflicts.
     */
@@ -342,33 +349,11 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
    return InvalidTransactionId;
 }
 
-/*----------
- * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
- *
- *     This recursive procedure does the following things:
- *
- *         +  finds the right place to insert the tuple.
- *         +  if necessary, splits the target page (making sure that the
- *            split is equitable as far as post-insert free space goes).
- *         +  inserts the tuple.
- *         +  if the page was split, pops the parent stack, and finds the
- *            right place to insert the new child pointer (by walking
- *            right using information stored in the parent stack).
- *         +  invokes itself with the appropriate tuple for the right
- *            child page on the parent.
- *         +  updates the metapage if a true root or fast root is split.
- *
- *     On entry, we must have the right buffer in which to do the
- *     insertion, and the buffer must be pinned and write-locked.  On return,
- *     we will have dropped both the pin and the lock on the buffer.
- *
- *     If 'afteritem' is >0 then the new tuple must be inserted after the
- *     existing item of that number, noplace else.  If 'afteritem' is 0
- *     then the procedure finds the exact spot to insert it by searching.
- *     (keysz and scankey parameters are used ONLY if afteritem == 0.
- *     The scankey must be an insertion-type scankey.)
+
+/*
+ * _bt_findinsertloc() -- Finds an insert location for a tuple
  *
- *     NOTE: if the new key is equal to one or more existing keys, we can
+ *     If the new key is equal to one or more existing keys, we can
  *     legitimately place it anywhere in the series of equal keys --- in fact,
  *     if the new key is equal to the page's "high key" we can place it on
  *     the next page.  If it is equal to the high key, and there's not room
@@ -379,36 +364,40 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
  *     Once we have chosen the page to put the key on, we'll insert it before
  *     any existing equal keys because of the way _bt_binsrch() works.
  *
- *     The locking interactions in this code are critical.  You should
- *     grok Lehman and Yao's paper before making any changes.  In addition,
- *     you need to understand how we disambiguate duplicate keys in this
- *     implementation, in order to be able to find our location using
- *     L&Y "move right" operations.  Since we may insert duplicate user
- *     keys, and since these dups may propagate up the tree, we use the
- *     'afteritem' parameter to position ourselves correctly for the
- *     insertion on internal pages.
- *----------
+ *     If there's not enough room in the space, we try to make room by
+ *     removing any LP_DELETEd tuples.
+ *
+ *     On entry, *buf and *offsetptr point to the first legal position
+ *     where the new tuple could be inserted. The caller should hold an
+ *     exclusive lock on *buf. *offsetptr can also be set to
+ *     InvalidOffsetNumber, in which case the function will search the right
+ *     location within the page if needed. On exit, they point to the chosen
+ *     insert location. If findinsertloc decided to move right, the lock and
+ *     pin on the original page will be released and the new page returned to
+ *     the caller is exclusively locked instead.
+ *
+ *     newtup is the new tuple we're inserting, and scankey is an insertion
+ *     type scan key for it.
  */
 static void
-_bt_insertonpg(Relation rel,
-              Buffer buf,
-              BTStack stack,
-              int keysz,
-              ScanKey scankey,
-              IndexTuple itup,
-              OffsetNumber afteritem,
-              bool split_only_page)
+_bt_findinsertloc(Relation rel,
+                 Buffer *bufptr,
+                 OffsetNumber *offsetptr,
+                 int keysz,
+                 ScanKey scankey,
+                 IndexTuple newtup)
 {
-   Page        page;
+   Buffer buf = *bufptr;
+   Page page = BufferGetPage(buf);
+   Size itemsz;
    BTPageOpaque lpageop;
+   bool movedright, vacuumed;
    OffsetNumber newitemoff;
-   OffsetNumber firstright = InvalidOffsetNumber;
-   Size        itemsz;
+   OffsetNumber firstlegaloff = *offsetptr;
 
-   page = BufferGetPage(buf);
    lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
-   itemsz = IndexTupleDSize(*itup);
+   itemsz = IndexTupleDSize(*newtup);
    itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but we
                                 * need to be consistent */
 
@@ -429,96 +418,155 @@ _bt_insertonpg(Relation rel,
                "Consider a function index of an MD5 hash of the value, "
                "or use full text indexing.")));
 
-   /*
-    * Determine exactly where new item will go.
+
+
+   /*----------
+    * If we will need to split the page to put the item on this page,
+    * check whether we can put the tuple somewhere to the right,
+    * instead.  Keep scanning right until we
+    *      (a) find a page with enough free space,
+    *      (b) reach the last page where the tuple can legally go, or
+    *      (c) get tired of searching.
+    * (c) is not flippant; it is important because if there are many
+    * pages' worth of equal keys, it's better to split one of the early
+    * pages than to scan all the way to the end of the run of equal keys
+    * on every insert.  We implement "get tired" as a random choice,
+    * since stopping after scanning a fixed number of pages wouldn't work
+    * well (we'd never reach the right-hand side of previously split
+    * pages).  Currently the probability of moving right is set at 0.99,
+    * which may seem too high to change the behavior much, but it does an
+    * excellent job of preventing O(N^2) behavior with many equal keys.
+    *----------
     */
-   if (afteritem > 0)
-       newitemoff = afteritem + 1;
-   else
+   movedright = false;
+   vacuumed = false;
+   while (PageGetFreeSpace(page) < itemsz)
    {
-       /*----------
-        * If we will need to split the page to put the item here,
-        * check whether we can put the tuple somewhere to the right,
-        * instead.  Keep scanning right until we
-        *      (a) find a page with enough free space,
-        *      (b) reach the last page where the tuple can legally go, or
-        *      (c) get tired of searching.
-        * (c) is not flippant; it is important because if there are many
-        * pages' worth of equal keys, it's better to split one of the early
-        * pages than to scan all the way to the end of the run of equal keys
-        * on every insert.  We implement "get tired" as a random choice,
-        * since stopping after scanning a fixed number of pages wouldn't work
-        * well (we'd never reach the right-hand side of previously split
-        * pages).  Currently the probability of moving right is set at 0.99,
-        * which may seem too high to change the behavior much, but it does an
-        * excellent job of preventing O(N^2) behavior with many equal keys.
-        *----------
-        */
-       bool        movedright = false;
+       Buffer      rbuf;
 
-       while (PageGetFreeSpace(page) < itemsz)
+       /*
+        * before considering moving right, see if we can obtain enough
+        * space by erasing LP_DELETE items
+        */
+       if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
        {
-           Buffer      rbuf;
-
-           /*
-            * before considering moving right, see if we can obtain enough
-            * space by erasing LP_DELETE items
-            */
-           if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
-           {
-               _bt_vacuum_one_page(rel, buf);
-               if (PageGetFreeSpace(page) >= itemsz)
-                   break;      /* OK, now we have enough space */
-           }
+           _bt_vacuum_one_page(rel, buf);
 
-           /*
-            * nope, so check conditions (b) and (c) enumerated above
-            */
-           if (P_RIGHTMOST(lpageop) ||
-               _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
-               random() <= (MAX_RANDOM_VALUE / 100))
-               break;
-
-           /*
-            * step right to next non-dead page
-            *
-            * must write-lock that page before releasing write lock on
-            * current page; else someone else's _bt_check_unique scan could
-            * fail to see our insertion.  write locks on intermediate dead
-            * pages won't do because we don't know when they will get
-            * de-linked from the tree.
-            */
-           rbuf = InvalidBuffer;
-
-           for (;;)
-           {
-               BlockNumber rblkno = lpageop->btpo_next;
+           /* remember that we vacuumed this page, because that makes
+            * the hint supplied by the caller invalid */
+           vacuumed = true;
 
-               rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
-               page = BufferGetPage(rbuf);
-               lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
-               if (!P_IGNORE(lpageop))
-                   break;
-               if (P_RIGHTMOST(lpageop))
-                   elog(ERROR, "fell off the end of \"%s\"",
-                        RelationGetRelationName(rel));
-           }
-           _bt_relbuf(rel, buf);
-           buf = rbuf;
-           movedright = true;
+           if (PageGetFreeSpace(page) >= itemsz) 
+               break;      /* OK, now we have enough space */
        }
 
        /*
-        * Now we are on the right page, so find the insert position. If we
-        * moved right at all, we know we should insert at the start of the
-        * page, else must find the position by searching.
+        * nope, so check conditions (b) and (c) enumerated above
         */
-       if (movedright)
-           newitemoff = P_FIRSTDATAKEY(lpageop);
-       else
-           newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
+       if (P_RIGHTMOST(lpageop) ||
+           _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
+           random() <= (MAX_RANDOM_VALUE / 100))
+           break;
+
+       /*
+        * step right to next non-dead page
+        *
+        * must write-lock that page before releasing write lock on
+        * current page; else someone else's _bt_check_unique scan could
+        * fail to see our insertion.  write locks on intermediate dead
+        * pages won't do because we don't know when they will get
+        * de-linked from the tree.
+        */
+       rbuf = InvalidBuffer;
+
+       for (;;)
+       {
+           BlockNumber rblkno = lpageop->btpo_next;
+
+           rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
+           page = BufferGetPage(rbuf);
+           lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+           if (!P_IGNORE(lpageop))
+               break;
+           if (P_RIGHTMOST(lpageop))
+               elog(ERROR, "fell off the end of \"%s\"",
+                    RelationGetRelationName(rel));
+       }
+       _bt_relbuf(rel, buf);
+       buf = rbuf;
+       movedright = true;
+       vacuumed = false;
    }
 
+   /*
+    * Now we are on the right page, so find the insert position. If we
+    * moved right at all, we know we should insert at the start of the
+    * page. If we didn't move right, we can use the firstlegaloff hint
+    * if the caller supplied one, unless we vacuumed the page which
+    * might have moved tuples around making the hint invalid. If we 
+    * didn't move right or can't use the hint, find the position
+    * by searching.
+    */
+   if (movedright)
+       newitemoff = P_FIRSTDATAKEY(lpageop);
+   else if(firstlegaloff != InvalidOffsetNumber && !vacuumed)
+       newitemoff = firstlegaloff;
+   else
+       newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
+
+   *bufptr = buf;
+   *offsetptr = newitemoff;
+}
+
+/*----------
+ * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
+ *
+ *     This recursive procedure does the following things:
+ *
+ *         +  if necessary, splits the target page (making sure that the
+ *            split is equitable as far as post-insert free space goes).
+ *         +  inserts the tuple.
+ *         +  if the page was split, pops the parent stack, and finds the
+ *            right place to insert the new child pointer (by walking
+ *            right using information stored in the parent stack).
+ *         +  invokes itself with the appropriate tuple for the right
+ *            child page on the parent.
+ *         +  updates the metapage if a true root or fast root is split.
+ *
+ *     On entry, we must have the right buffer in which to do the
+ *     insertion, and the buffer must be pinned and write-locked.  On return,
+ *     we will have dropped both the pin and the lock on the buffer.
+ *
+ *     The locking interactions in this code are critical.  You should
+ *     grok Lehman and Yao's paper before making any changes.  In addition,
+ *     you need to understand how we disambiguate duplicate keys in this
+ *     implementation, in order to be able to find our location using
+ *     L&Y "move right" operations.  Since we may insert duplicate user
+ *     keys, and since these dups may propagate up the tree, we use the
+ *     'afteritem' parameter to position ourselves correctly for the
+ *     insertion on internal pages.
+ *----------
+ */
+static void
+_bt_insertonpg(Relation rel,
+              Buffer buf,
+              BTStack stack,
+              IndexTuple itup,
+              OffsetNumber newitemoff,
+              bool split_only_page)
+{
+   Page        page;
+   BTPageOpaque lpageop;
+   OffsetNumber firstright = InvalidOffsetNumber;
+   Size        itemsz;
+
+   page = BufferGetPage(buf);
+   lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+   itemsz = IndexTupleDSize(*itup);
+   itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but we
+                                * need to be consistent */
+
    /*
     * Do we need to split the page to fit the item on it?
     *
@@ -1427,7 +1475,7 @@ _bt_insert_parent(Relation rel,
 
        /* Recursively update the parent */
        _bt_insertonpg(rel, pbuf, stack->bts_parent,
-                      0, NULL, new_item, stack->bts_offset,
+                      new_item, stack->bts_offset + 1,
                       is_only);
 
        /* be tidy */