Upgrade localbuf.c to use a hash table instead of linear search to
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 19 Mar 2005 17:39:43 +0000 (17:39 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 19 Mar 2005 17:39:43 +0000 (17:39 +0000)
find already-allocated local buffers.  This is the last obstacle
in the way of setting NLocBuffer to something reasonably large.

src/backend/storage/buffer/localbuf.c

index eb6db2447a87b15d3a8ddb609f66b51edcb806ae..c2a1ed587e1b00b04285d0a3937f9900cd83e157 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.64 2005/03/18 16:16:09 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.65 2005/03/19 17:39:43 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
-#include "utils/relcache.h"
+#include "utils/memutils.h"
 #include "utils/resowner.h"
 
 
 /*#define LBDEBUG*/
 
+/* entry for buffer lookup hashtable */
+typedef struct
+{
+   BufferTag   key;            /* Tag of a disk page */
+   int         id;             /* Associated local buffer's index */
+} LocalBufferLookupEnt;
+
 /* Note: this macro only works on local buffers, not shared ones! */
 #define LocalBufHdrGetBlock(bufHdr)    \
    LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
 
-/* should be a GUC parameter some day */
-int            NLocBuffer = 64;
+int            NLocBuffer = 0;     /* until buffers are initialized */
 
 BufferDesc *LocalBufferDescriptors = NULL;
 Block     *LocalBufferBlockPointers = NULL;
@@ -37,10 +43,12 @@ int32      *LocalRefCount = NULL;
 
 static int nextFreeLocalBuf = 0;
 
+static HTAB *LocalBufHash = NULL;
+
 
 /*
  * LocalBufferAlloc -
- *   allocate a local buffer. We do round robin allocation for now.
+ *   Find or create a local buffer for the given page of the given relation.
  *
  * API is similar to bufmgr.c's BufferAlloc, except that we do not need
  * to do any locking since this is all local.  Also, IO_IN_PROGRESS
@@ -50,35 +58,39 @@ BufferDesc *
 LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
 {
    BufferTag   newTag;         /* identity of requested block */
-   int         i;
-   int         trycounter;
+   LocalBufferLookupEnt *hresult;
    BufferDesc *bufHdr;
+   int         b;
+   int         trycounter;
+   bool        found;
 
    INIT_BUFFERTAG(newTag, reln, blockNum);
 
-   /* a low tech search for now -- should use a hashtable */
-   for (i = 0; i < NLocBuffer; i++)
+   /* See if the desired buffer already exists */
+   hresult = (LocalBufferLookupEnt *)
+       hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
+
+   if (hresult)
    {
-       bufHdr = &LocalBufferDescriptors[i];
-       if (BUFFERTAGS_EQUAL(bufHdr->tag, newTag))
-       {
+       b = hresult->id;
+       bufHdr = &LocalBufferDescriptors[b];
+       Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
 #ifdef LBDEBUG
-           fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
-                   RelationGetRelid(reln), blockNum, -i - 1);
+       fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+               RelationGetRelid(reln), blockNum, -b - 1);
 #endif
 
-           LocalRefCount[i]++;
-           ResourceOwnerRememberBuffer(CurrentResourceOwner,
-                                     BufferDescriptorGetBuffer(bufHdr));
-           if (bufHdr->flags & BM_VALID)
-               *foundPtr = TRUE;
-           else
-           {
-               /* Previous read attempt must have failed; try again */
-               *foundPtr = FALSE;
-           }
-           return bufHdr;
+       LocalRefCount[b]++;
+       ResourceOwnerRememberBuffer(CurrentResourceOwner,
+                                   BufferDescriptorGetBuffer(bufHdr));
+       if (bufHdr->flags & BM_VALID)
+           *foundPtr = TRUE;
+       else
+       {
+           /* Previous read attempt must have failed; try again */
+           *foundPtr = FALSE;
        }
+       return bufHdr;
    }
 
 #ifdef LBDEBUG
@@ -93,7 +105,7 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
    trycounter = NLocBuffer;
    for (;;)
    {
-       int         b = nextFreeLocalBuf;
+       b = nextFreeLocalBuf;
 
        if (++nextFreeLocalBuf >= NLocBuffer)
            nextFreeLocalBuf = 0;
@@ -136,31 +148,50 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
                  (char *) LocalBufHdrGetBlock(bufHdr),
                  true);
 
+       /* Mark not-dirty now in case we error out below */
+       bufHdr->flags &= ~BM_DIRTY;
+
        LocalBufferFlushCount++;
    }
 
    /*
     * lazy memory allocation: allocate space on first use of a buffer.
-    *
-    * Note this path cannot be taken for a buffer that was previously in
-    * use, so it's okay to do it (and possibly error out) before marking
-    * the buffer as not dirty.
     */
    if (LocalBufHdrGetBlock(bufHdr) == NULL)
    {
-       char       *data = (char *) malloc(BLCKSZ);
+       char       *data;
 
-       if (data == NULL)
-           ereport(ERROR,
-                   (errcode(ERRCODE_OUT_OF_MEMORY),
-                    errmsg("out of memory")));
+       data = (char *) MemoryContextAlloc(TopMemoryContext, BLCKSZ);
 
-       /*
-        * Set pointer for use by BufferGetBlock() macro.
-        */
+       /* Set pointer for use by BufferGetBlock() macro */
        LocalBufHdrGetBlock(bufHdr) = (Block) data;
    }
 
+   /*
+    * Update the hash table: remove old entry, if any, and make new one.
+    */
+   if (bufHdr->flags & BM_TAG_VALID)
+   {
+       hresult = (LocalBufferLookupEnt *)
+           hash_search(LocalBufHash, (void *) &bufHdr->tag,
+                       HASH_REMOVE, NULL);
+       if (!hresult)               /* shouldn't happen */
+           elog(ERROR, "local buffer hash table corrupted");
+       /* mark buffer invalid just in case hash insert fails */
+       CLEAR_BUFFERTAG(bufHdr->tag);
+       bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID);
+   }
+
+   hresult = (LocalBufferLookupEnt *)
+       hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found);
+   if (!hresult)
+       ereport(ERROR,
+               (errcode(ERRCODE_OUT_OF_MEMORY),
+                errmsg("out of memory")));
+   if (found)              /* shouldn't happen */
+       elog(ERROR, "local buffer hash table corrupted");
+   hresult->id = b;
+
    /*
     * it's all ours now.
     */
@@ -215,20 +246,39 @@ WriteLocalBuffer(Buffer buffer, bool release)
 void
 InitLocalBuffer(void)
 {
+   int         nbufs = 64;     /* should be from a GUC var */
+   HASHCTL     info;
    int         i;
 
-   /*
-    * these aren't going away. I'm not gonna use palloc.
-    */
+   /* Create the lookup hash table */
+   MemSet(&info, 0, sizeof(info));
+   info.keysize = sizeof(BufferTag);
+   info.entrysize = sizeof(LocalBufferLookupEnt);
+   info.hash = tag_hash;
+
+   LocalBufHash = hash_create("Local Buffer Lookup Table",
+                              nbufs,
+                              &info,
+                              HASH_ELEM | HASH_FUNCTION);
+
+   if (!LocalBufHash)
+       elog(ERROR, "could not initialize local buffer hash table");
+
+   /* Allocate and zero buffer headers and auxiliary arrays */
    LocalBufferDescriptors = (BufferDesc *)
-       calloc(NLocBuffer, sizeof(*LocalBufferDescriptors));
+       MemoryContextAllocZero(TopMemoryContext,
+                              nbufs * sizeof(BufferDesc));
    LocalBufferBlockPointers = (Block *)
-       calloc(NLocBuffer, sizeof(*LocalBufferBlockPointers));
+       MemoryContextAllocZero(TopMemoryContext,
+                              nbufs * sizeof(Block));
    LocalRefCount = (int32 *)
-       calloc(NLocBuffer, sizeof(*LocalRefCount));
+       MemoryContextAllocZero(TopMemoryContext,
+                              nbufs * sizeof(int32));
+
    nextFreeLocalBuf = 0;
 
-   for (i = 0; i < NLocBuffer; i++)
+   /* initialize fields that need to start off nonzero */
+   for (i = 0; i < nbufs; i++)
    {
        BufferDesc *buf = &LocalBufferDescriptors[i];
 
@@ -240,6 +290,9 @@ InitLocalBuffer(void)
         */
        buf->buf_id = -i - 2;
    }
+
+   /* Initialization done, mark buffers allocated */
+   NLocBuffer = nbufs;
 }
 
 /*
@@ -271,5 +324,6 @@ void
 AtProcExit_LocalBuffers(void)
 {
    /* just zero the refcounts ... */
-   MemSet(LocalRefCount, 0, NLocBuffer * sizeof(*LocalRefCount));
+   if (LocalRefCount)
+       MemSet(LocalRefCount, 0, NLocBuffer * sizeof(*LocalRefCount));
 }