Fix assorted bugs in GIN's WAL replay logic.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 11 Oct 2010 23:04:37 +0000 (19:04 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 11 Oct 2010 23:04:37 +0000 (19:04 -0400)
The original coding was quite sloppy about handling the case where
XLogReadBuffer fails (because the page has since been deleted).  This
would result in either "bad buffer id: 0" or an Assert failure during
replay, if indeed the page were no longer there.  In a couple of places
it also neglected to check whether the change had already been applied,
which would probably result in corrupted index contents.  I believe that
bug #5703 is an instance of the first problem.  These issues could show up
without replication, but only if you were unfortunate enough to crash
between modification of a GIN index and the next checkpoint.

Back-patch to 8.2, which is as far back as GIN has WAL support.

src/backend/access/gin/ginxlog.c

index 7a581334f1466a767950ff7e3a2c18047cdb5d9d..75997d9534b4e03558a21e9f1a9fd6534fa5cf81 100644 (file)
@@ -77,11 +77,13 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 
    MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
    Assert(BufferIsValid(MetaBuffer));
+   page = (Page) BufferGetPage(MetaBuffer);
+
    GinInitMetabuffer(MetaBuffer);
 
-   page = (Page) BufferGetPage(MetaBuffer);
    PageSetLSN(page, lsn);
    PageSetTLI(page, ThisTimeLineID);
+   MarkBufferDirty(MetaBuffer);
 
    RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
    Assert(BufferIsValid(RootBuffer));
@@ -91,11 +93,10 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 
    PageSetLSN(page, lsn);
    PageSetTLI(page, ThisTimeLineID);
-
-   MarkBufferDirty(MetaBuffer);
-   UnlockReleaseBuffer(MetaBuffer);
    MarkBufferDirty(RootBuffer);
+
    UnlockReleaseBuffer(RootBuffer);
+   UnlockReleaseBuffer(MetaBuffer);
 }
 
 static void
@@ -128,21 +129,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
    Buffer      buffer;
    Page        page;
 
+   /* first, forget any incomplete split this insertion completes */
+   if (data->isData)
+   {
+       Assert(data->isDelete == FALSE);
+       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+       {
+           PostingItem *pitem;
+
+           pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+           forgetIncompleteSplit(data->node,
+                                 PostingItemGetBlockNumber(pitem),
+                                 data->updateBlkno);
+       }
+
+   }
+   else
+   {
+       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+       {
+           IndexTuple  itup;
+
+           itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+           forgetIncompleteSplit(data->node,
+                                 GinItemPointerGetBlockNumber(&itup->t_tid),
+                                 data->updateBlkno);
+       }
+   }
+
    /* nothing else to do if page was backed up */
    if (record->xl_info & XLR_BKP_BLOCK_1)
        return;
 
    buffer = XLogReadBuffer(data->node, data->blkno, false);
-   Assert(BufferIsValid(buffer));
+   if (!BufferIsValid(buffer))
+       return;                 /* page was deleted, nothing to do */
    page = (Page) BufferGetPage(buffer);
 
-   if (data->isData)
+   if (!XLByteLE(lsn, PageGetLSN(page)))
    {
-       Assert(data->isDelete == FALSE);
-       Assert(GinPageIsData(page));
-
-       if (!XLByteLE(lsn, PageGetLSN(page)))
+       if (data->isData)
        {
+           Assert(GinPageIsData(page));
+
            if (data->isLeaf)
            {
                OffsetNumber i;
@@ -172,23 +201,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                GinDataPageAddItem(page, pitem, data->offset);
            }
        }
-
-       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+       else
        {
-           PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+           IndexTuple  itup;
 
-           forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
-       }
+           Assert(!GinPageIsData(page));
 
-   }
-   else
-   {
-       IndexTuple  itup;
-
-       Assert(!GinPageIsData(page));
-
-       if (!XLByteLE(lsn, PageGetLSN(page)))
-       {
            if (data->updateBlkno != InvalidBlockNumber)
            {
                /* update link to right page after split */
@@ -212,20 +230,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                  data->node.spcNode, data->node.dbNode, data->node.relNode);
        }
 
-       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-       {
-           itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-           forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
-       }
-   }
-
-   if (!XLByteLE(lsn, PageGetLSN(page)))
-   {
        PageSetLSN(page, lsn);
        PageSetTLI(page, ThisTimeLineID);
 
        MarkBufferDirty(buffer);
    }
+
    UnlockReleaseBuffer(buffer);
 }
 
@@ -244,7 +254,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
    if (data->isData)
        flags |= GIN_DATA;
 
-   lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
+   lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
    Assert(BufferIsValid(lbuffer));
    lpage = (Page) BufferGetPage(lbuffer);
    GinInitBuffer(lbuffer, flags);
@@ -321,7 +331,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 
    if (data->isRootSplit)
    {
-       Buffer      rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
+       Buffer      rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
        Page        rootPage = BufferGetPage(rootBuf);
 
        GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
@@ -357,45 +367,49 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
    Buffer      buffer;
    Page        page;
 
-   /* nothing else to do if page was backed up (and no info to do it with) */
+   /* nothing to do if page was backed up (and no info to do it with) */
    if (record->xl_info & XLR_BKP_BLOCK_1)
        return;
 
    buffer = XLogReadBuffer(data->node, data->blkno, false);
-   Assert(BufferIsValid(buffer));
+   if (!BufferIsValid(buffer))
+       return;
    page = (Page) BufferGetPage(buffer);
 
-   if (GinPageIsData(page))
-   {
-       memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
-              GinSizeOfItem(page) *data->nitem);
-       GinPageGetOpaque(page)->maxoff = data->nitem;
-   }
-   else
+   if (!XLByteLE(lsn, PageGetLSN(page)))
    {
-       OffsetNumber i,
-                  *tod;
-       IndexTuple  itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+       if (GinPageIsData(page))
+       {
+           memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+                  GinSizeOfItem(page) *data->nitem);
+           GinPageGetOpaque(page)->maxoff = data->nitem;
+       }
+       else
+       {
+           OffsetNumber i,
+               *tod;
+           IndexTuple  itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
 
-       tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
-       for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
-           tod[i - 1] = i;
+           tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
+           for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
+               tod[i - 1] = i;
 
-       PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+           PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
 
-       for (i = 0; i < data->nitem; i++)
-       {
-           if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-               elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                 data->node.spcNode, data->node.dbNode, data->node.relNode);
-           itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+           for (i = 0; i < data->nitem; i++)
+           {
+               if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+                   elog(ERROR, "failed to add item to index page in %u/%u/%u",
+                        data->node.spcNode, data->node.dbNode, data->node.relNode);
+               itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+           }
        }
-   }
 
-   PageSetLSN(page, lsn);
-   PageSetTLI(page, ThisTimeLineID);
+       PageSetLSN(page, lsn);
+       PageSetTLI(page, ThisTimeLineID);
+       MarkBufferDirty(buffer);
+   }
 
-   MarkBufferDirty(buffer);
    UnlockReleaseBuffer(buffer);
 }
 
@@ -409,38 +423,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
    if (!(record->xl_info & XLR_BKP_BLOCK_1))
    {
        buffer = XLogReadBuffer(data->node, data->blkno, false);
-       page = BufferGetPage(buffer);
-       Assert(GinPageIsData(page));
-       GinPageGetOpaque(page)->flags = GIN_DELETED;
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
-       MarkBufferDirty(buffer);
-       UnlockReleaseBuffer(buffer);
+       if (BufferIsValid(buffer))
+       {
+           page = BufferGetPage(buffer);
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               Assert(GinPageIsData(page));
+               GinPageGetOpaque(page)->flags = GIN_DELETED;
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+           UnlockReleaseBuffer(buffer);
+       }
    }
 
    if (!(record->xl_info & XLR_BKP_BLOCK_2))
    {
        buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
-       page = BufferGetPage(buffer);
-       Assert(GinPageIsData(page));
-       Assert(!GinPageIsLeaf(page));
-       PageDeletePostingItem(page, data->parentOffset);
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
-       MarkBufferDirty(buffer);
-       UnlockReleaseBuffer(buffer);
+       if (BufferIsValid(buffer))
+       {
+           page = BufferGetPage(buffer);
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               Assert(GinPageIsData(page));
+               Assert(!GinPageIsLeaf(page));
+               PageDeletePostingItem(page, data->parentOffset);
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+           UnlockReleaseBuffer(buffer);
+       }
    }
 
    if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
    {
        buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
-       page = BufferGetPage(buffer);
-       Assert(GinPageIsData(page));
-       GinPageGetOpaque(page)->rightlink = data->rightLink;
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
-       MarkBufferDirty(buffer);
-       UnlockReleaseBuffer(buffer);
+       if (BufferIsValid(buffer))
+       {
+           page = BufferGetPage(buffer);
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               Assert(GinPageIsData(page));
+               GinPageGetOpaque(page)->rightlink = data->rightLink;
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+           UnlockReleaseBuffer(buffer);
+       }
    }
 }
 
@@ -450,8 +482,11 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
    ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
    Buffer      metabuffer;
    Page        metapage;
+   Buffer      buffer;
 
    metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
+   if (!BufferIsValid(metabuffer))
+       elog(PANIC, "GIN metapage disappeared");
    metapage = BufferGetPage(metabuffer);
 
    if (!XLByteLE(lsn, PageGetLSN(metapage)))
@@ -469,40 +504,43 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
         */
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
-           Buffer      buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
-           Page        page = BufferGetPage(buffer);
-
-           if (!XLByteLE(lsn, PageGetLSN(page)))
+           buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
+           if (BufferIsValid(buffer))
            {
-               OffsetNumber l,
-                           off = (PageIsEmpty(page)) ? FirstOffsetNumber :
-               OffsetNumberNext(PageGetMaxOffsetNumber(page));
-               int         i,
-                           tupsize;
-               IndexTuple  tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
-
-               for (i = 0; i < data->ntuples; i++)
+               Page        page = BufferGetPage(buffer);
+
+               if (!XLByteLE(lsn, PageGetLSN(page)))
                {
-                   tupsize = IndexTupleSize(tuples);
+                   OffsetNumber l,
+                       off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+                       OffsetNumberNext(PageGetMaxOffsetNumber(page));
+                   int         i,
+                       tupsize;
+                   IndexTuple  tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
 
-                   l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
+                   for (i = 0; i < data->ntuples; i++)
+                   {
+                       tupsize = IndexTupleSize(tuples);
 
-                   if (l == InvalidOffsetNumber)
-                       elog(ERROR, "failed to add item to index page");
+                       l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
 
-                   tuples = (IndexTuple) (((char *) tuples) + tupsize);
-               }
+                       if (l == InvalidOffsetNumber)
+                           elog(ERROR, "failed to add item to index page");
 
-               /*
-                * Increase counter of heap tuples
-                */
-               GinPageGetOpaque(page)->maxoff++;
+                       tuples = (IndexTuple) (((char *) tuples) + tupsize);
+                   }
 
-               PageSetLSN(page, lsn);
-               PageSetTLI(page, ThisTimeLineID);
-               MarkBufferDirty(buffer);
+                   /*
+                    * Increase counter of heap tuples
+                    */
+                   GinPageGetOpaque(page)->maxoff++;
+
+                   PageSetLSN(page, lsn);
+                   PageSetTLI(page, ThisTimeLineID);
+                   MarkBufferDirty(buffer);
+               }
+               UnlockReleaseBuffer(buffer);
            }
-           UnlockReleaseBuffer(buffer);
        }
    }
    else if (data->prevTail != InvalidBlockNumber)
@@ -510,19 +548,21 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
        /*
         * New tail
         */
-
-       Buffer      buffer = XLogReadBuffer(data->node, data->prevTail, false);
-       Page        page = BufferGetPage(buffer);
-
-       if (!XLByteLE(lsn, PageGetLSN(page)))
+       buffer = XLogReadBuffer(data->node, data->prevTail, false);
+       if (BufferIsValid(buffer))
        {
-           GinPageGetOpaque(page)->rightlink = data->newRightlink;
+           Page        page = BufferGetPage(buffer);
 
-           PageSetLSN(page, lsn);
-           PageSetTLI(page, ThisTimeLineID);
-           MarkBufferDirty(buffer);
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               GinPageGetOpaque(page)->rightlink = data->newRightlink;
+
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+           UnlockReleaseBuffer(buffer);
        }
-       UnlockReleaseBuffer(buffer);
    }
 
    UnlockReleaseBuffer(metabuffer);
@@ -544,6 +584,7 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
        return;
 
    buffer = XLogReadBuffer(data->node, data->blkno, true);
+   Assert(BufferIsValid(buffer));
    page = BufferGetPage(buffer);
 
    GinInitBuffer(buffer, GIN_LIST);
@@ -587,6 +628,8 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
    int         i;
 
    metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
+   if (!BufferIsValid(metabuffer))
+       elog(PANIC, "GIN metapage disappeared");
    metapage = BufferGetPage(metabuffer);
 
    if (!XLByteLE(lsn, PageGetLSN(metapage)))
@@ -600,18 +643,22 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
    for (i = 0; i < data->ndeleted; i++)
    {
        Buffer      buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
-       Page        page = BufferGetPage(buffer);
 
-       if (!XLByteLE(lsn, PageGetLSN(page)))
+       if (BufferIsValid(buffer))
        {
-           GinPageGetOpaque(page)->flags = GIN_DELETED;
+           Page        page = BufferGetPage(buffer);
 
-           PageSetLSN(page, lsn);
-           PageSetTLI(page, ThisTimeLineID);
-           MarkBufferDirty(buffer);
-       }
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               GinPageGetOpaque(page)->flags = GIN_DELETED;
 
-       UnlockReleaseBuffer(buffer);
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+
+           UnlockReleaseBuffer(buffer);
+       }
    }
    UnlockReleaseBuffer(metabuffer);
 }
@@ -755,6 +802,13 @@ ginContinueSplit(ginIncompleteSplit *split)
     */
    buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
 
+   /*
+    * Failure should be impossible here, because we wrote the page earlier.
+    */
+   if (!BufferIsValid(buffer))
+       elog(PANIC, "ginContinueSplit: left block %u not found",
+            split->leftBlkno);
+
    reln = CreateFakeRelcacheEntry(split->node);
 
    if (split->rootBlkno == GIN_ROOT_BLKNO)