Refactor XLogOpenRelation() and XLogReadBuffer() in preparation for relation
authorHeikki Linnakangas <heikki@enterprisedb.com>
Thu, 12 Jun 2008 09:12:31 +0000 (09:12 +0000)
committerHeikki Linnakangas <heikki@enterprisedb.com>
Thu, 12 Jun 2008 09:12:31 +0000 (09:12 +0000)
forks. XLogOpenRelation() and the associated light-weight relation cache in
xlogutils.c is gone, and XLogReadBuffer() now takes a RelFileNode as argument,
instead of Relation.

For functions that still need a Relation struct during WAL replay, there's a
new function called CreateFakeRelcacheEntry() that returns a fake entry like
XLogOpenRelation() used to.

20 files changed:
src/backend/access/gin/ginxlog.c
src/backend/access/gist/gist.c
src/backend/access/gist/gistutil.c
src/backend/access/gist/gistvacuum.c
src/backend/access/gist/gistxlog.c
src/backend/access/heap/heapam.c
src/backend/access/heap/pruneheap.c
src/backend/access/nbtree/nbtxlog.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogutils.c
src/backend/commands/sequence.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/backend/storage/smgr/smgr.c
src/backend/utils/init/flatfiles.c
src/include/access/gist_private.h
src/include/access/heapam.h
src/include/access/xlogutils.h
src/include/storage/buf_internals.h
src/include/storage/bufmgr.h

index d3dd8d69b179f9f1066be16a9b29be65c90836ad..e8311f1831e9a1336c92dc212f831f52d9b75faf 100644 (file)
@@ -71,12 +71,10 @@ static void
 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 {
        RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
-       reln = XLogOpenRelation(*node);
-       buffer = XLogReadBuffer(reln, GIN_ROOT_BLKNO, true);
+       buffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -94,12 +92,10 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
 {
        ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
        ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
-       reln = XLogOpenRelation(data->node);
-       buffer = XLogReadBuffer(reln, data->blkno, true);
+       buffer = XLogReadBuffer(data->node, data->blkno, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -118,7 +114,6 @@ static void
 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 {
        ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
@@ -126,8 +121,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(data->node);
-       buffer = XLogReadBuffer(reln, data->blkno, false);
+       buffer = XLogReadBuffer(data->node, data->blkno, false);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -228,26 +222,23 @@ static void
 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 {
        ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          lbuffer,
                                rbuffer;
        Page            lpage,
                                rpage;
        uint32          flags = 0;
 
-       reln = XLogOpenRelation(data->node);
-
        if (data->isLeaf)
                flags |= GIN_LEAF;
        if (data->isData)
                flags |= GIN_DATA;
 
-       lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
+       lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit);
        Assert(BufferIsValid(lbuffer));
        lpage = (Page) BufferGetPage(lbuffer);
        GinInitBuffer(lbuffer, flags);
 
-       rbuffer = XLogReadBuffer(reln, data->rblkno, true);
+       rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
        Assert(BufferIsValid(rbuffer));
        rpage = (Page) BufferGetPage(rbuffer);
        GinInitBuffer(rbuffer, flags);
@@ -319,7 +310,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 
        if (data->isRootSplit)
        {
-               Buffer          rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
+               Buffer          rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false);
                Page            rootPage = BufferGetPage(rootBuf);
 
                GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
@@ -352,7 +343,6 @@ static void
 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
 {
        ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
@@ -360,8 +350,7 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(data->node);
-       buffer = XLogReadBuffer(reln, data->blkno, false);
+       buffer = XLogReadBuffer(data->node, data->blkno, false);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -403,15 +392,12 @@ static void
 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
 {
        ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
-       reln = XLogOpenRelation(data->node);
-
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
-               buffer = XLogReadBuffer(reln, data->blkno, false);
+               buffer = XLogReadBuffer(data->node, data->blkno, false);
                page = BufferGetPage(buffer);
                Assert(GinPageIsData(page));
                GinPageGetOpaque(page)->flags = GIN_DELETED;
@@ -423,7 +409,7 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
 
        if (!(record->xl_info & XLR_BKP_BLOCK_2))
        {
-               buffer = XLogReadBuffer(reln, data->parentBlkno, false);
+               buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
                page = BufferGetPage(buffer);
                Assert(GinPageIsData(page));
                Assert(!GinPageIsLeaf(page));
@@ -436,7 +422,7 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
 
        if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
        {
-               buffer = XLogReadBuffer(reln, data->leftBlkno, false);
+               buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
                page = BufferGetPage(buffer);
                Assert(GinPageIsData(page));
                GinPageGetOpaque(page)->rightlink = data->rightLink;
@@ -557,9 +543,9 @@ ginContinueSplit(ginIncompleteSplit *split)
         * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
         * split->leftBlkno, split->rightBlkno);
         */
-       reln = XLogOpenRelation(split->node);
+       buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
 
-       buffer = XLogReadBuffer(reln, split->leftBlkno, false);
+       reln = CreateFakeRelcacheEntry(split->node);
 
        if (split->rootBlkno == GIN_ROOT_BLKNO)
        {
@@ -581,6 +567,8 @@ ginContinueSplit(ginIncompleteSplit *split)
                                                                           GinPageGetOpaque(page)->maxoff))->key;
        }
 
+       FreeFakeRelcacheEntry(reln);
+
        btree.rightblkno = split->rightBlkno;
 
        stack.blkno = split->leftBlkno;
index 77ff12ddcf4e8f9660fbbf80ecc99a339d244205..e8f837776aa66f2a306a729a488c7cd6ea5ec4c4 100644 (file)
@@ -462,7 +462,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
 
                if (!is_leaf)
                        PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
-               gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
+               gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
 
                MarkBufferDirty(state->stack->buffer);
 
@@ -1008,7 +1008,7 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
        START_CRIT_SECTION();
 
        GISTInitBuffer(buffer, 0);
-       gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
+       gistfillbuffer(page, itup, len, FirstOffsetNumber);
 
        MarkBufferDirty(buffer);
 
index a66ea9e6be87915c1121e34cd0c2d27d97cbf24c..bf80127b01a9e9c5ac9e01ee6831e4f8cc1a587f 100644 (file)
@@ -27,11 +27,10 @@ static Datum attrS[INDEX_MAX_KEYS];
 static bool isnullS[INDEX_MAX_KEYS];
 
 /*
- * Write itup vector to page, has no control of free space
+ * Write itup vector to page, has no control of free space.
  */
-OffsetNumber
-gistfillbuffer(Relation r, Page page, IndexTuple *itup,
-                          int len, OffsetNumber off)
+void
+gistfillbuffer(Page page, IndexTuple *itup, int len, OffsetNumber off)
 {
        OffsetNumber l = InvalidOffsetNumber;
        int                     i;
@@ -42,14 +41,13 @@ gistfillbuffer(Relation r, Page page, IndexTuple *itup,
 
        for (i = 0; i < len; i++)
        {
-               l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
-                                               off, false, false);
+               Size sz = IndexTupleSize(itup[i]);
+               l = PageAddItem(page, (Item) itup[i], sz, off, false, false);
                if (l == InvalidOffsetNumber)
-                       elog(ERROR, "failed to add item to index page in \"%s\"",
-                                RelationGetRelationName(r));
+                       elog(ERROR, "failed to add item to GiST index page, item %d out of %d, size %d bytes",
+                                i, len, sz);
                off++;
        }
-       return l;
 }
 
 /*
index a5b70af67bc92fc843cd359364e81053b1ebfd90..85e95edc1952537ce827b5fee78412adb33c4682 100644 (file)
@@ -403,7 +403,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
                        }
                        else
                                /* enough free space */
-                               gistfillbuffer(gv->index, tempPage, addon, curlenaddon, InvalidOffsetNumber);
+                               gistfillbuffer(tempPage, addon, curlenaddon, InvalidOffsetNumber);
                }
        }
 
index 81bb7c59ff17289b9e0bfb716ca9d439c8ad1420..b97ab2fb76310dd21388e6e292bb1a4d095736bf 100644 (file)
@@ -189,7 +189,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 {
        gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
        PageUpdateRecord xlrec;
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
@@ -208,8 +207,7 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 
        decodePageUpdateRecord(&xlrec, record);
 
-       reln = XLogOpenRelation(xlrec.data->node);
-       buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
+       buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
        if (!BufferIsValid(buffer))
                return;
        page = (Page) BufferGetPage(buffer);
@@ -234,7 +232,7 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
 
        /* add tuples */
        if (xlrec.len > 0)
-               gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
+               gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
 
        /*
         * special case: leafpage, nothing to insert, nothing to delete, then
@@ -262,7 +260,6 @@ static void
 gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
 {
        gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
@@ -270,8 +267,7 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(xldata->node);
-       buffer = XLogReadBuffer(reln, xldata->blkno, false);
+       buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
        if (!BufferIsValid(buffer))
                return;
 
@@ -319,14 +315,12 @@ static void
 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
 {
        PageSplitRecord xlrec;
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        int                     i;
        int                     flags;
 
        decodePageSplitRecord(&xlrec, record);
-       reln = XLogOpenRelation(xlrec.data->node);
        flags = xlrec.data->origleaf ? F_LEAF : 0;
 
        /* loop around all pages */
@@ -334,7 +328,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
        {
                NewPage    *newpage = xlrec.page + i;
 
-               buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
+               buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
                Assert(BufferIsValid(buffer));
                page = (Page) BufferGetPage(buffer);
 
@@ -342,7 +336,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
                GISTInitBuffer(buffer, flags);
 
                /* and fill it */
-               gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber);
+               gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
 
                PageSetLSN(page, lsn);
                PageSetTLI(page, ThisTimeLineID);
@@ -361,12 +355,10 @@ static void
 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
 {
        RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
-       reln = XLogOpenRelation(*node);
-       buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true);
+       buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -602,7 +594,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
                                lenitup;
        Relation        index;
 
-       index = XLogOpenRelation(insert->node);
+       index = CreateFakeRelcacheEntry(insert->node);
 
        /*
         * needed vector itup never will be more than initial lenblkno+2, because
@@ -624,7 +616,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
                 * it was split root, so we should only make new root. it can't be
                 * simple insert into root, we should replace all content of root.
                 */
-               Buffer          buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
+               Buffer          buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
 
                gistnewroot(index, buffer, itup, lenitup, NULL);
                UnlockReleaseBuffer(buffer);
@@ -703,7 +695,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
                                LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
                                GISTInitBuffer(buffers[numbuffer], 0);
                                pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
-                               gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
+                               gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
                                numbuffer++;
 
                                if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
@@ -749,7 +741,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
 
                                for (j = 0; j < ntodelete; j++)
                                        PageIndexTupleDelete(pages[0], todelete[j]);
-                               gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber);
+                               gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
 
                                rdata = formUpdateRdata(index->rd_node, buffers[0],
                                                                                todelete, ntodelete,
@@ -794,6 +786,8 @@ gistContinueInsert(gistIncompleteInsert *insert)
                }
        }
 
+       FreeFakeRelcacheEntry(index);
+
        ereport(LOG,
                        (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
                        insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
index 6c20b48cc4806892a756c87c80caf46b3f33b197..205771ec69debb4d881b6e29b92f8a40220495a8 100644 (file)
@@ -3944,7 +3944,6 @@ static void
 heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
 {
        xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        OffsetNumber *end;
@@ -3958,8 +3957,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(xlrec->node);
-       buffer = XLogReadBuffer(reln, xlrec->block, false);
+       buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
        if (!BufferIsValid(buffer))
                return;
        page = (Page) BufferGetPage(buffer);
@@ -3980,7 +3978,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
        Assert(nunused >= 0);
 
        /* Update all item pointers per the record, and repair fragmentation */
-       heap_page_prune_execute(reln, buffer,
+       heap_page_prune_execute(buffer,
                                                        redirected, nredirected,
                                                        nowdead, ndead,
                                                        nowunused, nunused,
@@ -4002,15 +4000,13 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record);
        TransactionId cutoff_xid = xlrec->cutoff_xid;
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(xlrec->node);
-       buffer = XLogReadBuffer(reln, xlrec->block, false);
+       buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
        if (!BufferIsValid(buffer))
                return;
        page = (Page) BufferGetPage(buffer);
@@ -4050,7 +4046,6 @@ static void
 heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
 
@@ -4058,8 +4053,7 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
         * Note: the NEWPAGE log record is used for both heaps and indexes, so do
         * not do anything that assumes we are touching a heap.
         */
-       reln = XLogOpenRelation(xlrec->node);
-       buffer = XLogReadBuffer(reln, xlrec->blkno, true);
+       buffer = XLogReadBuffer(xlrec->node, xlrec->blkno, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -4076,7 +4070,6 @@ static void
 heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        OffsetNumber offnum;
@@ -4086,8 +4079,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(xlrec->target.node);
-       buffer = XLogReadBuffer(reln,
+       buffer = XLogReadBuffer(xlrec->target.node,
                                                        ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                                        false);
        if (!BufferIsValid(buffer))
@@ -4133,7 +4125,6 @@ static void
 heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        OffsetNumber offnum;
@@ -4149,11 +4140,9 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(xlrec->target.node);
-
        if (record->xl_info & XLOG_HEAP_INIT_PAGE)
        {
-               buffer = XLogReadBuffer(reln,
+               buffer = XLogReadBuffer(xlrec->target.node,
                                                         ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                                                true);
                Assert(BufferIsValid(buffer));
@@ -4163,7 +4152,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
        }
        else
        {
-               buffer = XLogReadBuffer(reln,
+               buffer = XLogReadBuffer(xlrec->target.node,
                                                         ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                                                false);
                if (!BufferIsValid(buffer))
@@ -4216,7 +4205,6 @@ static void
 heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
 {
        xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
-       Relation        reln = XLogOpenRelation(xlrec->target.node);
        Buffer          buffer;
        bool            samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
                                                        ItemPointerGetBlockNumber(&(xlrec->target.tid)));
@@ -4242,7 +4230,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
 
        /* Deal with old tuple version */
 
-       buffer = XLogReadBuffer(reln,
+       buffer = XLogReadBuffer(xlrec->target.node,
                                                        ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                                        false);
        if (!BufferIsValid(buffer))
@@ -4317,7 +4305,7 @@ newt:;
 
        if (record->xl_info & XLOG_HEAP_INIT_PAGE)
        {
-               buffer = XLogReadBuffer(reln,
+               buffer = XLogReadBuffer(xlrec->target.node,
                                                                ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                                                true);
                Assert(BufferIsValid(buffer));
@@ -4327,7 +4315,7 @@ newt:;
        }
        else
        {
-               buffer = XLogReadBuffer(reln,
+               buffer = XLogReadBuffer(xlrec->target.node,
                                                                ItemPointerGetBlockNumber(&(xlrec->newtid)),
                                                                false);
                if (!BufferIsValid(buffer))
@@ -4399,7 +4387,6 @@ static void
 heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        OffsetNumber offnum;
@@ -4409,8 +4396,7 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       reln = XLogOpenRelation(xlrec->target.node);
-       buffer = XLogReadBuffer(reln,
+       buffer = XLogReadBuffer(xlrec->target.node,
                                                        ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                                        false);
        if (!BufferIsValid(buffer))
@@ -4458,7 +4444,6 @@ static void
 heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
-       Relation        reln = XLogOpenRelation(xlrec->target.node);
        Buffer          buffer;
        Page            page;
        OffsetNumber offnum;
@@ -4470,7 +4455,7 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
        if (record->xl_info & XLR_BKP_BLOCK_1)
                return;
 
-       buffer = XLogReadBuffer(reln,
+       buffer = XLogReadBuffer(xlrec->target.node,
                                                        ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                                        false);
        if (!BufferIsValid(buffer))
index 50e61b3771c1368a6b40636fe7f9bde32dd9f425..62a2ba5059133a7f906be67554074ba384552873 100644 (file)
@@ -225,7 +225,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
                 * and update the page's hint bit about whether it has free line
                 * pointers.
                 */
-               heap_page_prune_execute(relation, buffer,
+               heap_page_prune_execute(buffer,
                                                                prstate.redirected, prstate.nredirected,
                                                                prstate.nowdead, prstate.ndead,
                                                                prstate.nowunused, prstate.nunused,
@@ -696,7 +696,7 @@ heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
  * arguments are identical to those of log_heap_clean().
  */
 void
-heap_page_prune_execute(Relation reln, Buffer buffer,
+heap_page_prune_execute(Buffer buffer,
                                                OffsetNumber *redirected, int nredirected,
                                                OffsetNumber *nowdead, int ndead,
                                                OffsetNumber *nowunused, int nunused,
index d56f73cb6d164f2f306a8df14475437058737350..f6d7cf2fe44ceacd42fb85d5d137ee21ba160cd7 100644 (file)
@@ -150,7 +150,7 @@ _bt_restore_page(Page page, char *from, int len)
 }
 
 static void
-_bt_restore_meta(Relation reln, XLogRecPtr lsn,
+_bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
                                 BlockNumber root, uint32 level,
                                 BlockNumber fastroot, uint32 fastlevel)
 {
@@ -159,7 +159,7 @@ _bt_restore_meta(Relation reln, XLogRecPtr lsn,
        BTMetaPageData *md;
        BTPageOpaque pageop;
 
-       metabuf = XLogReadBuffer(reln, BTREE_METAPAGE, true);
+       metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
        Assert(BufferIsValid(metabuf));
        metapg = BufferGetPage(metabuf);
 
@@ -194,7 +194,6 @@ btree_xlog_insert(bool isleaf, bool ismeta,
                                  XLogRecPtr lsn, XLogRecord *record)
 {
        xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        char       *datapos;
@@ -220,11 +219,9 @@ btree_xlog_insert(bool isleaf, bool ismeta,
        if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
                return;                                 /* nothing to do */
 
-       reln = XLogOpenRelation(xlrec->target.node);
-
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
-               buffer = XLogReadBuffer(reln,
+               buffer = XLogReadBuffer(xlrec->target.node,
                                                         ItemPointerGetBlockNumber(&(xlrec->target.tid)),
                                                                false);
                if (BufferIsValid(buffer))
@@ -251,7 +248,7 @@ btree_xlog_insert(bool isleaf, bool ismeta,
        }
 
        if (ismeta)
-               _bt_restore_meta(reln, lsn,
+               _bt_restore_meta(xlrec->target.node, lsn,
                                                 md.root, md.level,
                                                 md.fastroot, md.fastlevel);
 
@@ -265,7 +262,6 @@ btree_xlog_split(bool onleft, bool isroot,
                                 XLogRecPtr lsn, XLogRecord *record)
 {
        xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          rbuf;
        Page            rpage;
        BTPageOpaque ropaque;
@@ -277,8 +273,6 @@ btree_xlog_split(bool onleft, bool isroot,
        Item            left_hikey = NULL;
        Size            left_hikeysz = 0;
 
-       reln = XLogOpenRelation(xlrec->node);
-
        datapos = (char *) xlrec + SizeOfBtreeSplit;
        datalen = record->xl_len - SizeOfBtreeSplit;
 
@@ -328,7 +322,7 @@ btree_xlog_split(bool onleft, bool isroot,
        }
 
        /* Reconstruct right (new) sibling from scratch */
-       rbuf = XLogReadBuffer(reln, xlrec->rightsib, true);
+       rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
        Assert(BufferIsValid(rbuf));
        rpage = (Page) BufferGetPage(rbuf);
 
@@ -369,7 +363,7 @@ btree_xlog_split(bool onleft, bool isroot,
         */
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
-               Buffer          lbuf = XLogReadBuffer(reln, xlrec->leftsib, false);
+               Buffer          lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
 
                if (BufferIsValid(lbuf))
                {
@@ -439,7 +433,7 @@ btree_xlog_split(bool onleft, bool isroot,
        /* Fix left-link of the page to the right of the new right sibling */
        if (xlrec->rnext != P_NONE && !(record->xl_info & XLR_BKP_BLOCK_2))
        {
-               Buffer          buffer = XLogReadBuffer(reln, xlrec->rnext, false);
+               Buffer          buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
 
                if (BufferIsValid(buffer))
                {
@@ -468,7 +462,6 @@ static void
 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_btree_delete *xlrec;
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        BTPageOpaque opaque;
@@ -477,8 +470,7 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
                return;
 
        xlrec = (xl_btree_delete *) XLogRecGetData(record);
-       reln = XLogOpenRelation(xlrec->node);
-       buffer = XLogReadBuffer(reln, xlrec->block, false);
+       buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
        if (!BufferIsValid(buffer))
                return;
        page = (Page) BufferGetPage(buffer);
@@ -517,7 +509,6 @@ static void
 btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
 {
        xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
-       Relation        reln;
        BlockNumber parent;
        BlockNumber target;
        BlockNumber leftsib;
@@ -526,7 +517,6 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
        Page            page;
        BTPageOpaque pageop;
 
-       reln = XLogOpenRelation(xlrec->target.node);
        parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
        target = xlrec->deadblk;
        leftsib = xlrec->leftblk;
@@ -535,7 +525,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
        /* parent page */
        if (!(record->xl_info & XLR_BKP_BLOCK_1))
        {
-               buffer = XLogReadBuffer(reln, parent, false);
+               buffer = XLogReadBuffer(xlrec->target.node, parent, false);
                if (BufferIsValid(buffer))
                {
                        page = (Page) BufferGetPage(buffer);
@@ -581,7 +571,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
        /* Fix left-link of right sibling */
        if (!(record->xl_info & XLR_BKP_BLOCK_2))
        {
-               buffer = XLogReadBuffer(reln, rightsib, false);
+               buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
                if (BufferIsValid(buffer))
                {
                        page = (Page) BufferGetPage(buffer);
@@ -607,7 +597,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
        {
                if (leftsib != P_NONE)
                {
-                       buffer = XLogReadBuffer(reln, leftsib, false);
+                       buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
                        if (BufferIsValid(buffer))
                        {
                                page = (Page) BufferGetPage(buffer);
@@ -630,7 +620,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
        }
 
        /* Rewrite target page as empty deleted page */
-       buffer = XLogReadBuffer(reln, target, true);
+       buffer = XLogReadBuffer(xlrec->target.node, target, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -655,7 +645,7 @@ btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
 
                memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
                           sizeof(xl_btree_metadata));
-               _bt_restore_meta(reln, lsn,
+               _bt_restore_meta(xlrec->target.node, lsn,
                                                 md.root, md.level,
                                                 md.fastroot, md.fastlevel);
        }
@@ -672,14 +662,12 @@ static void
 btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
 {
        xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        BTPageOpaque pageop;
        BlockNumber downlink = 0;
 
-       reln = XLogOpenRelation(xlrec->node);
-       buffer = XLogReadBuffer(reln, xlrec->rootblk, true);
+       buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
@@ -711,7 +699,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
        MarkBufferDirty(buffer);
        UnlockReleaseBuffer(buffer);
 
-       _bt_restore_meta(reln, lsn,
+       _bt_restore_meta(xlrec->node, lsn,
                                         xlrec->rootblk, xlrec->level,
                                         xlrec->rootblk, xlrec->level);
 
@@ -904,9 +892,7 @@ btree_xlog_cleanup(void)
        foreach(l, incomplete_actions)
        {
                bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
-               Relation        reln;
 
-               reln = XLogOpenRelation(action->node);
                if (action->is_split)
                {
                        /* finish an incomplete split */
@@ -917,14 +903,15 @@ btree_xlog_cleanup(void)
                        BTPageOpaque lpageop,
                                                rpageop;
                        bool            is_only;
+                       Relation        reln;
 
-                       lbuf = XLogReadBuffer(reln, action->leftblk, false);
+                       lbuf = XLogReadBuffer(action->node, action->leftblk, false);
                        /* failure is impossible because we wrote this page earlier */
                        if (!BufferIsValid(lbuf))
                                elog(PANIC, "btree_xlog_cleanup: left block unfound");
                        lpage = (Page) BufferGetPage(lbuf);
                        lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
-                       rbuf = XLogReadBuffer(reln, action->rightblk, false);
+                       rbuf = XLogReadBuffer(action->node, action->rightblk, false);
                        /* failure is impossible because we wrote this page earlier */
                        if (!BufferIsValid(rbuf))
                                elog(PANIC, "btree_xlog_cleanup: right block unfound");
@@ -934,18 +921,26 @@ btree_xlog_cleanup(void)
                        /* if the pages are all of their level, it's a only-page split */
                        is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
 
+                       reln = CreateFakeRelcacheEntry(action->node);
                        _bt_insert_parent(reln, lbuf, rbuf, NULL,
                                                          action->is_root, is_only);
+                       FreeFakeRelcacheEntry(reln);
                }
                else
                {
                        /* finish an incomplete deletion (of a half-dead page) */
                        Buffer          buf;
 
-                       buf = XLogReadBuffer(reln, action->delblk, false);
+                       buf = XLogReadBuffer(action->node, action->delblk, false);
                        if (BufferIsValid(buf))
+                       {
+                               Relation reln;
+
+                               reln = CreateFakeRelcacheEntry(action->node);
                                if (_bt_pagedel(reln, buf, NULL, true) == 0)
                                        elog(PANIC, "btree_xlog_cleanup: _bt_pagdel failed");
+                               FreeFakeRelcacheEntry(reln);
+                       }
                }
        }
        incomplete_actions = NIL;
index 0ceb9f579e4ee00c57a7b2679898943ade59ed86..dfe8375d1f2822159a01ff195f26ec2919b28cc2 100644 (file)
@@ -2840,7 +2840,6 @@ CleanupBackupHistory(void)
 static void
 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
 {
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        BkpBlock        bkpb;
@@ -2856,8 +2855,7 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
                memcpy(&bkpb, blk, sizeof(BkpBlock));
                blk += sizeof(BkpBlock);
 
-               reln = XLogOpenRelation(bkpb.node);
-               buffer = XLogReadBuffer(reln, bkpb.block, true);
+               buffer = XLogReadBuffer(bkpb.node, bkpb.block, true);
                Assert(BufferIsValid(buffer));
                page = (Page) BufferGetPage(buffer);
 
@@ -5064,9 +5062,7 @@ StartupXLOG(void)
                                                                BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
                }
 
-               /* Start up the recovery environment */
-               XLogInitRelationCache();
-
+               /* Initialize resource managers */
                for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
                {
                        if (RmgrTable[rmid].rm_startup != NULL)
@@ -5330,11 +5326,6 @@ StartupXLOG(void)
                 * allows some extra error checking in xlog_redo.
                 */
                CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
-
-               /*
-                * Close down recovery environment
-                */
-               XLogCloseRelationCache();
        }
 
        /*
index ef441872b571235a68c40a857a02ca86dedcf87c..27f1501fb1b050ad33b70f993d0091674058a8c3 100644 (file)
@@ -190,6 +190,9 @@ XLogCheckInvalidPages(void)
 
        if (foundone)
                elog(PANIC, "WAL contains references to invalid pages");
+
+       hash_destroy(invalid_page_tab);
+       invalid_page_tab = NULL;
 }
 
 
@@ -218,27 +221,40 @@ XLogCheckInvalidPages(void)
  * at the end of WAL replay.)
  */
 Buffer
-XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
+XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
 {
-       BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
+       BlockNumber lastblock;
        Buffer          buffer;
+       SMgrRelation smgr;
 
        Assert(blkno != P_NEW);
 
+       /* Open the relation at smgr level */
+       smgr = smgropen(rnode);
+
+       /*
+        * Create the target file if it doesn't already exist.  This lets us cope
+        * if the replay sequence contains writes to a relation that is later
+        * deleted.  (The original coding of this routine would instead suppress
+        * the writes, but that seems like it risks losing valuable data if the
+        * filesystem loses an inode during a crash.  Better to write the data
+        * until we are actually told to delete the file.)
+        */
+       smgrcreate(smgr, false, true);
+
+       lastblock = smgrnblocks(smgr);
+
        if (blkno < lastblock)
        {
                /* page exists in file */
-               if (init)
-                       buffer = ReadOrZeroBuffer(reln, blkno);
-               else
-                       buffer = ReadBuffer(reln, blkno);
+               buffer = ReadBufferWithoutRelcache(rnode, false, blkno, init);
        }
        else
        {
                /* hm, page doesn't exist in file */
                if (!init)
                {
-                       log_invalid_page(reln->rd_node, blkno, false);
+                       log_invalid_page(rnode, blkno, false);
                        return InvalidBuffer;
                }
                /* OK to extend the file */
@@ -249,7 +265,7 @@ XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
                {
                        if (buffer != InvalidBuffer)
                                ReleaseBuffer(buffer);
-                       buffer = ReadBuffer(reln, P_NEW);
+                       buffer = ReadBufferWithoutRelcache(rnode, false, P_NEW, false);
                        lastblock++;
                }
                Assert(BufferGetBlockNumber(buffer) == blkno);
@@ -265,7 +281,7 @@ XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
                if (PageIsNew((PageHeader) page))
                {
                        UnlockReleaseBuffer(buffer);
-                       log_invalid_page(reln->rd_node, blkno, true);
+                       log_invalid_page(rnode, blkno, true);
                        return InvalidBuffer;
                }
        }
@@ -275,226 +291,81 @@ XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
 
 
 /*
- * Lightweight "Relation" cache --- this substitutes for the normal relcache
- * during XLOG replay.
+ * Struct actually returned by XLogFakeRelcacheEntry, though the declared
+ * return type is Relation.
  */
-
-typedef struct XLogRelDesc
-{
-       RelationData reldata;
-       struct XLogRelDesc *lessRecently;
-       struct XLogRelDesc *moreRecently;
-} XLogRelDesc;
-
-typedef struct XLogRelCacheEntry
+typedef struct
 {
-       RelFileNode rnode;
-       XLogRelDesc *rdesc;
-} XLogRelCacheEntry;
+       RelationData            reldata;        /* Note: this must be first */
+       FormData_pg_class       pgc;
+} FakeRelCacheEntryData;
 
-static HTAB *_xlrelcache;
-static XLogRelDesc *_xlrelarr = NULL;
-static Form_pg_class _xlpgcarr = NULL;
-static int     _xlast = 0;
-static int     _xlcnt = 0;
+typedef FakeRelCacheEntryData *FakeRelCacheEntry;
 
-#define _XLOG_RELCACHESIZE     512
-
-static void
-_xl_init_rel_cache(void)
-{
-       HASHCTL         ctl;
-
-       _xlcnt = _XLOG_RELCACHESIZE;
-       _xlast = 0;
-       _xlrelarr = (XLogRelDesc *) malloc(sizeof(XLogRelDesc) * _xlcnt);
-       memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
-       _xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
-       memset(_xlpgcarr, 0, sizeof(FormData_pg_class) * _xlcnt);
-
-       _xlrelarr[0].moreRecently = &(_xlrelarr[0]);
-       _xlrelarr[0].lessRecently = &(_xlrelarr[0]);
-
-       memset(&ctl, 0, sizeof(ctl));
-       ctl.keysize = sizeof(RelFileNode);
-       ctl.entrysize = sizeof(XLogRelCacheEntry);
-       ctl.hash = tag_hash;
-
-       _xlrelcache = hash_create("XLOG relcache", _XLOG_RELCACHESIZE,
-                                                         &ctl, HASH_ELEM | HASH_FUNCTION);
-}
-
-static void
-_xl_remove_hash_entry(XLogRelDesc *rdesc)
-{
-       Form_pg_class tpgc = rdesc->reldata.rd_rel;
-       XLogRelCacheEntry *hentry;
-
-       rdesc->lessRecently->moreRecently = rdesc->moreRecently;
-       rdesc->moreRecently->lessRecently = rdesc->lessRecently;
-
-       hentry = (XLogRelCacheEntry *) hash_search(_xlrelcache,
-                                         (void *) &(rdesc->reldata.rd_node), HASH_REMOVE, NULL);
-       if (hentry == NULL)
-               elog(PANIC, "_xl_remove_hash_entry: file was not found in cache");
-
-       RelationCloseSmgr(&(rdesc->reldata));
-
-       memset(rdesc, 0, sizeof(XLogRelDesc));
-       memset(tpgc, 0, sizeof(FormData_pg_class));
-       rdesc->reldata.rd_rel = tpgc;
-}
-
-static XLogRelDesc *
-_xl_new_reldesc(void)
-{
-       XLogRelDesc *res;
-
-       _xlast++;
-       if (_xlast < _xlcnt)
-       {
-               _xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
-               return &(_xlrelarr[_xlast]);
-       }
-
-       /* reuse */
-       res = _xlrelarr[0].moreRecently;
-
-       _xl_remove_hash_entry(res);
-
-       _xlast--;
-       return res;
-}
-
-
-void
-XLogInitRelationCache(void)
-{
-       _xl_init_rel_cache();
-       invalid_page_tab = NULL;
-}
-
-void
-XLogCloseRelationCache(void)
+/*
+ * Create a fake relation cache entry for a physical relation
+ *
+ * It's often convenient to use the same functions in XLOG replay as in the
+ * main codepath, but those functions typically work with a relcache entry. 
+ * We don't have a working relation cache during XLOG replay, but this 
+ * function can be used to create a fake relcache entry instead. Only the 
+ * fields related to physical storage, like rd_rel, are initialized, so the 
+ * fake entry is only usable in low-level operations like ReadBuffer().
+ *
+ * Caller must free the returned entry with FreeFakeRelcacheEntry().
+ */
+Relation
+CreateFakeRelcacheEntry(RelFileNode rnode)
 {
-       HASH_SEQ_STATUS status;
-       XLogRelCacheEntry *hentry;
+       FakeRelCacheEntry fakeentry;
+       Relation rel;
 
-       if (!_xlrelarr)
-               return;
+       /* Allocate the Relation struct and all related space in one block. */
+       fakeentry = palloc0(sizeof(FakeRelCacheEntryData));
+       rel = (Relation) fakeentry;
 
-       hash_seq_init(&status, _xlrelcache);
+       rel->rd_rel = &fakeentry->pgc;
+       rel->rd_node = rnode;
 
-       while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
-               _xl_remove_hash_entry(hentry->rdesc);
+       /* We don't know the name of the relation; use relfilenode instead */
+       sprintf(RelationGetRelationName(rel), "%u", rnode.relNode);
 
-       hash_destroy(_xlrelcache);
+       /*
+        * We set up the lockRelId in case anything tries to lock the dummy
+        * relation.  Note that this is fairly bogus since relNode may be
+        * different from the relation's OID.  It shouldn't really matter
+        * though, since we are presumably running by ourselves and can't have
+        * any lock conflicts ...
+        */
+       rel->rd_lockInfo.lockRelId.dbId = rnode.dbNode;
+       rel->rd_lockInfo.lockRelId.relId = rnode.relNode;
 
-       free(_xlrelarr);
-       free(_xlpgcarr);
+       rel->rd_targblock = InvalidBlockNumber;
+       rel->rd_smgr = NULL;
 
-       _xlrelarr = NULL;
+       return rel;
 }
 
 /*
- * Open a relation during XLOG replay
- *
- * Note: this once had an API that allowed NULL return on failure, but it
- * no longer does; any failure results in elog().
+ * Free a fake relation cache entry.
  */
-Relation
-XLogOpenRelation(RelFileNode rnode)
+void
+FreeFakeRelcacheEntry(Relation fakerel)
 {
-       XLogRelDesc *res;
-       XLogRelCacheEntry *hentry;
-       bool            found;
-
-       hentry = (XLogRelCacheEntry *)
-               hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
-
-       if (hentry)
-       {
-               res = hentry->rdesc;
-
-               res->lessRecently->moreRecently = res->moreRecently;
-               res->moreRecently->lessRecently = res->lessRecently;
-       }
-       else
-       {
-               res = _xl_new_reldesc();
-
-               sprintf(RelationGetRelationName(&(res->reldata)), "%u", rnode.relNode);
-
-               res->reldata.rd_node = rnode;
-
-               /*
-                * We set up the lockRelId in case anything tries to lock the dummy
-                * relation.  Note that this is fairly bogus since relNode may be
-                * different from the relation's OID.  It shouldn't really matter
-                * though, since we are presumably running by ourselves and can't have
-                * any lock conflicts ...
-                */
-               res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode;
-               res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
-
-               hentry = (XLogRelCacheEntry *)
-                       hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
-
-               if (found)
-                       elog(PANIC, "xlog relation already present on insert into cache");
-
-               hentry->rdesc = res;
-
-               res->reldata.rd_targblock = InvalidBlockNumber;
-               res->reldata.rd_smgr = NULL;
-               RelationOpenSmgr(&(res->reldata));
-
-               /*
-                * Create the target file if it doesn't already exist.  This lets us
-                * cope if the replay sequence contains writes to a relation that is
-                * later deleted.  (The original coding of this routine would instead
-                * return NULL, causing the writes to be suppressed. But that seems
-                * like it risks losing valuable data if the filesystem loses an inode
-                * during a crash.      Better to write the data until we are actually
-                * told to delete the file.)
-                */
-               smgrcreate(res->reldata.rd_smgr, res->reldata.rd_istemp, true);
-       }
-
-       res->moreRecently = &(_xlrelarr[0]);
-       res->lessRecently = _xlrelarr[0].lessRecently;
-       _xlrelarr[0].lessRecently = res;
-       res->lessRecently->moreRecently = res;
-
-       return &(res->reldata);
+       pfree(fakerel);
 }
 
 /*
  * Drop a relation during XLOG replay
  *
- * This is called when the relation is about to be deleted; we need to ensure
- * that there is no dangling smgr reference in the xlog relation cache.
- *
- * Currently, we don't bother to physically remove the relation from the
- * cache, we just let it age out normally.
- *
- * This also takes care of removing any open "invalid-page" records for
- * the relation.
+ * This is called when the relation is about to be deleted; we need to remove
+ * any open "invalid-page" records for the relation.
  */
 void
 XLogDropRelation(RelFileNode rnode)
 {
-       XLogRelCacheEntry *hentry;
-
-       hentry = (XLogRelCacheEntry *)
-               hash_search(_xlrelcache, (void *) &rnode, HASH_FIND, NULL);
-
-       if (hentry)
-       {
-               XLogRelDesc *rdesc = hentry->rdesc;
-
-               RelationCloseSmgr(&(rdesc->reldata));
-       }
+       /* Tell smgr to forget about this relation as well */
+       smgrclosenode(rnode);
 
        forget_invalid_pages(rnode, 0);
 }
@@ -507,18 +378,14 @@ XLogDropRelation(RelFileNode rnode)
 void
 XLogDropDatabase(Oid dbid)
 {
-       HASH_SEQ_STATUS status;
-       XLogRelCacheEntry *hentry;
-
-       hash_seq_init(&status, _xlrelcache);
-
-       while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
-       {
-               XLogRelDesc *rdesc = hentry->rdesc;
-
-               if (hentry->rnode.dbNode == dbid)
-                       RelationCloseSmgr(&(rdesc->reldata));
-       }
+       /*
+        * This is unnecessarily heavy-handed, as it will close SMgrRelation
+        * objects for other databases as well. DROP DATABASE occurs seldom
+        * enough that it's not worth introducing a variant of smgrclose for
+        * just this purpose. XXX: Or should we rather leave the smgr entries
+        * dangling?
+        */
+       smgrcloseall();
 
        forget_invalid_pages_db(dbid);
 }
@@ -526,8 +393,7 @@ XLogDropDatabase(Oid dbid)
 /*
  * Truncate a relation during XLOG replay
  *
- * We don't need to do anything to the fake relcache, but we do need to
- * clean up any open "invalid-page" records for the dropped pages.
+ * We need to clean up any open "invalid-page" records for the dropped pages.
  */
 void
 XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks)
index ab8aa306f236af671ac785e5c54d459d2067df1a..6333fdcddae6ab7552702905acb7f1bc02f500c4 100644 (file)
@@ -1332,7 +1332,6 @@ void
 seq_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        char       *item;
@@ -1343,8 +1342,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
        if (info != XLOG_SEQ_LOG)
                elog(PANIC, "seq_redo: unknown op code %u", info);
 
-       reln = XLogOpenRelation(xlrec->node);
-       buffer = XLogReadBuffer(reln, 0, true);
+       buffer = XLogReadBuffer(xlrec->node, 0, true);
        Assert(BufferIsValid(buffer));
        page = (Page) BufferGetPage(buffer);
 
index 10ebdf44cdee71150a655684e207d594ece96e16..406de7206594ce0eed9d928b00bbef144bd8ae9c 100644 (file)
@@ -76,9 +76,10 @@ static bool IsForInput;
 static volatile BufferDesc *PinCountWaitBuf = NULL;
 
 
-static Buffer ReadBuffer_common(Relation reln, BlockNumber blockNum,
-                                 bool zeroPage,
-                                 BufferAccessStrategy strategy);
+static Buffer ReadBuffer_relcache(Relation reln, BlockNumber blockNum,
+                                 bool zeroPage, BufferAccessStrategy strategy);
+static Buffer ReadBuffer_common(SMgrRelation reln, bool isLocalBuf, BlockNumber blockNum,
+                                 bool zeroPage, BufferAccessStrategy strategy, bool *hit);
 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
 static void PinBuffer_Locked(volatile BufferDesc *buf);
 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
@@ -89,7 +90,7 @@ static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
                                  int set_flag_bits);
 static void buffer_write_error_callback(void *arg);
-static volatile BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
+static volatile BufferDesc *BufferAlloc(SMgrRelation smgr, BlockNumber blockNum,
                        BufferAccessStrategy strategy,
                        bool *foundPtr);
 static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
@@ -114,7 +115,7 @@ static void AtProcExit_Buffers(int code, Datum arg);
 Buffer
 ReadBuffer(Relation reln, BlockNumber blockNum)
 {
-       return ReadBuffer_common(reln, blockNum, false, NULL);
+       return ReadBuffer_relcache(reln, blockNum, false, NULL);
 }
 
 /*
@@ -125,7 +126,7 @@ Buffer
 ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                                           BufferAccessStrategy strategy)
 {
-       return ReadBuffer_common(reln, blockNum, false, strategy);
+       return ReadBuffer_relcache(reln, blockNum, false, strategy);
 }
 
 /*
@@ -142,41 +143,79 @@ ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
 Buffer
 ReadOrZeroBuffer(Relation reln, BlockNumber blockNum)
 {
-       return ReadBuffer_common(reln, blockNum, true, NULL);
+       return ReadBuffer_relcache(reln, blockNum, true, NULL);
 }
 
 /*
- * ReadBuffer_common -- common logic for ReadBuffer variants
+ * ReadBufferWithoutRelcache -- like ReadBuffer, but doesn't require a 
+ *             relcache entry for the relation. If zeroPage is true, this behaves
+ *             like ReadOrZeroBuffer rather than ReadBuffer.
+ */
+Buffer
+ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp, 
+                                                 BlockNumber blockNum, bool zeroPage)
+{
+       bool hit;
+
+       SMgrRelation smgr = smgropen(rnode);
+       return ReadBuffer_common(smgr, isTemp, blockNum, zeroPage, NULL, &hit);
+}
+
+/*
+ * ReadBuffer_relcache -- common logic for ReadBuffer-variants that 
+ *             operate on a Relation.
+ */
+static Buffer
+ReadBuffer_relcache(Relation reln, BlockNumber blockNum, 
+                                       bool zeroPage, BufferAccessStrategy strategy)
+{
+       bool hit;
+       Buffer buf;
+
+       /* Open it at the smgr level if not already done */
+       RelationOpenSmgr(reln);
+
+       /*
+        * Read the buffer, and update pgstat counters to reflect a cache
+        * hit or miss.
+        */
+       pgstat_count_buffer_read(reln);
+       buf = ReadBuffer_common(reln->rd_smgr, reln->rd_istemp, blockNum, 
+                                                       zeroPage, strategy, &hit);
+       if (hit)
+               pgstat_count_buffer_hit(reln);
+       return buf;
+}
+
+/*
+ * ReadBuffer_common -- common logic for all ReadBuffer variants
+ *
+ * *hit is set to true if the request was satisfied from shared buffer cache.
  */
 static Buffer
-ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
-                                 BufferAccessStrategy strategy)
+ReadBuffer_common(SMgrRelation smgr, bool isLocalBuf, BlockNumber blockNum, 
+                                 bool zeroPage, BufferAccessStrategy strategy, bool *hit)
 {
        volatile BufferDesc *bufHdr;
        Block           bufBlock;
        bool            found;
        bool            isExtend;
-       bool            isLocalBuf;
+
+       *hit = false;
 
        /* Make sure we will have room to remember the buffer pin */
        ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
 
        isExtend = (blockNum == P_NEW);
-       isLocalBuf = reln->rd_istemp;
-
-       /* Open it at the smgr level if not already done */
-       RelationOpenSmgr(reln);
 
        /* Substitute proper block number if caller asked for P_NEW */
        if (isExtend)
-               blockNum = smgrnblocks(reln->rd_smgr);
-
-       pgstat_count_buffer_read(reln);
+               blockNum = smgrnblocks(smgr);
 
        if (isLocalBuf)
        {
                ReadLocalBufferCount++;
-               bufHdr = LocalBufferAlloc(reln, blockNum, &found);
+               bufHdr = LocalBufferAlloc(smgr, blockNum, &found);
                if (found)
                        LocalBufferHitCount++;
        }
@@ -188,7 +227,7 @@ ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
                 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
                 * not currently in memory.
                 */
-               bufHdr = BufferAlloc(reln, blockNum, strategy, &found);
+               bufHdr = BufferAlloc(smgr, blockNum, strategy, &found);
                if (found)
                        BufferHitCount++;
        }
@@ -201,7 +240,7 @@ ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
                if (!isExtend)
                {
                        /* Just need to update stats before we exit */
-                       pgstat_count_buffer_hit(reln);
+                       *hit = true;
 
                        if (VacuumCostActive)
                                VacuumCostBalance += VacuumCostPageHit;
@@ -225,8 +264,8 @@ ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
                bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
                if (!PageIsNew((PageHeader) bufBlock))
                        ereport(ERROR,
-                                       (errmsg("unexpected data beyond EOF in block %u of relation \"%s\"",
-                                                       blockNum, RelationGetRelationName(reln)),
+                                       (errmsg("unexpected data beyond EOF in block %u of relation %u/%u/%u",
+                                                       blockNum, smgr->smgr_rnode.spcNode, smgr->smgr_rnode.dbNode, smgr->smgr_rnode.relNode),
                                         errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
 
                /*
@@ -278,8 +317,7 @@ ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
        {
                /* new buffers are zero-filled */
                MemSet((char *) bufBlock, 0, BLCKSZ);
-               smgrextend(reln->rd_smgr, blockNum, (char *) bufBlock,
-                                  reln->rd_istemp);
+               smgrextend(smgr, blockNum, (char *) bufBlock, isLocalBuf);
        }
        else
        {
@@ -290,7 +328,7 @@ ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
                if (zeroPage)
                        MemSet((char *) bufBlock, 0, BLCKSZ);
                else
-                       smgrread(reln->rd_smgr, blockNum, (char *) bufBlock);
+                       smgrread(smgr, blockNum, (char *) bufBlock);
                /* check for garbage data */
                if (!PageHeaderIsValid((PageHeader) bufBlock))
                {
@@ -298,15 +336,20 @@ ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
                        {
                                ereport(WARNING,
                                                (errcode(ERRCODE_DATA_CORRUPTED),
-                                                errmsg("invalid page header in block %u of relation \"%s\"; zeroing out page",
-                                                               blockNum, RelationGetRelationName(reln))));
+                                                errmsg("invalid page header in block %u of relation %u/%u/%u; zeroing out page",
+                                                               blockNum, 
+                                                               smgr->smgr_rnode.spcNode,
+                                                               smgr->smgr_rnode.dbNode,
+                                                               smgr->smgr_rnode.relNode)));
                                MemSet((char *) bufBlock, 0, BLCKSZ);
                        }
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_DATA_CORRUPTED),
-                                errmsg("invalid page header in block %u of relation \"%s\"",
-                                               blockNum, RelationGetRelationName(reln))));
+                                errmsg("invalid page header in block %u of relation %u/%u/%u",
+                                               blockNum, smgr->smgr_rnode.spcNode,
+                                               smgr->smgr_rnode.dbNode,
+                                               smgr->smgr_rnode.relNode)));
                }
        }
 
@@ -347,7 +390,7 @@ ReadBuffer_common(Relation reln, BlockNumber blockNum, bool zeroPage,
  * No locks are held either at entry or exit.
  */
 static volatile BufferDesc *
-BufferAlloc(Relation reln,
+BufferAlloc(SMgrRelation smgr,
                        BlockNumber blockNum,
                        BufferAccessStrategy strategy,
                        bool *foundPtr)
@@ -364,7 +407,7 @@ BufferAlloc(Relation reln,
        bool            valid;
 
        /* create a tag so we can lookup the buffer */
-       INIT_BUFFERTAG(newTag, reln, blockNum);
+       INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);
 
        /* determine its hash code and partition lock ID */
        newHash = BufTableHashCode(&newTag);
index 703c70eb11bd8e07607904282770a4aad9e9b335..3f8247e5cbe7b13539790a39d1b535848dce946b 100644 (file)
@@ -61,7 +61,7 @@ static Block GetLocalBufferStorage(void);
  * (hence, usage_count is always advanced).
  */
 BufferDesc *
-LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
+LocalBufferAlloc(SMgrRelation smgr, BlockNumber blockNum, bool *foundPtr)
 {
        BufferTag       newTag;                 /* identity of requested block */
        LocalBufferLookupEnt *hresult;
@@ -70,7 +70,7 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
        int                     trycounter;
        bool            found;
 
-       INIT_BUFFERTAG(newTag, reln, blockNum);
+       INIT_BUFFERTAG(newTag, smgr->smgr_rnode, blockNum);
 
        /* Initialize local buffers if first request in this session */
        if (LocalBufHash == NULL)
@@ -87,7 +87,7 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
                Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
 #ifdef LBDEBUG
                fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
-                               RelationGetRelid(reln), blockNum, -b - 1);
+                               smgr->smgr_rnode.relNode, blockNum, -b - 1);
 #endif
                /* this part is equivalent to PinBuffer for a shared buffer */
                if (LocalRefCount[b] == 0)
index 936c6ccd8f3977aef2fde1982a9e91eb32c314a3..21e3dd22f66958afb705f3dcbec057df9d482149 100644 (file)
@@ -330,6 +330,13 @@ smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo)
        xl_smgr_create xlrec;
        PendingRelDelete *pending;
 
+       /*
+        * Exit quickly in WAL replay mode if we've already opened the file. 
+        * If it's open, it surely must exist.
+        */ 
+       if (isRedo && reln->md_fd != NULL)
+               return;
+
        /*
         * We may be using the target table space for the first time in this
         * database, so create a per-database subdirectory if needed.
index 4ec268026530a956e0d625fcf8a695f5cf472b34..05f59a79b89b232cfb70d816b20a64b8f063dcaa 100644 (file)
@@ -704,12 +704,6 @@ BuildFlatFiles(bool database_only)
                                rel_authid,
                                rel_authmem;
 
-       /*
-        * We don't have any hope of running a real relcache, but we can use the
-        * same fake-relcache facility that WAL replay uses.
-        */
-       XLogInitRelationCache();
-
        /* Need a resowner to keep the heapam and buffer code happy */
        owner = ResourceOwnerCreate(NULL, "BuildFlatFiles");
        CurrentResourceOwner = owner;
@@ -719,9 +713,15 @@ BuildFlatFiles(bool database_only)
        rnode.dbNode = 0;
        rnode.relNode = DatabaseRelationId;
 
-       /* No locking is needed because no one else is alive yet */
-       rel_db = XLogOpenRelation(rnode);
+       /*
+        * We don't have any hope of running a real relcache, but we can use the
+        * same fake-relcache facility that WAL replay uses.
+        *
+        * No locking is needed because no one else is alive yet.
+        */
+       rel_db = CreateFakeRelcacheEntry(rnode);
        write_database_file(rel_db, true);
+       FreeFakeRelcacheEntry(rel_db);
 
        if (!database_only)
        {
@@ -729,21 +729,21 @@ BuildFlatFiles(bool database_only)
                rnode.spcNode = GLOBALTABLESPACE_OID;
                rnode.dbNode = 0;
                rnode.relNode = AuthIdRelationId;
-               rel_authid = XLogOpenRelation(rnode);
+               rel_authid = CreateFakeRelcacheEntry(rnode);
 
                /* hard-wired path to pg_auth_members */
                rnode.spcNode = GLOBALTABLESPACE_OID;
                rnode.dbNode = 0;
                rnode.relNode = AuthMemRelationId;
-               rel_authmem = XLogOpenRelation(rnode);
+               rel_authmem = CreateFakeRelcacheEntry(rnode);
 
                write_auth_file(rel_authid, rel_authmem);
+               FreeFakeRelcacheEntry(rel_authid);
+               FreeFakeRelcacheEntry(rel_authmem);
        }
 
        CurrentResourceOwner = NULL;
        ResourceOwnerDelete(owner);
-
-       XLogCloseRelationCache();
 }
 
 
index 1f83b54109cf42744955a19a816bd6cd3df97b67..d220751a5136571de6c97465ac1f1d0e4924ace8 100644 (file)
@@ -284,8 +284,8 @@ extern bool gistfitpage(IndexTuple *itvec, int len);
 extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
 extern void gistcheckpage(Relation rel, Buffer buf);
 extern Buffer gistNewBuffer(Relation r);
-extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
-                          int len, OffsetNumber off);
+extern void gistfillbuffer(Page page, IndexTuple *itup, int len,
+                                                  OffsetNumber off);
 extern IndexTuple *gistextractpage(Page page, int *len /* out */ );
 extern IndexTuple *gistjoinvector(
                           IndexTuple *itvec, int *len,
index 6088c7f8403da44e73358c68c96201735afb81af..0acb633439b7b2ac40242446438f2ff3416d10d0 100644 (file)
@@ -124,7 +124,7 @@ extern void heap_page_prune_opt(Relation relation, Buffer buffer,
 extern int heap_page_prune(Relation relation, Buffer buffer,
                                TransactionId OldestXmin,
                                bool redirect_move, bool report_stats);
-extern void heap_page_prune_execute(Relation reln, Buffer buffer,
+extern void heap_page_prune_execute(Buffer buffer,
                                                OffsetNumber *redirected, int nredirected,
                                                OffsetNumber *nowdead, int ndead,
                                                OffsetNumber *nowunused, int nunused,
index 6ef588871310c755a462c170d09902202248757f..88e2b127cf3a81126255f68751dde949042922e2 100644 (file)
 #include "utils/rel.h"
 
 
-extern void XLogInitRelationCache(void);
 extern void XLogCheckInvalidPages(void);
-extern void XLogCloseRelationCache(void);
 
-extern Relation XLogOpenRelation(RelFileNode rnode);
 extern void XLogDropRelation(RelFileNode rnode);
 extern void XLogDropDatabase(Oid dbid);
 extern void XLogTruncateRelation(RelFileNode rnode, BlockNumber nblocks);
 
-extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init);
+extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
+
+extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
+extern void FreeFakeRelcacheEntry(Relation fakerel);
 
 #endif
index 1111df14df3243cbd8b49c76837bdd8dd53744a7..356dbe695fde1ea9df804abb16c2a8a9045d98c7 100644 (file)
@@ -18,6 +18,7 @@
 #include "storage/buf.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "storage/smgr.h"
 #include "storage/spin.h"
 #include "utils/rel.h"
 
@@ -75,9 +76,9 @@ typedef struct buftag
        (a).blockNum = InvalidBlockNumber \
 )
 
-#define INIT_BUFFERTAG(a,xx_reln,xx_blockNum) \
+#define INIT_BUFFERTAG(a,xx_rnode,xx_blockNum) \
 ( \
-       (a).rnode = (xx_reln)->rd_node, \
+       (a).rnode = (xx_rnode), \
        (a).blockNum = (xx_blockNum) \
 )
 
@@ -201,7 +202,7 @@ extern int  BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id);
 extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);
 
 /* localbuf.c */
-extern BufferDesc *LocalBufferAlloc(Relation reln, BlockNumber blockNum,
+extern BufferDesc *LocalBufferAlloc(SMgrRelation reln, BlockNumber blockNum,
                                 bool *foundPtr);
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void DropRelFileNodeLocalBuffers(RelFileNode rnode,
index 153f28def477a87df602aba9d0a7d47538eebe47..c860f30b953cfd8055d05f1b31ef48b4f75d61aa 100644 (file)
@@ -145,6 +145,8 @@ extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
 extern Buffer ReadBufferWithStrategy(Relation reln, BlockNumber blockNum,
                                           BufferAccessStrategy strategy);
 extern Buffer ReadOrZeroBuffer(Relation reln, BlockNumber blockNum);
+extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode, bool isTemp,
+                                                        BlockNumber blockNum, bool zeroPage);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);