Improve gist XLOG code to follow the coding rules needed to prevent
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 30 Mar 2006 23:03:10 +0000 (23:03 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 30 Mar 2006 23:03:10 +0000 (23:03 +0000)
torn-page problems.  This introduces some issues of its own, mainly
that there are now some critical sections of unreasonably broad scope,
but it's a step forward anyway.  Further cleanup will require some
code refactoring that I'd prefer to get Oleg and Teodor involved in.

src/backend/access/gist/gist.c
src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c
src/include/access/gist_private.h

index de880831bf47c7ec62ca99141197a1375700e5e2..d997db37efbd031e16552824a047ce87b65c0a05 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.129 2006/03/05 15:58:20 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.130 2006/03/30 23:03:09 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -90,6 +90,7 @@ gistbuild(PG_FUNCTION_ARGS)
    double      reltuples;
    GISTBuildState buildstate;
    Buffer      buffer;
+   Page        page;
 
    /*
     * We expect to be called exactly once for any index relation. If that's
@@ -104,33 +105,33 @@ gistbuild(PG_FUNCTION_ARGS)
 
    /* initialize the root page */
    buffer = gistNewBuffer(index);
+   Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
+   page = BufferGetPage(buffer);
+
+   START_CRIT_SECTION();
+
    GISTInitBuffer(buffer, F_LEAF);
    if (!index->rd_istemp)
    {
        XLogRecPtr  recptr;
        XLogRecData rdata;
-       Page        page;
 
-       rdata.buffer = InvalidBuffer;
        rdata.data = (char *) &(index->rd_node);
        rdata.len = sizeof(RelFileNode);
+       rdata.buffer = InvalidBuffer;
        rdata.next = NULL;
 
-       page = BufferGetPage(buffer);
-
-       START_CRIT_SECTION();
-
        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
        PageSetLSN(page, recptr);
        PageSetTLI(page, ThisTimeLineID);
-
-       END_CRIT_SECTION();
    }
    else
-       PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp);
+       PageSetLSN(page, XLogRecPtrForTemp);
    LockBuffer(buffer, GIST_UNLOCK);
    WriteBuffer(buffer);
 
+   END_CRIT_SECTION();
+
    /* build the index */
    buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
    buildstate.indtuples = 0;
@@ -305,6 +306,15 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
    bool        is_splitted = false;
    bool        is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
 
+   /*
+    * XXX this code really ought to work by locking, but not modifying,
+    * all the buffers it needs; then starting a critical section; then
+    * modifying the buffers in an already-determined way and writing an
+    * XLOG record to reflect that.  Since it doesn't, we've got to put
+    * a critical section around the entire process, which is horrible
+    * from a robustness point of view.
+    */
+   START_CRIT_SECTION();
 
    if (!is_leaf)
 
@@ -312,6 +322,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
         * This node's key has been modified, either because a child split
         * occurred or because we needed to adjust our key for an insert in a
         * child node. Therefore, remove the old version of this node's key.
+        *
+        * Note: for WAL replay, in the non-split case we handle this by
+        * setting up a one-element todelete array; in the split case, it's
+        * handled implicitly because the tuple vector passed to gistSplit
+        * won't include this tuple.
         */
 
        PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
@@ -336,9 +351,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
            XLogRecData *rdata;
 
            rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
-                                  &(state->key), dist);
-
-           START_CRIT_SECTION();
+                                  is_leaf, &(state->key), dist);
 
            recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
            ptr = dist;
@@ -348,8 +361,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
                ptr = ptr->next;
            }
-
-           END_CRIT_SECTION();
        }
        else
        {
@@ -410,7 +421,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
            else
                ourpage = dist;
 
-
            /* now gets all needed data, and sets nsn's */
            page = (Page) BufferGetPage(ourpage->buffer);
            opaque = GistPageGetOpaque(page);
@@ -437,8 +447,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                WriteBuffer(ptr->buffer);
                ptr = ptr->next;
            }
+
+           WriteNoReleaseBuffer(state->stack->buffer);
        }
-       WriteNoReleaseBuffer(state->stack->buffer);
+
+       END_CRIT_SECTION();
    }
    else
    {
@@ -451,7 +464,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
        if (!state->r->rd_istemp)
        {
            OffsetNumber noffs = 0,
-                       offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)];
+                       offs[1];
            XLogRecPtr  recptr;
            XLogRecData *rdata;
 
@@ -462,17 +475,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
                noffs = 1;
            }
 
-           rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
-                            offs, noffs, false, state->itup, state->ituplen,
+           rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
+                                   offs, noffs, false,
+                                   state->itup, state->ituplen,
                                    &(state->key));
 
-           START_CRIT_SECTION();
-
-           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
            PageSetLSN(state->stack->page, recptr);
            PageSetTLI(state->stack->page, ThisTimeLineID);
-
-           END_CRIT_SECTION();
        }
        else
            PageSetLSN(state->stack->page, XLogRecPtrForTemp);
@@ -481,6 +491,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
            state->needInsertComplete = false;
        WriteNoReleaseBuffer(state->stack->buffer);
 
+       END_CRIT_SECTION();
+
        if (!is_leaf)           /* small optimization: inform scan ablout
                                 * deleting... */
            gistadjscans(state->r, GISTOP_DEL, state->stack->blkno,
@@ -636,30 +648,14 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
 }
 
 /*
- * Should have the same interface as XLogReadBuffer
- */
-static Buffer
-gistReadAndLockBuffer(Relation r, BlockNumber blkno)
-{
-   Buffer      buffer = ReadBuffer(r, blkno);
-
-   LockBuffer(buffer, GIST_SHARE);
-   return buffer;
-}
-
-/*
- * Traverse the tree to find path from root page.
+ * Traverse the tree to find path from root page to specified "child" block.
  *
  * returns from the begining of closest parent;
  *
- * Function is used in both regular and recovery mode, so must work with
- * different read functions (gistReadAndLockBuffer and XLogReadBuffer)
- *
  * To prevent deadlocks, this should lock only one page simultaneously.
  */
 GISTInsertStack *
-gistFindPath(Relation r, BlockNumber child,
-            Buffer (*myReadBuffer) (Relation, BlockNumber))
+gistFindPath(Relation r, BlockNumber child)
 {
    Page        page;
    Buffer      buffer;
@@ -677,7 +673,8 @@ gistFindPath(Relation r, BlockNumber child,
 
    while (top && top->blkno != child)
    {
-       buffer = myReadBuffer(r, top->blkno);   /* locks buffer */
+       buffer = ReadBuffer(r, top->blkno);
+       LockBuffer(buffer, GIST_SHARE);
        gistcheckpage(r, buffer);
        page = (Page) BufferGetPage(buffer);
 
@@ -833,7 +830,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
        }
 
        /* ok, find new path */
-       ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer);
+       ptr = parent = gistFindPath(r, child->blkno);
        Assert(ptr != NULL);
 
        /* read all buffers as expected by caller */
@@ -1192,27 +1189,31 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
 
    Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
    page = BufferGetPage(buffer);
-   GISTInitBuffer(buffer, 0);
 
+   START_CRIT_SECTION();
+
+   GISTInitBuffer(buffer, 0);  /* XXX not F_LEAF? */
    gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
+
    if (!r->rd_istemp)
    {
        XLogRecPtr  recptr;
        XLogRecData *rdata;
 
-       rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
-                               NULL, 0, false, itup, len, key);
-
-       START_CRIT_SECTION();
+       rdata = formUpdateRdata(r->rd_node, buffer,
+                               NULL, 0, false,
+                               itup, len, key);
 
        recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
        PageSetLSN(page, recptr);
        PageSetTLI(page, ThisTimeLineID);
-
-       END_CRIT_SECTION();
    }
    else
        PageSetLSN(page, XLogRecPtrForTemp);
+
+   WriteNoReleaseBuffer(buffer);
+
+   END_CRIT_SECTION();
 }
 
 void
index 664ba47e40dbe1fd1518bb1f1df4655c8da3fbd3..e7925c2c15138994e3b289d0722361546a0b5a4f 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.16 2006/03/05 15:58:20 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.17 2006/03/30 23:03:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -80,6 +80,12 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
    page = (Page) BufferGetPage(buffer);
    maxoff = PageGetMaxOffsetNumber(page);
 
+   /*
+    * XXX need to reduce scope of changes to page so we can make this
+    * critical section less extensive
+    */
+   START_CRIT_SECTION();
+
    if (GistPageIsLeaf(page))
    {
        if (GistTuplesDeleted(page))
@@ -188,11 +194,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                    ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
 
                    rdata = formSplitRdata(gv->index->rd_node, blkno,
-                                          &key, dist);
+                                          false, &key, dist);
                    xlinfo = rdata->data;
 
-                   START_CRIT_SECTION();
-
                    recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
                    ptr = dist;
                    while (ptr)
@@ -202,7 +206,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                        ptr = ptr->next;
                    }
 
-                   END_CRIT_SECTION();
                    pfree(xlinfo);
                    pfree(rdata);
                }
@@ -235,8 +238,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                    oldCtx = MemoryContextSwitchTo(gv->opCtx);
                    gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
                    MemoryContextSwitchTo(oldCtx);
-
-                   WriteNoReleaseBuffer(buffer);
                }
 
                needwrite = false;
@@ -302,15 +303,14 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
            XLogRecPtr  recptr;
            char       *xlinfo;
 
-           rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
-                                   res.emptypage, addon, curlenaddon, NULL);
+           rdata = formUpdateRdata(gv->index->rd_node, buffer,
+                                   todelete, ntodelete, res.emptypage,
+                                   addon, curlenaddon, NULL);
            xlinfo = rdata->data;
 
-           START_CRIT_SECTION();
-           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+           recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
            PageSetLSN(page, recptr);
            PageSetTLI(page, ThisTimeLineID);
-           END_CRIT_SECTION();
 
            pfree(xlinfo);
            pfree(rdata);
@@ -322,6 +322,8 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
    else
        ReleaseBuffer(buffer);
 
+   END_CRIT_SECTION();
+
    if (ncompleted && !gv->index->rd_istemp)
        gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
 
@@ -579,6 +581,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
             */
            pushStackIfSplited(page, stack);
 
+           /*
+            * Remove deletable tuples from page
+            *
+            * XXX try to make this critical section shorter.  Could do it
+            * by separating the callback loop from the actual tuple deletion,
+            * but that would affect the definition of the todelete[] array
+            * passed into the WAL record (because the indexes would all be
+            * pre-deletion).
+            */
+           START_CRIT_SECTION();
+
            maxoff = PageGetMaxOffsetNumber(page);
 
            for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
@@ -608,17 +621,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                {
                    XLogRecData *rdata;
                    XLogRecPtr  recptr;
-                   gistxlogEntryUpdate *xlinfo;
+                   gistxlogPageUpdate *xlinfo;
 
-                   rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
-                                           false, NULL, 0, NULL);
-                   xlinfo = (gistxlogEntryUpdate *) rdata->data;
+                   rdata = formUpdateRdata(rel->rd_node, buffer,
+                                           todelete, ntodelete, false,
+                                           NULL, 0,
+                                           NULL);
+                   xlinfo = (gistxlogPageUpdate *) rdata->data;
 
-                   START_CRIT_SECTION();
-                   recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
+                   recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
                    PageSetLSN(page, recptr);
                    PageSetTLI(page, ThisTimeLineID);
-                   END_CRIT_SECTION();
 
                    pfree(xlinfo);
                    pfree(rdata);
@@ -627,6 +640,8 @@ gistbulkdelete(PG_FUNCTION_ARGS)
                    PageSetLSN(page, XLogRecPtrForTemp);
                WriteNoReleaseBuffer(buffer);
            }
+
+           END_CRIT_SECTION();
        }
        else
        {
index 9a15061484f9d7cd17286e38f11f7b6cba3dbb8a..12a521c75c96c0b01e1ef4e8afa9f1d6ecb329ba 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.12 2006/03/29 21:17:36 tgl Exp $
+ *          $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.13 2006/03/30 23:03:10 tgl Exp $
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 typedef struct
 {
-   gistxlogEntryUpdate *data;
+   gistxlogPageUpdate *data;
    int         len;
    IndexTuple *itup;
    OffsetNumber *todelete;
-} EntryUpdateRecord;
+} PageUpdateRecord;
 
 typedef struct
 {
@@ -58,16 +58,15 @@ typedef struct gistIncompleteInsert
 } gistIncompleteInsert;
 
 
-MemoryContext opCtx;
-MemoryContext insertCtx;
+static MemoryContext opCtx;        /* working memory for operations */
+static MemoryContext insertCtx;    /* holds incomplete_inserts list */
 static List *incomplete_inserts;
 
 
-#define ItemPointerEQ( a, b )  \
-   ( \
-   ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
-   ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
-       )
+#define ItemPointerEQ(a, b)    \
+   ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
+     ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
+
 
 static void
 pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
@@ -101,7 +100,13 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
    }
    Assert(ninsert->lenblk > 0);
 
-   incomplete_inserts = lappend(incomplete_inserts, ninsert);
+   /*
+    * Stick the new incomplete insert onto the front of the list, not the
+    * back.  This is so that gist_xlog_cleanup will process incompletions
+    * in last-in-first-out order.
+    */
+   incomplete_inserts = lcons(ninsert, incomplete_inserts);
+
    MemoryContextSwitchTo(oldCxt);
 }
 
@@ -116,10 +121,9 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
 
        if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
        {
-
            /* found */
-           pfree(insert->blkno);
            incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
+           pfree(insert->blkno);
            pfree(insert);
            break;
        }
@@ -127,25 +131,25 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
 }
 
 static void
-decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
+decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
 {
    char       *begin = XLogRecGetData(record),
               *ptr;
    int         i = 0,
                addpath = 0;
 
-   decoded->data = (gistxlogEntryUpdate *) begin;
+   decoded->data = (gistxlogPageUpdate *) begin;
 
    if (decoded->data->ntodelete)
    {
-       decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogEntryUpdate) + addpath);
+       decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
        addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
    }
    else
        decoded->todelete = NULL;
 
    decoded->len = 0;
-   ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
+   ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
    while (ptr - begin < record->xl_len)
    {
        decoded->len++;
@@ -154,7 +158,7 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
 
    decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
 
-   ptr = begin + sizeof(gistxlogEntryUpdate) + addpath;
+   ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
    while (ptr - begin < record->xl_len)
    {
        decoded->itup[i] = (IndexTuple) ptr;
@@ -167,38 +171,30 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record)
  * redo any page update (except page split)
  */
 static void
-gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
+gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 {
-   EntryUpdateRecord xlrec;
+   PageUpdateRecord xlrec;
    Relation    reln;
    Buffer      buffer;
    Page        page;
 
-   decodeEntryUpdateRecord(&xlrec, record);
+   /* nothing to do if whole page was backed up (and no info to do it with) */
+   if (record->xl_info & XLR_BKP_BLOCK_1)
+       return;
+
+   decodePageUpdateRecord(&xlrec, record);
 
    reln = XLogOpenRelation(xlrec.data->node);
    buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
    if (!BufferIsValid(buffer))
-       elog(PANIC, "block %u unfound", xlrec.data->blkno);
+       return;
    page = (Page) BufferGetPage(buffer);
 
-   if (isnewroot)
+   if (XLByteLE(lsn, PageGetLSN(page)))
    {
-       if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)))
-       {
-           LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-           ReleaseBuffer(buffer);
-           return;
-       }
-   }
-   else
-   {
-       if (XLByteLE(lsn, PageGetLSN(page)))
-       {
-           LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-           ReleaseBuffer(buffer);
-           return;
-       }
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(buffer);
+       return;
    }
 
    if (xlrec.data->isemptypage)
@@ -237,9 +233,9 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
            GistClearTuplesDeleted(page);
    }
 
+   GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
    PageSetLSN(page, lsn);
    PageSetTLI(page, ThisTimeLineID);
-   GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
    LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
    WriteBuffer(buffer);
 
@@ -294,38 +290,21 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
    Buffer      buffer;
    Page        page;
    int         i;
-   int         flags = 0;
+   int         flags;
 
    decodePageSplitRecord(&xlrec, record);
    reln = XLogOpenRelation(xlrec.data->node);
-
-   /* first of all wee need get F_LEAF flag from original page */
-   buffer = XLogReadBuffer(reln, xlrec.data->origblkno, false);
-   if (!BufferIsValid(buffer))
-       elog(PANIC, "block %u unfound", xlrec.data->origblkno);
-   page = (Page) BufferGetPage(buffer);
-   flags = (GistPageIsLeaf(page)) ? F_LEAF : 0;
-   LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-   ReleaseBuffer(buffer);
+   flags = xlrec.data->origleaf ? F_LEAF : 0;
 
    /* loop around all pages */
    for (i = 0; i < xlrec.data->npage; i++)
    {
        NewPage    *newpage = xlrec.page + i;
-       bool        isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
 
-       buffer = XLogReadBuffer(reln, newpage->header->blkno, !isorigpage);
-       if (!BufferIsValid(buffer))
-           elog(PANIC, "block %u unfound", newpage->header->blkno);
+       buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
+       Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
-       if (XLByteLE(lsn, PageGetLSN(page)))
-       {
-           LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-           ReleaseBuffer(buffer);
-           continue;
-       }
-
        /* ok, clear buffer */
        GISTInitBuffer(buffer, flags);
 
@@ -399,12 +378,11 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
    oldCxt = MemoryContextSwitchTo(opCtx);
    switch (info)
    {
-       case XLOG_GIST_ENTRY_UPDATE:
-       case XLOG_GIST_ENTRY_DELETE:
-           gistRedoEntryUpdateRecord(lsn, record, false);
+       case XLOG_GIST_PAGE_UPDATE:
+           gistRedoPageUpdateRecord(lsn, record, false);
            break;
        case XLOG_GIST_NEW_ROOT:
-           gistRedoEntryUpdateRecord(lsn, record, true);
+           gistRedoPageUpdateRecord(lsn, record, true);
            break;
        case XLOG_GIST_PAGE_SPLIT:
            gistRedoPageSplitRecord(lsn, record);
@@ -433,7 +411,7 @@ out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
 }
 
 static void
-out_gistxlogEntryUpdate(StringInfo buf, gistxlogEntryUpdate *xlrec)
+out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
 {
    out_target(buf, xlrec->node, xlrec->key);
    appendStringInfo(buf, "; block number %u", xlrec->blkno);
@@ -455,17 +433,13 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
 
    switch (info)
    {
-       case XLOG_GIST_ENTRY_UPDATE:
-           appendStringInfo(buf, "entry_update: ");
-           out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec);
-           break;
-       case XLOG_GIST_ENTRY_DELETE:
-           appendStringInfo(buf, "entry_delete: ");
-           out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec);
+       case XLOG_GIST_PAGE_UPDATE:
+           appendStringInfo(buf, "page_update: ");
+           out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
            break;
        case XLOG_GIST_NEW_ROOT:
            appendStringInfo(buf, "new_root: ");
-           out_target(buf, ((gistxlogEntryUpdate *) rec)->node, ((gistxlogEntryUpdate *) rec)->key);
+           out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
            break;
        case XLOG_GIST_PAGE_SPLIT:
            out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
@@ -506,60 +480,47 @@ gist_form_invalid_tuple(BlockNumber blkno)
    return tuple;
 }
 
-static Buffer
-gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno)
-{
-   Buffer      buffer = XLogReadBuffer(r, blkno, false);
-
-   if (!BufferIsValid(buffer))
-       elog(PANIC, "block %u unfound", blkno);
-
-   return buffer;
-}
-
 
 static void
-gixtxlogFindPath(Relation index, gistIncompleteInsert *insert)
+gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
 {
    GISTInsertStack *top;
 
    insert->pathlen = 0;
    insert->path = NULL;
 
-   if ((top = gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL)
+   if ((top = gistFindPath(index, insert->origblkno)) != NULL)
    {
        int         i;
-       GISTInsertStack *ptr = top;
+       GISTInsertStack *ptr;
 
-       while (ptr)
-       {
+       for (ptr = top; ptr; ptr = ptr->parent)
            insert->pathlen++;
-           ptr = ptr->parent;
-       }
 
        insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
 
        i = 0;
-       ptr = top;
-       while (ptr)
-       {
-           insert->path[i] = ptr->blkno;
-           i++;
-           ptr = ptr->parent;
-       }
+       for (ptr = top; ptr; ptr = ptr->parent)
+           insert->path[i++] = ptr->blkno;
    }
    else
        elog(LOG, "lost parent for block %u", insert->origblkno);
 }
 
 /*
- * Continue insert after crash. In normal situation, there isn't any incomplete
- * inserts, but if it might be after crash, WAL may has not a record of completetion.
+ * Continue insert after crash.  In normal situations, there aren't any
+ * incomplete inserts, but if a crash occurs partway through an insertion
+ * sequence, we'll need to finish making the index valid at the end of WAL
+ * replay.
+ *
+ * Note that we assume the index is now in a valid state, except for the
+ * unfinished insertion.  In particular it's safe to invoke gistFindPath();
+ * there shouldn't be any garbage pages for it to run into.
  *
  * Although stored LSN in gistIncompleteInsert is a LSN of child page,
  * we can compare it with LSN of parent, because parent is always locked
  * while we change child page (look at gistmakedeal). So if parent's LSN is
- * lesser than stored lsn then changes in parent doesn't do yet.
+ * less than stored lsn then changes in parent aren't done yet.
  */
 static void
 gistContinueInsert(gistIncompleteInsert *insert)
@@ -602,6 +563,12 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
        WriteBuffer(buffer);
+
+       /*
+        * XXX fall out to avoid making LOG message at bottom of routine.
+        * I think the logic for when to emit that message is all wrong...
+        */
+       return;
    }
    else
    {
@@ -610,7 +577,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
        int         numbuffer;
 
        /* construct path */
-       gixtxlogFindPath(index, insert);
+       gistxlogFindPath(index, insert);
 
        Assert(insert->pathlen > 0);
 
@@ -625,9 +592,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
                        childfound = 0;
 
            numbuffer = 1;
-           buffers[numbuffer - 1] = XLogReadBuffer(index, insert->path[i], false);
-           if (!BufferIsValid(buffers[numbuffer - 1]))
-               elog(PANIC, "block %u unfound", insert->path[i]);
+           buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]);
+           LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE);
            pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
 
            if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
@@ -661,10 +627,9 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
            if (gistnospace(pages[numbuffer - 1], itup, lenitup))
            {
-               /* no space left on page, so we should split */
-               buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
-               if (!BufferIsValid(buffers[numbuffer]))
-                   elog(PANIC, "could not obtain new block");
+               /* no space left on page, so we must split */
+               buffers[numbuffer] = ReadBuffer(index, P_NEW);
+               LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
                GISTInitBuffer(buffers[numbuffer], 0);
                pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
                gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
@@ -678,7 +643,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
                     * we split root, just copy tuples from old root to new
                     * page
                     */
-                   parentitup = gistextractbuffer(buffers[numbuffer - 1], &pituplen);
+                   parentitup = gistextractbuffer(buffers[numbuffer - 1],
+                                                  &pituplen);
 
                    /* sanity check */
                    if (i + 1 != insert->pathlen)
@@ -686,9 +652,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
                             RelationGetRelationName(index));
 
                    /* fill new page */
-                   buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
-                   if (!BufferIsValid(buffers[numbuffer]))
-                       elog(PANIC, "could not obtain new block");
+                   buffers[numbuffer] = ReadBuffer(index, P_NEW);
+                   LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
                    GISTInitBuffer(buffers[numbuffer], 0);
                    pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
                    gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
@@ -748,16 +713,10 @@ void
 gist_xlog_cleanup(void)
 {
    ListCell   *l;
-   List       *reverse = NIL;
-   MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
-
-   /* we should call gistContinueInsert in reverse order */
+   MemoryContext oldCxt;
 
+   oldCxt = MemoryContextSwitchTo(opCtx);
    foreach(l, incomplete_inserts)
-       reverse = lappend(reverse, lfirst(l));
-
-   MemoryContextSwitchTo(opCtx);
-   foreach(l, reverse)
    {
        gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
 
@@ -772,10 +731,9 @@ gist_xlog_cleanup(void)
 
 
 XLogRecData *
-formSplitRdata(RelFileNode node, BlockNumber blkno,
+formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
               ItemPointer key, SplitedPageLayout *dist)
 {
-
    XLogRecData *rdata;
    gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
    SplitedPageLayout *ptr;
@@ -793,6 +751,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
 
    xlrec->node = node;
    xlrec->origblkno = blkno;
+   xlrec->origleaf = page_is_leaf;
    xlrec->npage = (uint16) npage;
    if (key)
        xlrec->key = *key;
@@ -825,68 +784,64 @@ formSplitRdata(RelFileNode node, BlockNumber blkno,
    return rdata;
 }
 
-
+/*
+ * Construct the rdata array for an XLOG record describing a page update
+ * (deletion and/or insertion of tuples on a single index page).
+ *
+ * Note that both the todelete array and the tuples are marked as belonging
+ * to the target buffer; they need not be stored in XLOG if XLogInsert decides
+ * to log the whole buffer contents instead.  Also, we take care that there's
+ * at least one rdata item referencing the buffer, even when ntodelete and
+ * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
+ */
 XLogRecData *
-formUpdateRdata(RelFileNode node, BlockNumber blkno,
+formUpdateRdata(RelFileNode node, Buffer buffer,
                OffsetNumber *todelete, int ntodelete, bool emptypage,
                IndexTuple *itup, int ituplen, ItemPointer key)
 {
    XLogRecData *rdata;
-   gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate *) palloc(sizeof(gistxlogEntryUpdate));
+   gistxlogPageUpdate *xlrec;
+   int         cur,
+               i;
+
+   /* ugly wart in API: emptypage causes us to ignore other inputs */
+   if (emptypage)
+       ntodelete = ituplen = 0;
+
+   rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
+   xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
 
    xlrec->node = node;
-   xlrec->blkno = blkno;
+   xlrec->blkno = BufferGetBlockNumber(buffer);
+   xlrec->ntodelete = ntodelete;
+   xlrec->isemptypage = emptypage;
    if (key)
        xlrec->key = *key;
    else
        ItemPointerSetInvalid(&(xlrec->key));
 
-   if (emptypage)
-   {
-       xlrec->isemptypage = true;
-       xlrec->ntodelete = 0;
-
-       rdata = (XLogRecData *) palloc(sizeof(XLogRecData));
-       rdata->buffer = InvalidBuffer;
-       rdata->data = (char *) xlrec;
-       rdata->len = sizeof(gistxlogEntryUpdate);
-       rdata->next = NULL;
-   }
-   else
-   {
-       int         cur = 1,
-                   i;
-
-       xlrec->isemptypage = false;
-       xlrec->ntodelete = ntodelete;
-
-       rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
-
-       rdata->buffer = InvalidBuffer;
-       rdata->data = (char *) xlrec;
-       rdata->len = sizeof(gistxlogEntryUpdate);
-       rdata->next = NULL;
+   rdata[0].data = (char *) xlrec;
+   rdata[0].len = sizeof(gistxlogPageUpdate);
+   rdata[0].buffer = InvalidBuffer;
+   rdata[0].next = &(rdata[1]);
 
-       if (ntodelete)
-       {
-           rdata[cur - 1].next = &(rdata[cur]);
-           rdata[cur].buffer = InvalidBuffer;
-           rdata[cur].data = (char *) todelete;
-           rdata[cur].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
-           rdata[cur].next = NULL;
-           cur++;
-       }
+   rdata[1].data = (char *) todelete;
+   rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+   rdata[1].buffer = buffer;
+   rdata[1].buffer_std = true;
+   rdata[1].next = NULL;
 
-       /* new tuples */
-       for (i = 0; i < ituplen; i++)
-       {
-           rdata[cur].buffer = InvalidBuffer;
-           rdata[cur].data = (char *) (itup[i]);
-           rdata[cur].len = IndexTupleSize(itup[i]);
-           rdata[cur].next = NULL;
-           rdata[cur - 1].next = &(rdata[cur]);
-           cur++;
-       }
+   /* new tuples */
+   cur = 2;
+   for (i = 0; i < ituplen; i++)
+   {
+       rdata[cur - 1].next = &(rdata[cur]);
+       rdata[cur].data = (char *) (itup[i]);
+       rdata[cur].len = IndexTupleSize(itup[i]);
+       rdata[cur].buffer = buffer;
+       rdata[cur].buffer_std = true;
+       rdata[cur].next = NULL;
+       cur++;
    }
 
    return rdata;
index 3b072da637615628767306badc61d7023d112562..1bfc90abbcedf69641e0d7bd00d4ce1dcd6ee736 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.11 2006/03/24 04:32:13 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -80,11 +80,13 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
 /* XLog stuff */
 extern const XLogRecPtr XLogRecPtrForTemp;
 
-#define XLOG_GIST_ENTRY_UPDATE 0x00
-#define XLOG_GIST_ENTRY_DELETE 0x10
-#define XLOG_GIST_NEW_ROOT 0x20
+#define XLOG_GIST_PAGE_UPDATE      0x00
+#define XLOG_GIST_NEW_ROOT         0x20
+#define XLOG_GIST_PAGE_SPLIT       0x30
+#define XLOG_GIST_INSERT_COMPLETE  0x40
+#define XLOG_GIST_CREATE_INDEX     0x50
 
-typedef struct gistxlogEntryUpdate
+typedef struct gistxlogPageUpdate
 {
    RelFileNode node;
    BlockNumber blkno;
@@ -100,17 +102,16 @@ typedef struct gistxlogEntryUpdate
    /*
     * follow: 1. todelete OffsetNumbers 2. tuples to insert
     */
-} gistxlogEntryUpdate;
-
-#define XLOG_GIST_PAGE_SPLIT   0x30
+} gistxlogPageUpdate;
 
 typedef struct gistxlogPageSplit
 {
    RelFileNode node;
    BlockNumber origblkno;      /* splitted page */
+   bool        origleaf;       /* was splitted page a leaf page? */
    uint16      npage;
 
-   /* see comments on gistxlogEntryUpdate */
+   /* see comments on gistxlogPageUpdate */
    ItemPointerData key;
 
    /*
@@ -118,22 +119,19 @@ typedef struct gistxlogPageSplit
     */
 } gistxlogPageSplit;
 
-#define XLOG_GIST_INSERT_COMPLETE  0x40
-
 typedef struct gistxlogPage
 {
    BlockNumber blkno;
-   int         num;
+   int         num;            /* number of index tuples following */
 } gistxlogPage;
 
-#define XLOG_GIST_CREATE_INDEX 0x50
-
 typedef struct gistxlogInsertComplete
 {
    RelFileNode node;
    /* follows ItemPointerData key to clean */
 } gistxlogInsertComplete;
 
+
 /* SplitedPageLayout - gistSplit function result */
 typedef struct SplitedPageLayout
 {
@@ -239,8 +237,7 @@ extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, It
 extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
          int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
 
-extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child,
-            Buffer (*myReadBuffer) (Relation, BlockNumber));
+extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child);
 
 /* gistxlog.c */
 extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
@@ -249,11 +246,12 @@ extern void gist_xlog_startup(void);
 extern void gist_xlog_cleanup(void);
 extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
 
-extern XLogRecData *formUpdateRdata(RelFileNode node, BlockNumber blkno,
+extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
                OffsetNumber *todelete, int ntodelete, bool emptypage,
                IndexTuple *itup, int ituplen, ItemPointer key);
 
-extern XLogRecData *formSplitRdata(RelFileNode node, BlockNumber blkno,
+extern XLogRecData *formSplitRdata(RelFileNode node,
+              BlockNumber blkno, bool page_is_leaf,
               ItemPointer key, SplitedPageLayout *dist);
 
 extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);