Upgrade localbuf.c to use a hash table instead of linear search to
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 19 Mar 2005 17:39:43 +0000 (17:39 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 19 Mar 2005 17:39:43 +0000 (17:39 +0000)
find already-allocated local buffers.  This is the last obstacle
in the way of setting NLocBuffer to something reasonably large.

src/backend/storage/buffer/localbuf.c

index eb6db2447a87b15d3a8ddb609f66b51edcb806ae..c2a1ed587e1b00b04285d0a3937f9900cd83e157 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.64 2005/03/18 16:16:09 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.65 2005/03/19 17:39:43 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
-#include "utils/relcache.h"
+#include "utils/memutils.h"
 #include "utils/resowner.h"
 
 
 /*#define LBDEBUG*/
 
+/* entry for buffer lookup hashtable */
+typedef struct
+{
+       BufferTag       key;                    /* Tag of a disk page */
+       int                     id;                             /* Associated local buffer's index */
+} LocalBufferLookupEnt;
+
 /* Note: this macro only works on local buffers, not shared ones! */
 #define LocalBufHdrGetBlock(bufHdr)    \
        LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
 
-/* should be a GUC parameter some day */
-int                    NLocBuffer = 64;
+int                    NLocBuffer = 0;         /* until buffers are initialized */
 
 BufferDesc *LocalBufferDescriptors = NULL;
 Block     *LocalBufferBlockPointers = NULL;
@@ -37,10 +43,12 @@ int32          *LocalRefCount = NULL;
 
 static int     nextFreeLocalBuf = 0;
 
+static HTAB *LocalBufHash = NULL;
+
 
 /*
  * LocalBufferAlloc -
- *       allocate a local buffer. We do round robin allocation for now.
+ *       Find or create a local buffer for the given page of the given relation.
  *
  * API is similar to bufmgr.c's BufferAlloc, except that we do not need
  * to do any locking since this is all local.  Also, IO_IN_PROGRESS
@@ -50,35 +58,39 @@ BufferDesc *
 LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
 {
        BufferTag       newTag;                 /* identity of requested block */
-       int                     i;
-       int                     trycounter;
+       LocalBufferLookupEnt *hresult;
        BufferDesc *bufHdr;
+       int                     b;
+       int                     trycounter;
+       bool            found;
 
        INIT_BUFFERTAG(newTag, reln, blockNum);
 
-       /* a low tech search for now -- should use a hashtable */
-       for (i = 0; i < NLocBuffer; i++)
+       /* See if the desired buffer already exists */
+       hresult = (LocalBufferLookupEnt *)
+               hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
+
+       if (hresult)
        {
-               bufHdr = &LocalBufferDescriptors[i];
-               if (BUFFERTAGS_EQUAL(bufHdr->tag, newTag))
-               {
+               b = hresult->id;
+               bufHdr = &LocalBufferDescriptors[b];
+               Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
 #ifdef LBDEBUG
-                       fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
-                                       RelationGetRelid(reln), blockNum, -i - 1);
+               fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+                               RelationGetRelid(reln), blockNum, -b - 1);
 #endif
 
-                       LocalRefCount[i]++;
-                       ResourceOwnerRememberBuffer(CurrentResourceOwner,
-                                                                         BufferDescriptorGetBuffer(bufHdr));
-                       if (bufHdr->flags & BM_VALID)
-                               *foundPtr = TRUE;
-                       else
-                       {
-                               /* Previous read attempt must have failed; try again */
-                               *foundPtr = FALSE;
-                       }
-                       return bufHdr;
+               LocalRefCount[b]++;
+               ResourceOwnerRememberBuffer(CurrentResourceOwner,
+                                                                       BufferDescriptorGetBuffer(bufHdr));
+               if (bufHdr->flags & BM_VALID)
+                       *foundPtr = TRUE;
+               else
+               {
+                       /* Previous read attempt must have failed; try again */
+                       *foundPtr = FALSE;
                }
+               return bufHdr;
        }
 
 #ifdef LBDEBUG
@@ -93,7 +105,7 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
        trycounter = NLocBuffer;
        for (;;)
        {
-               int                     b = nextFreeLocalBuf;
+               b = nextFreeLocalBuf;
 
                if (++nextFreeLocalBuf >= NLocBuffer)
                        nextFreeLocalBuf = 0;
@@ -136,31 +148,50 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
                                  (char *) LocalBufHdrGetBlock(bufHdr),
                                  true);
 
+               /* Mark not-dirty now in case we error out below */
+               bufHdr->flags &= ~BM_DIRTY;
+
                LocalBufferFlushCount++;
        }
 
        /*
         * lazy memory allocation: allocate space on first use of a buffer.
-        *
-        * Note this path cannot be taken for a buffer that was previously in
-        * use, so it's okay to do it (and possibly error out) before marking
-        * the buffer as not dirty.
         */
        if (LocalBufHdrGetBlock(bufHdr) == NULL)
        {
-               char       *data = (char *) malloc(BLCKSZ);
+               char       *data;
 
-               if (data == NULL)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_OUT_OF_MEMORY),
-                                        errmsg("out of memory")));
+               data = (char *) MemoryContextAlloc(TopMemoryContext, BLCKSZ);
 
-               /*
-                * Set pointer for use by BufferGetBlock() macro.
-                */
+               /* Set pointer for use by BufferGetBlock() macro */
                LocalBufHdrGetBlock(bufHdr) = (Block) data;
        }
 
+       /*
+        * Update the hash table: remove old entry, if any, and make new one.
+        */
+       if (bufHdr->flags & BM_TAG_VALID)
+       {
+               hresult = (LocalBufferLookupEnt *)
+                       hash_search(LocalBufHash, (void *) &bufHdr->tag,
+                                               HASH_REMOVE, NULL);
+               if (!hresult)                           /* shouldn't happen */
+                       elog(ERROR, "local buffer hash table corrupted");
+               /* mark buffer invalid just in case hash insert fails */
+               CLEAR_BUFFERTAG(bufHdr->tag);
+               bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID);
+       }
+
+       hresult = (LocalBufferLookupEnt *)
+               hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found);
+       if (!hresult)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory")));
+       if (found)                              /* shouldn't happen */
+               elog(ERROR, "local buffer hash table corrupted");
+       hresult->id = b;
+
        /*
         * it's all ours now.
         */
@@ -215,20 +246,39 @@ WriteLocalBuffer(Buffer buffer, bool release)
 void
 InitLocalBuffer(void)
 {
+       int                     nbufs = 64;             /* should be from a GUC var */
+       HASHCTL         info;
        int                     i;
 
-       /*
-        * these aren't going away. I'm not gonna use palloc.
-        */
+       /* Create the lookup hash table */
+       MemSet(&info, 0, sizeof(info));
+       info.keysize = sizeof(BufferTag);
+       info.entrysize = sizeof(LocalBufferLookupEnt);
+       info.hash = tag_hash;
+
+       LocalBufHash = hash_create("Local Buffer Lookup Table",
+                                                          nbufs,
+                                                          &info,
+                                                          HASH_ELEM | HASH_FUNCTION);
+
+       if (!LocalBufHash)
+               elog(ERROR, "could not initialize local buffer hash table");
+
+       /* Allocate and zero buffer headers and auxiliary arrays */
        LocalBufferDescriptors = (BufferDesc *)
-               calloc(NLocBuffer, sizeof(*LocalBufferDescriptors));
+               MemoryContextAllocZero(TopMemoryContext,
+                                                          nbufs * sizeof(BufferDesc));
        LocalBufferBlockPointers = (Block *)
-               calloc(NLocBuffer, sizeof(*LocalBufferBlockPointers));
+               MemoryContextAllocZero(TopMemoryContext,
+                                                          nbufs * sizeof(Block));
        LocalRefCount = (int32 *)
-               calloc(NLocBuffer, sizeof(*LocalRefCount));
+               MemoryContextAllocZero(TopMemoryContext,
+                                                          nbufs * sizeof(int32));
+
        nextFreeLocalBuf = 0;
 
-       for (i = 0; i < NLocBuffer; i++)
+       /* initialize fields that need to start off nonzero */
+       for (i = 0; i < nbufs; i++)
        {
                BufferDesc *buf = &LocalBufferDescriptors[i];
 
@@ -240,6 +290,9 @@ InitLocalBuffer(void)
                 */
                buf->buf_id = -i - 2;
        }
+
+       /* Initialization done, mark buffers allocated */
+       NLocBuffer = nbufs;
 }
 
 /*
@@ -271,5 +324,6 @@ void
 AtProcExit_LocalBuffers(void)
 {
        /* just zero the refcounts ... */
-       MemSet(LocalRefCount, 0, NLocBuffer * sizeof(*LocalRefCount));
+       if (LocalRefCount)
+               MemSet(LocalRefCount, 0, NLocBuffer * sizeof(*LocalRefCount));
 }