WAL
authorVadim B. Mikheev <vadim4o@yahoo.com>
Sat, 28 Oct 2000 16:21:00 +0000 (16:21 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Sat, 28 Oct 2000 16:21:00 +0000 (16:21 +0000)
25 files changed:
src/backend/access/transam/transsup.c
src/backend/access/transam/varsup.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogutils.c
src/backend/commands/dbcommands.c
src/backend/commands/vacuum.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/backend/storage/buffer/xlog_bufmgr.c [new file with mode: 0644]
src/backend/storage/buffer/xlog_localbuf.c [new file with mode: 0644]
src/backend/storage/file/fd.c
src/backend/storage/smgr/md.c
src/backend/storage/smgr/smgr.c
src/backend/utils/cache/relcache.c
src/backend/utils/init/postinit.c
src/include/access/transam.h
src/include/access/xact.h
src/include/access/xlog.h
src/include/access/xlogdefs.h [new file with mode: 0644]
src/include/access/xlogutils.h
src/include/storage/buf_internals.h
src/include/storage/bufmgr.h
src/include/storage/bufpage.h
src/include/storage/smgr.h

index d219f8b6841f75bd4d320b901e3a3fcdb9e29914..74e8c39eae085205d46661c9c0331cd9919b73ed 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/transam/Attic/transsup.c,v 1.25 2000/01/26 05:56:04 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/transam/Attic/transsup.c,v 1.26 2000/10/28 16:20:53 vadim Exp $
  *
  * NOTES
  *   This file contains support functions for the high
@@ -186,6 +186,10 @@ TransBlockGetXidStatus(Block tblock,
    bits8       bit2;
    BitIndex    offset;
 
+#ifdef XLOG
+   tblock = (Block) ((char*) tblock + sizeof(XLogRecPtr));
+#endif
+
    /* ----------------
     *  calculate the index into the transaction data where
     *  our transaction status is located
@@ -227,6 +231,10 @@ TransBlockSetXidStatus(Block tblock,
    Index       index;
    BitIndex    offset;
 
+#ifdef XLOG
+   tblock = (Block) ((char*) tblock + sizeof(XLogRecPtr));
+#endif
+
    /* ----------------
     *  calculate the index into the transaction data where
     *  we sould store our transaction status.
index 029da1d72ca8c3316941f94abed17d2f089ac4bc..49c82b55700ac6c4106aaefabf8764bef27a5263 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.29 2000/07/25 20:18:19 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.30 2000/10/28 16:20:53 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -125,7 +125,11 @@ VariableRelationPutNextXid(TransactionId xid)
 
    TransactionIdStore(xid, &(var->nextXidData));
 
+#ifdef XLOG
+   WriteBuffer(buf);   /* temp */
+#else
    FlushBuffer(buf, TRUE);
+#endif
 }
 
 /* --------------------------------
index a0476d97cffeb0236b38ec1821b95e38123c3a2b..6040b262b90c4a1d191e5676008ac1fbea7bc35a 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.77 2000/10/24 20:06:39 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.78 2000/10/28 16:20:53 vadim Exp $
  *
  * NOTES
  *     Transaction aborts can now occur two ways:
 
 extern bool SharedBufferChanged;
 
+void RecordTransactionCommit(void);
+
 static void AbortTransaction(void);
 static void AtAbort_Cache(void);
 static void AtAbort_Locks(void);
@@ -191,7 +193,6 @@ static void AtStart_Memory(void);
 static void CleanupTransaction(void);
 static void CommitTransaction(void);
 static void RecordTransactionAbort(void);
-static void RecordTransactionCommit(void);
 static void StartTransaction(void);
 
 /* ----------------
@@ -220,7 +221,7 @@ int         XactIsoLevel;
 #ifdef XLOG
 #include "access/xlogutils.h"
 
-int            CommitDelay = 100;
+int            CommitDelay = 5;    /* 1/200 sec */
 
 void       xact_redo(XLogRecPtr lsn, XLogRecord *record);
 void       xact_undo(XLogRecPtr lsn, XLogRecord *record);
@@ -658,8 +659,8 @@ AtStart_Memory(void)
  *           -cim 3/18/90
  * --------------------------------
  */
-static void
-RecordTransactionCommit(void)
+void
+RecordTransactionCommit()
 {
    TransactionId xid;
    int         leak;
@@ -683,6 +684,8 @@ RecordTransactionCommit(void)
        struct timeval  delay;
        XLogRecPtr      recptr;
 
+       BufmgrCommit();
+
        xlrec.xtime = time(NULL);
        /*
         * MUST SAVE ARRAY OF RELFILENODE-s TO DROP
index 1343743c0969561b21f189b028585cdfcb33294c..aa952a42ab47eb63757198d4ba5e38ee6405e5da 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.21 2000/10/24 09:56:09 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.22 2000/10/28 16:20:54 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -220,6 +220,8 @@ static uint32 readOff = 0;
 static char readBuf[BLCKSZ];
 static XLogRecord *nextRecord = NULL;
 
+static bool InRedo = false;
+
 XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
 {
@@ -481,6 +483,19 @@ XLogFlush(XLogRecPtr record)
    unsigned    i = 0;
    bool        force_lgwr = false;
 
+   if (XLOG_DEBUG)
+   {
+       fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n",
+           (IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
+           (InRedo) ? "(redo)" : "",
+           record.xlogid, record.xrecoff,
+           LgwrResult.Write.xlogid, LgwrResult.Write.xrecoff,
+           LgwrResult.Flush.xlogid, LgwrResult.Flush.xrecoff);
+       fflush(stderr);
+   }
+
+   if (IsBootstrapProcessingMode() || InRedo)
+       return;
    if (XLByteLE(record, LgwrResult.Flush))
        return;
    WriteRqst = LgwrRqst.Write;
@@ -894,7 +909,7 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
    record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
 
 got_record:;
-   if (record->xl_len == 0 || record->xl_len >
+   if (record->xl_len >
        (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
    {
        elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
@@ -1259,7 +1274,6 @@ StartupXLOG()
                LastRec;
    XLogRecord *record;
    char        buffer[MAXLOGRECSZ + SizeOfXLogRecord];
-   int         recovery = 0;
    bool        sie_saved = false;
 
 #endif
@@ -1380,16 +1394,15 @@ StartupXLOG()
            elog(STOP, "Invalid Redo/Undo record in shutdown checkpoint");
        if (ControlFile->state == DB_SHUTDOWNED)
            elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
-       recovery = 1;
+       InRecovery = true;
    }
    else if (ControlFile->state != DB_SHUTDOWNED)
    {
-       if (checkPoint.Shutdown)
-           elog(STOP, "Invalid state in control file");
-       recovery = 1;
+       InRecovery = true;
    }
 
-   if (recovery)
+   /* REDO */
+   if (InRecovery)
    {
        elog(LOG, "The DataBase system was not properly shut down\n"
             "\tAutomatic recovery is in progress...");
@@ -1401,6 +1414,7 @@ StartupXLOG()
        StopIfError = true;
 
        XLogOpenLogRelation();  /* open pg_log */
+       XLogInitRelationCache();
 
        /* Is REDO required ? */
        if (XLByteLT(checkPoint.redo, RecPtr))
@@ -1409,9 +1423,9 @@ StartupXLOG()
 /* read past CheckPoint record */
            record = ReadRecord(NULL, buffer);
 
-       /* REDO */
        if (record->xl_len != 0)
        {
+           InRedo = true;
            elog(LOG, "Redo starts at (%u, %u)",
                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
            do
@@ -1441,12 +1455,40 @@ StartupXLOG()
            elog(LOG, "Redo done at (%u, %u)",
                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
            LastRec = ReadRecPtr;
+           InRedo = false;
        }
        else
            elog(LOG, "Redo is not required");
+   }
+
+   /* Init xlog buffer cache */
+   record = ReadRecord(&LastRec, buffer);
+   logId = EndRecPtr.xlogid;
+   logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
+   logOff = 0;
+   logFile = XLogFileOpen(logId, logSeg, false);
+   XLogCtl->xlblocks[0].xlogid = logId;
+   XLogCtl->xlblocks[0].xrecoff =
+       ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
+   Insert = &XLogCtl->Insert;
+   memcpy((char *) (Insert->currpage), readBuf, BLCKSZ);
+   Insert->currpos = ((char *) Insert->currpage) +
+       (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+   Insert->PrevRecord = LastRec;
+
+   LgwrRqst.Write = LgwrRqst.Flush =
+   LgwrResult.Write = LgwrResult.Flush = EndRecPtr;
+
+   XLogCtl->Write.LgwrResult = LgwrResult;
+   Insert->LgwrResult = LgwrResult;
+
+   XLogCtl->LgwrRqst = LgwrRqst;
+   XLogCtl->LgwrResult = LgwrResult;
 
 #ifdef NOT_USED
-       /* UNDO */
+   /* UNDO */
+   if (InRecovery)
+   {
        RecPtr = ReadRecPtr;
        if (XLByteLT(checkPoint.undo, RecPtr))
        {
@@ -1465,29 +1507,16 @@ StartupXLOG()
        }
        else
            elog(LOG, "Undo is not required");
-#endif
    }
+#endif
 
-   /* Init xlog buffer cache */
-   record = ReadRecord(&LastRec, buffer);
-   logId = EndRecPtr.xlogid;
-   logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
-   logOff = 0;
-   logFile = XLogFileOpen(logId, logSeg, false);
-   XLogCtl->xlblocks[0].xlogid = logId;
-   XLogCtl->xlblocks[0].xrecoff =
-       ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
-   Insert = &XLogCtl->Insert;
-   memcpy((char *) (Insert->currpage), readBuf, BLCKSZ);
-   Insert->currpos = ((char *) Insert->currpage) +
-       (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
-   Insert->PrevRecord = ControlFile->checkPoint;
-
-   if (recovery)
+   if (InRecovery)
    {
        CreateCheckPoint(true);
        StopIfError = sie_saved;
+       XLogCloseRelationCache();
    }
+   InRecovery = false;
 
 #endif  /* XLOG */
 
index 2800ff0316f43cf33131f0c4a6ef9fb46f29e4ff..3d15033b940298607fefcc7450cbddebf03a6c8c 100644 (file)
@@ -22,6 +22,7 @@
 #include "access/htup.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_database.h"
+#include "lib/hasht.h"
 
 /*
  * ---------------------------------------------------------------
@@ -240,32 +241,10 @@ static int                    _xlcnt = 0;
 #define    _XLOG_INITRELCACHESIZE  32
 #define    _XLOG_MAXRELCACHESIZE   512
 
-void
-XLogCloseRelationCache(void)
-{
-   int i;
-
-   if (!_xlrelarr)
-       return;
-
-   for (i = 1; i < _xlast; i++)
-   {
-       Relation    reln = &(_xlrelarr[i].reldata);
-       if (reln->rd_fd >= 0)
-           smgrclose(DEFAULT_SMGR, reln);
-   }
-
-   free(_xlrelarr);
-   free(_xlpgcarr);
-
-   hash_destroy(_xlrelcache);
-   _xlrelarr = NULL;
-}
-
 static void
 _xl_init_rel_cache(void)
 {
-   HASHCTL ctl;
+   HASHCTL         ctl;
 
    _xlcnt = _XLOG_INITRELCACHESIZE;
    _xlast = 0;
@@ -286,6 +265,35 @@ _xl_init_rel_cache(void)
                                HASH_ELEM | HASH_FUNCTION);
 }
 
+static void
+_xl_remove_hash_entry(XLogRelDesc **edata, int dummy)
+{
+   XLogRelCacheEntry      *hentry;
+   bool                    found;
+   XLogRelDesc            *rdesc = *edata;
+   Form_pg_class           tpgc = rdesc->reldata.rd_rel;
+
+   rdesc->lessRecently->moreRecently = rdesc->moreRecently;
+   rdesc->moreRecently->lessRecently = rdesc->lessRecently;
+
+   hentry = (XLogRelCacheEntry*) hash_search(_xlrelcache, 
+       (char*)&(rdesc->reldata.rd_node), HASH_REMOVE, &found);
+
+   if (hentry == NULL)
+       elog(STOP, "_xl_remove_hash_entry: can't delete from cache");
+   if (!found)
+       elog(STOP, "_xl_remove_hash_entry: file was not found in cache");
+
+   if (rdesc->reldata.rd_fd >= 0)
+       smgrclose(DEFAULT_SMGR, &(rdesc->reldata));
+
+   memset(rdesc, 0, sizeof(XLogRelDesc));
+   memset(tpgc, 0, sizeof(FormData_pg_class));
+   rdesc->reldata.rd_rel = tpgc;
+
+   return;
+}
+
 static XLogRelDesc*
 _xl_new_reldesc(void)
 {
@@ -310,32 +318,41 @@ _xl_new_reldesc(void)
    }
    else /* reuse */
    {
-       XLogRelCacheEntry      *hentry;
-       bool                    found;
-       XLogRelDesc            *res = _xlrelarr[0].moreRecently;
-       Form_pg_class           tpgc = res->reldata.rd_rel;
+       XLogRelDesc    *res = _xlrelarr[0].moreRecently;
 
-       res->lessRecently->moreRecently = res->moreRecently;
-       res->moreRecently->lessRecently = res->lessRecently;
+       _xl_remove_hash_entry(&res, 0);
 
-       hentry = (XLogRelCacheEntry*) hash_search(_xlrelcache, 
-           (char*)&(res->reldata.rd_node), HASH_REMOVE, &found);
+       _xlast--;
+       return(res);
+   }
+}
 
-       if (hentry == NULL)
-           elog(STOP, "XLogOpenRelation: can't delete from cache");
-       if (!found)
-           elog(STOP, "XLogOpenRelation: file was not found in cache");
+extern void CreateDummyCaches(void);
+extern void DestroyDummyCaches(void);
 
-       if (res->reldata.rd_fd >= 0)
-           smgrclose(DEFAULT_SMGR, &(res->reldata));
+void
+XLogInitRelationCache(void)
+{
+   CreateDummyCaches();
+   _xl_init_rel_cache();
+}
 
-       memset(res, 0, sizeof(XLogRelDesc));
-       memset(tpgc, 0, sizeof(FormData_pg_class));
-       res->reldata.rd_rel = tpgc;
+void
+XLogCloseRelationCache(void)
+{
 
-       _xlast--;
-       return(res);
-   }
+   DestroyDummyCaches();
+
+   if (!_xlrelarr)
+       return;
+
+   HashTableWalk(_xlrelcache, (HashtFunc)_xl_remove_hash_entry, 0);
+   hash_destroy(_xlrelcache);
+
+   free(_xlrelarr);
+   free(_xlpgcarr);
+
+   _xlrelarr = NULL;
 }
 
 Relation
@@ -345,9 +362,6 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode)
    XLogRelCacheEntry      *hentry;
    bool                    found;
 
-   if (!_xlrelarr)
-       _xl_init_rel_cache();
-
    hentry = (XLogRelCacheEntry*) 
            hash_search(_xlrelcache, (char*)&rnode, HASH_FIND, &found);
 
index d68033d897526b643cb3207e655523c482a8f74a..802e68670153a870d49be0acd97d278704789d4f 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.62 2000/10/22 17:55:36 pjw Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.63 2000/10/28 16:20:54 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -193,6 +193,9 @@ createdb(const char *dbname, const char *dbpath, int encoding)
            elog(ERROR, "CREATE DATABASE: Could not initialize database directory. Delete failed as well");
    }
 
+#ifdef XLOG
+   BufferSync();
+#endif
 }
 
 
index 0905f60b8077f9b652f85d671973a9814de07d9a..3976cb1ab50cf82ffebe8cfbb9b48f7bea4de0fd 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.170 2000/10/24 09:56:15 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.171 2000/10/28 16:20:54 vadim Exp $
  *
 
  *-------------------------------------------------------------------------
@@ -1787,7 +1787,9 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
 
    if (num_moved > 0)
    {
-
+#ifdef XLOG
+       RecordTransactionCommit();
+#else
        /*
         * We have to commit our tuple' movings before we'll truncate
         * relation, but we shouldn't lose our locks. And so - quick hack:
@@ -1797,6 +1799,7 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
        FlushBufferPool();
        TransactionIdCommit(myXID);
        FlushBufferPool();
+#endif
    }
 
    /*
index c0a320986ce9c670756ad66280d59ad654a705bf..9c9bda5035c6685b88bae9a9edfddd9e19b3f4a8 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.91 2000/10/23 04:10:06 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.92 2000/10/28 16:20:55 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
  *     freelist.c -- chooses victim for buffer replacement
  *     buf_table.c -- manages the buffer lookup table
  */
+
+#ifdef XLOG
+
+#include "xlog_bufmgr.c"
+
+#else
+
 #include <sys/types.h>
 #include <sys/file.h>
 #include <math.h>
@@ -2512,3 +2519,5 @@ MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer))
    SpinRelease(BufMgrLock);
    return;
 }
+
+#endif /* ! XLOG */
index 1d6a416e48e2c95aa2cd9c2512de7dc540dce717..faa3304b4f66d9d5fcc97af2419a955761ca37d5 100644 (file)
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.32 2000/10/23 04:10:06 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.33 2000/10/28 16:20:56 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
+   
+#ifdef XLOG
+
+#include "xlog_localbuf.c"
+
+#else
+
 #include <sys/types.h>
 #include <sys/file.h>
 #include <math.h>
@@ -247,10 +254,11 @@ InitLocalBuffer(void)
 }
 
 /*
- * LocalBufferSync -
- *   flush all dirty buffers in the local buffer cache. Since the buffer
- *   cache is only used for keeping relations visible during a transaction,
- *   we will not need these buffers again.
+ * LocalBufferSync
+ *
+ * Flush all dirty buffers in the local buffer cache at commit time.
+ * Since the buffer cache is only used for keeping relations visible
+ * during a transaction, we will not need these buffers again.
  */
 void
 LocalBufferSync(void)
@@ -303,3 +311,5 @@ ResetLocalBufferPool(void)
    MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
    nextFreeLocalBuf = 0;
 }
+
+#endif /* XLOG */
diff --git a/src/backend/storage/buffer/xlog_bufmgr.c b/src/backend/storage/buffer/xlog_bufmgr.c
new file mode 100644 (file)
index 0000000..dcd377b
--- /dev/null
@@ -0,0 +1,2205 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmgr.c
+ *   buffer manager interface routines
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ *
+ * BufferAlloc() -- lookup a buffer in the buffer table.  If
+ *     it isn't there add it, but do not read data into memory.
+ *     This is used when we are about to reinitialize the
+ *     buffer so don't care what the current disk contents are.
+ *     BufferAlloc() also pins the new buffer in memory.
+ *
+ * ReadBuffer() -- like BufferAlloc() but reads the data
+ *     on a buffer cache miss.
+ *
+ * ReleaseBuffer() -- unpin the buffer
+ *
+ * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty"
+ *     but don't unpin.  The disk IO is delayed until buffer
+ *     replacement.
+ *
+ * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
+ *
+ * BufferSync() -- flush all dirty buffers in the buffer pool.
+ *
+ * InitBufferPool() -- Init the buffer module.
+ *
+ * See other files:
+ *     freelist.c -- chooses victim for buffer replacement
+ *     buf_table.c -- manages the buffer lookup table
+ */
+#include <sys/types.h>
+#include <sys/file.h>
+#include <math.h>
+#include <signal.h>
+
+#include "postgres.h"
+#include "executor/execdebug.h"
+#include "miscadmin.h"
+#include "storage/s_lock.h"
+#include "storage/smgr.h"
+#include "utils/relcache.h"
+
+#ifdef XLOG
+#include "catalog/pg_database.h"
+#endif
+
+#define BufferGetLSN(bufHdr)   \
+   (*((XLogRecPtr*)MAKE_PTR((bufHdr)->data)))
+
+
+extern SPINLOCK BufMgrLock;
+extern long int ReadBufferCount;
+extern long int ReadLocalBufferCount;
+extern long int BufferHitCount;
+extern long int LocalBufferHitCount;
+extern long int BufferFlushCount;
+extern long int LocalBufferFlushCount;
+
+/*
+ * It's used to avoid disk writes for read-only transactions
+ * (i.e. when no one shared buffer was changed by transaction).
+ * We set it to true in WriteBuffer/WriteNoReleaseBuffer when
+ * marking shared buffer as dirty. We set it to false in xact.c
+ * after transaction is committed/aborted.
+ */
+bool       SharedBufferChanged = false;
+
+static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
+static void StartBufferIO(BufferDesc *buf, bool forInput);
+static void TerminateBufferIO(BufferDesc *buf);
+static void ContinueBufferIO(BufferDesc *buf, bool forInput);
+extern void AbortBufferIO(void);
+
+/*
+ * Macro : BUFFER_IS_BROKEN
+ *     Note that write error doesn't mean the buffer broken
+*/
+#define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY))
+
+#ifndef HAS_TEST_AND_SET
+static void SignalIO(BufferDesc *buf);
+extern long *NWaitIOBackendP;  /* defined in buf_init.c */
+
+#endif  /* HAS_TEST_AND_SET */
+
+static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
+                        bool bufferLockHeld);
+static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
+           bool *foundPtr, bool bufferLockHeld);
+static int BufferReplace(BufferDesc *bufHdr);
+void       PrintBufferDescs(void);
+
+/* ---------------------------------------------------
+ * RelationGetBufferWithBuffer
+ *     see if the given buffer is what we want
+ *     if yes, we don't need to bother the buffer manager
+ * ---------------------------------------------------
+ */
+Buffer
+RelationGetBufferWithBuffer(Relation relation,
+                           BlockNumber blockNumber,
+                           Buffer buffer)
+{
+   BufferDesc *bufHdr;
+
+   if (BufferIsValid(buffer))
+   {
+       if (!BufferIsLocal(buffer))
+       {
+           bufHdr = &BufferDescriptors[buffer - 1];
+           SpinAcquire(BufMgrLock);
+           if (bufHdr->tag.blockNum == blockNumber &&
+               RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
+           {
+               SpinRelease(BufMgrLock);
+               return buffer;
+           }
+           return ReadBufferWithBufferLock(relation, blockNumber, true);
+       }
+       else
+       {
+           bufHdr = &LocalBufferDescriptors[-buffer - 1];
+           if (bufHdr->tag.blockNum == blockNumber &&
+               RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
+               return buffer;
+       }
+   }
+   return ReadBuffer(relation, blockNumber);
+}
+
+/*
+ * ReadBuffer -- returns a buffer containing the requested
+ *     block of the requested relation.  If the blknum
+ *     requested is P_NEW, extend the relation file and
+ *     allocate a new block.
+ *
+ * Returns: the buffer number for the buffer containing
+ *     the block read or NULL on an error.
+ *
+ * Assume when this function is called, that reln has been
+ *     opened already.
+ */
+
+#undef ReadBuffer              /* conflicts with macro when BUFMGR_DEBUG
+                                * defined */
+
+/*
+ * ReadBuffer
+ *
+ */
+Buffer
+ReadBuffer(Relation reln, BlockNumber blockNum)
+{
+   return ReadBufferWithBufferLock(reln, blockNum, false);
+}
+
+/*
+ * ReadBufferWithBufferLock -- does the work of
+ *     ReadBuffer() but with the possibility that
+ *     the buffer lock has already been held. this
+ *     is yet another effort to reduce the number of
+ *     semops in the system.
+ */
+static Buffer
+ReadBufferWithBufferLock(Relation reln,
+                        BlockNumber blockNum,
+                        bool bufferLockHeld)
+{
+   BufferDesc *bufHdr;
+   int         extend;         /* extending the file by one block */
+   int         status;
+   bool        found;
+   bool        isLocalBuf;
+
+   extend = (blockNum == P_NEW);
+   isLocalBuf = reln->rd_myxactonly;
+
+   if (isLocalBuf)
+   {
+       ReadLocalBufferCount++;
+       bufHdr = LocalBufferAlloc(reln, blockNum, &found);
+       if (found)
+           LocalBufferHitCount++;
+   }
+   else
+   {
+       ReadBufferCount++;
+
+       /*
+        * lookup the buffer.  IO_IN_PROGRESS is set if the requested
+        * block is not currently in memory.
+        */
+       bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
+       if (found)
+           BufferHitCount++;
+   }
+
+   if (!bufHdr)
+       return InvalidBuffer;
+
+   /* if it's already in the buffer pool, we're done */
+   if (found)
+   {
+
+       /*
+        * This happens when a bogus buffer was returned previously and is
+        * floating around in the buffer pool.  A routine calling this
+        * would want this extended.
+        */
+       if (extend)
+       {
+           /* new buffers are zero-filled */
+           MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
+           smgrextend(DEFAULT_SMGR, reln,
+                      (char *) MAKE_PTR(bufHdr->data));
+       }
+       return BufferDescriptorGetBuffer(bufHdr);
+
+   }
+
+   /*
+    * if we have gotten to this point, the reln pointer must be ok and
+    * the relation file must be open.
+    */
+   if (extend)
+   {
+       /* new buffers are zero-filled */
+       MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
+       status = smgrextend(DEFAULT_SMGR, reln,
+                           (char *) MAKE_PTR(bufHdr->data));
+   }
+   else
+   {
+       status = smgrread(DEFAULT_SMGR, reln, blockNum,
+                         (char *) MAKE_PTR(bufHdr->data));
+   }
+
+   if (isLocalBuf)
+       return BufferDescriptorGetBuffer(bufHdr);
+
+   /* lock buffer manager again to update IO IN PROGRESS */
+   SpinAcquire(BufMgrLock);
+
+   if (status == SM_FAIL)
+   {
+       /* IO Failed.  cleanup the data structures and go home */
+
+       if (!BufTableDelete(bufHdr))
+       {
+           SpinRelease(BufMgrLock);
+           elog(FATAL, "BufRead: buffer table broken after IO error\n");
+       }
+       /* remember that BufferAlloc() pinned the buffer */
+       UnpinBuffer(bufHdr);
+
+       /*
+        * Have to reset the flag so that anyone waiting for the buffer
+        * can tell that the contents are invalid.
+        */
+       bufHdr->flags |= BM_IO_ERROR;
+       bufHdr->flags &= ~BM_IO_IN_PROGRESS;
+   }
+   else
+   {
+       /* IO Succeeded.  clear the flags, finish buffer update */
+
+       bufHdr->flags &= ~(BM_IO_ERROR | BM_IO_IN_PROGRESS);
+   }
+
+   /* If anyone was waiting for IO to complete, wake them up now */
+   TerminateBufferIO(bufHdr);
+
+   SpinRelease(BufMgrLock);
+
+   if (status == SM_FAIL)
+       return InvalidBuffer;
+
+   return BufferDescriptorGetBuffer(bufHdr);
+}
+
+/*
+ * BufferAlloc -- Get a buffer from the buffer pool but dont
+ *     read it.
+ *
+ * Returns: descriptor for buffer
+ *
+ * When this routine returns, the BufMgrLock is guaranteed NOT be held.
+ */
+static BufferDesc *
+BufferAlloc(Relation reln,
+           BlockNumber blockNum,
+           bool *foundPtr,
+           bool bufferLockHeld)
+{
+   BufferDesc *buf,
+              *buf2;
+   BufferTag   newTag;         /* identity of requested block */
+   bool        inProgress;     /* buffer undergoing IO */
+   bool        newblock = FALSE;
+
+   /* create a new tag so we can lookup the buffer */
+   /* assume that the relation is already open */
+   if (blockNum == P_NEW)
+   {
+       newblock = TRUE;
+       blockNum = smgrnblocks(DEFAULT_SMGR, reln);
+   }
+
+   INIT_BUFFERTAG(&newTag, reln, blockNum);
+
+   if (!bufferLockHeld)
+       SpinAcquire(BufMgrLock);
+
+   /* see if the block is in the buffer pool already */
+   buf = BufTableLookup(&newTag);
+   if (buf != NULL)
+   {
+
+       /*
+        * Found it.  Now, (a) pin the buffer so no one steals it from the
+        * buffer pool, (b) check IO_IN_PROGRESS, someone may be faulting
+        * the buffer into the buffer pool.
+        */
+
+       PinBuffer(buf);
+       inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+
+       *foundPtr = TRUE;
+       if (inProgress)         /* confirm end of IO */
+       {
+           WaitIO(buf, BufMgrLock);
+           inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+       }
+       if (BUFFER_IS_BROKEN(buf))
+       {
+
+           /*
+            * I couldn't understand the following old comment. If there's
+            * no IO for the buffer and the buffer is BROKEN,it should be
+            * read again. So start a new buffer IO here.
+            *
+            * wierd race condition:
+            *
+            * We were waiting for someone else to read the buffer. While we
+            * were waiting, the reader boof'd in some way, so the
+            * contents of the buffer are still invalid.  By saying that
+            * we didn't find it, we can make the caller reinitialize the
+            * buffer.  If two processes are waiting for this block, both
+            * will read the block.  The second one to finish may
+            * overwrite any updates made by the first.  (Assume higher
+            * level synchronization prevents this from happening).
+            *
+            * This is never going to happen, don't worry about it.
+            */
+           *foundPtr = FALSE;
+       }
+#ifdef BMTRACE
+       _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCFND);
+#endif  /* BMTRACE */
+
+       if (!(*foundPtr))
+           StartBufferIO(buf, true);
+       SpinRelease(BufMgrLock);
+
+       return buf;
+   }
+
+   *foundPtr = FALSE;
+
+   /*
+    * Didn't find it in the buffer pool.  We'll have to initialize a new
+    * buffer.  First, grab one from the free list.  If it's dirty, flush
+    * it to disk. Remember to unlock BufMgr spinlock while doing the IOs.
+    */
+   inProgress = FALSE;
+   for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
+   {
+       buf = GetFreeBuffer();
+
+       /* GetFreeBuffer will abort if it can't find a free buffer */
+       Assert(buf);
+
+       /*
+        * There should be exactly one pin on the buffer after it is
+        * allocated -- ours.  If it had a pin it wouldn't have been on
+        * the free list.  No one else could have pinned it between
+        * GetFreeBuffer and here because we have the BufMgrLock.
+        */
+       Assert(buf->refcount == 0);
+       buf->refcount = 1;
+       PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;
+
+       if (buf->flags & BM_DIRTY || buf->cntxDirty)
+       {
+           bool        smok;
+
+           /*
+            *  skip write error buffers 
+            */
+           if ((buf->flags & BM_IO_ERROR) != 0)
+           {
+               PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+               buf->refcount--;
+               buf = (BufferDesc *) NULL;
+               continue;
+           }
+           /*
+            * Set BM_IO_IN_PROGRESS to keep anyone from doing anything
+            * with the contents of the buffer while we write it out. We
+            * don't really care if they try to read it, but if they can
+            * complete a BufferAlloc on it they can then scribble into
+            * it, and we'd really like to avoid that while we are
+            * flushing the buffer.  Setting this flag should block them
+            * in WaitIO until we're done.
+            */
+           inProgress = TRUE;
+
+           /*
+            * All code paths that acquire this lock pin the buffer first;
+            * since no one had it pinned (it just came off the free
+            * list), no one else can have this lock.
+            */
+           StartBufferIO(buf, false);
+
+           /*
+            * Write the buffer out, being careful to release BufMgrLock
+            * before starting the I/O.
+            */
+           smok = BufferReplace(buf);
+
+           if (smok == FALSE)
+           {
+               elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
+               buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
+               inProgress = FALSE;
+               buf->flags |= BM_IO_ERROR;
+               buf->flags &= ~BM_IO_IN_PROGRESS;
+               TerminateBufferIO(buf);
+               PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+               Assert(buf->refcount > 0);
+               buf->refcount--;
+               if (buf->refcount == 0)
+               {
+                   AddBufferToFreelist(buf);
+                   buf->flags |= BM_FREE;
+               }
+               buf = (BufferDesc *) NULL;
+           }
+           else
+           {
+               /*
+                * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't
+                * be setted by anyone.     - vadim 01/17/97
+                */
+               if (buf->flags & BM_JUST_DIRTIED)
+               {
+                   elog(STOP, "BufferAlloc: content of block %u (%s) changed while flushing",
+                        buf->tag.blockNum, buf->blind.relname);
+               }
+               else
+                   buf->flags &= ~BM_DIRTY;
+               buf->cntxDirty = false;
+           }
+
+           /*
+            * Somebody could have pinned the buffer while we were doing
+            * the I/O and had given up the BufMgrLock (though they would
+            * be waiting for us to clear the BM_IO_IN_PROGRESS flag).
+            * That's why this is a loop -- if so, we need to clear the
+            * I/O flags, remove our pin and start all over again.
+            *
+            * People may be making buffers free at any time, so there's no
+            * reason to think that we have an immediate disaster on our
+            * hands.
+            */
+           if (buf && buf->refcount > 1)
+           {
+               inProgress = FALSE;
+               buf->flags &= ~BM_IO_IN_PROGRESS;
+               TerminateBufferIO(buf);
+               PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+               buf->refcount--;
+               buf = (BufferDesc *) NULL;
+           }
+
+           /*
+            * Somebody could have allocated another buffer for the same
+            * block we are about to read in. (While we flush out the
+            * dirty buffer, we don't hold the lock and someone could have
+            * allocated another buffer for the same block. The problem is
+            * we haven't gotten around to insert the new tag into the
+            * buffer table. So we need to check here.      -ay 3/95
+            */
+           buf2 = BufTableLookup(&newTag);
+           if (buf2 != NULL)
+           {
+
+               /*
+                * Found it. Someone has already done what we're about to
+                * do. We'll just handle this as if it were found in the
+                * buffer pool in the first place.
+                */
+               if (buf != NULL)
+               {
+                   buf->flags &= ~BM_IO_IN_PROGRESS;
+                   TerminateBufferIO(buf);
+                   /* give up the buffer since we don't need it any more */
+                   PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+                   Assert(buf->refcount > 0);
+                   buf->refcount--;
+                   if (buf->refcount == 0)
+                   {
+                       AddBufferToFreelist(buf);
+                       buf->flags |= BM_FREE;
+                   }
+               }
+
+               PinBuffer(buf2);
+               inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
+
+               *foundPtr = TRUE;
+               if (inProgress)
+               {
+                   WaitIO(buf2, BufMgrLock);
+                   inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
+               }
+               if (BUFFER_IS_BROKEN(buf2))
+                   *foundPtr = FALSE;
+
+               if (!(*foundPtr))
+                   StartBufferIO(buf2, true);
+               SpinRelease(BufMgrLock);
+
+               return buf2;
+           }
+       }
+   }
+
+   /*
+    * At this point we should have the sole pin on a non-dirty buffer and
+    * we may or may not already have the BM_IO_IN_PROGRESS flag set.
+    */
+
+   /*
+    * Change the name of the buffer in the lookup table:
+    *
+    * Need to update the lookup table before the read starts. If someone
+    * comes along looking for the buffer while we are reading it in, we
+    * don't want them to allocate a new buffer.  For the same reason, we
+    * didn't want to erase the buf table entry for the buffer we were
+    * writing back until now, either.
+    */
+
+   if (!BufTableDelete(buf))
+   {
+       SpinRelease(BufMgrLock);
+       elog(FATAL, "buffer wasn't in the buffer table\n");
+   }
+
+   /* record the database name and relation name for this buffer */
+   strcpy(buf->blind.dbname, (DatabaseName) ? DatabaseName : "Recovery");
+   strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln));
+
+   INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
+   if (!BufTableInsert(buf))
+   {
+       SpinRelease(BufMgrLock);
+       elog(FATAL, "Buffer in lookup table twice \n");
+   }
+
+   /*
+    * Buffer contents are currently invalid.  Have to mark IO IN PROGRESS
+    * so no one fiddles with them until the read completes.  If this
+    * routine has been called simply to allocate a buffer, no io will be
+    * attempted, so the flag isnt set.
+    */
+   if (!inProgress)
+       StartBufferIO(buf, true);
+   else
+       ContinueBufferIO(buf, true);
+
+#ifdef BMTRACE
+   _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);
+#endif  /* BMTRACE */
+
+   SpinRelease(BufMgrLock);
+
+   return buf;
+}
+
+/*
+ * WriteBuffer
+ *
+ *     Marks buffer contents as dirty (actual write happens later).
+ *
+ * Assume that buffer is pinned.  Assume that reln is
+ *     valid.
+ *
+ * Side Effects:
+ *     Pin count is decremented.
+ */
+
+#undef WriteBuffer
+
+int
+WriteBuffer(Buffer buffer)
+{
+   BufferDesc *bufHdr;
+
+   if (BufferIsLocal(buffer))
+       return WriteLocalBuffer(buffer, TRUE);
+
+   if (BAD_BUFFER_ID(buffer))
+       return FALSE;
+
+   bufHdr = &BufferDescriptors[buffer - 1];
+
+   SharedBufferChanged = true;
+
+   SpinAcquire(BufMgrLock);
+   Assert(bufHdr->refcount > 0);
+
+   bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+   UnpinBuffer(bufHdr);
+   SpinRelease(BufMgrLock);
+
+   return TRUE;
+}
+
+/*
+ * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer
+ *                        when the operation is complete.
+ */
+int
+WriteNoReleaseBuffer(Buffer buffer)
+{
+   BufferDesc *bufHdr;
+
+   if (BufferIsLocal(buffer))
+       return WriteLocalBuffer(buffer, FALSE);
+
+   if (BAD_BUFFER_ID(buffer))
+       return STATUS_ERROR;
+
+   bufHdr = &BufferDescriptors[buffer - 1];
+
+   SharedBufferChanged = true;
+
+   SpinAcquire(BufMgrLock);
+   Assert(bufHdr->refcount > 0);
+
+   bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+   SpinRelease(BufMgrLock);
+
+   return STATUS_OK;
+}
+
+
+#undef ReleaseAndReadBuffer
+/*
+ * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
+ *     so that only one semop needs to be called.
+ *
+ */
+Buffer
+ReleaseAndReadBuffer(Buffer buffer,
+                    Relation relation,
+                    BlockNumber blockNum)
+{
+   BufferDesc *bufHdr;
+   Buffer      retbuf;
+
+   if (BufferIsLocal(buffer))
+   {
+       Assert(LocalRefCount[-buffer - 1] > 0);
+       LocalRefCount[-buffer - 1]--;
+   }
+   else
+   {
+       if (BufferIsValid(buffer))
+       {
+           bufHdr = &BufferDescriptors[buffer - 1];
+           Assert(PrivateRefCount[buffer - 1] > 0);
+           PrivateRefCount[buffer - 1]--;
+           if (PrivateRefCount[buffer - 1] == 0)
+           {
+               SpinAcquire(BufMgrLock);
+               Assert(bufHdr->refcount > 0);
+               bufHdr->refcount--;
+               if (bufHdr->refcount == 0)
+               {
+                   AddBufferToFreelist(bufHdr);
+                   bufHdr->flags |= BM_FREE;
+               }
+               retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
+               return retbuf;
+           }
+       }
+   }
+
+   return ReadBuffer(relation, blockNum);
+}
+
+/*
+ * BufferSync -- Write all dirty buffers in the pool.
+ *
+ * This is called at checkpoint time and write out all dirty buffers.
+ */
+void
+BufferSync()
+{
+   int         i;
+   BufferDesc *bufHdr;
+   Buffer      buffer;
+   int         status;
+   RelFileNode rnode;
+   XLogRecPtr  recptr;
+   Relation    reln = NULL;
+
+   for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
+   {
+
+       SpinAcquire(BufMgrLock);
+
+       if (!(bufHdr->flags & BM_VALID))
+       {
+           SpinRelease(BufMgrLock);
+           continue;
+       }
+
+       /*
+        * Pin buffer and ensure that no one reads it from disk
+        */
+       PinBuffer(bufHdr);
+       /* Synchronize with BufferAlloc */
+       if (bufHdr->flags & BM_IO_IN_PROGRESS)
+           WaitIO(bufHdr, BufMgrLock);
+
+       buffer = BufferDescriptorGetBuffer(bufHdr);
+       rnode = bufHdr->tag.rnode;
+
+       SpinRelease(BufMgrLock);
+
+       /*
+        * Try to find relation for buffer
+        */
+       reln = RelationNodeCacheGetRelation(rnode);
+
+       /*
+        * Protect buffer content against concurrent update
+        */
+       LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+       /*
+        * Force XLOG flush for buffer' LSN
+        */
+       recptr = BufferGetLSN(bufHdr);
+       XLogFlush(recptr);
+
+       /*
+        * Now it's safe to write buffer to disk
+        * (if needed at all -:))
+        */
+
+       SpinAcquire(BufMgrLock);
+       if (bufHdr->flags & BM_IO_IN_PROGRESS)
+           WaitIO(bufHdr, BufMgrLock);
+
+       if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+       {
+           bufHdr->flags &= ~BM_JUST_DIRTIED;
+           StartBufferIO(bufHdr, false);       /* output IO start */
+
+           SpinRelease(BufMgrLock);
+
+           if (reln == (Relation) NULL)
+           {
+               status = smgrblindwrt(DEFAULT_SMGR,
+                                   bufHdr->tag.rnode,
+                                   bufHdr->tag.blockNum,
+                                   (char *) MAKE_PTR(bufHdr->data),
+                                   true);  /* must fsync */
+           }
+           else
+           {
+               status = smgrwrite(DEFAULT_SMGR, reln,
+                               bufHdr->tag.blockNum,
+                               (char *) MAKE_PTR(bufHdr->data));
+           }
+
+           if (status == SM_FAIL)  /* disk failure ?! */
+               elog(STOP, "BufferSync: cannot write %u for %s",
+                    bufHdr->tag.blockNum, bufHdr->blind.relname);
+
+           /*
+            * Note that it's safe to change cntxDirty here because of
+            * we protect it from upper writers by share lock and from
+            * other bufmgr routines by BM_IO_IN_PROGRESS
+            */
+           bufHdr->cntxDirty = false;
+
+           /*
+            * Release the per-buffer readlock, reacquire BufMgrLock.
+            */
+           LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+           BufferFlushCount++;
+
+           SpinAcquire(BufMgrLock);
+
+           bufHdr->flags &= ~BM_IO_IN_PROGRESS;    /* mark IO finished */
+           TerminateBufferIO(bufHdr);              /* Sync IO finished */
+
+           /*
+            * If this buffer was marked by someone as DIRTY while
+            * we were flushing it out we must not clear DIRTY
+            * flag - vadim 01/17/97
+            */
+           if (!(bufHdr->flags & BM_JUST_DIRTIED))
+               bufHdr->flags &= ~BM_DIRTY;
+       }
+       else
+           LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+       UnpinBuffer(bufHdr);
+
+       SpinRelease(BufMgrLock);
+
+       /* drop refcnt obtained by RelationIdCacheGetRelation */
+       if (reln != (Relation) NULL)
+       {
+           RelationDecrementReferenceCount(reln);
+           reln = NULL;
+       }
+   }
+
+}
+
+/*
+ * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
+ *
+ * Should be entered with buffer manager spinlock held; releases it before
+ * waiting and re-acquires it afterwards.
+ *
+ * OLD NOTES:
+ *     Because IO_IN_PROGRESS conflicts are
+ *     expected to be rare, there is only one BufferIO
+ *     lock in the entire system.  All processes block
+ *     on this semaphore when they try to use a buffer
+ *     that someone else is faulting in.  Whenever a
+ *     process finishes an IO and someone is waiting for
+ *     the buffer, BufferIO is signaled (SignalIO).  All
+ *     waiting processes then wake up and check to see
+ *     if their buffer is now ready.  This implementation
+ *     is simple, but efficient enough if WaitIO is
+ *     rarely called by multiple processes simultaneously.
+ *
+ * NEW NOTES:
+ *     The above is true only on machines without test-and-set
+ *     semaphores (which we hope are few, these days).  On better
+ *     hardware, each buffer has a spinlock that we can wait on.
+ */
+#ifdef HAS_TEST_AND_SET
+
+static void
+WaitIO(BufferDesc *buf, SPINLOCK spinlock)
+{
+
+   /*
+    * Changed to wait until there's no IO - Inoue 01/13/2000
+    */
+   while ((buf->flags & BM_IO_IN_PROGRESS) != 0)
+   {
+       SpinRelease(spinlock);
+       S_LOCK(&(buf->io_in_progress_lock));
+       S_UNLOCK(&(buf->io_in_progress_lock));
+       SpinAcquire(spinlock);
+   }
+}
+
+#else                          /* !HAS_TEST_AND_SET */
+
+IpcSemaphoreId WaitIOSemId;
+IpcSemaphoreId WaitCLSemId;
+
+static void
+WaitIO(BufferDesc *buf, SPINLOCK spinlock)
+{
+   bool        inProgress;
+
+   for (;;)
+   {
+
+       /* wait until someone releases IO lock */
+       (*NWaitIOBackendP)++;
+       SpinRelease(spinlock);
+       IpcSemaphoreLock(WaitIOSemId, 0, 1);
+       SpinAcquire(spinlock);
+       inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+       if (!inProgress)
+           break;
+   }
+}
+
+/*
+ * SignalIO
+ */
+static void
+SignalIO(BufferDesc *buf)
+{
+   /* somebody better be waiting. */
+   Assert(buf->refcount > 1);
+   IpcSemaphoreUnlock(WaitIOSemId, 0, *NWaitIOBackendP);
+   *NWaitIOBackendP = 0;
+}
+
+#endif  /* HAS_TEST_AND_SET */
+
+long       NDirectFileRead;    /* some I/O's are direct file access.
+                                * bypass bufmgr */
+long       NDirectFileWrite;   /* e.g., I/O in psort and hashjoin.                 */
+
+void
+PrintBufferUsage(FILE *statfp)
+{
+   float       hitrate;
+   float       localhitrate;
+
+   if (ReadBufferCount == 0)
+       hitrate = 0.0;
+   else
+       hitrate = (float) BufferHitCount *100.0 / ReadBufferCount;
+
+   if (ReadLocalBufferCount == 0)
+       localhitrate = 0.0;
+   else
+       localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount;
+
+   fprintf(statfp, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
+           ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate);
+   fprintf(statfp, "!\tLocal  blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
+           ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate);
+   fprintf(statfp, "!\tDirect blocks: %10ld read, %10ld written\n",
+           NDirectFileRead, NDirectFileWrite);
+}
+
+void
+ResetBufferUsage()
+{
+   BufferHitCount = 0;
+   ReadBufferCount = 0;
+   BufferFlushCount = 0;
+   LocalBufferHitCount = 0;
+   ReadLocalBufferCount = 0;
+   LocalBufferFlushCount = 0;
+   NDirectFileRead = 0;
+   NDirectFileWrite = 0;
+}
+
+/* ----------------------------------------------
+ *     ResetBufferPool
+ *
+ *     This routine is supposed to be called when a transaction aborts.
+ *     it will release all the buffer pins held by the transaction.
+ *     Currently, we also call it during commit if BufferPoolCheckLeak
+ *     detected a problem --- in that case, isCommit is TRUE, and we
+ *     only clean up buffer pin counts.
+ *
+ * During abort, we also forget any pending fsync requests.  Dirtied buffers
+ * will still get written, eventually, but there will be no fsync for them.
+ *
+ * ----------------------------------------------
+ */
+void
+ResetBufferPool(bool isCommit)
+{
+   int         i;
+
+   for (i = 0; i < NBuffers; i++)
+   {
+       if (PrivateRefCount[i] != 0)
+       {
+           BufferDesc *buf = &BufferDescriptors[i];
+
+           SpinAcquire(BufMgrLock);
+           Assert(buf->refcount > 0);
+           buf->refcount--;
+           if (buf->refcount == 0)
+           {
+               AddBufferToFreelist(buf);
+               buf->flags |= BM_FREE;
+           }
+           SpinRelease(BufMgrLock);
+       }
+       PrivateRefCount[i] = 0;
+   }
+
+   ResetLocalBufferPool();
+
+   if (!isCommit)
+       smgrabort();
+}
+
+/* -----------------------------------------------
+ *     BufferPoolCheckLeak
+ *
+ *     check if there is buffer leak
+ *
+ * -----------------------------------------------
+ */
+int
+BufferPoolCheckLeak()
+{
+   int         i;
+   int         result = 0;
+
+   for (i = 1; i <= NBuffers; i++)
+   {
+       if (PrivateRefCount[i - 1] != 0)
+       {
+           BufferDesc *buf = &(BufferDescriptors[i - 1]);
+
+           elog(NOTICE,
+                "Buffer Leak: [%03d] (freeNext=%ld, freePrev=%ld, \
+relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)",
+                i - 1, buf->freeNext, buf->freePrev,
+                buf->blind.relname, buf->tag.blockNum, buf->flags,
+                buf->refcount, PrivateRefCount[i - 1]);
+           result = 1;
+       }
+   }
+   return result;
+}
+
+/* ------------------------------------------------
+ * FlushBufferPool
+ *
+ * Flush all dirty blocks in buffer pool to disk
+ * at the checkpoint time
+ * ------------------------------------------------
+ */
+void
+FlushBufferPool(void)
+{
+   BufferSync();
+   smgrsync();
+}
+
+/*
+ * At the commit time we have to flush local buffer pool only
+ */
+void
+BufmgrCommit(void)
+{
+   LocalBufferSync();
+   smgrcommit();
+}
+
+/*
+ * BufferGetBlockNumber
+ *     Returns the block number associated with a buffer.
+ *
+ * Note:
+ *     Assumes that the buffer is valid.
+ */
+BlockNumber
+BufferGetBlockNumber(Buffer buffer)
+{
+   Assert(BufferIsValid(buffer));
+
+   /* XXX should be a critical section */
+   if (BufferIsLocal(buffer))
+       return LocalBufferDescriptors[-buffer - 1].tag.blockNum;
+   else
+       return BufferDescriptors[buffer - 1].tag.blockNum;
+}
+
+/*
+ * BufferReplace
+ *
+ * Write out the buffer corresponding to 'bufHdr'
+ *
+ * BufMgrLock must be held at entry, and the buffer must be pinned.
+ */
+static int
+BufferReplace(BufferDesc *bufHdr)
+{
+   Relation    reln;
+   XLogRecPtr  recptr;
+   int         status;
+
+   /* To check if block content changed while flushing. - vadim 01/17/97 */
+   bufHdr->flags &= ~BM_JUST_DIRTIED;
+
+   SpinRelease(BufMgrLock);
+
+   /*
+    * No need to lock buffer context - no one should be able to
+    * end ReadBuffer
+    */
+   recptr = BufferGetLSN(bufHdr);
+   XLogFlush(recptr);
+
+   reln = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
+
+   if (reln != (Relation) NULL)
+   {
+       status = smgrwrite(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+                          (char *) MAKE_PTR(bufHdr->data));
+   }
+   else
+   {
+       status = smgrblindwrt(DEFAULT_SMGR, bufHdr->tag.rnode,
+                             bufHdr->tag.blockNum,
+                             (char *) MAKE_PTR(bufHdr->data),
+                             false);   /* no fsync */
+   }
+
+   /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+   if (reln != (Relation) NULL)
+       RelationDecrementReferenceCount(reln);
+
+   SpinAcquire(BufMgrLock);
+
+   if (status == SM_FAIL)
+       return FALSE;
+
+   BufferFlushCount++;
+
+   return TRUE;
+}
+
+/*
+ * RelationGetNumberOfBlocks
+ *     Returns the buffer descriptor associated with a page in a relation.
+ *
+ * Note:
+ *     XXX may fail for huge relations.
+ *     XXX should be elsewhere.
+ *     XXX maybe should be hidden
+ */
+BlockNumber
+RelationGetNumberOfBlocks(Relation relation)
+{
+   return ((relation->rd_myxactonly) ? relation->rd_nblocks :
+           smgrnblocks(DEFAULT_SMGR, relation));
+}
+
+/* ---------------------------------------------------------------------
+ *     ReleaseRelationBuffers
+ *
+ *     This function removes all the buffered pages for a relation
+ *     from the buffer pool.  Dirty pages are simply dropped, without
+ *     bothering to write them out first.  This is used when the
+ *     relation is about to be deleted.  We assume that the caller
+ *     holds an exclusive lock on the relation, which should assure
+ *     that no new buffers will be acquired for the rel meanwhile.
+ *
+ *     XXX currently it sequentially searches the buffer pool, should be
+ *     changed to more clever ways of searching.
+ * --------------------------------------------------------------------
+ */
+void
+ReleaseRelationBuffers(Relation rel)
+{
+   int         i;
+   BufferDesc *bufHdr;
+
+   if (rel->rd_myxactonly)
+   {
+       for (i = 0; i < NLocBuffer; i++)
+       {
+           bufHdr = &LocalBufferDescriptors[i];
+           if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+           {
+               bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+               bufHdr->cntxDirty = false;
+               LocalRefCount[i] = 0;
+               bufHdr->tag.rnode.relNode = InvalidOid;
+           }
+       }
+       return;
+   }
+
+   SpinAcquire(BufMgrLock);
+   for (i = 1; i <= NBuffers; i++)
+   {
+       bufHdr = &BufferDescriptors[i - 1];
+recheck:
+       if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+       {
+
+           /*
+            * If there is I/O in progress, better wait till it's done;
+            * don't want to delete the relation out from under someone
+            * who's just trying to flush the buffer!
+            */
+           if (bufHdr->flags & BM_IO_IN_PROGRESS)
+           {
+               WaitIO(bufHdr, BufMgrLock);
+
+               /*
+                * By now, the buffer very possibly belongs to some other
+                * rel, so check again before proceeding.
+                */
+               goto recheck;
+           }
+           /* Now we can do what we came for */
+           bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+           bufHdr->cntxDirty = false;
+
+           /*
+            * Release any refcount we may have.
+            *
+            * This is very probably dead code, and if it isn't then it's
+            * probably wrong.  I added the Assert to find out --- tgl
+            * 11/99.
+            */
+           if (!(bufHdr->flags & BM_FREE))
+           {
+               /* Assert checks that buffer will actually get freed! */
+               Assert(PrivateRefCount[i - 1] == 1 &&
+                      bufHdr->refcount == 1);
+               /* ReleaseBuffer expects we do not hold the lock at entry */
+               SpinRelease(BufMgrLock);
+               ReleaseBuffer(i);
+               SpinAcquire(BufMgrLock);
+           }
+           /*
+            * And mark the buffer as no longer occupied by this rel.
+            */
+           BufTableDelete(bufHdr);
+       }
+   }
+
+   SpinRelease(BufMgrLock);
+}
+
+/* ---------------------------------------------------------------------
+ *     DropBuffers
+ *
+ *     This function removes all the buffers in the buffer cache for a
+ *     particular database.  Dirty pages are simply dropped, without
+ *     bothering to write them out first.  This is used when we destroy a
+ *     database, to avoid trying to flush data to disk when the directory
+ *     tree no longer exists.  Implementation is pretty similar to
+ *     ReleaseRelationBuffers() which is for destroying just one relation.
+ * --------------------------------------------------------------------
+ */
+void
+DropBuffers(Oid dbid)
+{
+   int         i;
+   BufferDesc *bufHdr;
+
+   SpinAcquire(BufMgrLock);
+   for (i = 1; i <= NBuffers; i++)
+   {
+       bufHdr = &BufferDescriptors[i - 1];
+recheck:
+       /*
+        * We know that currently database OID is tblNode but
+        * this probably will be changed in future and this
+        * func will be used to drop tablespace buffers.
+        */
+       if (bufHdr->tag.rnode.tblNode == dbid)
+       {
+
+           /*
+            * If there is I/O in progress, better wait till it's done;
+            * don't want to delete the database out from under someone
+            * who's just trying to flush the buffer!
+            */
+           if (bufHdr->flags & BM_IO_IN_PROGRESS)
+           {
+               WaitIO(bufHdr, BufMgrLock);
+
+               /*
+                * By now, the buffer very possibly belongs to some other
+                * DB, so check again before proceeding.
+                */
+               goto recheck;
+           }
+           /* Now we can do what we came for */
+           bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+           bufHdr->cntxDirty = false;
+
+           /*
+            * The thing should be free, if caller has checked that no
+            * backends are running in that database.
+            */
+           Assert(bufHdr->flags & BM_FREE);
+           /*
+            * And mark the buffer as no longer occupied by this page.
+            */
+           BufTableDelete(bufHdr);
+       }
+   }
+   SpinRelease(BufMgrLock);
+}
+
+/* -----------------------------------------------------------------
+ *     PrintBufferDescs
+ *
+ *     this function prints all the buffer descriptors, for debugging
+ *     use only.
+ * -----------------------------------------------------------------
+ */
+void
+PrintBufferDescs()
+{
+   int         i;
+   BufferDesc *buf = BufferDescriptors;
+
+   if (IsUnderPostmaster)
+   {
+       SpinAcquire(BufMgrLock);
+       for (i = 0; i < NBuffers; ++i, ++buf)
+       {
+           elog(DEBUG, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
+blockNum=%d, flags=0x%x, refcount=%d %ld)",
+                i, buf->freeNext, buf->freePrev,
+                buf->blind.relname, buf->tag.blockNum, buf->flags,
+                buf->refcount, PrivateRefCount[i]);
+       }
+       SpinRelease(BufMgrLock);
+   }
+   else
+   {
+       /* interactive backend */
+       for (i = 0; i < NBuffers; ++i, ++buf)
+       {
+           printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
+                   i, buf->blind.relname, buf->tag.blockNum,
+                   buf->flags, buf->refcount, PrivateRefCount[i]);
+       }
+   }
+}
+
+void
+PrintPinnedBufs()
+{
+   int         i;
+   BufferDesc *buf = BufferDescriptors;
+
+   SpinAcquire(BufMgrLock);
+   for (i = 0; i < NBuffers; ++i, ++buf)
+   {
+       if (PrivateRefCount[i] > 0)
+           elog(NOTICE, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
+blockNum=%d, flags=0x%x, refcount=%d %ld)\n",
+                i, buf->freeNext, buf->freePrev, buf->blind.relname,
+                buf->tag.blockNum, buf->flags,
+                buf->refcount, PrivateRefCount[i]);
+   }
+   SpinRelease(BufMgrLock);
+}
+
+/*
+ * BufferPoolBlowaway
+ *
+ * this routine is solely for the purpose of experiments -- sometimes
+ * you may want to blowaway whatever is left from the past in buffer
+ * pool and start measuring some performance with a clean empty buffer
+ * pool.
+ */
+#ifdef NOT_USED
+void
+BufferPoolBlowaway()
+{
+   int         i;
+
+   BufferSync();
+   for (i = 1; i <= NBuffers; i++)
+   {
+       if (BufferIsValid(i))
+       {
+           while (BufferIsValid(i))
+               ReleaseBuffer(i);
+       }
+       BufTableDelete(&BufferDescriptors[i - 1]);
+   }
+}
+
+#endif
+
+/* ---------------------------------------------------------------------
+ *     FlushRelationBuffers
+ *
+ *     This function flushes all dirty pages of a relation out to disk.
+ *     Furthermore, pages that have blocknumber >= firstDelBlock are
+ *     actually removed from the buffer pool.  An error code is returned
+ *     if we fail to dump a dirty buffer or if we find one of
+ *     the target pages is pinned into the cache.
+ *
+ *     This is used by VACUUM before truncating the relation to the given
+ *     number of blocks.  (TRUNCATE TABLE also uses it in the same way.)
+ *     It might seem unnecessary to flush dirty pages before firstDelBlock,
+ *     since VACUUM should already have committed its changes.  However,
+ *     it is possible for there still to be dirty pages: if some page
+ *     had unwritten on-row tuple status updates from a prior transaction,
+ *     and VACUUM had no additional changes to make to that page, then
+ *     VACUUM won't have written it.  This is harmless in most cases but
+ *     will break pg_upgrade, which relies on VACUUM to ensure that *all*
+ *     tuples have correct on-row status.  So, we check and flush all
+ *     dirty pages of the rel regardless of block number.
+ *
+ *     This is also used by RENAME TABLE (with firstDelBlock = 0)
+ *     to clear out the buffer cache before renaming the physical files of
+ *     a relation.  Without that, some other backend might try to do a
+ *     blind write of a buffer page (relying on the BlindId of the buffer)
+ *     and fail because it's not got the right filename anymore.
+ *
+ *     In all cases, the caller should be holding AccessExclusiveLock on
+ *     the target relation to ensure that no other backend is busy reading
+ *     more blocks of the relation.
+ *
+ *     Formerly, we considered it an error condition if we found dirty
+ *     buffers here.   However, since BufferSync no longer forces out all
+ *     dirty buffers at every xact commit, it's possible for dirty buffers
+ *     to still be present in the cache due to failure of an earlier
+ *     transaction.  So, must flush dirty buffers without complaint.
+ *
+ *     Returns: 0 - Ok, -1 - FAILED TO WRITE DIRTY BUFFER, -2 - PINNED
+ *
+ *     XXX currently it sequentially searches the buffer pool, should be
+ *     changed to more clever ways of searching.
+ * --------------------------------------------------------------------
+ */
+int
+FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
+{
+   int         i;
+   BufferDesc *bufHdr;
+   XLogRecPtr  recptr;
+   int         status;
+
+   if (rel->rd_myxactonly)
+   {
+       for (i = 0; i < NLocBuffer; i++)
+       {
+           bufHdr = &LocalBufferDescriptors[i];
+           if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+           {
+               if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+               {
+                   status = smgrwrite(DEFAULT_SMGR, rel, 
+                               bufHdr->tag.blockNum,
+                               (char *) MAKE_PTR(bufHdr->data));
+                   if (status == SM_FAIL)
+                   {
+                       elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
+                            RelationGetRelationName(rel), firstDelBlock,
+                            bufHdr->tag.blockNum);
+                       return(-1);
+                   }
+                   bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+                   bufHdr->cntxDirty = false;
+               }
+               if (LocalRefCount[i] > 0)
+               {
+                   elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is referenced (%ld)",
+                        RelationGetRelationName(rel), firstDelBlock,
+                        bufHdr->tag.blockNum, LocalRefCount[i]);
+                   return(-2);
+               }
+               if (bufHdr->tag.blockNum >= firstDelBlock)
+               {
+                   bufHdr->tag.rnode.relNode = InvalidOid;
+               }
+           }
+       }
+       return 0;
+   }
+
+   SpinAcquire(BufMgrLock);
+   for (i = 0; i < NBuffers; i++)
+   {
+       bufHdr = &BufferDescriptors[i];
+       if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+       {
+           if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+           {
+               PinBuffer(bufHdr);
+               if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                   WaitIO(bufHdr, BufMgrLock);
+               SpinRelease(BufMgrLock);
+
+               /*
+                * Force XLOG flush for buffer' LSN
+                */
+               recptr = BufferGetLSN(bufHdr);
+               XLogFlush(recptr);
+
+               /*
+                * Now it's safe to write buffer to disk
+                */
+
+               SpinAcquire(BufMgrLock);
+               if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                   WaitIO(bufHdr, BufMgrLock);
+
+               if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+               {
+                   bufHdr->flags &= ~BM_JUST_DIRTIED;
+                   StartBufferIO(bufHdr, false);       /* output IO start */
+
+                   SpinRelease(BufMgrLock);
+
+                   status = smgrwrite(DEFAULT_SMGR, rel,
+                                   bufHdr->tag.blockNum,
+                                   (char *) MAKE_PTR(bufHdr->data));
+
+                   if (status == SM_FAIL)  /* disk failure ?! */
+                       elog(STOP, "FlushRelationBuffers: cannot write %u for %s",
+                            bufHdr->tag.blockNum, bufHdr->blind.relname);
+
+                   BufferFlushCount++;
+
+                   SpinAcquire(BufMgrLock);
+                   bufHdr->flags &= ~BM_IO_IN_PROGRESS;
+                   TerminateBufferIO(bufHdr);
+                   Assert(!(bufHdr->flags & BM_JUST_DIRTIED));
+                   bufHdr->flags &= ~BM_DIRTY;
+                   /*
+                    * Note that it's safe to change cntxDirty here because
+                    * of we protect it from upper writers by
+                    * AccessExclusiveLock and from other bufmgr routines
+                    * by BM_IO_IN_PROGRESS
+                    */
+                   bufHdr->cntxDirty = false;
+               }
+               UnpinBuffer(bufHdr);
+           }
+           if (!(bufHdr->flags & BM_FREE))
+           {
+               SpinRelease(BufMgrLock);
+               elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %ld, global %d)",
+                    RelationGetRelationName(rel), firstDelBlock,
+                    bufHdr->tag.blockNum,
+                    PrivateRefCount[i], bufHdr->refcount);
+               return -2;
+           }
+           if (bufHdr->tag.blockNum >= firstDelBlock)
+           {
+               BufTableDelete(bufHdr);
+           }
+       }
+   }
+   SpinRelease(BufMgrLock);
+   return 0;
+}
+
+#undef ReleaseBuffer
+
+/*
+ * ReleaseBuffer -- remove the pin on a buffer without
+ *     marking it dirty.
+ *
+ */
+int
+ReleaseBuffer(Buffer buffer)
+{
+   BufferDesc *bufHdr;
+
+   if (BufferIsLocal(buffer))
+   {
+       Assert(LocalRefCount[-buffer - 1] > 0);
+       LocalRefCount[-buffer - 1]--;
+       return STATUS_OK;
+   }
+
+   if (BAD_BUFFER_ID(buffer))
+       return STATUS_ERROR;
+
+   bufHdr = &BufferDescriptors[buffer - 1];
+
+   Assert(PrivateRefCount[buffer - 1] > 0);
+   PrivateRefCount[buffer - 1]--;
+   if (PrivateRefCount[buffer - 1] == 0)
+   {
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+       bufHdr->refcount--;
+       if (bufHdr->refcount == 0)
+       {
+           AddBufferToFreelist(bufHdr);
+           bufHdr->flags |= BM_FREE;
+       }
+       SpinRelease(BufMgrLock);
+   }
+
+   return STATUS_OK;
+}
+
+#ifdef NOT_USED
+void
+IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
+{
+   IncrBufferRefCount(buffer);
+   if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
+   {
+       BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+       fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+               buffer, buf->blind.relname, buf->tag.blockNum,
+               PrivateRefCount[buffer - 1], file, line);
+   }
+}
+
+#endif
+
+#ifdef NOT_USED
+void
+ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
+{
+   ReleaseBuffer(buffer);
+   if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
+   {
+       BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+       fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+               buffer, buf->blind.relname, buf->tag.blockNum,
+               PrivateRefCount[buffer - 1], file, line);
+   }
+}
+
+#endif
+
+#ifdef NOT_USED
+int
+ReleaseAndReadBuffer_Debug(char *file,
+                          int line,
+                          Buffer buffer,
+                          Relation relation,
+                          BlockNumber blockNum)
+{
+   bool        bufferValid;
+   Buffer      b;
+
+   bufferValid = BufferIsValid(buffer);
+   b = ReleaseAndReadBuffer(buffer, relation, blockNum);
+   if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
+       && is_userbuffer(buffer))
+   {
+       BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+       fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+               buffer, buf->blind.relname, buf->tag.blockNum,
+               PrivateRefCount[buffer - 1], file, line);
+   }
+   if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
+   {
+       BufferDesc *buf = &BufferDescriptors[b - 1];
+
+       fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+               b, buf->blind.relname, buf->tag.blockNum,
+               PrivateRefCount[b - 1], file, line);
+   }
+   return b;
+}
+
+#endif
+
+#ifdef BMTRACE
+
+/*
+ * trace allocations and deallocations in a circular buffer in
+ * shared memory.  check the buffer before doing the allocation,
+ * and die if there's anything fishy.
+ */
+
+_bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType)
+{
+   long        start,
+               cur;
+   bmtrace    *tb;
+
+   start = *CurTraceBuf;
+
+   if (start > 0)
+       cur = start - 1;
+   else
+       cur = BMT_LIMIT - 1;
+
+   for (;;)
+   {
+       tb = &TraceBuf[cur];
+       if (tb->bmt_op != BMT_NOTUSED)
+       {
+           if (tb->bmt_buf == bufNo)
+           {
+               if ((tb->bmt_op == BMT_DEALLOC)
+                   || (tb->bmt_dbid == dbId && tb->bmt_relid == relId
+                       && tb->bmt_blkno == blkNo))
+                   goto okay;
+
+               /* die holding the buffer lock */
+               _bm_die(dbId, relId, blkNo, bufNo, allocType, start, cur);
+           }
+       }
+
+       if (cur == start)
+           goto okay;
+
+       if (cur == 0)
+           cur = BMT_LIMIT - 1;
+       else
+           cur--;
+   }
+
+okay:
+   tb = &TraceBuf[start];
+   tb->bmt_pid = MyProcPid;
+   tb->bmt_buf = bufNo;
+   tb->bmt_dbid = dbId;
+   tb->bmt_relid = relId;
+   tb->bmt_blkno = blkNo;
+   tb->bmt_op = allocType;
+
+   *CurTraceBuf = (start + 1) % BMT_LIMIT;
+}
+
+_bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
+       int allocType, long start, long cur)
+{
+   FILE       *fp;
+   bmtrace    *tb;
+   int         i;
+
+   tb = &TraceBuf[cur];
+
+   if ((fp = AllocateFile("/tmp/death_notice", "w")) == NULL)
+       elog(FATAL, "buffer alloc trace error and can't open log file");
+
+   fprintf(fp, "buffer alloc trace detected the following error:\n\n");
+   fprintf(fp, "    buffer %d being %s inconsistently with a previous %s\n\n",
+        bufNo, (allocType == BMT_DEALLOC ? "deallocated" : "allocated"),
+           (tb->bmt_op == BMT_DEALLOC ? "deallocation" : "allocation"));
+
+   fprintf(fp, "the trace buffer contains:\n");
+
+   i = start;
+   for (;;)
+   {
+       tb = &TraceBuf[i];
+       if (tb->bmt_op != BMT_NOTUSED)
+       {
+           fprintf(fp, "     [%3d]%spid %d buf %2d for <%d,%u,%d> ",
+                   i, (i == cur ? " ---> " : "\t"),
+                   tb->bmt_pid, tb->bmt_buf,
+                   tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno);
+
+           switch (tb->bmt_op)
+           {
+               case BMT_ALLOCFND:
+                   fprintf(fp, "allocate (found)\n");
+                   break;
+
+               case BMT_ALLOCNOTFND:
+                   fprintf(fp, "allocate (not found)\n");
+                   break;
+
+               case BMT_DEALLOC:
+                   fprintf(fp, "deallocate\n");
+                   break;
+
+               default:
+                   fprintf(fp, "unknown op type %d\n", tb->bmt_op);
+                   break;
+           }
+       }
+
+       i = (i + 1) % BMT_LIMIT;
+       if (i == start)
+           break;
+   }
+
+   fprintf(fp, "\noperation causing error:\n");
+   fprintf(fp, "\tpid %d buf %d for <%d,%u,%d> ",
+           getpid(), bufNo, dbId, relId, blkNo);
+
+   switch (allocType)
+   {
+       case BMT_ALLOCFND:
+           fprintf(fp, "allocate (found)\n");
+           break;
+
+       case BMT_ALLOCNOTFND:
+           fprintf(fp, "allocate (not found)\n");
+           break;
+
+       case BMT_DEALLOC:
+           fprintf(fp, "deallocate\n");
+           break;
+
+       default:
+           fprintf(fp, "unknown op type %d\n", allocType);
+           break;
+   }
+
+   FreeFile(fp);
+
+   kill(getpid(), SIGILL);
+}
+
+#endif  /* BMTRACE */
+
+/*
+ * SetBufferCommitInfoNeedsSave
+ *
+ * Mark a buffer dirty when we have updated tuple commit-status bits in it.
+ *
+ * This is similar to WriteNoReleaseBuffer, except that we do not set
+ * SharedBufferChanged or BufferDirtiedByMe, because we have not made a
+ * critical change that has to be flushed to disk before xact commit --- the
+ * status-bit update could be redone by someone else just as easily.  The
+ * buffer will be marked dirty, but it will not be written to disk until
+ * there is another reason to write it.
+ *
+ * This routine might get called many times on the same page, if we are making
+ * the first scan after commit of an xact that added/deleted many tuples.
+ * So, be as quick as we can if the buffer is already dirty.
+ */
+void
+SetBufferCommitInfoNeedsSave(Buffer buffer)
+{
+   BufferDesc *bufHdr;
+
+   if (BufferIsLocal(buffer))
+       return;
+
+   if (BAD_BUFFER_ID(buffer))
+       return;
+
+   bufHdr = &BufferDescriptors[buffer - 1];
+
+   if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+       (BM_DIRTY | BM_JUST_DIRTIED))
+   {
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+       SpinRelease(BufMgrLock);
+   }
+}
+
+void
+UnlockBuffers()
+{
+   BufferDesc *buf;
+   int         i;
+
+   for (i = 0; i < NBuffers; i++)
+   {
+       if (BufferLocks[i] == 0)
+           continue;
+
+       Assert(BufferIsValid(i + 1));
+       buf = &(BufferDescriptors[i]);
+
+#ifdef HAS_TEST_AND_SET
+       S_LOCK(&(buf->cntx_lock));
+#else
+       IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+       if (BufferLocks[i] & BL_R_LOCK)
+       {
+           Assert(buf->r_locks > 0);
+           (buf->r_locks)--;
+       }
+       if (BufferLocks[i] & BL_RI_LOCK)
+       {
+
+           /*
+            * Someone else could remove our RI lock when acquiring W
+            * lock. This is possible if we came here from elog(ERROR)
+            * from IpcSemaphore{Lock|Unlock}(WaitCLSemId). And so we
+            * don't do Assert(buf->ri_lock) here.
+            */
+           buf->ri_lock = false;
+       }
+       if (BufferLocks[i] & BL_W_LOCK)
+       {
+           Assert(buf->w_lock);
+           buf->w_lock = false;
+       }
+#ifdef HAS_TEST_AND_SET
+       S_UNLOCK(&(buf->cntx_lock));
+#else
+       IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+       BufferLocks[i] = 0;
+   }
+}
+
+void
+LockBuffer(Buffer buffer, int mode)
+{
+   BufferDesc *buf;
+   bits8      *buflock;
+
+   Assert(BufferIsValid(buffer));
+   if (BufferIsLocal(buffer))
+       return;
+
+   buf = &(BufferDescriptors[buffer - 1]);
+   buflock = &(BufferLocks[buffer - 1]);
+
+#ifdef HAS_TEST_AND_SET
+   S_LOCK(&(buf->cntx_lock));
+#else
+   IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+   if (mode == BUFFER_LOCK_UNLOCK)
+   {
+       if (*buflock & BL_R_LOCK)
+       {
+           Assert(buf->r_locks > 0);
+           Assert(!(buf->w_lock));
+           Assert(!(*buflock & (BL_W_LOCK | BL_RI_LOCK)));
+           (buf->r_locks)--;
+           *buflock &= ~BL_R_LOCK;
+       }
+       else if (*buflock & BL_W_LOCK)
+       {
+           Assert(buf->w_lock);
+           Assert(buf->r_locks == 0);
+           Assert(!(*buflock & (BL_R_LOCK | BL_RI_LOCK)));
+           buf->w_lock = false;
+           *buflock &= ~BL_W_LOCK;
+       }
+       else
+           elog(ERROR, "UNLockBuffer: buffer %lu is not locked", buffer);
+   }
+   else if (mode == BUFFER_LOCK_SHARE)
+   {
+       unsigned    i = 0;
+
+       Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
+       while (buf->ri_lock || buf->w_lock)
+       {
+#ifdef HAS_TEST_AND_SET
+           S_UNLOCK(&(buf->cntx_lock));
+           s_lock_sleep(i++);
+           S_LOCK(&(buf->cntx_lock));
+#else
+           IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+           s_lock_sleep(i++);
+           IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+       }
+       (buf->r_locks)++;
+       *buflock |= BL_R_LOCK;
+   }
+   else if (mode == BUFFER_LOCK_EXCLUSIVE)
+   {
+       unsigned    i = 0;
+
+       Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
+       while (buf->r_locks > 0 || buf->w_lock)
+       {
+           if (buf->r_locks > 3 || (*buflock & BL_RI_LOCK))
+           {
+
+               /*
+                * Our RI lock might be removed by concurrent W lock
+                * acquiring (see what we do with RI locks below when our
+                * own W acquiring succeeded) and so we set RI lock again
+                * if we already did this.
+                */
+               *buflock |= BL_RI_LOCK;
+               buf->ri_lock = true;
+           }
+#ifdef HAS_TEST_AND_SET
+           S_UNLOCK(&(buf->cntx_lock));
+           s_lock_sleep(i++);
+           S_LOCK(&(buf->cntx_lock));
+#else
+           IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+           s_lock_sleep(i++);
+           IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+       }
+       buf->w_lock = true;
+       *buflock |= BL_W_LOCK;
+
+       buf->cntxDirty = true;
+
+       if (*buflock & BL_RI_LOCK)
+       {
+
+           /*
+            * It's possible to remove RI locks acquired by another W
+            * lockers here, but they'll take care about it.
+            */
+           buf->ri_lock = false;
+           *buflock &= ~BL_RI_LOCK;
+       }
+   }
+   else
+       elog(ERROR, "LockBuffer: unknown lock mode %d", mode);
+
+#ifdef HAS_TEST_AND_SET
+   S_UNLOCK(&(buf->cntx_lock));
+#else
+   IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+}
+
+/*
+ * Functions for IO error handling
+ *
+ * Note : We assume that nested buffer IO never occur.
+ * i.e at most one io_in_progress spinlock is held
+ * per proc.
+*/
+static BufferDesc *InProgressBuf = (BufferDesc *) NULL;
+static bool IsForInput;
+
+/*
+ * Function:StartBufferIO
+ * (Assumptions)
+ * My process is executing no IO
+ * BufMgrLock is held
+ * BM_IO_IN_PROGRESS mask is not set for the buffer
+ * The buffer is Pinned
+ *
+*/
+static void
+StartBufferIO(BufferDesc *buf, bool forInput)
+{
+   Assert(!InProgressBuf);
+   Assert(!(buf->flags & BM_IO_IN_PROGRESS));
+   buf->flags |= BM_IO_IN_PROGRESS;
+#ifdef HAS_TEST_AND_SET
+
+   /*
+    * There used to be
+    *
+    * Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
+    *
+    * here, but that's wrong because of the way WaitIO works: someone else
+    * waiting for the I/O to complete will succeed in grabbing the lock
+    * for a few instructions, and if we context-swap back to here the
+    * Assert could fail.  Tiny window for failure, but I've seen it
+    * happen -- tgl
+    */
+   S_LOCK(&(buf->io_in_progress_lock));
+#endif  /* HAS_TEST_AND_SET */
+   InProgressBuf = buf;
+   IsForInput = forInput;
+}
+
+/*
+ * Function:TerminateBufferIO
+ * (Assumptions)
+ * My process is executing IO for the buffer
+ * BufMgrLock is held
+ * The buffer is Pinned
+ *
+*/
+static void
+TerminateBufferIO(BufferDesc *buf)
+{
+   Assert(buf == InProgressBuf);
+#ifdef HAS_TEST_AND_SET
+   S_UNLOCK(&(buf->io_in_progress_lock));
+#else
+   if (buf->refcount > 1)
+       SignalIO(buf);
+#endif  /* HAS_TEST_AND_SET */
+   InProgressBuf = (BufferDesc *) 0;
+}
+
+/*
+ * Function:ContinueBufferIO
+ * (Assumptions)
+ * My process is executing IO for the buffer
+ * BufMgrLock is held
+ * The buffer is Pinned
+ *
+*/
+static void
+ContinueBufferIO(BufferDesc *buf, bool forInput)
+{
+   Assert(buf == InProgressBuf);
+   Assert(buf->flags & BM_IO_IN_PROGRESS);
+   IsForInput = forInput;
+}
+
+#ifdef NOT_USED
+void
+InitBufferIO(void)
+{
+   InProgressBuf = (BufferDesc *) 0;
+}
+#endif
+
+/*
+ * This function is called from ProcReleaseSpins().
+ * BufMgrLock isn't held when this function is called.
+ * BM_IO_ERROR is always set. If BM_IO_ERROR was already
+ * set in case of output,this routine would kill all
+ * backends and reset postmaster.
+ */
+void
+AbortBufferIO(void)
+{
+   BufferDesc *buf = InProgressBuf;
+
+   if (buf)
+   {
+       Assert(buf->flags & BM_IO_IN_PROGRESS);
+       SpinAcquire(BufMgrLock);
+       if (IsForInput)
+           Assert(!(buf->flags & BM_DIRTY) && !(buf->cntxDirty));
+       else
+       {
+           Assert(buf->flags & BM_DIRTY || buf->cntxDirty);
+           if (buf->flags & BM_IO_ERROR)
+           {
+               elog(NOTICE, "write error may be permanent: cannot write block %u for %s/%s",
+               buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
+           }
+           buf->flags |= BM_DIRTY;
+       }
+       buf->flags |= BM_IO_ERROR;
+       buf->flags &= ~BM_IO_IN_PROGRESS;
+       TerminateBufferIO(buf);
+       SpinRelease(BufMgrLock);
+   }
+}
+
+/*
+ * Cleanup buffer or mark it for cleanup. Buffer may be cleaned
+ * up if it's pinned only once.
+ *
+ * NOTE: buffer must be excl locked.
+ */
+void
+MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer))
+{
+   BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
+
+   Assert(PrivateRefCount[buffer - 1] > 0);
+
+   if (PrivateRefCount[buffer - 1] > 1)
+   {
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       PrivateRefCount[buffer - 1]--;
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+       bufHdr->CleanupFunc = CleanupFunc;
+       SpinRelease(BufMgrLock);
+       return;
+   }
+
+   SpinAcquire(BufMgrLock);
+   Assert(bufHdr->refcount > 0);
+   if (bufHdr->refcount == 1)
+   {
+       SpinRelease(BufMgrLock);
+       CleanupFunc(buffer);
+       CleanupFunc = NULL;
+   }
+   else
+       SpinRelease(BufMgrLock);
+
+   LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+   PrivateRefCount[buffer - 1]--;
+
+   SpinAcquire(BufMgrLock);
+   Assert(bufHdr->refcount > 0);
+   bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+   bufHdr->CleanupFunc = CleanupFunc;
+   bufHdr->refcount--;
+   if (bufHdr->refcount == 0)
+   {
+       AddBufferToFreelist(bufHdr);
+       bufHdr->flags |= BM_FREE;
+   }
+   SpinRelease(BufMgrLock);
+   return;
+}
diff --git a/src/backend/storage/buffer/xlog_localbuf.c b/src/backend/storage/buffer/xlog_localbuf.c
new file mode 100644 (file)
index 0000000..cb14a32
--- /dev/null
@@ -0,0 +1,274 @@
+/*-------------------------------------------------------------------------
+ *
+ * localbuf.c
+ *   local buffer manager. Fast buffer manager for temporary tables
+ *   or special cases when the operation is not visible to other backends.
+ *
+ *   When a relation is being created, the descriptor will have rd_islocal
+ *   set to indicate that the local buffer manager should be used. During
+ *   the same transaction the relation is being created, any inserts or
+ *   selects from the newly created relation will use the local buffer
+ *   pool. rd_islocal is reset at the end of a transaction (commit/abort).
+ *   This is useful for queries like SELECT INTO TABLE and create index.
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994-5, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *   $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_localbuf.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include <sys/types.h>
+#include <sys/file.h>
+#include <math.h>
+#include <signal.h>
+
+#include "postgres.h"
+
+#include "executor/execdebug.h"
+#include "storage/smgr.h"
+#include "utils/relcache.h"
+
+extern long int LocalBufferFlushCount;
+
+int            NLocBuffer = 64;
+BufferDesc *LocalBufferDescriptors = NULL;
+long      *LocalRefCount = NULL;
+
+static int nextFreeLocalBuf = 0;
+
+/*#define LBDEBUG*/
+
+/*
+ * LocalBufferAlloc -
+ *   allocate a local buffer. We do round robin allocation for now.
+ */
+BufferDesc *
+LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
+{
+   int         i;
+   BufferDesc *bufHdr = (BufferDesc *) NULL;
+
+   if (blockNum == P_NEW)
+   {
+       blockNum = reln->rd_nblocks;
+       reln->rd_nblocks++;
+   }
+
+   /* a low tech search for now -- not optimized for scans */
+   for (i = 0; i < NLocBuffer; i++)
+   {
+       if (LocalBufferDescriptors[i].tag.rnode.relNode == 
+           reln->rd_node.relNode &&
+           LocalBufferDescriptors[i].tag.blockNum == blockNum)
+       {
+
+#ifdef LBDEBUG
+           fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+                   RelationGetRelid(reln), blockNum, -i - 1);
+#endif
+           LocalRefCount[i]++;
+           *foundPtr = TRUE;
+           return &LocalBufferDescriptors[i];
+       }
+   }
+
+#ifdef LBDEBUG
+   fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+           RelationGetRelid(reln), blockNum, -nextFreeLocalBuf - 1);
+#endif
+
+   /* need to get a new buffer (round robin for now) */
+   for (i = 0; i < NLocBuffer; i++)
+   {
+       int         b = (nextFreeLocalBuf + i) % NLocBuffer;
+
+       if (LocalRefCount[b] == 0)
+       {
+           bufHdr = &LocalBufferDescriptors[b];
+           LocalRefCount[b]++;
+           nextFreeLocalBuf = (b + 1) % NLocBuffer;
+           break;
+       }
+   }
+   if (bufHdr == NULL)
+       elog(ERROR, "no empty local buffer.");
+
+   /*
+    * this buffer is not referenced but it might still be dirty (the last
+    * transaction to touch it doesn't need its contents but has not
+    * flushed it).  if that's the case, write it out before reusing it!
+    */
+   if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+   {
+       Relation    bufrel = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
+
+       Assert(bufrel != NULL);
+
+       /* flush this page */
+       smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
+                 (char *) MAKE_PTR(bufHdr->data));
+       LocalBufferFlushCount++;
+
+       /*
+        * drop relcache refcount incremented by
+        * RelationIdCacheGetRelation
+        */
+       RelationDecrementReferenceCount(bufrel);
+   }
+
+   /*
+    * it's all ours now.
+    *
+    * We need not in tblNode currently but will in future I think,
+    * when we'll give up rel->rd_fd to fmgr cache.
+    */
+   bufHdr->tag.rnode = reln->rd_node;
+   bufHdr->tag.blockNum = blockNum;
+   bufHdr->flags &= ~BM_DIRTY;
+   bufHdr->cntxDirty = false;
+
+   /*
+    * lazy memory allocation. (see MAKE_PTR for why we need to do
+    * MAKE_OFFSET.)
+    */
+   if (bufHdr->data == (SHMEM_OFFSET) 0)
+   {
+       char       *data = (char *) malloc(BLCKSZ);
+
+       bufHdr->data = MAKE_OFFSET(data);
+   }
+
+   *foundPtr = FALSE;
+   return bufHdr;
+}
+
+/*
+ * WriteLocalBuffer -
+ *   writes out a local buffer
+ */
+int
+WriteLocalBuffer(Buffer buffer, bool release)
+{
+   int         bufid;
+
+   Assert(BufferIsLocal(buffer));
+
+#ifdef LBDEBUG
+   fprintf(stderr, "LB WRITE %d\n", buffer);
+#endif
+
+   bufid = -(buffer + 1);
+   LocalBufferDescriptors[bufid].flags |= BM_DIRTY;
+
+   if (release)
+   {
+       Assert(LocalRefCount[bufid] > 0);
+       LocalRefCount[bufid]--;
+   }
+
+   return true;
+}
+
+/*
+ * InitLocalBuffer -
+ *   init the local buffer cache. Since most queries (esp. multi-user ones)
+ *   don't involve local buffers, we delay allocating memory for actual the
+ *   buffer until we need it.
+ */
+void
+InitLocalBuffer(void)
+{
+   int         i;
+
+   /*
+    * these aren't going away. I'm not gonna use palloc.
+    */
+   LocalBufferDescriptors =
+       (BufferDesc *) malloc(sizeof(BufferDesc) * NLocBuffer);
+   MemSet(LocalBufferDescriptors, 0, sizeof(BufferDesc) * NLocBuffer);
+   nextFreeLocalBuf = 0;
+
+   for (i = 0; i < NLocBuffer; i++)
+   {
+       BufferDesc *buf = &LocalBufferDescriptors[i];
+
+       /*
+        * negative to indicate local buffer. This is tricky: shared
+        * buffers start with 0. We have to start with -2. (Note that the
+        * routine BufferDescriptorGetBuffer adds 1 to buf_id so our first
+        * buffer id is -1.)
+        */
+       buf->buf_id = -i - 2;
+   }
+
+   LocalRefCount = (long *) malloc(sizeof(long) * NLocBuffer);
+   MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+}
+
+/*
+ * LocalBufferSync
+ *
+ * Flush all dirty buffers in the local buffer cache at commit time.
+ * Since the buffer cache is only used for keeping relations visible
+ * during a transaction, we will not need these buffers again.
+ *
+ * Note that we have to *flush* local buffers because of them are not
+ * visible to checkpoint makers. But we can skip XLOG flush check.
+ */
+void
+LocalBufferSync(void)
+{
+   int         i;
+
+   for (i = 0; i < NLocBuffer; i++)
+   {
+       BufferDesc *buf = &LocalBufferDescriptors[i];
+       Relation    bufrel;
+
+       if (buf->flags & BM_DIRTY || buf->cntxDirty)
+       {
+#ifdef LBDEBUG
+           fprintf(stderr, "LB SYNC %d\n", -i - 1);
+#endif
+           bufrel = RelationNodeCacheGetRelation(buf->tag.rnode);
+
+           Assert(bufrel != NULL);
+
+           smgrwrite(DEFAULT_SMGR, bufrel, buf->tag.blockNum,
+                       (char *) MAKE_PTR(buf->data));
+           smgrmarkdirty(DEFAULT_SMGR, bufrel, buf->tag.blockNum);
+           LocalBufferFlushCount++;
+
+           /* drop relcache refcount from RelationIdCacheGetRelation */
+           RelationDecrementReferenceCount(bufrel);
+
+           buf->flags &= ~BM_DIRTY;
+           buf->cntxDirty = false;
+       }
+   }
+
+   MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+   nextFreeLocalBuf = 0;
+}
+
+void
+ResetLocalBufferPool(void)
+{
+   int         i;
+
+   for (i = 0; i < NLocBuffer; i++)
+   {
+       BufferDesc *buf = &LocalBufferDescriptors[i];
+
+       buf->tag.rnode.relNode = InvalidOid;
+       buf->flags &= ~BM_DIRTY;
+       buf->cntxDirty = false;
+       buf->buf_id = -i - 2;
+   }
+
+   MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+   nextFreeLocalBuf = 0;
+}
index 128c49989a060f703cbac97f3554e8921b01ab34..84c4e76c09d626cb2b95cf3e8021bc9b15007c5e 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.64 2000/10/02 19:42:47 petere Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.65 2000/10/28 16:20:56 vadim Exp $
  *
  * NOTES:
  *
@@ -823,8 +823,10 @@ FileWrite(File file, char *buffer, int amount)
    if (returnCode > 0)
    {
        VfdCache[file].seekPos += returnCode;
+#ifndef XLOG
        /* mark the file as needing fsync */
        VfdCache[file].fdstate |= FD_DIRTY;
+#endif
    }
    else
        VfdCache[file].seekPos = FileUnknownPos;
index ff8b4ce52fe74c76bfbed51ef7fd6939a34dd532..da466afe9f8aefa5ed3c70cd616f887672cebee4 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.76 2000/10/20 11:01:11 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.77 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -76,12 +76,7 @@ static int   _mdfd_getrelnfd(Relation reln);
 static MdfdVec *_mdfd_openseg(Relation reln, int segno, int oflags);
 static MdfdVec *_mdfd_getseg(Relation reln, int blkno);
 
-#ifdef OLD_FILE_NAMING
-static int _mdfd_blind_getseg(char *dbname, char *relname,
-                  Oid dbid, Oid relid, int blkno);
-#else
 static int _mdfd_blind_getseg(RelFileNode rnode, int blkno);
-#endif
 
 static int _fdvec_alloc(void);
 static void _fdvec_free(int);
@@ -134,11 +129,7 @@ mdcreate(Relation reln)
 
    Assert(reln->rd_unlinked && reln->rd_fd < 0);
 
-#ifdef OLD_FILE_NAMING
-   path = relpath(RelationGetPhysicalRelationName(reln));
-#else
    path = relpath(reln->rd_node);
-#endif
    fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
 
    /*
@@ -336,11 +327,7 @@ mdopen(Relation reln)
    int         vfd;
 
    Assert(reln->rd_fd < 0);
-#ifdef OLD_FILE_NAMING
-   path = relpath(RelationGetPhysicalRelationName(reln));
-#else
    path = relpath(reln->rd_node);
-#endif
 
    fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
    if (fd < 0)
@@ -579,30 +566,16 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer)
  *     the file, making it more like mdflush().
  */
 int
-#ifdef OLD_FILE_NAMING
-mdblindwrt(char *dbname,
-          char *relname,
-          Oid dbid,
-          Oid relid,
-          BlockNumber blkno,
-          char *buffer,
-          bool dofsync)
-#else
 mdblindwrt(RelFileNode rnode,
           BlockNumber blkno,
           char *buffer,
           bool dofsync)
-#endif
 {
    int         status;
    long        seekpos;
    int         fd;
 
-#ifdef OLD_FILE_NAMING
-   fd = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
-#else
    fd = _mdfd_blind_getseg(rnode, blkno);
-#endif
 
    if (fd < 0)
        return SM_FAIL;
@@ -676,25 +649,13 @@ mdmarkdirty(Relation reln, BlockNumber blkno)
  *     rather than building md/fd datastructures to postpone it till later.
  */
 int
-#ifdef OLD_FILE_NAMING
-mdblindmarkdirty(char *dbname,
-                char *relname,
-                Oid dbid,
-                Oid relid,
-                BlockNumber blkno)
-#else
 mdblindmarkdirty(RelFileNode rnode,
                 BlockNumber blkno)
-#endif
 {
    int         status;
    int         fd;
 
-#ifdef OLD_FILE_NAMING
-   fd = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
-#else
    fd = _mdfd_blind_getseg(rnode, blkno);
-#endif
 
    if (fd < 0)
        return SM_FAIL;
@@ -915,6 +876,22 @@ mdabort()
    return SM_SUCCESS;
 }
 
+#ifdef XLOG
+/*
+ * mdsync() -- Sync storage.
+ *
+ */
+int
+mdsync()
+{
+   sync();
+   if (IsUnderPostmaster)
+       sleep(2);
+   sync();
+   return SM_SUCCESS;
+}
+#endif
+
 /*
  * _fdvec_alloc () -- grab a free (or new) md file descriptor vector.
  *
@@ -996,11 +973,7 @@ _mdfd_openseg(Relation reln, int segno, int oflags)
               *fullpath;
 
    /* be sure we have enough space for the '.segno', if any */
-#ifdef OLD_FILE_NAMING
-   path = relpath(RelationGetPhysicalRelationName(reln));
-#else
    path = relpath(reln->rd_node);
-#endif
 
    if (segno > 0)
    {
@@ -1115,12 +1088,7 @@ _mdfd_getseg(Relation reln, int blkno)
  */
 
 static int
-#ifdef OLD_FILE_NAMING
-_mdfd_blind_getseg(char *dbname, char *relname, Oid dbid, Oid relid,
-                  int blkno)
-#else
 _mdfd_blind_getseg(RelFileNode rnode, int blkno)
-#endif
 {
    char       *path;
    int         fd;
@@ -1130,12 +1098,7 @@ _mdfd_blind_getseg(RelFileNode rnode, int blkno)
 
 #endif
 
-#ifdef OLD_FILE_NAMING
-   /* construct the path to the relation */
-   path = relpath_blind(dbname, relname, dbid, relid);
-#else
    path = relpath(rnode);
-#endif
 
 #ifndef LET_OS_MANAGE_FILESIZE
    /* append the '.segno', if needed */
index 65bc5595a8595a75d351e7a990c22224a6240eb4..d2a940a76e530bd410659e9da88ff702dee871a6 100644 (file)
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.41 2000/10/21 15:43:31 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.42 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,27 +36,17 @@ typedef struct f_smgr
                                           char *buffer);
    int         (*smgr_flush) (Relation reln, BlockNumber blocknum,
                                           char *buffer);
-#ifdef OLD_FILE_NAMING
-   int         (*smgr_blindwrt) (char *dbname, char *relname,
-                                             Oid dbid, Oid relid,
-                                        BlockNumber blkno, char *buffer,
-                                             bool dofsync);
-#else
    int         (*smgr_blindwrt) (RelFileNode rnode, BlockNumber blkno, 
                                        char *buffer, bool dofsync);
-#endif
    int         (*smgr_markdirty) (Relation reln, BlockNumber blkno);
-#ifdef OLD_FILE_NAMING
-   int         (*smgr_blindmarkdirty) (char *dbname, char *relname,
-                                                   Oid dbid, Oid relid,
-                                                   BlockNumber blkno);
-#else
    int         (*smgr_blindmarkdirty) (RelFileNode, BlockNumber blkno);
-#endif
    int         (*smgr_nblocks) (Relation reln);
    int         (*smgr_truncate) (Relation reln, int nblocks);
    int         (*smgr_commit) (void);  /* may be NULL */
    int         (*smgr_abort) (void);   /* may be NULL */
+#ifdef XLOG
+   int         (*smgr_sync) (void);
+#endif
 } f_smgr;
 
 /*
@@ -69,7 +59,11 @@ static f_smgr smgrsw[] = {
    /* magnetic disk */
    {mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
        mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
+#ifdef XLOG
+   mdnblocks, mdtruncate, mdcommit, mdabort, mdsync},
+#else
    mdnblocks, mdtruncate, mdcommit, mdabort},
+#endif
 
 #ifdef STABLE_MEMORY_STORAGE
    /* main memory */
@@ -310,40 +304,6 @@ smgrflush(int16 which, Relation reln, BlockNumber blocknum, char *buffer)
  *     this page down to stable storage in this circumstance.  The
  *     write should be synchronous if dofsync is true.
  */
-#ifdef OLD_FILE_NAMING
-int
-smgrblindwrt(int16 which,
-            char *dbname,
-            char *relname,
-            Oid dbid,
-            Oid relid,
-            BlockNumber blkno,
-            char *buffer,
-            bool dofsync)
-{
-   char       *dbstr;
-   char       *relstr;
-   int         status;
-
-   /* strdup here is probably redundant */
-   dbstr = pstrdup(dbname);
-   relstr = pstrdup(relname);
-
-   status = (*(smgrsw[which].smgr_blindwrt)) (dbstr, relstr, dbid, relid,
-                                              blkno, buffer, dofsync);
-
-   if (status == SM_FAIL)
-       elog(ERROR, "cannot write block %d of %s [%s] blind: %m",
-            blkno, relstr, dbstr);
-
-   pfree(dbstr);
-   pfree(relstr);
-
-   return status;
-}
-
-#else
-
 int
 smgrblindwrt(int16 which,
             RelFileNode rnode,
@@ -361,7 +321,6 @@ smgrblindwrt(int16 which,
 
    return status;
 }
-#endif
 
 /*
  * smgrmarkdirty() -- Mark a page dirty (needs fsync).
@@ -394,39 +353,6 @@ smgrmarkdirty(int16 which,
  *
  *     Just like smgrmarkdirty, except we don't have a reldesc.
  */
-#ifdef OLD_FILE_NAMING
-int
-smgrblindmarkdirty(int16 which,
-                  char *dbname,
-                  char *relname,
-                  Oid dbid,
-                  Oid relid,
-                  BlockNumber blkno)
-{
-   char       *dbstr;
-   char       *relstr;
-   int         status;
-
-   /* strdup here is probably redundant */
-   dbstr = pstrdup(dbname);
-   relstr = pstrdup(relname);
-
-   status = (*(smgrsw[which].smgr_blindmarkdirty)) (dbstr, relstr,
-                                                    dbid, relid,
-                                                    blkno);
-
-   if (status == SM_FAIL)
-       elog(ERROR, "cannot mark block %d of %s [%s] blind: %m",
-            blkno, relstr, dbstr);
-
-   pfree(dbstr);
-   pfree(relstr);
-
-   return status;
-}
-
-#else
-
 int
 smgrblindmarkdirty(int16 which,
                   RelFileNode rnode,
@@ -442,7 +368,6 @@ smgrblindmarkdirty(int16 which,
 
    return status;
 }
-#endif
 
 /*
  * smgrnblocks() -- Calculate the number of POSTGRES blocks in the
@@ -528,6 +453,27 @@ smgrabort()
    return SM_SUCCESS;
 }
 
+#ifdef XLOG
+int
+smgrsync()
+{
+   int         i;
+
+   for (i = 0; i < NSmgr; i++)
+   {
+       if (smgrsw[i].smgr_sync)
+       {
+           if ((*(smgrsw[i].smgr_sync)) () == SM_FAIL)
+               elog(STOP, "storage sync failed on %s: %m",
+                    DatumGetCString(DirectFunctionCall1(smgrout,
+                                                        Int16GetDatum(i))));
+       }
+   }
+
+   return SM_SUCCESS;
+}
+#endif
+
 #ifdef NOT_USED
 bool
 smgriswo(int16 smgrno)
index de3e3c4a8d30d2ae20443168169e3ce7a3255dfe..ea7a8d0212c3f4cb6bedf1ee97c29b3fcd465827 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.113 2000/10/23 04:10:08 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.114 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -2064,7 +2064,62 @@ RelationCacheInitializePhase2(void)
    }
 }
 
+#ifdef XLOG        /* used by XLogInitCache */
 
+void CreateDummyCaches(void);
+void DestroyDummyCaches(void);
+
+void
+CreateDummyCaches(void)
+{
+   MemoryContext   oldcxt;
+   HASHCTL         ctl;
+
+   if (!CacheMemoryContext)
+       CreateCacheMemoryContext();
+
+   oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+   MemSet(&ctl, 0, (int) sizeof(ctl));
+   ctl.keysize = sizeof(NameData);
+   ctl.datasize = sizeof(Relation);
+   RelationNameCache = hash_create(INITRELCACHESIZE, &ctl, HASH_ELEM);
+
+   ctl.keysize = sizeof(Oid);
+   ctl.hash = tag_hash;
+   RelationIdCache = hash_create(INITRELCACHESIZE, &ctl,
+                                 HASH_ELEM | HASH_FUNCTION);
+
+   ctl.keysize = sizeof(RelFileNode);
+   ctl.hash = tag_hash;
+   RelationNodeCache = hash_create(INITRELCACHESIZE, &ctl,
+                                 HASH_ELEM | HASH_FUNCTION);
+   MemoryContextSwitchTo(oldcxt);
+}
+
+void
+DestroyDummyCaches(void)
+{
+   MemoryContext   oldcxt;
+
+   if (!CacheMemoryContext)
+       return;
+
+   oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+   if (RelationNameCache)
+       hash_destroy(RelationNameCache);
+   if (RelationIdCache)
+       hash_destroy(RelationIdCache);
+   if (RelationNodeCache)
+       hash_destroy(RelationNodeCache);
+
+   RelationNameCache = RelationIdCache = RelationNodeCache = NULL;
+
+   MemoryContextSwitchTo(oldcxt);
+}
+
+#endif /* XLOG */
 
 static void
 AttrDefaultFetch(Relation relation)
index cee8dfaac90acf553bc1250ce755c53adcc5641d..fbc9cc2ab2cc21cb7ac8c63cd9d52ec9f1ca7d69 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/utils/init/postinit.c,v 1.68 2000/10/16 14:52:15 vadim Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/utils/init/postinit.c,v 1.69 2000/10/28 16:20:58 vadim Exp $
  *
  *
  *-------------------------------------------------------------------------
@@ -231,9 +231,6 @@ InitPostgres(const char *dbname, const char *username)
 {
    bool        bootstrap = IsBootstrapProcessingMode();
 
-   /* initialize the local buffer manager */
-   InitLocalBuffer();
-
 #ifndef XLOG
    if (!TransactionFlushEnabled())
        on_shmem_exit(FlushBufferPool, 0);
@@ -414,4 +411,8 @@ BaseInit(void)
    smgrinit();
 
    EnablePortalManager();      /* memory for portal/transaction stuff */
+
+   /* initialize the local buffer manager */
+   InitLocalBuffer();
+
 }
index 752682ca969d97b5df0c582d026dfc9f679144b6..415ad56b959c4449aea705ff86b91539ecf861dd 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: transam.h,v 1.24 2000/01/26 05:57:51 momjian Exp $
+ * $Id: transam.h,v 1.25 2000/10/28 16:20:59 vadim Exp $
  *
  *  NOTES
  *     Transaction System Version 101 now support proper oid
@@ -67,7 +67,11 @@ typedef unsigned char XidStatus;/* (2 bits) */
  *     transaction page definitions
  * ----------------
  */
+#ifdef XLOG
+#define TP_DataSize                (BLCKSZ - sizeof(XLogRecPtr))
+#else
 #define TP_DataSize                BLCKSZ
+#endif
 #define TP_NumXidStatusPerBlock (TP_DataSize * 4)
 
 /* ----------------
@@ -84,6 +88,10 @@ typedef unsigned char XidStatus;/* (2 bits) */
  */
 typedef struct LogRelationContentsData
 {
+#ifdef XLOG
+   XLogRecPtr  LSN;        /* temp hack: LSN is member of any block */
+                           /* so should be described in bufmgr */
+#endif
    int         TransSystemVersion;
 } LogRelationContentsData;
 
@@ -107,6 +115,9 @@ typedef LogRelationContentsData *LogRelationContents;
  */
 typedef struct VariableRelationContentsData
 {
+#ifdef XLOG
+   XLogRecPtr  LSN;
+#endif
    int         TransSystemVersion;
    TransactionId nextXidData;
    TransactionId lastXidData;  /* unused */
index 712e88b6005828a1b2529e899db7811270650c31..18ca96f3d837758a0317b962c385a038b4fbd381 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xact.h,v 1.28 2000/10/20 11:01:14 vadim Exp $
+ * $Id: xact.h,v 1.29 2000/10/28 16:20:59 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -135,6 +135,8 @@ extern bool IsTransactionBlock(void);
 extern void UserAbortTransactionBlock(void);
 extern void AbortOutOfAnyTransaction(void);
 
+extern void RecordTransactionCommit(void);
+
 extern TransactionId DisabledTransactionId;
 
 extern void XactPushRollback(void (*func) (void *), void* data);
index c77c1cac02a6431293c34d4beec857c7d982b20b..02998755c328e889f53a0d8d9057c9cf9637861e 100644 (file)
 
 #include "access/rmgr.h"
 #include "access/transam.h"
-
-typedef struct XLogRecPtr
-{
-   uint32      xlogid;         /* log file #, 0 based */
-   uint32      xrecoff;        /* offset of record in log file */
-} XLogRecPtr;
+#include "access/xlogdefs.h"
 
 typedef struct XLogRecord
 {
@@ -83,12 +78,7 @@ typedef XLogPageHeaderData *XLogPageHeader;
 #define XLByteEQ(left, right)      \
            (right.xlogid == left.xlogid && right.xrecoff ==  left.xrecoff)
 
-/*
- * StartUpID (SUI) - system startups counter.
- * It's to allow removing pg_log after shutdown.
- */
-typedef    uint32      StartUpID;
-extern StartUpID   ThisStartUpID;
+extern StartUpID   ThisStartUpID;  /* current SUI */
 extern bool        InRecovery;
 extern XLogRecPtr  MyLastRecPtr;
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
new file mode 100644 (file)
index 0000000..ce1b3ef
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ *
+ * xlogdefs.h
+ *
+ * Postgres transaction log manager record pointer and
+ * system stratup number definitions
+ *
+ */
+#ifndef XLOG_DEFS_H
+#define XLOG_DEFS_H
+
+typedef struct XLogRecPtr
+{
+   uint32      xlogid;         /* log file #, 0 based */
+   uint32      xrecoff;        /* offset of record in log file */
+} XLogRecPtr;
+
+/*
+ * StartUpID (SUI) - system startups counter. It's to allow removing
+ * pg_log after shutdown, in future.
+ */
+typedef    uint32      StartUpID;
+
+#endif  /* XLOG_DEFS_H */
index f62f726d8313f27ad1649321f55789c6f3b98d3e..b8fa3549f42991a1f26b305a90436dd2cd14dc8f 100644 (file)
@@ -9,8 +9,10 @@ extern bool XLogIsValidTuple(RelFileNode hnode, ItemPointer iptr);
 
 extern void XLogOpenLogRelation(void);
 
-extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno);
+extern void XLogInitRelationCache(void);
 extern void XLogCloseRelationCache(void);
+
 extern Relation XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode);
+extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno);
 
 #endif
index 65abe9b8ceb5a1502f1d64225d65ddcb54d48a07..80aca7c57e96f9035797916007e5742fea6607cc 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: buf_internals.h,v 1.41 2000/10/23 04:10:14 vadim Exp $
+ * $Id: buf_internals.h,v 1.42 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -109,6 +109,10 @@ typedef struct sbufdesc
    bool        ri_lock;        /* read-intent lock */
    bool        w_lock;         /* context exclusively locked */
 
+#ifdef XLOG
+   bool        cntxDirty;      /* new way to mark block as dirty */
+#endif
+
    BufferBlindId blind;        /* was used to support blind write */
 
    /*
index 551f98e75f94437a20b8a1a9764e847d7b6b1eeb..0ed4837305d5c031aeaa30601ab275774b744bc8 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufmgr.h,v 1.41 2000/10/20 11:01:21 vadim Exp $
+ * $Id: bufmgr.h,v 1.42 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -15,7 +15,7 @@
 #define BUFMGR_H
 
 #include "storage/buf_internals.h"
-
+#include "access/xlogdefs.h"
 
 typedef void *Block;
 
@@ -177,4 +177,9 @@ extern void AbortBufferIO(void);
 extern bool BufferIsUpdatable(Buffer buffer);
 extern void MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer));
 
+#ifdef XLOG
+extern void BufmgrCommit(void);
+extern void BufferSync(void);
+#endif
+
 #endif
index d547f71b736a9a4959a95a868d5801a0506cdacb..78b22f392cff48f5588939f83d8bffe0341ffc67 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufpage.h,v 1.34 2000/10/21 15:43:36 vadim Exp $
+ * $Id: bufpage.h,v 1.35 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -118,7 +118,8 @@ typedef OpaqueData *Opaque;
  */
 typedef struct PageHeaderData
 {
-#ifdef XLOG
+#ifdef XLOG                        /* XXX LSN is member of *any* block, not */
+                               /* only page-organized - 'll change later */
    XLogRecPtr  pd_lsn;         /* LSN: next byte after last byte of xlog */
                                /* record for last change of this page */
    StartUpID   pd_sui;         /* SUI of last changes (currently it's */
index 7caac813e9acb7bafb999dc0e3075bfe59b86dba..49a2e3e5e922da16fb829891d1312a37c14aa577 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: smgr.h,v 1.22 2000/10/16 14:52:28 vadim Exp $
+ * $Id: smgr.h,v 1.23 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,26 +36,19 @@ extern int smgrwrite(int16 which, Relation reln, BlockNumber blocknum,
          char *buffer);
 extern int smgrflush(int16 which, Relation reln, BlockNumber blocknum,
          char *buffer);
-#ifdef OLD_FILE_NAMING
-extern int smgrblindwrt(int16 which, char *dbname, char *relname,
-            Oid dbid, Oid relid,
-            BlockNumber blkno, char *buffer,
-            bool dofsync);
-extern int smgrblindmarkdirty(int16 which, char *dbname, char *relname,
-                  Oid dbid, Oid relid,
-                  BlockNumber blkno);
-#else
 extern int smgrblindwrt(int16 which, RelFileNode rnode,
                        BlockNumber blkno, char *buffer, bool dofsync);
 extern int smgrblindmarkdirty(int16 which, RelFileNode rnode,
                        BlockNumber blkno);
-#endif
 extern int smgrmarkdirty(int16 which, Relation reln, BlockNumber blkno);
 extern int smgrnblocks(int16 which, Relation reln);
 extern int smgrtruncate(int16 which, Relation reln, int nblocks);
 extern int smgrcommit(void);
 extern int smgrabort(void);
 
+#ifdef XLOG
+extern int smgrsync(void);
+#endif
 
 
 /* internals: move me elsewhere -- ay 7/94 */
@@ -71,22 +64,18 @@ extern int  mdread(Relation reln, BlockNumber blocknum, char *buffer);
 extern int mdwrite(Relation reln, BlockNumber blocknum, char *buffer);
 extern int mdflush(Relation reln, BlockNumber blocknum, char *buffer);
 extern int mdmarkdirty(Relation reln, BlockNumber blkno);
-#ifdef OLD_FILE_NAMING
-extern int mdblindwrt(char *dbname, char *relname, Oid dbid, Oid relid,
-          BlockNumber blkno, char *buffer,
-          bool dofsync);
-extern int mdblindmarkdirty(char *dbname, char *relname, Oid dbid, Oid relid,
-                BlockNumber blkno);
-#else
 extern int mdblindwrt(RelFileNode rnode, BlockNumber blkno,
                        char *buffer, bool dofsync);
 extern int mdblindmarkdirty(RelFileNode rnode, BlockNumber blkno);
-#endif
 extern int mdnblocks(Relation reln);
 extern int mdtruncate(Relation reln, int nblocks);
 extern int mdcommit(void);
 extern int mdabort(void);
 
+#ifdef XLOG
+extern int mdsync(void);
+#endif
+
 /* mm.c */
 extern SPINLOCK MMCacheLock;