Reduce size of critical section and remove call of user-defined functions in
authorTeodor Sigaev <teodor@sigaev.ru>
Wed, 10 May 2006 09:19:54 +0000 (09:19 +0000)
committerTeodor Sigaev <teodor@sigaev.ru>
Wed, 10 May 2006 09:19:54 +0000 (09:19 +0000)
insertion and deletion, modify gistSplit() to do not use buffers.

 TODO: gistvacuumcleanup and XLOG

src/backend/access/gist/gist.c
src/backend/access/gist/gistutil.c
src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c
src/include/access/gist_private.h

index 16468fd35a5bce6b442bd3349c6805a5a7857bbf..2272e3339d14d24e50742dab7cf3d8423b0e3d12 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.132 2006/04/03 13:44:33 teodor Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.133 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -52,6 +52,8 @@ static void gistfindleaf(GISTInsertState *state,
 #define ROTATEDIST(d) do { \
    SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
    memset(tmp,0,sizeof(SplitedPageLayout)); \
+   tmp->block.blkno = InvalidBlockNumber;  \
+   tmp->buffer = InvalidBuffer;    \
    tmp->next = (d); \
    (d)=tmp; \
 } while(0)
@@ -309,52 +311,111 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
    bool        is_splitted = false;
    bool        is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
 
+
    /*
-    * XXX this code really ought to work by locking, but not modifying,
-    * all the buffers it needs; then starting a critical section; then
-    * modifying the buffers in an already-determined way and writing an
-    * XLOG record to reflect that.  Since it doesn't, we've got to put
-    * a critical section around the entire process, which is horrible
-    * from a robustness point of view.
+    * if (!is_leaf) remove old key:
+    * This node's key has been modified, either because a child split
+    * occurred or because we needed to adjust our key for an insert in a
+    * child node. Therefore, remove the old version of this node's key.
+    *
+    * for WAL replay, in the non-split case we handle this by
+    * setting up a one-element todelete array; in the split case, it's
+    * handled implicitly because the tuple vector passed to gistSplit
+    * won't include this tuple.
     */
-   START_CRIT_SECTION();
-
-   if (!is_leaf)
-
-       /*
-        * This node's key has been modified, either because a child split
-        * occurred or because we needed to adjust our key for an insert in a
-        * child node. Therefore, remove the old version of this node's key.
-        *
-        * Note: for WAL replay, in the non-split case we handle this by
-        * setting up a one-element todelete array; in the split case, it's
-        * handled implicitly because the tuple vector passed to gistSplit
-        * won't include this tuple.
-        */
 
-       PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
 
-   if (gistnospace(state->stack->page, state->itup, state->ituplen))
+   if (gistnospace(state->stack->page, state->itup, state->ituplen, (is_leaf) ? InvalidOffsetNumber : state->stack->childoffnum))
    {
        /* no space for insertion */
-       IndexTuple *itvec,
-                  *newitup;
+       IndexTuple *itvec;
        int         tlen;
        SplitedPageLayout *dist = NULL,
                   *ptr;
+       BlockNumber rrlink = InvalidBlockNumber;
+       GistNSN     oldnsn;
 
        is_splitted = true;
+
+       /*
+        * Form index tuples vector to split:
+        * remove old tuple if t's needed and add new tuples to vector
+        */
        itvec = gistextractbuffer(state->stack->buffer, &tlen);
+       if ( !is_leaf ) {
+           /* on inner page we should remove old tuple */
+           int pos = state->stack->childoffnum - FirstOffsetNumber;
+
+           tlen--; 
+           if ( pos != tlen ) 
+               memmove( itvec+pos, itvec + pos + 1, sizeof( IndexTuple ) * (tlen-pos) );
+       }
        itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
-       newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
+       dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
+
+       state->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * tlen);
+       state->ituplen = 0;
+
+       if (state->stack->blkno != GIST_ROOT_BLKNO) {
+           /* if non-root split then we should not allocate new buffer,
+              but we must create temporary page to operate */ 
+           dist->buffer = state->stack->buffer;
+           dist->page = PageGetTempPage( BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData) );
+
+           /*clean all flags except F_LEAF */ 
+           GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+       }
+
+       /* make new pages and fills them */
+       for (ptr = dist; ptr; ptr = ptr->next) {
+           int i;
+           char *data;
+
+           /* get new page */
+           if ( ptr->buffer == InvalidBuffer ) {
+               ptr->buffer = gistNewBuffer( state->r );
+               GISTInitBuffer( ptr->buffer, (is_leaf) ? F_LEAF : 0 );
+               ptr->page = BufferGetPage(ptr->buffer);
+           }
+           ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+           /* fill page, we can do it becouse all this pages are new (ie not linked in tree
+              or masked by temp page */
+           data = (char*)(ptr->list); 
+           for(i=0;i<ptr->block.num;i++) {
+               if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+                   elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
+               data += IndexTupleSize((IndexTuple)data);
+           }
+
+           /* set up ItemPointer and remmeber it for parent */
+           ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+           state->itup[ state->ituplen ] = ptr->itup;
+           state->ituplen++;
+       }
+
+       /* saves old rightlink */
+       if ( state->stack->blkno != GIST_ROOT_BLKNO )
+           rrlink =  GistPageGetOpaque(dist->page)->rightlink;
+
+       START_CRIT_SECTION();
 
        /*
         * must mark buffers dirty before XLogInsert, even though we'll
-        * still be changing their opaque fields below
+        * still be changing their opaque fields below.
+        * set up right links.
         */
-       for (ptr = dist; ptr; ptr = ptr->next)
+       for (ptr = dist; ptr; ptr = ptr->next) 
        {
            MarkBufferDirty(ptr->buffer);
+           GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
+               ptr->next->block.blkno : rrlink;
+       }
+
+       /* restore splitted non-root page */
+       if ( state->stack->blkno != GIST_ROOT_BLKNO ) {
+           PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
+           dist->page = BufferGetPage( dist->buffer );
        }
 
        if (!state->r->rd_istemp)
@@ -366,88 +427,44 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                                   is_leaf, &(state->key), dist);
 
            recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
+
            for (ptr = dist; ptr; ptr = ptr->next)
            {
-               PageSetLSN(BufferGetPage(ptr->buffer), recptr);
-               PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
+               PageSetLSN(ptr->page, recptr);
+               PageSetTLI(ptr->page, ThisTimeLineID);
            }
        }
        else
        {
            for (ptr = dist; ptr; ptr = ptr->next)
            {
-               PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
+               PageSetLSN(ptr->page, XLogRecPtrForTemp);
            }
        }
 
-       state->itup = newitup;
-       state->ituplen = tlen;  /* now tlen >= 2 */
-
-       if (state->stack->blkno == GIST_ROOT_BLKNO)
-       {
-           gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
-           state->needInsertComplete = false;
-           for (ptr = dist; ptr; ptr = ptr->next)
-           {
-               Page        page = (Page) BufferGetPage(ptr->buffer);
+       /* set up NSN */
+       oldnsn = GistPageGetOpaque(dist->page)->nsn;
+       if ( state->stack->blkno == GIST_ROOT_BLKNO )
+           /* if root split we should put initial value */
+           oldnsn = PageGetLSN(dist->page);
 
-               GistPageGetOpaque(page)->rightlink = (ptr->next) ?
-                   ptr->next->block.blkno : InvalidBlockNumber;
-               GistPageGetOpaque(page)->nsn = PageGetLSN(page);
-               UnlockReleaseBuffer(ptr->buffer);
-           }
+       for (ptr = dist; ptr; ptr = ptr->next) {
+           /* only for last set oldnsn */
+           GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
+               PageGetLSN(ptr->page) : oldnsn;
        }
-       else
-       {
-           Page        page;
-           BlockNumber rightrightlink = InvalidBlockNumber;
-           SplitedPageLayout *ourpage = NULL;
-           GistNSN     oldnsn;
-           GISTPageOpaque opaque;
-
-           /* move origpage to first in chain */
-           if (dist->block.blkno != state->stack->blkno)
-           {
-               ptr = dist;
-               while (ptr->next)
-               {
-                   if (ptr->next->block.blkno == state->stack->blkno)
-                   {
-                       ourpage = ptr->next;
-                       ptr->next = ptr->next->next;
-                       ourpage->next = dist;
-                       dist = ourpage;
-                       break;
-                   }
-                   ptr = ptr->next;
-               }
-               Assert(ourpage != NULL);
-           }
-           else
-               ourpage = dist;
 
-           /* now gets all needed data, and sets nsn's */
-           page = (Page) BufferGetPage(ourpage->buffer);
-           opaque = GistPageGetOpaque(page);
-           rightrightlink = opaque->rightlink;
-           oldnsn = opaque->nsn;
-           opaque->nsn = PageGetLSN(page);
-           opaque->rightlink = ourpage->next->block.blkno;
+       /* 
+        * release buffers, if it was a root split then
+        * release all buffers because we create all buffers 
+        */
+       ptr = ( state->stack->blkno == GIST_ROOT_BLKNO ) ? dist : dist->next;
+       for(; ptr; ptr = ptr->next)
+           UnlockReleaseBuffer(ptr->buffer);
 
-           /*
-            * fill and release all new pages. They isn't linked into tree yet
-            */
-           for (ptr = ourpage->next; ptr; ptr = ptr->next)
-           {
-               page = (Page) BufferGetPage(ptr->buffer);
-               GistPageGetOpaque(page)->rightlink = (ptr->next) ?
-                   ptr->next->block.blkno : rightrightlink;
-               /* only for last set oldnsn */
-               GistPageGetOpaque(page)->nsn = (ptr->next) ?
-                   opaque->nsn : oldnsn;
-
-               UnlockReleaseBuffer(ptr->buffer);
-           }
+       if (state->stack->blkno == GIST_ROOT_BLKNO) {
+           gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
+           state->needInsertComplete = false;
        }
 
        END_CRIT_SECTION();
@@ -455,13 +472,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
    else
    {
        /* enough space */
-       XLogRecPtr  oldlsn;
+       START_CRIT_SECTION();
 
+       if (!is_leaf)
+           PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
        gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
 
        MarkBufferDirty(state->stack->buffer);
 
-       oldlsn = PageGetLSN(state->stack->page);
        if (!state->r->rd_istemp)
        {
            OffsetNumber noffs = 0,
@@ -921,77 +939,55 @@ gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
        arr[i] = reasloffset[arr[i]];
 }
 
+static IndexTupleData *
+gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
+   char *ptr, *ret = palloc(BLCKSZ);
+   int i;
+
+   ptr = ret;
+   for (i = 0; i < veclen; i++) {
+       memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
+       ptr += IndexTupleSize(vec[i]);
+   }
+
+   *memlen = ptr - ret;
+   Assert( *memlen < BLCKSZ );
+   return (IndexTupleData*)ret;
+}
+
 /*
  * gistSplit -- split a page in the tree.
  */
-IndexTuple *
+SplitedPageLayout *
 gistSplit(Relation r,
-         Buffer buffer,
+         Page page,
          IndexTuple *itup,     /* contains compressed entry */
-         int *len,
-         SplitedPageLayout **dist,
+         int len,
          GISTSTATE *giststate)
 {
-   Page        p;
-   Buffer      leftbuf,
-               rightbuf;
-   Page        left,
-               right;
    IndexTuple *lvectup,
-              *rvectup,
-              *newtup;
-   BlockNumber lbknum,
-               rbknum;
-   GISTPageOpaque opaque;
+              *rvectup;
    GIST_SPLITVEC v;
    GistEntryVector *entryvec;
    int         i,
-               fakeoffset,
-               nlen;
+               fakeoffset;
    OffsetNumber *realoffset;
    IndexTuple *cleaneditup = itup;
-   int         lencleaneditup = *len;
-
-   p = (Page) BufferGetPage(buffer);
-   opaque = GistPageGetOpaque(p);
-
-   /*
-    * The root of the tree is the first block in the relation.  If we're
-    * about to split the root, we need to do some hocus-pocus to enforce this
-    * guarantee.
-    */
-   if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
-   {
-       leftbuf = gistNewBuffer(r);
-       GISTInitBuffer(leftbuf, opaque->flags & F_LEAF);
-       lbknum = BufferGetBlockNumber(leftbuf);
-       left = (Page) BufferGetPage(leftbuf);
-   }
-   else
-   {
-       leftbuf = buffer;
-       /* IncrBufferRefCount(buffer); */
-       lbknum = BufferGetBlockNumber(buffer);
-       left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
-   }
-
-   rightbuf = gistNewBuffer(r);
-   GISTInitBuffer(rightbuf, opaque->flags & F_LEAF);
-   rbknum = BufferGetBlockNumber(rightbuf);
-   right = (Page) BufferGetPage(rightbuf);
+   int         lencleaneditup = len;
+   SplitedPageLayout   *res = NULL;
 
    /* generate the item array */
-   realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
-   entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
-   entryvec->n = *len + 1;
+   realoffset = palloc((len + 1) * sizeof(OffsetNumber));
+   entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
+   entryvec->n = len + 1;
 
    fakeoffset = FirstOffsetNumber;
-   for (i = 1; i <= *len; i++)
+   for (i = 1; i <= len; i++)
    {
        Datum       datum;
        bool        IsNull;
 
-       if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1]))
+       if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
        {
            entryvec->n--;
            /* remember position of invalid tuple */
@@ -1001,7 +997,7 @@ gistSplit(Relation r,
 
        datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
        gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
-                      datum, r, p, i,
+                      datum, r, page, i,
                       ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
                       FALSE, IsNull);
        realoffset[fakeoffset] = i;
@@ -1013,14 +1009,14 @@ gistSplit(Relation r,
     * possible, we move all invalid tuples on right page. We should remember,
     * that union with invalid tuples is a invalid tuple.
     */
-   if (entryvec->n != *len + 1)
+   if (entryvec->n != len + 1)
    {
        lencleaneditup = entryvec->n - 1;
        cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
        for (i = 1; i < entryvec->n; i++)
            cleaneditup[i - 1] = itup[realoffset[i] - 1];
 
-       if (gistnospace(left, cleaneditup, lencleaneditup))
+       if (!gistfitpage(cleaneditup, lencleaneditup))
        {
            /* no space on left to put all good tuples, so picksplit */
            gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
@@ -1041,8 +1037,8 @@ gistSplit(Relation r,
                v.spl_leftvalid = v.spl_rightvalid = false;
                v.spl_nright = 0;
                v.spl_nleft = 0;
-               for (i = 1; i <= *len; i++)
-                   if (i - 1 < *len / 2)
+               for (i = 1; i <= len; i++)
+                   if (i - 1 < len / 2)
                        v.spl_left[v.spl_nleft++] = i;
                    else
                        v.spl_right[v.spl_nright++] = i;
@@ -1071,14 +1067,14 @@ gistSplit(Relation r,
    else
    {
        /* there is no invalid tuples, so usial processing */
-       gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
+       gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
        v.spl_leftvalid = v.spl_rightvalid = true;
    }
 
 
    /* form left and right vector */
-   lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
-   rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
+   lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+   rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
 
    for (i = 0; i < v.spl_nleft; i++)
        lvectup[i] = itup[v.spl_left[i] - 1];
@@ -1087,87 +1083,48 @@ gistSplit(Relation r,
        rvectup[i] = itup[v.spl_right[i] - 1];
 
    /* place invalid tuples on right page if itsn't done yet */
-   for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++)
+   for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
    {
        rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
    }
 
-   /* write on disk (may need another split) */
-   if (gistnospace(right, rvectup, v.spl_nright))
+   /* finalyze splitting (may need another split) */
+   if (!gistfitpage(rvectup, v.spl_nright))
    {
-       nlen = v.spl_nright;
-       newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
-       /* ReleaseBuffer(rightbuf); */
+       res = gistSplit(r, page, rvectup, v.spl_nright, giststate);
    }
    else
    {
-       char       *ptr;
-
-       gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
-       /* XLOG stuff */
-       ROTATEDIST(*dist);
-       (*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
-       (*dist)->block.num = v.spl_nright;
-       (*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
-       ptr = (char *) ((*dist)->list);
-       for (i = 0; i < v.spl_nright; i++)
-       {
-           memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i]));
-           ptr += IndexTupleSize(rvectup[i]);
-       }
-       (*dist)->lenlist = ptr - ((char *) ((*dist)->list));
-       (*dist)->buffer = rightbuf;
-
-       nlen = 1;
-       newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
-       newtup[0] = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
-           : gist_form_invalid_tuple(rbknum);
-       ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum);
+       ROTATEDIST(res);
+       res->block.num = v.spl_nright;
+       res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) );
+       res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
+           : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
    }
 
-   if (gistnospace(left, lvectup, v.spl_nleft))
+   if (!gistfitpage(lvectup, v.spl_nleft))
    {
-       int         llen = v.spl_nleft;
-       IndexTuple *lntup;
+       SplitedPageLayout *resptr, *subres;
 
-       lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
-       /* ReleaseBuffer(leftbuf); */
+       resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate);
 
-       newtup = gistjoinvector(newtup, &nlen, lntup, llen);
+       /* install on list's tail */ 
+       while( resptr->next )
+           resptr = resptr->next;
+
+       resptr->next = res;
+       res = subres;
    }
    else
    {
-       char       *ptr;
-
-       gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
-       /* XLOG stuff */
-       ROTATEDIST(*dist);
-       (*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
-       (*dist)->block.num = v.spl_nleft;
-       (*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
-       ptr = (char *) ((*dist)->list);
-       for (i = 0; i < v.spl_nleft; i++)
-       {
-           memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i]));
-           ptr += IndexTupleSize(lvectup[i]);
-       }
-       (*dist)->lenlist = ptr - ((char *) ((*dist)->list));
-       (*dist)->buffer = leftbuf;
-
-       if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
-           PageRestoreTempPage(left, p);
-
-       nlen += 1;
-       newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
-       newtup[nlen - 1] = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
-           : gist_form_invalid_tuple(lbknum);
-       ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum);
+       ROTATEDIST(res);
+       res->block.num = v.spl_nleft;
+       res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) );
+       res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
+           : gist_form_invalid_tuple(GIST_ROOT_BLKNO);
    }
 
-   GistClearTuplesDeleted(p);
-
-   *len = nlen;
-   return newtup;
+   return res;
 }
 
 /*
index bf0a090ff8ba62fb047ba1e815999409a586c12a..d5d6405100b3e803a7022da690026944d3603ad5 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *         $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.10 2006/03/05 15:58:20 momjian Exp $
+ *         $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -81,15 +81,31 @@ gistfillbuffer(Relation r, Page page, IndexTuple *itup,
  * Check space for itup vector on page
  */
 bool
-gistnospace(Page page, IndexTuple *itvec, int len)
+gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete)
 {
-   unsigned int size = 0;
+   unsigned int size = 0, deleted = 0;
    int         i;
 
    for (i = 0; i < len; i++)
        size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
 
-   return (PageGetFreeSpace(page) < size);
+   if ( todelete != InvalidOffsetNumber ) {
+       IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
+       deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
+   }
+
+   return (PageGetFreeSpace(page) + deleted < size);
+}
+
+bool
+gistfitpage(IndexTuple *itvec, int len) {
+   int i;
+   Size size=0;
+
+   for(i=0;i<len;i++)
+       size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
+
+   return (size <= GiSTPageSize);
 }
 
 /*
@@ -107,7 +123,7 @@ gistextractbuffer(Buffer buffer, int *len /* out */ )
    *len = maxoff;
    itvec = palloc(sizeof(IndexTuple) * maxoff);
    for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-       itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
+       itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
 
    return itvec;
 }
index eafd472c5fd351b05f50bd15804ab0d4fe40f405..e81c0ebf487fb53ffe06e69fd86075152b895953 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.19 2006/05/02 22:25:10 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -85,10 +85,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
    if (GistPageIsLeaf(page))
    {
        if (GistTuplesDeleted(page))
-       {
            needunion = needwrite = true;
-           GistClearTuplesDeleted(page);
-       }
    }
    else
    {
@@ -157,30 +154,54 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
        if (curlenaddon)
        {
            /* insert updated tuples */
-           if (gistnospace(page, addon, curlenaddon))
+           if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
            {
                /* there is no space on page to insert tuples */
                IndexTuple *vec;
                SplitedPageLayout *dist = NULL,
                           *ptr;
-               int         i;
+               int         i, veclen=0;
                MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
 
-               vec = gistextractbuffer(buffer, &(res.ituplen));
-               vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon);
-               res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate));
+               vec = gistextractbuffer(buffer, &veclen);
+               vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
+               dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
+
                MemoryContextSwitchTo(oldCtx);
 
-               vec = (IndexTuple *) palloc(sizeof(IndexTuple) * res.ituplen);
-               for (i = 0; i < res.ituplen; i++)
-               {
-                   vec[i] = (IndexTuple) palloc(IndexTupleSize(res.itup[i]));
-                   memcpy(vec[i], res.itup[i], IndexTupleSize(res.itup[i]));
+               if (blkno != GIST_ROOT_BLKNO) {
+                   /* if non-root split then we should not allocate new buffer */
+                   dist->buffer = buffer;
+                   dist->page = BufferGetPage(dist->buffer);
+                   GistPageGetOpaque(dist->page)->flags = 0;
                }
-               res.itup = vec;
 
-               for (ptr = dist; ptr; ptr = ptr->next)
-               {
+               res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
+               res.ituplen = 0;
+
+               /* make new pages and fills them */
+               for (ptr = dist; ptr; ptr = ptr->next) {
+                   char *data;
+
+                   if ( ptr->buffer == InvalidBuffer ) {
+                       ptr->buffer = gistNewBuffer( gv->index );
+                       GISTInitBuffer( ptr->buffer, 0 );
+                       ptr->page = BufferGetPage(ptr->buffer);
+                   }
+                   ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
+
+                   data = (char*)(ptr->list);
+                   for(i=0;i<ptr->block.num;i++) {
+                       if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
+                           elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
+                       data += IndexTupleSize((IndexTuple)data);
+                   }
+
+                   ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+                   res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
+                   memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
+                   res.ituplen++;
+
                    MarkBufferDirty(ptr->buffer);
                }
 
@@ -218,10 +239,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
 
                for (ptr = dist; ptr; ptr = ptr->next)
                {
-                   /* we must keep the buffer lock on the head page */
+                   /* we must keep the buffer pin on the head page */
                    if (BufferGetBlockNumber(ptr->buffer) != blkno)
-                       LockBuffer(ptr->buffer, GIST_UNLOCK);
-                   ReleaseBuffer(ptr->buffer);
+                       UnlockReleaseBuffer( ptr->buffer );
                }
 
                if (blkno == GIST_ROOT_BLKNO)
@@ -294,6 +314,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
    if (needwrite)
    {
        MarkBufferDirty(buffer);
+       GistClearTuplesDeleted(page);
 
        if (!gv->index->rd_istemp)
        {
@@ -570,14 +591,7 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
            /*
             * Remove deletable tuples from page
-            *
-            * XXX try to make this critical section shorter.  Could do it
-            * by separating the callback loop from the actual tuple deletion,
-            * but that would affect the definition of the todelete[] array
-            * passed into the WAL record (because the indexes would all be
-            * pre-deletion).
             */
-           START_CRIT_SECTION();
 
            maxoff = PageGetMaxOffsetNumber(page);
 
@@ -588,13 +602,9 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
                if (callback(&(idxtuple->t_tid), callback_state))
                {
-                   PageIndexTupleDelete(page, i);
-                   todelete[ntodelete] = i;
-                   i--;
-                   maxoff--;
+                   todelete[ntodelete] = i-ntodelete;
                    ntodelete++;
                    stats->std.tuples_removed += 1;
-                   Assert(maxoff == PageGetMaxOffsetNumber(page));
                }
                else
                    stats->std.num_index_tuples += 1;
@@ -602,10 +612,14 @@ gistbulkdelete(PG_FUNCTION_ARGS)
 
            if (ntodelete)
            {
-               GistMarkTuplesDeleted(page);
+               START_CRIT_SECTION();
 
                MarkBufferDirty(buffer);
 
+               for(i=0;i<ntodelete;i++)
+                   PageIndexTupleDelete(page, todelete[i]);
+               GistMarkTuplesDeleted(page);
+
                if (!rel->rd_istemp)
                {
                    XLogRecData *rdata;
@@ -627,9 +641,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                }
                else
                    PageSetLSN(page, XLogRecPtrForTemp);
+
+               END_CRIT_SECTION();
            }
 
-           END_CRIT_SECTION();
        }
        else
        {
index c74762b7df18762ef978835bd2265570e7a4dcf6..a029d8f1ec5540c7b08f5c28a577cea72cfb77dc 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.15 2006/04/03 16:45:50 tgl Exp $
+ *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -625,7 +625,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
                    }
            }
 
-           if (gistnospace(pages[numbuffer - 1], itup, lenitup))
+           if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber))
            {
                /* no space left on page, so we must split */
                buffers[numbuffer] = ReadBuffer(index, P_NEW);
index 1bfc90abbcedf69641e0d7bd00d4ce1dcd6ee736..7e9469f000b9e2b7b2898fea4ef0ee16c40e1bd1 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -138,6 +138,8 @@ typedef struct SplitedPageLayout
    gistxlogPage block;
    IndexTupleData *list;
    int         lenlist;
+   IndexTuple  itup;  /* union key for page */
+   Page        page;           /* to operate */
    Buffer      buffer;         /* to write after all proceed */
 
    struct SplitedPageLayout *next;
@@ -234,8 +236,8 @@ extern void freeGISTstate(GISTSTATE *giststate);
 extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
 extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
 
-extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
-         int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
+extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
+         int len, GISTSTATE *giststate);
 
 extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child);
 
@@ -261,11 +263,16 @@ extern Datum gistgettuple(PG_FUNCTION_ARGS);
 extern Datum gistgetmulti(PG_FUNCTION_ARGS);
 
 /* gistutil.c */
+
+#define GiSTPageSize   \
+    ( BLCKSZ - SizeOfPageHeaderData - MAXALIGN(sizeof(GISTPageOpaqueData)) ) 
+
+extern bool gistfitpage(IndexTuple *itvec, int len);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete);
 extern void gistcheckpage(Relation rel, Buffer buf);
 extern Buffer gistNewBuffer(Relation r);
 extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
               int len, OffsetNumber off);
-extern bool gistnospace(Page page, IndexTuple *itvec, int len);
 extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
               IndexTuple *itvec, int *len,