Reimplement hash index locking algorithms, per my recent proposal to
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 4 Sep 2003 22:06:27 +0000 (22:06 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 4 Sep 2003 22:06:27 +0000 (22:06 +0000)
pghackers.  This fixes the problem recently reported by Markus KrÌutner
(hash bucket split corrupts the state of scans being done concurrently),
and I believe it also fixes all the known problems with deadlocks in
hash index operations.  Hash indexes are still not really ready for prime
time (since they aren't WAL-logged), but this is a step forward.

src/backend/access/hash/README
src/backend/access/hash/hash.c
src/backend/access/hash/hashinsert.c
src/backend/access/hash/hashovfl.c
src/backend/access/hash/hashpage.c
src/backend/access/hash/hashscan.c
src/backend/access/hash/hashsearch.c
src/backend/access/hash/hashutil.c
src/backend/storage/lmgr/lmgr.c
src/include/access/hash.h
src/include/storage/lmgr.h

index 118d4348796c5bdbc5bd149a0d4ffae5bb0311e9..ce195eae2cd7814aa3d12f0adbedb295c6846b9a 100644 (file)
@@ -1,4 +1,4 @@
-$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.2 2003/09/02 03:29:01 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/access/hash/README,v 1.3 2003/09/04 22:06:27 tgl Exp $
 
 This directory contains an implementation of hash indexing for Postgres.
 
@@ -229,8 +229,8 @@ existing bucket in two, thereby lowering the fill ratio:
        check split still needed
        if split not needed anymore, drop locks and exit
        decide which bucket to split
-       Attempt to X-lock new bucket number (shouldn't fail, but...)
        Attempt to X-lock old bucket number (definitely could fail)
+       Attempt to X-lock new bucket number (shouldn't fail, but...)
        if above fail, drop locks and exit
        update meta page to reflect new number of buckets
        write/release meta page
@@ -261,12 +261,6 @@ not be overfull and split attempts will stop.  (We could make a successful
 splitter loop to see if the index is still overfull, but it seems better to
 distribute the split overhead across successive insertions.)
 
-It may be wise to make the initial exclusive-lock-page-zero operation a
-conditional one as well, although the odds of a deadlock failure are quite
-low.  (AFAICS it could only deadlock against a VACUUM operation that is
-trying to X-lock a bucket that the current process has a stopped indexscan
-in.)
-
 A problem is that if a split fails partway through (eg due to insufficient
 disk space) the index is left corrupt.  The probability of that could be
 made quite low if we grab a free page or two before we update the meta
index 7e30754c88f7d25408a623dcc45d805773afb2da..190c95e2c85919943d59bb1acef5cb4ee077cd2a 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.67 2003/09/02 18:13:29 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.68 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *       This file contains only the public interface routines.
@@ -27,9 +27,6 @@
 #include "miscadmin.h"
 
 
-bool           BuildingHash = false;
-
-
 /* Working state for hashbuild and its callback */
 typedef struct
 {
@@ -61,9 +58,6 @@ hashbuild(PG_FUNCTION_ARGS)
        double          reltuples;
        HashBuildState buildstate;
 
-       /* set flag to disable locking */
-       BuildingHash = true;
-
        /*
         * We expect to be called exactly once for any index relation. If
         * that's not the case, big trouble's what we have.
@@ -82,9 +76,6 @@ hashbuild(PG_FUNCTION_ARGS)
        reltuples = IndexBuildHeapScan(heap, index, indexInfo,
                                                                hashbuildCallback, (void *) &buildstate);
 
-       /* all done */
-       BuildingHash = false;
-
        /*
         * Since we just counted the tuples in the heap, we update its stats
         * in pg_class to guarantee that the planner takes advantage of the
@@ -212,10 +203,18 @@ hashgettuple(PG_FUNCTION_ARGS)
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
        ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
+       Relation        rel = scan->indexRelation;
        Page            page;
        OffsetNumber offnum;
        bool            res;
 
+       /*
+        * We hold pin but not lock on current buffer while outside the hash AM.
+        * Reacquire the read lock here.
+        */
+       if (BufferIsValid(so->hashso_curbuf))
+               _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
+
        /*
         * If we've already initialized this scan, we can just advance it in
         * the appropriate direction.  If we haven't done so yet, we call a
@@ -267,6 +266,10 @@ hashgettuple(PG_FUNCTION_ARGS)
                }
        }
 
+       /* Release read lock on current buffer, but keep it pinned */
+       if (BufferIsValid(so->hashso_curbuf))
+               _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
+
        PG_RETURN_BOOL(res);
 }
 
@@ -285,6 +288,8 @@ hashbeginscan(PG_FUNCTION_ARGS)
 
        scan = RelationGetIndexScan(rel, keysz, scankey);
        so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
+       so->hashso_bucket_valid = false;
+       so->hashso_bucket_blkno = 0;
        so->hashso_curbuf = so->hashso_mrkbuf = InvalidBuffer;
        scan->opaque = so;
 
@@ -303,28 +308,38 @@ hashrescan(PG_FUNCTION_ARGS)
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
        ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(1);
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
-       ItemPointer iptr;
+       Relation        rel = scan->indexRelation;
 
-       /* we hold a read lock on the current page in the scan */
-       if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
+       /* if we are called from beginscan, so is still NULL */
+       if (so)
        {
-               _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
+               /* release any pins we still hold */
+               if (BufferIsValid(so->hashso_curbuf))
+                       _hash_dropbuf(rel, so->hashso_curbuf);
                so->hashso_curbuf = InvalidBuffer;
-               ItemPointerSetInvalid(iptr);
-       }
-       if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
-       {
-               _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
+
+               if (BufferIsValid(so->hashso_mrkbuf))
+                       _hash_dropbuf(rel, so->hashso_mrkbuf);
                so->hashso_mrkbuf = InvalidBuffer;
-               ItemPointerSetInvalid(iptr);
+
+               /* release lock on bucket, too */
+               if (so->hashso_bucket_blkno)
+                       _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
+               so->hashso_bucket_blkno = 0;
        }
 
+       /* set positions invalid (this will cause _hash_first call) */
+       ItemPointerSetInvalid(&(scan->currentItemData));
+       ItemPointerSetInvalid(&(scan->currentMarkData));
+
        /* Update scan key, if a new one is given */
        if (scankey && scan->numberOfKeys > 0)
        {
                memmove(scan->keyData,
                                scankey,
                                scan->numberOfKeys * sizeof(ScanKeyData));
+               if (so)
+                       so->hashso_bucket_valid = false;
        }
 
        PG_RETURN_VOID();
@@ -337,32 +352,32 @@ Datum
 hashendscan(PG_FUNCTION_ARGS)
 {
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
-       ItemPointer iptr;
-       HashScanOpaque so;
+       HashScanOpaque so = (HashScanOpaque) scan->opaque;
+       Relation        rel = scan->indexRelation;
 
-       so = (HashScanOpaque) scan->opaque;
+       /* don't need scan registered anymore */
+       _hash_dropscan(scan);
 
-       /* release any locks we still hold */
-       if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
-       {
-               _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
-               so->hashso_curbuf = InvalidBuffer;
-               ItemPointerSetInvalid(iptr);
-       }
+       /* release any pins we still hold */
+       if (BufferIsValid(so->hashso_curbuf))
+               _hash_dropbuf(rel, so->hashso_curbuf);
+       so->hashso_curbuf = InvalidBuffer;
 
-       if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
-       {
-               if (BufferIsValid(so->hashso_mrkbuf))
-                       _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
-               so->hashso_mrkbuf = InvalidBuffer;
-               ItemPointerSetInvalid(iptr);
-       }
+       if (BufferIsValid(so->hashso_mrkbuf))
+               _hash_dropbuf(rel, so->hashso_mrkbuf);
+       so->hashso_mrkbuf = InvalidBuffer;
 
-       /* don't need scan registered anymore */
-       _hash_dropscan(scan);
+       /* release lock on bucket, too */
+       if (so->hashso_bucket_blkno)
+               _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
+       so->hashso_bucket_blkno = 0;
 
        /* be tidy */
-       pfree(scan->opaque);
+       ItemPointerSetInvalid(&(scan->currentItemData));
+       ItemPointerSetInvalid(&(scan->currentMarkData));
+
+       pfree(so);
+       scan->opaque = NULL;
 
        PG_RETURN_VOID();
 }
@@ -374,25 +389,21 @@ Datum
 hashmarkpos(PG_FUNCTION_ARGS)
 {
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
-       ItemPointer iptr;
-       HashScanOpaque so;
-
-       so = (HashScanOpaque) scan->opaque;
+       HashScanOpaque so = (HashScanOpaque) scan->opaque;
+       Relation        rel = scan->indexRelation;
 
-       /* release lock on old marked data, if any */
-       if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
-       {
-               _hash_relbuf(scan->indexRelation, so->hashso_mrkbuf, HASH_READ);
-               so->hashso_mrkbuf = InvalidBuffer;
-               ItemPointerSetInvalid(iptr);
-       }
+       /* release pin on old marked data, if any */
+       if (BufferIsValid(so->hashso_mrkbuf))
+               _hash_dropbuf(rel, so->hashso_mrkbuf);
+       so->hashso_mrkbuf = InvalidBuffer;
+       ItemPointerSetInvalid(&(scan->currentMarkData));
 
-       /* bump lock on currentItemData and copy to currentMarkData */
+       /* bump pin count on currentItemData and copy to currentMarkData */
        if (ItemPointerIsValid(&(scan->currentItemData)))
        {
-               so->hashso_mrkbuf = _hash_getbuf(scan->indexRelation,
+               so->hashso_mrkbuf = _hash_getbuf(rel,
                                                                 BufferGetBlockNumber(so->hashso_curbuf),
-                                                                                HASH_READ);
+                                                                                HASH_NOLOCK);
                scan->currentMarkData = scan->currentItemData;
        }
 
@@ -406,26 +417,21 @@ Datum
 hashrestrpos(PG_FUNCTION_ARGS)
 {
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
-       ItemPointer iptr;
-       HashScanOpaque so;
+       HashScanOpaque so = (HashScanOpaque) scan->opaque;
+       Relation        rel = scan->indexRelation;
 
-       so = (HashScanOpaque) scan->opaque;
+       /* release pin on current data, if any */
+       if (BufferIsValid(so->hashso_curbuf))
+               _hash_dropbuf(rel, so->hashso_curbuf);
+       so->hashso_curbuf = InvalidBuffer;
+       ItemPointerSetInvalid(&(scan->currentItemData));
 
-       /* release lock on current data, if any */
-       if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
-       {
-               _hash_relbuf(scan->indexRelation, so->hashso_curbuf, HASH_READ);
-               so->hashso_curbuf = InvalidBuffer;
-               ItemPointerSetInvalid(iptr);
-       }
-
-       /* bump lock on currentMarkData and copy to currentItemData */
+       /* bump pin count on currentMarkData and copy to currentItemData */
        if (ItemPointerIsValid(&(scan->currentMarkData)))
        {
-               so->hashso_curbuf = _hash_getbuf(scan->indexRelation,
+               so->hashso_curbuf = _hash_getbuf(rel,
                                                                 BufferGetBlockNumber(so->hashso_mrkbuf),
-                                                                                HASH_READ);
-
+                                                                                HASH_NOLOCK);
                scan->currentItemData = scan->currentMarkData;
        }
 
@@ -474,7 +480,7 @@ hashbulkdelete(PG_FUNCTION_ARGS)
        orig_maxbucket = metap->hashm_maxbucket;
        orig_ntuples = metap->hashm_ntuples;
        memcpy(&local_metapage, metap, sizeof(local_metapage));
-       _hash_relbuf(rel, metabuf, HASH_READ);
+       _hash_relbuf(rel, metabuf);
 
        /* Scan the buckets that we know exist */
        cur_bucket = 0;
@@ -490,7 +496,12 @@ loop_top:
                /* Get address of bucket's start page */
                bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
 
-               /* XXX lock bucket here */
+               /* Exclusive-lock the bucket so we can shrink it */
+               _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
+
+               /* Shouldn't have any active scans locally, either */
+               if (_hash_has_active_scan(rel, cur_bucket))
+                       elog(ERROR, "hash index has active scan during VACUUM");
 
                /* Scan each page in bucket */
                blkno = bucket_blkno;
@@ -522,13 +533,6 @@ loop_top:
                                htup = &(hitem->hash_itup.t_tid);
                                if (callback(htup, callback_state))
                                {
-                                       ItemPointerData indextup;
-
-                                       /* adjust any active scans that will be affected */
-                                       /* (this should be unnecessary) */
-                                       ItemPointerSet(&indextup, blkno, offno);
-                                       _hash_adjscans(rel, &indextup);
-
                                        /* delete the item from the page */
                                        PageIndexTupleDelete(page, offno);
                                        bucket_dirty = page_dirty = true;
@@ -547,24 +551,22 @@ loop_top:
                        }
 
                        /*
-                        * Write or free page if needed, advance to next page.  We want
-                        * to preserve the invariant that overflow pages are nonempty.
+                        * Write page if needed, advance to next page.
                         */
                        blkno = opaque->hasho_nextblkno;
 
-                       if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
-                               _hash_freeovflpage(rel, buf);
-                       else if (page_dirty)
+                       if (page_dirty)
                                _hash_wrtbuf(rel, buf);
                        else
-                               _hash_relbuf(rel, buf, HASH_WRITE);
+                               _hash_relbuf(rel, buf);
                }
 
                /* If we deleted anything, try to compact free space */
                if (bucket_dirty)
                        _hash_squeezebucket(rel, cur_bucket, bucket_blkno);
 
-               /* XXX unlock bucket here */
+               /* Release bucket lock */
+               _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
 
                /* Advance to next bucket */
                cur_bucket++;
@@ -580,7 +582,7 @@ loop_top:
                /* There's been a split, so process the additional bucket(s) */
                cur_maxbucket = metap->hashm_maxbucket;
                memcpy(&local_metapage, metap, sizeof(local_metapage));
-               _hash_relbuf(rel, metabuf, HASH_WRITE);
+               _hash_relbuf(rel, metabuf);
                goto loop_top;
        }
 
index 20cdcabfaa4083414916168744235760e292ddf6..00b3d60b28c11df69b147f074b169ebdf148e08f 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.29 2003/09/02 18:13:30 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.30 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "storage/lmgr.h"
+
+
+static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
+                                                                  Size itemsize, HashItem hitem);
 
-static InsertIndexResult _hash_insertonpg(Relation rel, Buffer buf, int keysz, ScanKey scankey, HashItem hitem, Buffer metabuf);
-static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, HashItem hitem);
 
 /*
  *     _hash_doinsert() -- Handle insertion of a single HashItem in the table.
  *
  *             This routine is called by the public interface routines, hashbuild
- *             and hashinsert.  By here, hashitem is filled in, and has a unique
- *             (xid, seqno) pair. The datum to be used as a "key" is in the
- *             hashitem.
+ *             and hashinsert.  By here, hashitem is completely filled in.
+ *             The datum to be used as a "key" is in the hashitem.
  */
 InsertIndexResult
 _hash_doinsert(Relation rel, HashItem hitem)
 {
        Buffer          buf;
        Buffer          metabuf;
-       BlockNumber blkno;
        HashMetaPage metap;
        IndexTuple      itup;
+       BlockNumber itup_blkno;
+       OffsetNumber itup_off;
        InsertIndexResult res;
-       ScanKey         itup_scankey;
-       int                     natts;
+       BlockNumber blkno;
        Page            page;
-
-       metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
-       metap = (HashMetaPage) BufferGetPage(metabuf);
-       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
-
-       /* we need a scan key to do our search, so build one */
-       itup = &(hitem->hash_itup);
-       if ((natts = rel->rd_rel->relnatts) != 1)
-               elog(ERROR, "Hash indexes support only one index key");
-       itup_scankey = _hash_mkscankey(rel, itup);
+       HashPageOpaque pageopaque;
+       Size            itemsz;
+       bool            do_expand;
+       uint32          hashkey;
+       Bucket          bucket;
+       Datum           datum;
+       bool            isnull;
 
        /*
-        * find the first page in the bucket chain containing this key and
-        * place it in buf.  _hash_search obtains a read lock for us.
+        * Compute the hash key for the item.  We do this first so as not to
+        * need to hold any locks while running the hash function.
         */
-       _hash_search(rel, natts, itup_scankey, &buf, metap);
-       page = BufferGetPage(buf);
-       _hash_checkpage(rel, page, LH_BUCKET_PAGE);
+       itup = &(hitem->hash_itup);
+       if (rel->rd_rel->relnatts != 1)
+               elog(ERROR, "hash indexes support only one index key");
+       datum = index_getattr(itup, 1, RelationGetDescr(rel), &isnull);
+       Assert(!isnull);
+       hashkey = _hash_datum2hashkey(rel, datum);
+
+       /* compute item size too */
+       itemsz = IndexTupleDSize(hitem->hash_itup)
+               + (sizeof(HashItemData) - sizeof(IndexTupleData));
+
+       itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but
+                                                                * we need to be consistent */
 
        /*
-        * trade in our read lock for a write lock so that we can do the
-        * insertion.
+        * Acquire shared split lock so we can compute the target bucket
+        * safely (see README).
         */
-       blkno = BufferGetBlockNumber(buf);
-       _hash_relbuf(rel, buf, HASH_READ);
-       buf = _hash_getbuf(rel, blkno, HASH_WRITE);
+       _hash_getlock(rel, 0, HASH_SHARE);
 
+       /* Read the metapage */
+       metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
+       metap = (HashMetaPage) BufferGetPage(metabuf);
+       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
        /*
-        * XXX btree comment (haven't decided what to do in hash): don't think
-        * the bucket can be split while we're reading the metapage.
-        *
-        * If the page was split between the time that we surrendered our read
-        * lock and acquired our write lock, then this page may no longer be
-        * the right place for the key we want to insert.
+        * Check whether the item can fit on a hash page at all. (Eventually,
+        * we ought to try to apply TOAST methods if not.)  Note that at this
+        * point, itemsz doesn't include the ItemId.
         */
+       if (itemsz > HashMaxItemSize((Page) metap))
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+                                errmsg("index tuple size %lu exceeds hash maximum, %lu",
+                                               (unsigned long) itemsz,
+                                               (unsigned long) HashMaxItemSize((Page) metap))));
 
-       /* do the insertion */
-       res = _hash_insertonpg(rel, buf, natts, itup_scankey,
-                                                  hitem, metabuf);
+       /*
+        * Compute the target bucket number, and convert to block number.
+        */
+       bucket = _hash_hashkey2bucket(hashkey,
+                                                                 metap->hashm_maxbucket,
+                                                                 metap->hashm_highmask,
+                                                                 metap->hashm_lowmask);
 
-       /* be tidy */
-       _hash_freeskey(itup_scankey);
+       blkno = BUCKET_TO_BLKNO(metap, bucket);
 
-       return res;
-}
+       /* release lock on metapage, but keep pin since we'll need it again */
+       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
 
-/*
- *     _hash_insertonpg() -- Insert a tuple on a particular page in the table.
- *
- *             This recursive procedure does the following things:
- *
- *                     +  if necessary, splits the target page.
- *                     +  inserts the tuple.
- *
- *             On entry, we must have the right buffer on which to do the
- *             insertion, and the buffer must be pinned and locked.  On return,
- *             we will have dropped both the pin and the write lock on the buffer.
- *
- */
-static InsertIndexResult
-_hash_insertonpg(Relation rel,
-                                Buffer buf,
-                                int keysz,
-                                ScanKey scankey,
-                                HashItem hitem,
-                                Buffer metabuf)
-{
-       InsertIndexResult res;
-       Page            page;
-       BlockNumber itup_blkno;
-       OffsetNumber itup_off;
-       Size            itemsz;
-       HashPageOpaque pageopaque;
-       bool            do_expand = false;
-       Buffer          ovflbuf;
-       HashMetaPage metap;
-       Bucket          bucket;
+       /*
+        * Acquire share lock on target bucket; then we can release split lock.
+        */
+       _hash_getlock(rel, blkno, HASH_SHARE);
 
-       metap = (HashMetaPage) BufferGetPage(metabuf);
-       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
+       _hash_droplock(rel, 0, HASH_SHARE);
 
+       /* Fetch the primary bucket page for the bucket */
+       buf = _hash_getbuf(rel, blkno, HASH_WRITE);
        page = BufferGetPage(buf);
-       _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+       _hash_checkpage(rel, page, LH_BUCKET_PAGE);
        pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
-       bucket = pageopaque->hasho_bucket;
-
-       itemsz = IndexTupleDSize(hitem->hash_itup)
-               + (sizeof(HashItemData) - sizeof(IndexTupleData));
-       itemsz = MAXALIGN(itemsz);
+       Assert(pageopaque->hasho_bucket == bucket);
 
+       /* Do the insertion */
        while (PageGetFreeSpace(page) < itemsz)
        {
                /*
                 * no space on this page; check for an overflow page
                 */
-               if (BlockNumberIsValid(pageopaque->hasho_nextblkno))
+               BlockNumber     nextblkno = pageopaque->hasho_nextblkno;
+
+               if (BlockNumberIsValid(nextblkno))
                {
                        /*
                         * ovfl page exists; go get it.  if it doesn't have room,
                         * we'll find out next pass through the loop test above.
                         */
-                       ovflbuf = _hash_getbuf(rel, pageopaque->hasho_nextblkno,
-                                                                  HASH_WRITE);
-                       _hash_relbuf(rel, buf, HASH_WRITE);
-                       buf = ovflbuf;
+                       _hash_relbuf(rel, buf);
+                       buf = _hash_getbuf(rel, nextblkno, HASH_WRITE);
                        page = BufferGetPage(buf);
                }
                else
@@ -154,65 +142,72 @@ _hash_insertonpg(Relation rel,
                         * we're at the end of the bucket chain and we haven't found a
                         * page with enough room.  allocate a new overflow page.
                         */
-                       do_expand = true;
-                       ovflbuf = _hash_addovflpage(rel, metabuf, buf);
-                       _hash_relbuf(rel, buf, HASH_WRITE);
-                       buf = ovflbuf;
+
+                       /* release our write lock without modifying buffer */
+                       _hash_chgbufaccess(rel, buf, HASH_READ, HASH_NOLOCK);
+
+                       /* chain to a new overflow page */
+                       buf = _hash_addovflpage(rel, metabuf, buf);
                        page = BufferGetPage(buf);
 
-                       if (PageGetFreeSpace(page) < itemsz)
-                       {
-                               /* it doesn't fit on an empty page -- give up */
-                               elog(ERROR, "hash item too large");
-                       }
+                       /* should fit now, given test above */
+                       Assert(PageGetFreeSpace(page) >= itemsz);
                }
                _hash_checkpage(rel, page, LH_OVERFLOW_PAGE);
                pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
                Assert(pageopaque->hasho_bucket == bucket);
        }
 
-       itup_off = _hash_pgaddtup(rel, buf, keysz, scankey, itemsz, hitem);
+       /* found page with enough space, so add the item here */
+       itup_off = _hash_pgaddtup(rel, buf, itemsz, hitem);
        itup_blkno = BufferGetBlockNumber(buf);
 
-       /* by here, the new tuple is inserted */
-       res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
+       /* write and release the modified page */
+       _hash_wrtbuf(rel, buf);
 
-       ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
+       /* We can drop the bucket lock now */
+       _hash_droplock(rel, blkno, HASH_SHARE);
 
-       if (res != NULL)
-       {
-               /*
-                * Increment the number of keys in the table. We switch lock
-                * access type just for a moment to allow greater accessibility to
-                * the metapage.
-                */
-               _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
-               metap->hashm_ntuples += 1;
-               _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
-       }
+       /*
+        * Write-lock the metapage so we can increment the tuple count.
+        * After incrementing it, check to see if it's time for a split.
+        */
+       _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
 
-       _hash_wrtbuf(rel, buf);
+       metap->hashm_ntuples += 1;
 
-       if (do_expand ||
-               (metap->hashm_ntuples / (metap->hashm_maxbucket + 1))
-               > (double) metap->hashm_ffactor)
+       /* Make sure this stays in sync with _hash_expandtable() */
+       do_expand = metap->hashm_ntuples >
+               (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
+
+       /* Write out the metapage and drop lock, but keep pin */
+       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
+
+       /* Attempt to split if a split is needed */
+       if (do_expand)
                _hash_expandtable(rel, metabuf);
-       _hash_relbuf(rel, metabuf, HASH_READ);
+
+       /* Finally drop our pin on the metapage */
+       _hash_dropbuf(rel, metabuf);
+
+       /* Create the return data structure */
+       res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
+
+       ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
+
        return res;
 }
 
 /*
  *     _hash_pgaddtup() -- add a tuple to a particular page in the index.
  *
- *             This routine adds the tuple to the page as requested, and keeps the
- *             write lock and reference associated with the page's buffer.  It is
- *             an error to call pgaddtup() without a write lock and reference.
+ *             This routine adds the tuple to the page as requested; it does
+ *             not write out the page.  It is an error to call pgaddtup() without
+ *             a write lock and pin.
  */
 static OffsetNumber
 _hash_pgaddtup(Relation rel,
                           Buffer buf,
-                          int keysz,
-                          ScanKey itup_scankey,
                           Size itemsize,
                           HashItem hitem)
 {
@@ -228,8 +223,5 @@ _hash_pgaddtup(Relation rel,
                elog(ERROR, "failed to add index item to \"%s\"",
                         RelationGetRelationName(rel));
 
-       /* write the buffer, but hold our lock */
-       _hash_wrtnorelbuf(buf);
-
        return itup_off;
 }
index 388a711832a9814c03a0450f847bdcaeab1ed8cc..fe5e5e95958704aa6fb219530c33bcbc5e0026f9 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.40 2003/09/02 18:13:30 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.41 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *       Overflow pages look like ordinary relation pages.
@@ -77,39 +77,68 @@ blkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
 /*
  *     _hash_addovflpage
  *
- *     Add an overflow page to the page currently pointed to by the buffer
- *     argument 'buf'.
+ *     Add an overflow page to the bucket whose last page is pointed to by 'buf'.
  *
- *     metabuf has a read lock upon entering the function; buf has a
- *     write lock.  The same is true on exit.  The returned overflow page
- *     is write-locked.
+ *     On entry, the caller must hold a pin but no lock on 'buf'.  The pin is
+ *     dropped before exiting (we assume the caller is not interested in 'buf'
+ *     anymore).  The returned overflow page will be pinned and write-locked;
+ *     it is guaranteed to be empty.
+ *
+ *     The caller must hold a pin, but no lock, on the metapage buffer.
+ *     That buffer is returned in the same state.
+ *
+ *     The caller must hold at least share lock on the bucket, to ensure that
+ *     no one else tries to compact the bucket meanwhile.  This guarantees that
+ *     'buf' won't stop being part of the bucket while it's unlocked.
+ *
+ * NB: since this could be executed concurrently by multiple processes,
+ * one should not assume that the returned overflow page will be the
+ * immediate successor of the originally passed 'buf'.  Additional overflow
+ * pages might have been added to the bucket chain in between.
  */
 Buffer
 _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
 {
        BlockNumber ovflblkno;
        Buffer          ovflbuf;
-       HashMetaPage metap;
-       HashPageOpaque ovflopaque;
-       HashPageOpaque pageopaque;
        Page            page;
        Page            ovflpage;
-
-       /* this had better be the last page in a bucket chain */
-       page = BufferGetPage(buf);
-       _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
-       pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
-       Assert(!BlockNumberIsValid(pageopaque->hasho_nextblkno));
-
-       metap = (HashMetaPage) BufferGetPage(metabuf);
-       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
+       HashPageOpaque pageopaque;
+       HashPageOpaque ovflopaque;
 
        /* allocate an empty overflow page */
        ovflblkno = _hash_getovflpage(rel, metabuf);
+
+       /* lock the overflow page */
        ovflbuf = _hash_getbuf(rel, ovflblkno, HASH_WRITE);
        ovflpage = BufferGetPage(ovflbuf);
 
-       /* initialize the new overflow page */
+       /*
+        * Write-lock the tail page.  It is okay to hold two buffer locks here
+        * since there cannot be anyone else contending for access to ovflbuf.
+        */
+       _hash_chgbufaccess(rel, buf, HASH_NOLOCK, HASH_WRITE);
+
+       /* loop to find current tail page, in case someone else inserted too */
+       for (;;)
+       {
+               BlockNumber nextblkno;
+
+               page = BufferGetPage(buf);
+               _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+               pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+               nextblkno = pageopaque->hasho_nextblkno;
+
+               if (!BlockNumberIsValid(nextblkno))
+                       break;
+
+               /* we assume we do not need to write the unmodified page */
+               _hash_relbuf(rel, buf);
+
+               buf = _hash_getbuf(rel, nextblkno, HASH_WRITE);
+       }
+
+       /* now that we have correct backlink, initialize new overflow page */
        _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
        ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
        ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
@@ -117,11 +146,12 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
        ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
        ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
        ovflopaque->hasho_filler = HASHO_FILL;
-       _hash_wrtnorelbuf(ovflbuf);
+       _hash_wrtnorelbuf(rel, ovflbuf);
 
        /* logically chain overflow page to previous page */
        pageopaque->hasho_nextblkno = ovflblkno;
-       _hash_wrtnorelbuf(buf);
+       _hash_wrtbuf(rel, buf);
+
        return ovflbuf;
 }
 
@@ -130,9 +160,8 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
  *
  *     Find an available overflow page and return its block number.
  *
- *     When we enter this function, we have a read lock on metabuf which
- *     we change to a write lock immediately. Before exiting, the write lock
- *     is exchanged for a read lock.
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.
  */
 static BlockNumber
 _hash_getovflpage(Relation rel, Buffer metabuf)
@@ -140,6 +169,7 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
        HashMetaPage metap;
        Buffer          mapbuf = 0;
        BlockNumber blkno;
+       uint32          orig_firstfree;
        uint32          splitnum;
        uint32     *freep = NULL;
        uint32          max_ovflpg;
@@ -150,51 +180,66 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
        uint32          i,
                                j;
 
-       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
-       metap = (HashMetaPage) BufferGetPage(metabuf);
-       splitnum = metap->hashm_ovflpoint;
+       /* Get exclusive lock on the meta page */
+       _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
 
-       /* end search with the last existing overflow page */
-       max_ovflpg = metap->hashm_spares[splitnum] - 1;
-       last_page = max_ovflpg >> BMPG_SHIFT(metap);
-       last_bit = max_ovflpg & BMPG_MASK(metap);
+       metap = (HashMetaPage) BufferGetPage(metabuf);
+       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
        /* start search at hashm_firstfree */
-       first_page = metap->hashm_firstfree >> BMPG_SHIFT(metap);
-       bit = metap->hashm_firstfree & BMPG_MASK(metap);
+       orig_firstfree = metap->hashm_firstfree;
+       first_page = orig_firstfree >> BMPG_SHIFT(metap);
+       bit = orig_firstfree & BMPG_MASK(metap);
+       i = first_page;
        j = bit / BITS_PER_MAP;
        bit &= ~(BITS_PER_MAP - 1);
 
-       for (i = first_page; i <= last_page; i++)
+       /* outer loop iterates once per bitmap page */
+       for (;;)
        {
                BlockNumber     mapblkno;
                Page            mappage;
                uint32          last_inpage;
 
-               mapblkno = metap->hashm_mapp[i];
-               mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE);
-               mappage = BufferGetPage(mapbuf);
-               _hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
-               freep = HashPageGetBitmap(mappage);
+               /* want to end search with the last existing overflow page */
+               splitnum = metap->hashm_ovflpoint;
+               max_ovflpg = metap->hashm_spares[splitnum] - 1;
+               last_page = max_ovflpg >> BMPG_SHIFT(metap);
+               last_bit = max_ovflpg & BMPG_MASK(metap);
 
-               if (i != first_page)
-               {
-                       bit = 0;
-                       j = 0;
-               }
+               if (i > last_page)
+                       break;
+
+               Assert(i < metap->hashm_nmaps);
+               mapblkno = metap->hashm_mapp[i];
 
                if (i == last_page)
                        last_inpage = last_bit;
                else
                        last_inpage = BMPGSZ_BIT(metap) - 1;
 
+               /* Release exclusive lock on metapage while reading bitmap page */
+               _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+               mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE);
+               mappage = BufferGetPage(mapbuf);
+               _hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
+               freep = HashPageGetBitmap(mappage);
+
                for (; bit <= last_inpage; j++, bit += BITS_PER_MAP)
                {
                        if (freep[j] != ALL_SET)
                                goto found;
                }
 
-               _hash_relbuf(rel, mapbuf, HASH_WRITE);
+               /* No free space here, try to advance to next map page */
+               _hash_relbuf(rel, mapbuf);
+               i++;
+               j = 0;                                  /* scan from start of next map page */
+               bit = 0;
+
+               /* Reacquire exclusive lock on the meta page */
+               _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
        }
 
        /* No Free Page Found - have to allocate a new page */
@@ -225,13 +270,19 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
                 */
        }
 
-       /* mark new page as first free so we don't search much next time */
-       metap->hashm_firstfree = bit;
-
        /* Calculate address of the new overflow page */
        blkno = bitno_to_blkno(metap, bit);
 
-       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
+       /*
+        * Adjust hashm_firstfree to avoid redundant searches.  But don't
+        * risk changing it if someone moved it while we were searching
+        * bitmap pages.
+        */
+       if (metap->hashm_firstfree == orig_firstfree)
+               metap->hashm_firstfree = bit + 1;
+
+       /* Write updated metapage and release lock, but not pin */
+       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
 
        return blkno;
 
@@ -239,20 +290,36 @@ found:
        /* convert bit to bit number within page */
        bit += _hash_firstfreebit(freep[j]);
 
-       /* mark page "in use" */
+       /* mark page "in use" in the bitmap */
        SETBIT(freep, bit);
        _hash_wrtbuf(rel, mapbuf);
 
+       /* Reacquire exclusive lock on the meta page */
+       _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
+
        /* convert bit to absolute bit number */
        bit += (i << BMPG_SHIFT(metap));
 
-       /* adjust hashm_firstfree to avoid redundant searches */
-       if (bit > metap->hashm_firstfree)
-               metap->hashm_firstfree = bit;
-
+       /* Calculate address of the new overflow page */
        blkno = bitno_to_blkno(metap, bit);
 
-       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
+       /*
+        * Adjust hashm_firstfree to avoid redundant searches.  But don't
+        * risk changing it if someone moved it while we were searching
+        * bitmap pages.
+        */
+       if (metap->hashm_firstfree == orig_firstfree)
+       {
+               metap->hashm_firstfree = bit + 1;
+
+               /* Write updated metapage and release lock, but not pin */
+               _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
+       }
+       else
+       {
+               /* We didn't change the metapage, so no need to write */
+               _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+       }
 
        return blkno;
 }
@@ -275,7 +342,10 @@ _hash_firstfreebit(uint32 map)
                        return i;
                mask <<= 1;
        }
-       return i;
+
+       elog(ERROR, "firstfreebit found no free bit");
+
+       return 0;                                       /* keep compiler quiet */
 }
 
 /*
@@ -287,7 +357,9 @@ _hash_firstfreebit(uint32 map)
  *     Returns the block number of the page that followed the given page
  *     in the bucket, or InvalidBlockNumber if no following page.
  *
- *     NB: caller must not hold lock on metapage.
+ *     NB: caller must not hold lock on metapage, nor on either page that's
+ *     adjacent in the bucket chain.  The caller had better hold exclusive lock
+ *     on the bucket, too.
  */
 BlockNumber
 _hash_freeovflpage(Relation rel, Buffer ovflbuf)
@@ -308,10 +380,7 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
                                bitmapbit;
        Bucket          bucket;
 
-       metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
-       metap = (HashMetaPage) BufferGetPage(metabuf);
-       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
-
+       /* Get information from the doomed page */
        ovflblkno = BufferGetBlockNumber(ovflbuf);      
        ovflpage = BufferGetPage(ovflbuf);
        _hash_checkpage(rel, ovflpage, LH_OVERFLOW_PAGE);
@@ -319,17 +388,16 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
        nextblkno = ovflopaque->hasho_nextblkno;
        prevblkno = ovflopaque->hasho_prevblkno;
        bucket = ovflopaque->hasho_bucket;
+
+       /* Zero the page for debugging's sake; then write and release it */
        MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
        _hash_wrtbuf(rel, ovflbuf);
 
        /*
-        * fix up the bucket chain.  this is a doubly-linked list, so we must
+        * Fix up the bucket chain.  this is a doubly-linked list, so we must
         * fix up the bucket chain members behind and ahead of the overflow
-        * page being deleted.
-        *
-        * XXX this should look like: - lock prev/next - modify/write prev/next
-        * (how to do write ordering with a doubly-linked list?) - unlock
-        * prev/next
+        * page being deleted.  No concurrency issues since we hold exclusive
+        * lock on the entire bucket.
         */
        if (BlockNumberIsValid(prevblkno))
        {
@@ -354,9 +422,12 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
                _hash_wrtbuf(rel, nextbuf);
        }
 
-       /*
-        * Clear the bitmap bit to indicate that this overflow page is free.
-        */
+       /* Read the metapage so we can determine which bitmap page to use */
+       metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
+       metap = (HashMetaPage) BufferGetPage(metabuf);
+       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
+
+       /* Identify which bit to set */
        ovflbitno = blkno_to_bitno(metap, ovflblkno);
 
        bitmappage = ovflbitno >> BMPG_SHIFT(metap);
@@ -366,18 +437,32 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
                elog(ERROR, "invalid overflow bit number %u", ovflbitno);
        blkno = metap->hashm_mapp[bitmappage];
 
+       /* Release metapage lock while we access the bitmap page */
+       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+       /* Clear the bitmap bit to indicate that this overflow page is free */
        mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE);
        mappage = BufferGetPage(mapbuf);
        _hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
        freep = HashPageGetBitmap(mappage);
+       Assert(ISSET(freep, bitmapbit));
        CLRBIT(freep, bitmapbit);
        _hash_wrtbuf(rel, mapbuf);
 
+       /* Get write-lock on metapage to update firstfree */
+       _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
+
        /* if this is now the first free page, update hashm_firstfree */
        if (ovflbitno < metap->hashm_firstfree)
+       {
                metap->hashm_firstfree = ovflbitno;
-
-       _hash_wrtbuf(rel, metabuf);
+               _hash_wrtbuf(rel, metabuf);
+       }
+       else
+       {
+               /* no need to change metapage */
+               _hash_relbuf(rel, metabuf);
+       }
 
        return nextblkno;
 }
@@ -401,9 +486,18 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
        HashPageOpaque op;
        uint32     *freep;
 
-       /* initialize the page */
+       /*
+        * It is okay to write-lock the new bitmap page while holding metapage
+        * write lock, because no one else could be contending for the new page.
+        *
+        * There is some loss of concurrency in possibly doing I/O for the new
+        * page while holding the metapage lock, but this path is taken so
+        * seldom that it's not worth worrying about.
+        */
        buf = _hash_getbuf(rel, blkno, HASH_WRITE);
        pg = BufferGetPage(buf);
+
+       /* initialize the page */
        _hash_pageinit(pg, BufferGetPageSize(buf));
        op = (HashPageOpaque) PageGetSpecialPointer(pg);
        op->hasho_prevblkno = InvalidBlockNumber;
@@ -416,7 +510,7 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
        freep = HashPageGetBitmap(pg);
        MemSet((char *) freep, 0xFF, BMPGSZ_BYTE(metap));
 
-       /* write out the new bitmap page (releasing write lock) */
+       /* write out the new bitmap page (releasing write lock and pin) */
        _hash_wrtbuf(rel, buf);
 
        /* add the new bitmap page to the metapage's list of bitmaps */
@@ -445,7 +539,14 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
  *     the write page works forward; the procedure terminates when the
  *     read page and write page are the same page.
  *
- *     Caller must hold exclusive lock on the target bucket.
+ *     At completion of this procedure, it is guaranteed that all pages in
+ *     the bucket are nonempty, unless the bucket is totally empty (in
+ *     which case all overflow pages will be freed).  The original implementation
+ *     required that to be true on entry as well, but it's a lot easier for
+ *     callers to leave empty overflow pages and let this guy clean it up.
+ *
+ *     Caller must hold exclusive lock on the target bucket.  This allows
+ *     us to safely lock multiple pages in the bucket.
  */
 void
 _hash_squeezebucket(Relation rel,
@@ -479,7 +580,7 @@ _hash_squeezebucket(Relation rel,
         */
        if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
        {
-               _hash_relbuf(rel, wbuf, HASH_WRITE);
+               _hash_relbuf(rel, wbuf);
                return;
        }
 
@@ -492,11 +593,10 @@ _hash_squeezebucket(Relation rel,
        {
                rblkno = ropaque->hasho_nextblkno;
                if (ropaque != wopaque)
-                       _hash_relbuf(rel, rbuf, HASH_WRITE);
+                       _hash_relbuf(rel, rbuf);
                rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
                rpage = BufferGetPage(rbuf);
                _hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
-               Assert(!PageIsEmpty(rpage));
                ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
                Assert(ropaque->hasho_bucket == bucket);
        } while (BlockNumberIsValid(ropaque->hasho_nextblkno));
@@ -507,81 +607,97 @@ _hash_squeezebucket(Relation rel,
        roffnum = FirstOffsetNumber;
        for (;;)
        {
-               hitem = (HashItem) PageGetItem(rpage, PageGetItemId(rpage, roffnum));
-               itemsz = IndexTupleDSize(hitem->hash_itup)
-                       + (sizeof(HashItemData) - sizeof(IndexTupleData));
-               itemsz = MAXALIGN(itemsz);
-
-               /*
-                * walk up the bucket chain, looking for a page big enough for
-                * this item.
-                */
-               while (PageGetFreeSpace(wpage) < itemsz)
+               /* this test is needed in case page is empty on entry */
+               if (roffnum <= PageGetMaxOffsetNumber(rpage))
                {
-                       wblkno = wopaque->hasho_nextblkno;
+                       hitem = (HashItem) PageGetItem(rpage,
+                                                                                  PageGetItemId(rpage, roffnum));
+                       itemsz = IndexTupleDSize(hitem->hash_itup)
+                               + (sizeof(HashItemData) - sizeof(IndexTupleData));
+                       itemsz = MAXALIGN(itemsz);
+
+                       /*
+                        * Walk up the bucket chain, looking for a page big enough for
+                        * this item.  Exit if we reach the read page.
+                        */
+                       while (PageGetFreeSpace(wpage) < itemsz)
+                       {
+                               Assert(!PageIsEmpty(wpage));
 
-                       _hash_wrtbuf(rel, wbuf);
+                               wblkno = wopaque->hasho_nextblkno;
+                               Assert(BlockNumberIsValid(wblkno));
 
-                       if (!BlockNumberIsValid(wblkno) || (rblkno == wblkno))
-                       {
-                               _hash_wrtbuf(rel, rbuf);
-                               /* wbuf is already released */
-                               return;
+                               _hash_wrtbuf(rel, wbuf);
+
+                               if (rblkno == wblkno)
+                               {
+                                       /* wbuf is already released */
+                                       _hash_wrtbuf(rel, rbuf);
+                                       return;
+                               }
+
+                               wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
+                               wpage = BufferGetPage(wbuf);
+                               _hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE);
+                               wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
+                               Assert(wopaque->hasho_bucket == bucket);
                        }
 
-                       wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
-                       wpage = BufferGetPage(wbuf);
-                       _hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE);
-                       Assert(!PageIsEmpty(wpage));
-                       wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
-                       Assert(wopaque->hasho_bucket == bucket);
+                       /*
+                        * we have found room so insert on the "write" page.
+                        */
+                       woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
+                       if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
+                               == InvalidOffsetNumber)
+                               elog(ERROR, "failed to add index item to \"%s\"",
+                                        RelationGetRelationName(rel));
+
+                       /*
+                        * delete the tuple from the "read" page. PageIndexTupleDelete
+                        * repacks the ItemId array, so 'roffnum' will be "advanced" to
+                        * the "next" ItemId.
+                        */
+                       PageIndexTupleDelete(rpage, roffnum);
                }
 
                /*
-                * if we're here, we have found room so insert on the "write"
-                * page.
+                * if the "read" page is now empty because of the deletion (or
+                * because it was empty when we got to it), free it.
+                *
+                * Tricky point here: if our read and write pages are adjacent in the
+                * bucket chain, our write lock on wbuf will conflict with
+                * _hash_freeovflpage's attempt to update the sibling links of the
+                * removed page.  However, in that case we are done anyway, so we can
+                * simply drop the write lock before calling _hash_freeovflpage.
                 */
-               woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage));
-               if (PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED)
-                       == InvalidOffsetNumber)
-                       elog(ERROR, "failed to add index item to \"%s\"",
-                                RelationGetRelationName(rel));
-
-               /*
-                * delete the tuple from the "read" page. PageIndexTupleDelete
-                * repacks the ItemId array, so 'roffnum' will be "advanced" to
-                * the "next" ItemId.
-                */
-               PageIndexTupleDelete(rpage, roffnum);
-               _hash_wrtnorelbuf(rbuf);
-
-               /*
-                * if the "read" page is now empty because of the deletion, free
-                * it.
-                */
-               if (PageIsEmpty(rpage) && (ropaque->hasho_flag & LH_OVERFLOW_PAGE))
+               if (PageIsEmpty(rpage))
                {
                        rblkno = ropaque->hasho_prevblkno;
                        Assert(BlockNumberIsValid(rblkno));
 
-                       /* free this overflow page */
-                       _hash_freeovflpage(rel, rbuf);
-
+                       /* are we freeing the page adjacent to wbuf? */
                        if (rblkno == wblkno)
                        {
-                               /* rbuf is already released */
+                               /* yes, so release wbuf lock first */
                                _hash_wrtbuf(rel, wbuf);
+                               /* free this overflow page (releases rbuf) */
+                               _hash_freeovflpage(rel, rbuf);
+                               /* done */
                                return;
                        }
 
+                       /* free this overflow page, then get the previous one */
+                       _hash_freeovflpage(rel, rbuf);
+
                        rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
                        rpage = BufferGetPage(rbuf);
                        _hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
-                       Assert(!PageIsEmpty(rpage));
                        ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
                        Assert(ropaque->hasho_bucket == bucket);
 
                        roffnum = FirstOffsetNumber;
                }
        }
+
+       /* NOTREACHED */
 }
index 1c16df33cd350b3ac0b59967f11a75b3a5cfd710..5b9d19acf1b9eadee8e19b81955c83f1cd3eaeb4 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.41 2003/09/02 18:13:31 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.42 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *       Postgres hash pages look like ordinary relation pages.  The opaque
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
 #include "access/genam.h"
 #include "access/hash.h"
-#include "miscadmin.h"
 #include "storage/lmgr.h"
+#include "utils/lsyscache.h"
+
+
+static void _hash_splitbucket(Relation rel, Buffer metabuf,
+                                                         Bucket obucket, Bucket nbucket,
+                                                         BlockNumber start_oblkno,
+                                                         BlockNumber start_nblkno,
+                                                         uint32 maxbucket,
+                                                         uint32 highmask, uint32 lowmask);
+
+
+/*
+ * We use high-concurrency locking on hash indexes (see README for an overview
+ * of the locking rules).  There are two cases in which we don't do locking.
+ * One is when the index is newly created in the current transaction.  Since
+ * the creating transaction has not committed, no one else can see the index,
+ * and there's no reason to take locks.  The second case is for temp
+ * relations, which no one else can see either.  (We still take buffer-level
+ * locks, but not lmgr locks.)
+ */
+#define USELOCKING(rel)                (!((rel)->rd_isnew || (rel)->rd_istemp))
 
 
 /*
- *     We use high-concurrency locking on hash indices.  There are two cases in
- *     which we don't do locking.  One is when we're building the index.
- *     Since the creating transaction has not committed, no one can see
- *     the index, and there's no reason to share locks.  The second case
- *     is when we're just starting up the database system.  We use some
- *     special-purpose initialization code in the relation cache manager
- *     (see utils/cache/relcache.c) to allow us to do indexed scans on
- *     the system catalogs before we'd normally be able to.  This happens
- *     before the lock table is fully initialized, so we can't use it.
- *     Strictly speaking, this violates 2pl, but we don't do 2pl on the
- *     system catalogs anyway.
+ * _hash_getlock() -- Acquire an lmgr lock.
  *
- *     Note that our page locks are actual lockmanager locks, not buffer
- *     locks (as are used by btree, for example).      This is a good idea because
- *     the algorithms are not deadlock-free, and we'd better be able to detect
- *     and recover from deadlocks.
+ * 'whichlock' should be zero to acquire the split-control lock, or the
+ * block number of a bucket's primary bucket page to acquire the per-bucket
+ * lock.  (See README for details of the use of these locks.)
  *
- *     Another important difference from btree is that a hash indexscan
- *     retains both a lock and a buffer pin on the current index page
- *     between hashgettuple() calls (btree keeps only a buffer pin).
- *     Because of this, it's safe to do item deletions with only a regular
- *     write lock on a hash page --- there cannot be an indexscan stopped on
- *     the page being deleted, other than an indexscan of our own backend,
- *     which will be taken care of by _hash_adjscans.
+ * 'access' must be HASH_SHARE or HASH_EXCLUSIVE.
  */
-#define USELOCKING             (!BuildingHash && !IsInitProcessingMode())
+void
+_hash_getlock(Relation rel, BlockNumber whichlock, int access)
+{
+       if (USELOCKING(rel))
+               LockPage(rel, whichlock, access);
+}
 
+/*
+ * _hash_try_getlock() -- Acquire an lmgr lock, but only if it's free.
+ *
+ * Same as above except we return FALSE without blocking if lock isn't free.
+ */
+bool
+_hash_try_getlock(Relation rel, BlockNumber whichlock, int access)
+{
+       if (USELOCKING(rel))
+               return ConditionalLockPage(rel, whichlock, access);
+       else
+               return true;
+}
 
-static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
-static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
-static void _hash_splitbucket(Relation rel, Buffer metabuf,
-                                                         Bucket obucket, Bucket nbucket);
+/*
+ * _hash_droplock() -- Release an lmgr lock.
+ */
+void
+_hash_droplock(Relation rel, BlockNumber whichlock, int access)
+{
+       if (USELOCKING(rel))
+               UnlockPage(rel, whichlock, access);
+}
+
+/*
+ *     _hash_getbuf() -- Get a buffer by block number for read or write.
+ *
+ *             'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
+ *
+ *             When this routine returns, the appropriate lock is set on the
+ *             requested buffer and its reference count has been incremented
+ *             (ie, the buffer is "locked and pinned").
+ *
+ *             XXX P_NEW is not used because, unlike the tree structures, we
+ *             need the bucket blocks to be at certain block numbers.  we must
+ *             depend on the caller to call _hash_pageinit on the block if it
+ *             knows that this is a new block.
+ */
+Buffer
+_hash_getbuf(Relation rel, BlockNumber blkno, int access)
+{
+       Buffer          buf;
+
+       if (blkno == P_NEW)
+               elog(ERROR, "hash AM does not use P_NEW");
+
+       buf = ReadBuffer(rel, blkno);
+
+       if (access != HASH_NOLOCK)
+               LockBuffer(buf, access);
+
+       /* ref count and lock type are correct */
+       return buf;
+}
+
+/*
+ *     _hash_relbuf() -- release a locked buffer.
+ *
+ * Lock and pin (refcount) are both dropped.  Note that either read or
+ * write lock can be dropped this way, but if we modified the buffer,
+ * this is NOT the right way to release a write lock.
+ */
+void
+_hash_relbuf(Relation rel, Buffer buf)
+{
+       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+       ReleaseBuffer(buf);
+}
+
+/*
+ *     _hash_dropbuf() -- release an unlocked buffer.
+ *
+ * This is used to unpin a buffer on which we hold no lock.  It is assumed
+ * that the buffer is not dirty.
+ */
+void
+_hash_dropbuf(Relation rel, Buffer buf)
+{
+       ReleaseBuffer(buf);
+}
+
+/*
+ *     _hash_wrtbuf() -- write a hash page to disk.
+ *
+ *             This routine releases the lock held on the buffer and our refcount
+ *             for it.  It is an error to call _hash_wrtbuf() without a write lock
+ *             and a pin on the buffer.
+ *
+ * NOTE: actually, the buffer manager just marks the shared buffer page
+ * dirty here; the real I/O happens later.     This is okay since we are not
+ * relying on write ordering anyway.  The WAL mechanism is responsible for
+ * guaranteeing correctness after a crash.
+ */
+void
+_hash_wrtbuf(Relation rel, Buffer buf)
+{
+       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+       WriteBuffer(buf);
+}
+
+/*
+ *     _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
+ *                                              our reference or lock.
+ *
+ *             It is an error to call _hash_wrtnorelbuf() without a write lock
+ *             and a pin on the buffer.
+ *
+ * See above NOTE.
+ */
+void
+_hash_wrtnorelbuf(Relation rel, Buffer buf)
+{
+       WriteNoReleaseBuffer(buf);
+}
+
+/*
+ * _hash_chgbufaccess() -- Change the lock type on a buffer, without
+ *                     dropping our pin on it.
+ *
+ * from_access and to_access may be HASH_READ, HASH_WRITE, or HASH_NOLOCK,
+ * the last indicating that no buffer-level lock is held or wanted.
+ *
+ * When from_access == HASH_WRITE, we assume the buffer is dirty and tell
+ * bufmgr it must be written out.  If the caller wants to release a write
+ * lock on a page that's not been modified, it's okay to pass from_access
+ * as HASH_READ (a bit ugly, but handy in some places).
+ */
+void
+_hash_chgbufaccess(Relation rel,
+                                  Buffer buf,
+                                  int from_access,
+                                  int to_access)
+{
+       if (from_access != HASH_NOLOCK)
+               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+       if (from_access == HASH_WRITE)
+               WriteNoReleaseBuffer(buf);
+
+       if (to_access != HASH_NOLOCK)
+               LockBuffer(buf, to_access);
+}
 
 
 /*
  *     _hash_metapinit() -- Initialize the metadata page of a hash index,
  *                             the two buckets that we begin with and the initial
  *                             bitmap page.
+ *
+ * We are fairly cavalier about locking here, since we know that no one else
+ * could be accessing this index.  In particular the rule about not holding
+ * multiple buffer locks is ignored.
  */
 void
 _hash_metapinit(Relation rel)
@@ -83,16 +230,31 @@ _hash_metapinit(Relation rel)
        Buffer          metabuf;
        Buffer          buf;
        Page            pg;
+       int32           data_width;
+       int32           item_width;
+       int32           ffactor;
        uint16          i;
 
-       /* can't be sharing this with anyone, now... */
-       if (USELOCKING)
-               LockRelation(rel, AccessExclusiveLock);
-
+       /* safety check */
        if (RelationGetNumberOfBlocks(rel) != 0)
                elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
                         RelationGetRelationName(rel));
 
+       /*
+        * Determine the target fill factor (tuples per bucket) for this index.
+        * The idea is to make the fill factor correspond to pages about 3/4ths
+        * full.  We can compute it exactly if the index datatype is fixed-width,
+        * but for var-width there's some guessing involved.
+        */
+       data_width = get_typavgwidth(RelationGetDescr(rel)->attrs[0]->atttypid,
+                                                                RelationGetDescr(rel)->attrs[0]->atttypmod);
+       item_width = MAXALIGN(sizeof(HashItemData)) + MAXALIGN(data_width) +
+               sizeof(ItemIdData);             /* include the line pointer */
+       ffactor = (BLCKSZ * 3 / 4) / item_width;
+       /* keep to a sane range */
+       if (ffactor < 10)
+               ffactor = 10;
+
        metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
        pg = BufferGetPage(metabuf);
        _hash_pageinit(pg, BufferGetPageSize(metabuf));
@@ -110,7 +272,7 @@ _hash_metapinit(Relation rel)
        metap->hashm_version = HASH_VERSION;
        metap->hashm_ntuples = 0;
        metap->hashm_nmaps = 0;
-       metap->hashm_ffactor = DEFAULT_FFACTOR;
+       metap->hashm_ffactor = ffactor;
        metap->hashm_bsize = BufferGetPageSize(metabuf);
        /* find largest bitmap array size that will fit in page size */
        for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
@@ -142,7 +304,7 @@ _hash_metapinit(Relation rel)
        metap->hashm_firstfree = 0;
 
        /*
-        * initialize the first two buckets
+        * Initialize the first two buckets
         */
        for (i = 0; i <= 1; i++)
        {
@@ -159,135 +321,17 @@ _hash_metapinit(Relation rel)
        }
 
        /*
-        * Initialize bitmap page.  Can't do this until we
+        * Initialize first bitmap page.  Can't do this until we
         * create the first two buckets, else smgr will complain.
         */
        _hash_initbitmap(rel, metap, 3);
 
        /* all done */
        _hash_wrtbuf(rel, metabuf);
-
-       if (USELOCKING)
-               UnlockRelation(rel, AccessExclusiveLock);
 }
 
 /*
- *     _hash_getbuf() -- Get a buffer by block number for read or write.
- *
- *             When this routine returns, the appropriate lock is set on the
- *             requested buffer its reference count is correct.
- *
- *             XXX P_NEW is not used because, unlike the tree structures, we
- *             need the bucket blocks to be at certain block numbers.  we must
- *             depend on the caller to call _hash_pageinit on the block if it
- *             knows that this is a new block.
- */
-Buffer
-_hash_getbuf(Relation rel, BlockNumber blkno, int access)
-{
-       Buffer          buf;
-
-       if (blkno == P_NEW)
-               elog(ERROR, "hash AM does not use P_NEW");
-       switch (access)
-       {
-               case HASH_WRITE:
-               case HASH_READ:
-                       _hash_setpagelock(rel, blkno, access);
-                       break;
-               default:
-                       elog(ERROR, "unrecognized hash access code: %d", access);
-                       break;
-       }
-       buf = ReadBuffer(rel, blkno);
-
-       /* ref count and lock type are correct */
-       return buf;
-}
-
-/*
- *     _hash_relbuf() -- release a locked buffer.
- */
-void
-_hash_relbuf(Relation rel, Buffer buf, int access)
-{
-       BlockNumber blkno;
-
-       blkno = BufferGetBlockNumber(buf);
-
-       switch (access)
-       {
-               case HASH_WRITE:
-               case HASH_READ:
-                       _hash_unsetpagelock(rel, blkno, access);
-                       break;
-               default:
-                       elog(ERROR, "unrecognized hash access code: %d", access);
-                       break;
-       }
-
-       ReleaseBuffer(buf);
-}
-
-/*
- *     _hash_wrtbuf() -- write a hash page to disk.
- *
- *             This routine releases the lock held on the buffer and our reference
- *             to it.  It is an error to call _hash_wrtbuf() without a write lock
- *             or a reference to the buffer.
- */
-void
-_hash_wrtbuf(Relation rel, Buffer buf)
-{
-       BlockNumber blkno;
-
-       blkno = BufferGetBlockNumber(buf);
-       WriteBuffer(buf);
-       _hash_unsetpagelock(rel, blkno, HASH_WRITE);
-}
-
-/*
- *     _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
- *                                              our reference or lock.
- *
- *             It is an error to call _hash_wrtnorelbuf() without a write lock
- *             or a reference to the buffer.
- */
-void
-_hash_wrtnorelbuf(Buffer buf)
-{
-       BlockNumber blkno;
-
-       blkno = BufferGetBlockNumber(buf);
-       WriteNoReleaseBuffer(buf);
-}
-
-/*
- * _hash_chgbufaccess() -- Change from read to write access or vice versa.
- *
- * When changing from write to read, we assume the buffer is dirty and tell
- * bufmgr it must be written out.
- */
-void
-_hash_chgbufaccess(Relation rel,
-                                  Buffer buf,
-                                  int from_access,
-                                  int to_access)
-{
-       BlockNumber blkno;
-
-       blkno = BufferGetBlockNumber(buf);
-
-       if (from_access == HASH_WRITE)
-               _hash_wrtnorelbuf(buf);
-
-       _hash_unsetpagelock(rel, blkno, from_access);
-
-       _hash_setpagelock(rel, blkno, to_access);
-}
-
-/*
- *     _hash_pageinit() -- Initialize a new page.
+ *     _hash_pageinit() -- Initialize a new hash index page.
  */
 void
 _hash_pageinit(Page page, Size size)
@@ -297,57 +341,14 @@ _hash_pageinit(Page page, Size size)
 }
 
 /*
- *  _hash_setpagelock() -- Acquire the requested type of lock on a page.
- */
-static void
-_hash_setpagelock(Relation rel,
-                                 BlockNumber blkno,
-                                 int access)
-{
-       if (USELOCKING)
-       {
-               switch (access)
-               {
-                       case HASH_WRITE:
-                               LockPage(rel, blkno, ExclusiveLock);
-                               break;
-                       case HASH_READ:
-                               LockPage(rel, blkno, ShareLock);
-                               break;
-                       default:
-                               elog(ERROR, "unrecognized hash access code: %d", access);
-                               break;
-               }
-       }
-}
-
-/*
- *  _hash_unsetpagelock() -- Release the specified type of lock on a page.
- */
-static void
-_hash_unsetpagelock(Relation rel,
-                                       BlockNumber blkno,
-                                       int access)
-{
-       if (USELOCKING)
-       {
-               switch (access)
-               {
-                       case HASH_WRITE:
-                               UnlockPage(rel, blkno, ExclusiveLock);
-                               break;
-                       case HASH_READ:
-                               UnlockPage(rel, blkno, ShareLock);
-                               break;
-                       default:
-                               elog(ERROR, "unrecognized hash access code: %d", access);
-                               break;
-               }
-       }
-}
-
-/*
- * Expand the hash table by creating one new bucket.
+ * Attempt to expand the hash table by creating one new bucket.
+ *
+ * This will silently do nothing if it cannot get the needed locks.
+ *
+ * The caller should hold no locks on the hash index.
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.
  */
 void
 _hash_expandtable(Relation rel, Buffer metabuf)
@@ -356,15 +357,72 @@ _hash_expandtable(Relation rel, Buffer metabuf)
        Bucket          old_bucket;
        Bucket          new_bucket;
        uint32          spare_ndx;
+       BlockNumber start_oblkno;
+       BlockNumber start_nblkno;
+       uint32          maxbucket;
+       uint32          highmask;
+       uint32          lowmask;
+
+       /*
+        * Obtain the page-zero lock to assert the right to begin a split
+        * (see README).
+        *
+        * Note: deadlock should be impossible here. Our own backend could only
+        * be holding bucket sharelocks due to stopped indexscans; those will not
+        * block other holders of the page-zero lock, who are only interested in
+        * acquiring bucket sharelocks themselves.  Exclusive bucket locks are
+        * only taken here and in hashbulkdelete, and neither of these operations
+        * needs any additional locks to complete.  (If, due to some flaw in this
+        * reasoning, we manage to deadlock anyway, it's okay to error out; the
+        * index will be left in a consistent state.)
+        */
+       _hash_getlock(rel, 0, HASH_EXCLUSIVE);
+
+       /* Write-lock the meta page */
+       _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
 
        metap = (HashMetaPage) BufferGetPage(metabuf);
        _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
-       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
+       /*
+        * Check to see if split is still needed; someone else might have already
+        * done one while we waited for the lock.
+        *
+        * Make sure this stays in sync with_hash_doinsert()
+        */
+       if (metap->hashm_ntuples <=
+               (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1))
+               goto fail;
 
-       new_bucket = ++metap->hashm_maxbucket;
+       /*
+        * Determine which bucket is to be split, and attempt to lock the old
+        * bucket.  If we can't get the lock, give up.
+        *
+        * The lock protects us against other backends, but not against our own
+        * backend.  Must check for active scans separately.
+        *
+        * Ideally we would lock the new bucket too before proceeding, but if
+        * we are about to cross a splitpoint then the BUCKET_TO_BLKNO mapping
+        * isn't correct yet.  For simplicity we update the metapage first and
+        * then lock.  This should be okay because no one else should be trying
+        * to lock the new bucket yet...
+        */
+       new_bucket = metap->hashm_maxbucket + 1;
        old_bucket = (new_bucket & metap->hashm_lowmask);
 
+       start_oblkno = BUCKET_TO_BLKNO(metap, old_bucket);
+
+       if (_hash_has_active_scan(rel, old_bucket))
+               goto fail;
+
+       if (!_hash_try_getlock(rel, start_oblkno, HASH_EXCLUSIVE))
+               goto fail;
+
+       /*
+        * Okay to proceed with split.  Update the metapage bucket mapping info.
+        */
+       metap->hashm_maxbucket = new_bucket;
+
        if (new_bucket > metap->hashm_highmask)
        {
                /* Starting a new doubling */
@@ -379,7 +437,7 @@ _hash_expandtable(Relation rel, Buffer metabuf)
         * this new batch of bucket pages.
         *
         * XXX should initialize new bucket pages to prevent out-of-order
-        * page creation.
+        * page creation?  Don't wanna do it right here though.
         */
        spare_ndx = _hash_log2(metap->hashm_maxbucket + 1);
        if (spare_ndx > metap->hashm_ovflpoint)
@@ -389,10 +447,50 @@ _hash_expandtable(Relation rel, Buffer metabuf)
                metap->hashm_ovflpoint = spare_ndx;
        }
 
-       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_READ);
+       /* now we can compute the new bucket's primary block number */
+       start_nblkno = BUCKET_TO_BLKNO(metap, new_bucket);
+
+       Assert(!_hash_has_active_scan(rel, new_bucket));
+
+       if (!_hash_try_getlock(rel, start_nblkno, HASH_EXCLUSIVE))
+               elog(PANIC, "could not get lock on supposedly new bucket");
+
+       /*
+        * Copy bucket mapping info now; this saves re-accessing the meta page
+        * inside _hash_splitbucket's inner loop.  Note that once we drop the
+        * split lock, other splits could begin, so these values might be out of
+        * date before _hash_splitbucket finishes.  That's okay, since all it
+        * needs is to tell which of these two buckets to map hashkeys into.
+        */
+       maxbucket = metap->hashm_maxbucket;
+       highmask = metap->hashm_highmask;
+       lowmask = metap->hashm_lowmask;
+
+       /* Write out the metapage and drop lock, but keep pin */
+       _hash_chgbufaccess(rel, metabuf, HASH_WRITE, HASH_NOLOCK);
+
+       /* Release split lock; okay for other splits to occur now */
+       _hash_droplock(rel, 0, HASH_EXCLUSIVE);
 
        /* Relocate records to the new bucket */
-       _hash_splitbucket(rel, metabuf, old_bucket, new_bucket);
+       _hash_splitbucket(rel, metabuf, old_bucket, new_bucket,
+                                         start_oblkno, start_nblkno,
+                                         maxbucket, highmask, lowmask);
+
+       /* Release bucket locks, allowing others to access them */
+       _hash_droplock(rel, start_oblkno, HASH_EXCLUSIVE);
+       _hash_droplock(rel, start_nblkno, HASH_EXCLUSIVE);
+
+       return;
+
+       /* Here if decide not to split or fail to acquire old bucket lock */
+fail:
+
+       /* We didn't write the metapage, so just drop lock */
+       _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+
+       /* Release split lock */
+       _hash_droplock(rel, 0, HASH_EXCLUSIVE);
 }
 
 
@@ -403,27 +501,35 @@ _hash_expandtable(Relation rel, Buffer metabuf)
  * or more overflow (bucket chain) pages.  We must relocate tuples that
  * belong in the new bucket, and compress out any free space in the old
  * bucket.
+ *
+ * The caller must hold exclusive locks on both buckets to ensure that
+ * no one else is trying to access them (see README).
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.  (The metapage is only
+ * touched if it becomes necessary to add or remove overflow pages.)
  */
 static void
 _hash_splitbucket(Relation rel,
                                  Buffer metabuf,
                                  Bucket obucket,
-                                 Bucket nbucket)
+                                 Bucket nbucket,
+                                 BlockNumber start_oblkno,
+                                 BlockNumber start_nblkno,
+                                 uint32 maxbucket,
+                                 uint32 highmask,
+                                 uint32 lowmask)
 {
        Bucket          bucket;
        Buffer          obuf;
        Buffer          nbuf;
-       Buffer          ovflbuf;
        BlockNumber oblkno;
        BlockNumber nblkno;
-       BlockNumber start_oblkno;
-       BlockNumber start_nblkno;
        bool            null;
        Datum           datum;
        HashItem        hitem;
        HashPageOpaque oopaque;
        HashPageOpaque nopaque;
-       HashMetaPage metap;
        IndexTuple      itup;
        Size            itemsz;
        OffsetNumber ooffnum;
@@ -433,12 +539,11 @@ _hash_splitbucket(Relation rel,
        Page            npage;
        TupleDesc       itupdesc = RelationGetDescr(rel);
 
-       metap = (HashMetaPage) BufferGetPage(metabuf);
-       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
-
-       /* get the buffers & pages */
-       start_oblkno = BUCKET_TO_BLKNO(metap, obucket);
-       start_nblkno = BUCKET_TO_BLKNO(metap, nbucket);
+       /*
+        * It should be okay to simultaneously write-lock pages from each
+        * bucket, since no one else can be trying to acquire buffer lock
+        * on pages of either bucket.
+        */
        oblkno = start_oblkno;
        nblkno = start_nblkno;
        obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
@@ -446,7 +551,10 @@ _hash_splitbucket(Relation rel,
        opage = BufferGetPage(obuf);
        npage = BufferGetPage(nbuf);
 
-       /* initialize the new bucket page */
+       _hash_checkpage(rel, opage, LH_BUCKET_PAGE);
+       oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+       /* initialize the new bucket's primary page */
        _hash_pageinit(npage, BufferGetPageSize(nbuf));
        nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
        nopaque->hasho_prevblkno = InvalidBlockNumber;
@@ -454,44 +562,11 @@ _hash_splitbucket(Relation rel,
        nopaque->hasho_bucket = nbucket;
        nopaque->hasho_flag = LH_BUCKET_PAGE;
        nopaque->hasho_filler = HASHO_FILL;
-       _hash_wrtnorelbuf(nbuf);
-
-       /*
-        * make sure the old bucket isn't empty.  advance 'opage' and friends
-        * through the overflow bucket chain until we find a non-empty page.
-        *
-        * XXX we should only need this once, if we are careful to preserve the
-        * invariant that overflow pages are never empty.
-        */
-       _hash_checkpage(rel, opage, LH_BUCKET_PAGE);
-       oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-       if (PageIsEmpty(opage))
-       {
-               oblkno = oopaque->hasho_nextblkno;
-               _hash_relbuf(rel, obuf, HASH_WRITE);
-               if (!BlockNumberIsValid(oblkno))
-               {
-                       /*
-                        * the old bucket is completely empty; of course, the new
-                        * bucket will be as well, but since it's a base bucket page
-                        * we don't care.
-                        */
-                       _hash_relbuf(rel, nbuf, HASH_WRITE);
-                       return;
-               }
-               obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
-               opage = BufferGetPage(obuf);
-               _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
-               if (PageIsEmpty(opage))
-                       elog(ERROR, "empty hash overflow page %u", oblkno);
-               oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-       }
 
        /*
-        * we are now guaranteed that 'opage' is not empty.  partition the
-        * tuples in the old bucket between the old bucket and the new bucket,
-        * advancing along their respective overflow bucket chains and adding
-        * overflow pages as needed.
+        * Partition the tuples in the old bucket between the old bucket and the
+        * new bucket, advancing along the old bucket's overflow bucket chain
+        * and adding overflow pages to the new bucket as needed.
         */
        ooffnum = FirstOffsetNumber;
        omaxoffnum = PageGetMaxOffsetNumber(opage);
@@ -505,48 +580,39 @@ _hash_splitbucket(Relation rel,
                /* check if we're at the end of the page */
                if (ooffnum > omaxoffnum)
                {
-                       /* at end of page, but check for overflow page */
+                       /* at end of page, but check for an(other) overflow page */
                        oblkno = oopaque->hasho_nextblkno;
-                       if (BlockNumberIsValid(oblkno))
-                       {
-                               /*
-                                * we ran out of tuples on this particular page, but we
-                                * have more overflow pages; re-init values.
-                                */
-                               _hash_wrtbuf(rel, obuf);
-                               obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
-                               opage = BufferGetPage(obuf);
-                               _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
-                               oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-                               /* we're guaranteed that an ovfl page has at least 1 tuple */
-                               if (PageIsEmpty(opage))
-                                       elog(ERROR, "empty hash overflow page %u", oblkno);
-                               ooffnum = FirstOffsetNumber;
-                               omaxoffnum = PageGetMaxOffsetNumber(opage);
-                       }
-                       else
-                       {
-                               /*
-                                * We're at the end of the bucket chain, so now we're
-                                * really done with everything.  Before quitting, call
-                                * _hash_squeezebucket to ensure the tuples remaining in the
-                                * old bucket (including the overflow pages) are packed as
-                                * tightly as possible.  The new bucket is already tight.
-                                */
-                               _hash_wrtbuf(rel, obuf);
-                               _hash_wrtbuf(rel, nbuf);
-                               _hash_squeezebucket(rel, obucket, start_oblkno);
-                               return;
-                       }
+                       if (!BlockNumberIsValid(oblkno))
+                               break;
+                       /*
+                        * we ran out of tuples on this particular page, but we
+                        * have more overflow pages; advance to next page.
+                        */
+                       _hash_wrtbuf(rel, obuf);
+
+                       obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
+                       opage = BufferGetPage(obuf);
+                       _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
+                       oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+                       ooffnum = FirstOffsetNumber;
+                       omaxoffnum = PageGetMaxOffsetNumber(opage);
+                       continue;
                }
 
-               /* hash on the tuple */
+               /*
+                * Re-hash the tuple to determine which bucket it now belongs in.
+                *
+                * It is annoying to call the hash function while holding locks,
+                * but releasing and relocking the page for each tuple is unappealing
+                * too.
+                */
                hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
                itup = &(hitem->hash_itup);
                datum = index_getattr(itup, 1, itupdesc, &null);
                Assert(!null);
 
-               bucket = _hash_call(rel, metap, datum);
+               bucket = _hash_hashkey2bucket(_hash_datum2hashkey(rel, datum),
+                                                                         maxbucket, highmask, lowmask);
 
                if (bucket == nbucket)
                {
@@ -562,11 +628,13 @@ _hash_splitbucket(Relation rel,
 
                        if (PageGetFreeSpace(npage) < itemsz)
                        {
-                               ovflbuf = _hash_addovflpage(rel, metabuf, nbuf);
-                               _hash_wrtbuf(rel, nbuf);
-                               nbuf = ovflbuf;
+                               /* write out nbuf and drop lock, but keep pin */
+                               _hash_chgbufaccess(rel, nbuf, HASH_WRITE, HASH_NOLOCK);
+                               /* chain to a new overflow page */
+                               nbuf = _hash_addovflpage(rel, metabuf, nbuf);
                                npage = BufferGetPage(nbuf);
-                               _hash_checkpage(rel, npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+                               _hash_checkpage(rel, npage, LH_OVERFLOW_PAGE);
+                               /* we don't need nopaque within the loop */
                        }
 
                        noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
@@ -574,7 +642,6 @@ _hash_splitbucket(Relation rel,
                                == InvalidOffsetNumber)
                                elog(ERROR, "failed to add index item to \"%s\"",
                                         RelationGetRelationName(rel));
-                       _hash_wrtnorelbuf(nbuf);
 
                        /*
                         * now delete the tuple from the old bucket.  after this
@@ -586,40 +653,7 @@ _hash_splitbucket(Relation rel,
                         * instead of calling PageGetMaxOffsetNumber.
                         */
                        PageIndexTupleDelete(opage, ooffnum);
-                       _hash_wrtnorelbuf(obuf);
                        omaxoffnum = OffsetNumberPrev(omaxoffnum);
-
-                       /*
-                        * tidy up.  if the old page was an overflow page and it is
-                        * now empty, we must free it (we want to preserve the
-                        * invariant that overflow pages cannot be empty).
-                        */
-                       if (PageIsEmpty(opage) &&
-                               (oopaque->hasho_flag & LH_OVERFLOW_PAGE))
-                       {
-                               oblkno = _hash_freeovflpage(rel, obuf);
-
-                               /* check that we're not through the bucket chain */
-                               if (!BlockNumberIsValid(oblkno))
-                               {
-                                       _hash_wrtbuf(rel, nbuf);
-                                       _hash_squeezebucket(rel, obucket, start_oblkno);
-                                       return;
-                               }
-
-                               /*
-                                * re-init. again, we're guaranteed that an ovfl page has
-                                * at least one tuple.
-                                */
-                               obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
-                               opage = BufferGetPage(obuf);
-                               _hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
-                               oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
-                               if (PageIsEmpty(opage))
-                                       elog(ERROR, "empty hash overflow page %u", oblkno);
-                               ooffnum = FirstOffsetNumber;
-                               omaxoffnum = PageGetMaxOffsetNumber(opage);
-                       }
                }
                else
                {
@@ -632,5 +666,15 @@ _hash_splitbucket(Relation rel,
                        ooffnum = OffsetNumberNext(ooffnum);
                }
        }
-       /* NOTREACHED */
+
+       /*
+        * We're at the end of the old bucket chain, so we're done partitioning
+        * the tuples.  Before quitting, call _hash_squeezebucket to ensure the
+        * tuples remaining in the old bucket (including the overflow pages) are
+        * packed as tightly as possible.  The new bucket is already tight.
+        */
+       _hash_wrtbuf(rel, obuf);
+       _hash_wrtbuf(rel, nbuf);
+
+       _hash_squeezebucket(rel, obucket, start_oblkno);
 }
index a0b124cbee40d24ea6ae0c974dd7f9e8087b2096..35ac0622b5051ecaf94ec36b88fc85bef4deadf4 100644 (file)
@@ -8,22 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.30 2003/08/04 02:39:57 momjian Exp $
- *
- * NOTES
- *       Because we can be doing an index scan on a relation while we
- *       update it, we need to avoid missing data that moves around in
- *       the index.  The routines and global variables in this file
- *       guarantee that all scans in the local address space stay
- *       correctly positioned.  This is all we need to worry about, since
- *       write locking guarantees that no one else will be on the same
- *       page at the same time as we are.
- *
- *       The scheme is to manage a list of active scans in the current
- *       backend.      Whenever we add or remove records from an index, we
- *       check the list of active scans to see if any has been affected.
- *       A scan is affected only if it is on the same relation, and the
- *       same page, as the update.
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.31 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -44,10 +29,6 @@ typedef HashScanListData *HashScanList;
 static HashScanList HashScans = (HashScanList) NULL;
 
 
-static void _hash_scandel(IndexScanDesc scan,
-                         BlockNumber blkno, OffsetNumber offno);
-
-
 /*
  * AtEOXact_hash() --- clean up hash subsystem at xact abort or commit.
  *
@@ -67,9 +48,6 @@ AtEOXact_hash(void)
         * at end of transaction anyway.
         */
        HashScans = NULL;
-
-       /* If we were building a hash, we ain't anymore. */
-       BuildingHash = false;
 }
 
 /*
@@ -112,70 +90,26 @@ _hash_dropscan(IndexScanDesc scan)
        pfree(chk);
 }
 
-void
-_hash_adjscans(Relation rel, ItemPointer tid)
+/*
+ * Is there an active scan in this bucket?
+ */
+bool
+_hash_has_active_scan(Relation rel, Bucket bucket)
 {
+       Oid                     relid = RelationGetRelid(rel);
        HashScanList l;
-       Oid                     relid;
 
-       relid = RelationGetRelid(rel);
-       for (l = HashScans; l != (HashScanList) NULL; l = l->hashsl_next)
+       for (l = HashScans; l != NULL; l = l->hashsl_next)
        {
                if (relid == l->hashsl_scan->indexRelation->rd_id)
-                       _hash_scandel(l->hashsl_scan, ItemPointerGetBlockNumber(tid),
-                                                 ItemPointerGetOffsetNumber(tid));
-       }
-}
+               {
+                       HashScanOpaque so = (HashScanOpaque) l->hashsl_scan->opaque;
 
-static void
-_hash_scandel(IndexScanDesc scan, BlockNumber blkno, OffsetNumber offno)
-{
-       ItemPointer current;
-       ItemPointer mark;
-       Buffer          buf;
-       Buffer          metabuf;
-       HashScanOpaque so;
-
-       so = (HashScanOpaque) scan->opaque;
-       current = &(scan->currentItemData);
-       mark = &(scan->currentMarkData);
-
-       if (ItemPointerIsValid(current)
-               && ItemPointerGetBlockNumber(current) == blkno
-               && ItemPointerGetOffsetNumber(current) >= offno)
-       {
-               metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
-               buf = so->hashso_curbuf;
-               _hash_step(scan, &buf, BackwardScanDirection, metabuf);
+                       if (so->hashso_bucket_valid &&
+                               so->hashso_bucket == bucket)
+                               return true;
+               }
        }
 
-       if (ItemPointerIsValid(mark)
-               && ItemPointerGetBlockNumber(mark) == blkno
-               && ItemPointerGetOffsetNumber(mark) >= offno)
-       {
-               /*
-                * The idea here is to exchange the current and mark positions,
-                * then step backwards (affecting current), then exchange again.
-                */
-               ItemPointerData tmpitem;
-               Buffer          tmpbuf;
-
-               tmpitem = *mark;
-               *mark = *current;
-               *current = tmpitem;
-               tmpbuf = so->hashso_mrkbuf;
-               so->hashso_mrkbuf = so->hashso_curbuf;
-               so->hashso_curbuf = tmpbuf;
-
-               metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
-               buf = so->hashso_curbuf;
-               _hash_step(scan, &buf, BackwardScanDirection, metabuf);
-
-               tmpitem = *mark;
-               *mark = *current;
-               *current = tmpitem;
-               tmpbuf = so->hashso_mrkbuf;
-               so->hashso_mrkbuf = so->hashso_curbuf;
-               so->hashso_curbuf = tmpbuf;
-       }
+       return false;
 }
index c5321e4b6b479c59cca7fefa0a90370f60028966..d8982ffdbc9a1f8f92e82cf4dc377ef17f026045 100644 (file)
@@ -8,55 +8,16 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.33 2003/09/02 18:13:31 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.34 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "storage/lmgr.h"
 
 
-/*
- *     _hash_search() -- Find the bucket that contains the scankey
- *             and fetch its primary bucket page into *bufP.
- *
- * the buffer has a read lock.
- */
-void
-_hash_search(Relation rel,
-                        int keysz,
-                        ScanKey scankey,
-                        Buffer *bufP,
-                        HashMetaPage metap)
-{
-       BlockNumber blkno;
-       Bucket          bucket;
-
-       if (scankey == NULL ||
-               (scankey[0].sk_flags & SK_ISNULL))
-       {
-               /*
-                * If the scankey is empty, all tuples will satisfy the
-                * scan so we start the scan at the first bucket (bucket 0).
-                *
-                * If the scankey is NULL, no tuples will satisfy the search;
-                * this should have been checked already, but arbitrarily return
-                * bucket zero.
-                */
-               bucket = 0;
-       }
-       else
-       {
-               bucket = _hash_call(rel, metap, scankey[0].sk_argument);
-       }
-
-       blkno = BUCKET_TO_BLKNO(metap, bucket);
-
-       *bufP = _hash_getbuf(rel, blkno, HASH_READ);
-}
-
 /*
  *     _hash_next() -- Get the next item in a scan.
  *
@@ -69,31 +30,23 @@ _hash_search(Relation rel,
 bool
 _hash_next(IndexScanDesc scan, ScanDirection dir)
 {
-       Relation        rel;
+       Relation        rel = scan->indexRelation;
+       HashScanOpaque so = (HashScanOpaque) scan->opaque;
        Buffer          buf;
-       Buffer          metabuf;
        Page            page;
        OffsetNumber offnum;
        ItemPointer current;
        HashItem        hitem;
        IndexTuple      itup;
-       HashScanOpaque so;
-
-       rel = scan->indexRelation;
-       so = (HashScanOpaque) scan->opaque;
 
-       /* we still have the buffer pinned and locked */
+       /* we still have the buffer pinned and read-locked */
        buf = so->hashso_curbuf;
        Assert(BufferIsValid(buf));
 
-       metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
-
        /*
-        * step to next valid tuple.  note that _hash_step releases our lock
-        * on 'metabuf'; if we switch to a new 'buf' while looking for the
-        * next tuple, we come back with a lock on that buffer.
+        * step to next valid tuple.
         */
-       if (!_hash_step(scan, &buf, dir, metabuf))
+       if (!_hash_step(scan, &buf, dir))
                return false;
 
        /* if we're here, _hash_step found a valid tuple */
@@ -108,6 +61,9 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
        return true;
 }
 
+/*
+ * Advance to next page in a bucket, if any.
+ */
 static void
 _hash_readnext(Relation rel,
                           Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
@@ -115,7 +71,7 @@ _hash_readnext(Relation rel,
        BlockNumber blkno;
 
        blkno = (*opaquep)->hasho_nextblkno;
-       _hash_relbuf(rel, *bufp, HASH_READ);
+       _hash_relbuf(rel, *bufp);
        *bufp = InvalidBuffer;
        if (BlockNumberIsValid(blkno))
        {
@@ -123,10 +79,12 @@ _hash_readnext(Relation rel,
                *pagep = BufferGetPage(*bufp);
                _hash_checkpage(rel, *pagep, LH_OVERFLOW_PAGE);
                *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
-               Assert(!PageIsEmpty(*pagep));
        }
 }
 
+/*
+ * Advance to previous page in a bucket, if any.
+ */
 static void
 _hash_readprev(Relation rel,
                           Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
@@ -134,7 +92,7 @@ _hash_readprev(Relation rel,
        BlockNumber blkno;
 
        blkno = (*opaquep)->hasho_prevblkno;
-       _hash_relbuf(rel, *bufp, HASH_READ);
+       _hash_relbuf(rel, *bufp);
        *bufp = InvalidBuffer;
        if (BlockNumberIsValid(blkno))
        {
@@ -142,28 +100,26 @@ _hash_readprev(Relation rel,
                *pagep = BufferGetPage(*bufp);
                _hash_checkpage(rel, *pagep, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
                *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
-               if (PageIsEmpty(*pagep))
-               {
-                       Assert((*opaquep)->hasho_flag & LH_BUCKET_PAGE);
-                       _hash_relbuf(rel, *bufp, HASH_READ);
-                       *bufp = InvalidBuffer;
-               }
        }
 }
 
 /*
  *     _hash_first() -- Find the first item in a scan.
  *
- *             Find the first item in the tree that
+ *             Find the first item in the index that
  *             satisfies the qualification associated with the scan descriptor. On
- *             exit, the page containing the current index tuple is read locked
+ *             success, the page containing the current index tuple is read locked
  *             and pinned, and the scan's opaque data entry is updated to
  *             include the buffer.
  */
 bool
 _hash_first(IndexScanDesc scan, ScanDirection dir)
 {
-       Relation        rel;
+       Relation        rel = scan->indexRelation;
+       HashScanOpaque so = (HashScanOpaque) scan->opaque;
+       uint32          hashkey;
+       Bucket          bucket;
+       BlockNumber blkno;
        Buffer          buf;
        Buffer          metabuf;
        Page            page;
@@ -173,70 +129,89 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
        IndexTuple      itup;
        ItemPointer current;
        OffsetNumber offnum;
-       HashScanOpaque so;
 
-       rel = scan->indexRelation;
-       so = (HashScanOpaque) scan->opaque;
        current = &(scan->currentItemData);
+       ItemPointerSetInvalid(current);
+
+       /*
+        * We do not support hash scans with no index qualification, because
+        * we would have to read the whole index rather than just one bucket.
+        * That creates a whole raft of problems, since we haven't got a
+        * practical way to lock all the buckets against splits or compactions.
+        */
+       if (scan->numberOfKeys < 1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("hash indexes do not support whole-index scans")));
+
+       /*
+        * If the constant in the index qual is NULL, assume it cannot match
+        * any items in the index.
+        */
+       if (scan->keyData[0].sk_flags & SK_ISNULL)
+               return false;
+
+       /*
+        * Okay to compute the hash key.  We want to do this before acquiring
+        * any locks, in case a user-defined hash function happens to be slow.
+        */
+       hashkey = _hash_datum2hashkey(rel, scan->keyData[0].sk_argument);
 
+       /*
+        * Acquire shared split lock so we can compute the target bucket
+        * safely (see README).
+        */
+       _hash_getlock(rel, 0, HASH_SHARE);
+
+       /* Read the metapage */
        metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
        metap = (HashMetaPage) BufferGetPage(metabuf);
        _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
        /*
-        * XXX -- The attribute number stored in the scan key is the attno in
-        * the heap relation.  We need to transmogrify this into the index
-        * relation attno here.  For the moment, we have hardwired attno == 1.
+        * Compute the target bucket number, and convert to block number.
         */
+       bucket = _hash_hashkey2bucket(hashkey,
+                                                                 metap->hashm_maxbucket,
+                                                                 metap->hashm_highmask,
+                                                                 metap->hashm_lowmask);
+
+       blkno = BUCKET_TO_BLKNO(metap, bucket);
 
-       /* find the correct bucket page and load it into buf */
-       _hash_search(rel, 1, scan->keyData, &buf, metap);
+       /* done with the metapage */
+       _hash_relbuf(rel, metabuf);
+
+       /*
+        * Acquire share lock on target bucket; then we can release split lock.
+        */
+       _hash_getlock(rel, blkno, HASH_SHARE);
+
+       _hash_droplock(rel, 0, HASH_SHARE);
+
+       /* Update scan opaque state to show we have lock on the bucket */
+       so->hashso_bucket = bucket;
+       so->hashso_bucket_valid = true;
+       so->hashso_bucket_blkno = blkno;
+
+       /* Fetch the primary bucket page for the bucket */
+       buf = _hash_getbuf(rel, blkno, HASH_READ);
        page = BufferGetPage(buf);
        _hash_checkpage(rel, page, LH_BUCKET_PAGE);
        opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+       Assert(opaque->hasho_bucket == bucket);
 
-       /*
-        * if we are scanning forward, we need to find the first non-empty
-        * page (if any) in the bucket chain.  since overflow pages are never
-        * empty, this had better be either the bucket page or the first
-        * overflow page.
-        *
-        * if we are scanning backward, we always go all the way to the end of
-        * the bucket chain.
-        */
-       if (PageIsEmpty(page))
-       {
-               if (BlockNumberIsValid(opaque->hasho_nextblkno))
-                       _hash_readnext(rel, &buf, &page, &opaque);
-               else
-               {
-                       ItemPointerSetInvalid(current);
-                       so->hashso_curbuf = InvalidBuffer;
-
-                       /*
-                        * If there is no scankeys, all tuples will satisfy the scan -
-                        * so we continue in _hash_step to get tuples from all
-                        * buckets. - vadim 04/29/97
-                        */
-                       if (scan->numberOfKeys >= 1)
-                       {
-                               _hash_relbuf(rel, buf, HASH_READ);
-                               _hash_relbuf(rel, metabuf, HASH_READ);
-                               return false;
-                       }
-               }
-       }
+       /* If a backwards scan is requested, move to the end of the chain */
        if (ScanDirectionIsBackward(dir))
        {
                while (BlockNumberIsValid(opaque->hasho_nextblkno))
                        _hash_readnext(rel, &buf, &page, &opaque);
        }
 
-       if (!_hash_step(scan, &buf, dir, metabuf))
+       /* Now find the first tuple satisfying the qualification */
+       if (!_hash_step(scan, &buf, dir))
                return false;
 
        /* if we're here, _hash_step found a valid tuple */
-       current = &(scan->currentItemData);
        offnum = ItemPointerGetOffsetNumber(current);
        page = BufferGetPage(buf);
        _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
@@ -254,19 +229,16 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
  *             false.  Else, return true and set the CurrentItemData for the
  *             scan to the right thing.
  *
- *             'bufP' points to the buffer which contains the current page
- *             that we'll step through.
- *
- *             'metabuf' is released when this returns.
+ *             'bufP' points to the current buffer, which is pinned and read-locked.
+ *             On success exit, we have pin and read-lock on whichever page
+ *             contains the right item; on failure, we have released all buffers.
  */
 bool
-_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
+_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 {
-       Relation        rel;
+       Relation        rel = scan->indexRelation;
+       HashScanOpaque so = (HashScanOpaque) scan->opaque;
        ItemPointer current;
-       HashScanOpaque so;
-       int                     allbuckets;
-       HashMetaPage metap;
        Buffer          buf;
        Page            page;
        HashPageOpaque opaque;
@@ -277,18 +249,13 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
        HashItem        hitem;
        IndexTuple      itup;
 
-       rel = scan->indexRelation;
        current = &(scan->currentItemData);
-       so = (HashScanOpaque) scan->opaque;
-       allbuckets = (scan->numberOfKeys < 1);
-
-       metap = (HashMetaPage) BufferGetPage(metabuf);
-       _hash_checkpage(rel, (Page) metap, LH_META_PAGE);
 
        buf = *bufP;
        page = BufferGetPage(buf);
        _hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
        opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+       bucket = opaque->hasho_bucket;
 
        /*
         * If _hash_step is called from _hash_first, current will not be
@@ -309,107 +276,63 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
         */
        do
        {
-               bucket = opaque->hasho_bucket;
-
                switch (dir)
                {
                        case ForwardScanDirection:
                                if (offnum != InvalidOffsetNumber)
-                               {
                                        offnum = OffsetNumberNext(offnum);      /* move forward */
-                               }
                                else
-                               {
                                        offnum = FirstOffsetNumber; /* new page */
-                               }
+
                                while (offnum > maxoff)
                                {
-
-                                       /*--------
+                                       /*
                                         * either this page is empty
                                         * (maxoff == InvalidOffsetNumber)
                                         * or we ran off the end.
-                                        *--------
                                         */
                                        _hash_readnext(rel, &buf, &page, &opaque);
-                                       if (BufferIsInvalid(buf))
-                                       {                       /* end of chain */
-                                               if (allbuckets && bucket < metap->hashm_maxbucket)
-                                               {
-                                                       ++bucket;
-                                                       blkno = BUCKET_TO_BLKNO(metap, bucket);
-                                                       buf = _hash_getbuf(rel, blkno, HASH_READ);
-                                                       page = BufferGetPage(buf);
-                                                       _hash_checkpage(rel, page, LH_BUCKET_PAGE);
-                                                       opaque = (HashPageOpaque) PageGetSpecialPointer(page);
-                                                       Assert(opaque->hasho_bucket == bucket);
-                                                       while (PageIsEmpty(page) &&
-                                                        BlockNumberIsValid(opaque->hasho_nextblkno))
-                                                               _hash_readnext(rel, &buf, &page, &opaque);
-                                                       maxoff = PageGetMaxOffsetNumber(page);
-                                                       offnum = FirstOffsetNumber;
-                                               }
-                                               else
-                                               {
-                                                       maxoff = offnum = InvalidOffsetNumber;
-                                                       break;          /* while */
-                                               }
-                                       }
-                                       else
+                                       if (BufferIsValid(buf))
                                        {
-                                               /* _hash_readnext never returns an empty page */
                                                maxoff = PageGetMaxOffsetNumber(page);
                                                offnum = FirstOffsetNumber;
                                        }
+                                       else
+                                       {
+                                               /* end of bucket */
+                                               maxoff = offnum = InvalidOffsetNumber;
+                                               break;  /* exit while */
+                                       }
                                }
                                break;
+
                        case BackwardScanDirection:
                                if (offnum != InvalidOffsetNumber)
-                               {
                                        offnum = OffsetNumberPrev(offnum);      /* move back */
-                               }
                                else
-                               {
                                        offnum = maxoff;        /* new page */
-                               }
+
                                while (offnum < FirstOffsetNumber)
                                {
-
-                                       /*---------
+                                       /*
                                         * either this page is empty
                                         * (offnum == InvalidOffsetNumber)
                                         * or we ran off the end.
-                                        *---------
                                         */
                                        _hash_readprev(rel, &buf, &page, &opaque);
-                                       if (BufferIsInvalid(buf))
-                                       {                       /* end of chain */
-                                               if (allbuckets && bucket > 0)
-                                               {
-                                                       --bucket;
-                                                       blkno = BUCKET_TO_BLKNO(metap, bucket);
-                                                       buf = _hash_getbuf(rel, blkno, HASH_READ);
-                                                       page = BufferGetPage(buf);
-                                                       _hash_checkpage(rel, page, LH_BUCKET_PAGE);
-                                                       opaque = (HashPageOpaque) PageGetSpecialPointer(page);
-                                                       Assert(opaque->hasho_bucket == bucket);
-                                                       while (BlockNumberIsValid(opaque->hasho_nextblkno))
-                                                               _hash_readnext(rel, &buf, &page, &opaque);
-                                                       maxoff = offnum = PageGetMaxOffsetNumber(page);
-                                               }
-                                               else
-                                               {
-                                                       maxoff = offnum = InvalidOffsetNumber;
-                                                       break;          /* while */
-                                               }
+                                       if (BufferIsValid(buf))
+                                       {
+                                               maxoff = offnum = PageGetMaxOffsetNumber(page);
                                        }
                                        else
                                        {
-                                               /* _hash_readprev never returns an empty page */
-                                               maxoff = offnum = PageGetMaxOffsetNumber(page);
+                                               /* end of bucket */
+                                               maxoff = offnum = InvalidOffsetNumber;
+                                               break;  /* exit while */
                                        }
                                }
                                break;
+
                        default:
                                /* NoMovementScanDirection */
                                /* this should not be reached */
@@ -419,7 +342,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
                /* we ran off the end of the world without finding a match */
                if (offnum == InvalidOffsetNumber)
                {
-                       _hash_relbuf(rel, metabuf, HASH_READ);
                        *bufP = so->hashso_curbuf = InvalidBuffer;
                        ItemPointerSetInvalid(current);
                        return false;
@@ -431,7 +353,6 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
        } while (!_hash_checkqual(scan, itup));
 
        /* if we made it to here, we've found a valid tuple */
-       _hash_relbuf(rel, metabuf, HASH_READ);
        blkno = BufferGetBlockNumber(buf);
        *bufP = so->hashso_curbuf = buf;
        ItemPointerSet(current, blkno, offnum);
index ce62a3a84415167d75bcb21692f8982a5816f6d0..0cfbe5e7a12ce675355582d12eaea98c6ab4df76 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.35 2003/09/02 18:13:31 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.36 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "access/iqual.h"
 
 
-/*
- * _hash_mkscankey -- build a scan key matching the given indextuple
- *
- * Note: this is prepared for multiple index columns, but very little
- * else in access/hash is ...
- */
-ScanKey
-_hash_mkscankey(Relation rel, IndexTuple itup)
-{
-       ScanKey         skey;
-       TupleDesc       itupdesc = RelationGetDescr(rel);
-       int                     natts = rel->rd_rel->relnatts;
-       AttrNumber      i;
-       Datum           arg;
-       FmgrInfo   *procinfo;
-       bool            isnull;
-
-       skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
-
-       for (i = 0; i < natts; i++)
-       {
-               arg = index_getattr(itup, i + 1, itupdesc, &isnull);
-               procinfo = index_getprocinfo(rel, i + 1, HASHPROC);
-               ScanKeyEntryInitializeWithInfo(&skey[i],
-                                                                          isnull ? SK_ISNULL : 0x0,
-                                                                          (AttrNumber) (i + 1),
-                                                                          procinfo,
-                                                                          CurrentMemoryContext,
-                                                                          arg);
-       }
-
-       return skey;
-}
-
-void
-_hash_freeskey(ScanKey skey)
-{
-       pfree(skey);
-}
-
 /*
  * _hash_checkqual -- does the index tuple satisfy the scan conditions?
  */
@@ -102,24 +62,31 @@ _hash_formitem(IndexTuple itup)
 }
 
 /*
- * _hash_call -- given a Datum, call the index's hash procedure
- *
- * Returns the bucket number that the hash key maps to.
+ * _hash_datum2hashkey -- given a Datum, call the index's hash procedure
  */
-Bucket
-_hash_call(Relation rel, HashMetaPage metap, Datum key)
+uint32
+_hash_datum2hashkey(Relation rel, Datum key)
 {
        FmgrInfo   *procinfo;
-       uint32          n;
-       Bucket          bucket;
 
        /* XXX assumes index has only one attribute */
        procinfo = index_getprocinfo(rel, 1, HASHPROC);
-       n = DatumGetUInt32(FunctionCall1(procinfo, key));
 
-       bucket = n & metap->hashm_highmask;
-       if (bucket > metap->hashm_maxbucket)
-               bucket = bucket & metap->hashm_lowmask;
+       return DatumGetUInt32(FunctionCall1(procinfo, key));
+}
+
+/*
+ * _hash_hashkey2bucket -- determine which bucket the hashkey maps to.
+ */
+Bucket
+_hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket,
+                                        uint32 highmask, uint32 lowmask)
+{
+       Bucket          bucket;
+
+       bucket = hashkey & highmask;
+       if (bucket > maxbucket)
+               bucket = bucket & lowmask;
 
        return bucket;
 }
index 12845f5593d24b7454aa8f10f72caccfeb93b78e..c4fceb009658cebd66db53c7f8c4d36122975411 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.59 2003/08/17 22:41:12 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.60 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -153,7 +153,7 @@ LockRelation(Relation relation, LOCKMODE lockmode)
  * As above, but only lock if we can get the lock without blocking.
  * Returns TRUE iff the lock was acquired.
  *
- * NOTE: we do not currently need conditional versions of the other
+ * NOTE: we do not currently need conditional versions of all the
  * LockXXX routines in this file, but they could easily be added if needed.
  */
 bool
@@ -264,6 +264,26 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
                elog(ERROR, "LockAcquire failed");
 }
 
+/*
+ *             ConditionalLockPage
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE iff the lock was acquired.
+ */
+bool
+ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
+{
+       LOCKTAG         tag;
+
+       MemSet(&tag, 0, sizeof(tag));
+       tag.relId = relation->rd_lockInfo.lockRelId.relId;
+       tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
+       tag.objId.blkno = blkno;
+
+       return LockAcquire(LockTableId, &tag, GetCurrentTransactionId(),
+                                          lockmode, true);
+}
+
 /*
  *             UnlockPage
  */
index 7edbdad09846399f81fa0623ab4957afa7842c75..beffa806ea1e72a7bf56f48bcd4288e94d81a4b8 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: hash.h,v 1.52 2003/09/02 18:13:32 tgl Exp $
+ * $Id: hash.h,v 1.53 2003/09/04 22:06:27 tgl Exp $
  *
  * NOTES
  *             modeled after Margo Seltzer's hash implementation for unix.
@@ -70,13 +70,27 @@ typedef HashPageOpaqueData *HashPageOpaque;
 #define HASHO_FILL             0x1234
 
 /*
- *     ScanOpaqueData is used to remember which buffers we're currently
- *     examining in the scan.  We keep these buffers locked and pinned and
- *     recorded in the opaque entry of the scan in order to avoid doing a
- *     ReadBuffer() for every tuple in the index.
+ *     HashScanOpaqueData is private state for a hash index scan.
  */
 typedef struct HashScanOpaqueData
 {
+       /*
+        * By definition, a hash scan should be examining only one bucket.
+        * We record the bucket number here as soon as it is known.
+        */
+       Bucket          hashso_bucket;
+       bool            hashso_bucket_valid;
+       /*
+        * If we have a share lock on the bucket, we record it here.  When
+        * hashso_bucket_blkno is zero, we have no such lock.
+        */
+       BlockNumber     hashso_bucket_blkno;
+       /*
+        * We also want to remember which buffers we're currently examining in the
+        * scan. We keep these buffers pinned (but not locked) across hashgettuple
+        * calls, in order to avoid doing a ReadBuffer() for every tuple in the
+        * index.
+        */
        Buffer          hashso_curbuf;
        Buffer          hashso_mrkbuf;
 } HashScanOpaqueData;
@@ -148,10 +162,18 @@ typedef struct HashItemData
 
 typedef HashItemData *HashItem;
 
+/*
+ * Maximum size of a hash index item (it's okay to have only one per page)
+ */
+#define HashMaxItemSize(page) \
+       (PageGetPageSize(page) - \
+        sizeof(PageHeaderData) - \
+        MAXALIGN(sizeof(HashPageOpaqueData)) - \
+        sizeof(ItemIdData))
+
 /*
  * Constants
  */
-#define DEFAULT_FFACTOR                        300
 #define BYTE_TO_BIT                            3               /* 2^3 bits/byte */
 #define ALL_SET                                        ((uint32) ~0)
 
@@ -180,10 +202,14 @@ typedef HashItemData *HashItem;
 #define ISSET(A, N)            ((A)[(N)/BITS_PER_MAP] & (1<<((N)%BITS_PER_MAP)))
 
 /*
- * page locking modes
+ * page-level and high-level locking modes (see README)
  */
-#define HASH_READ              0
-#define HASH_WRITE             1
+#define HASH_READ              BUFFER_LOCK_SHARE
+#define HASH_WRITE             BUFFER_LOCK_EXCLUSIVE
+#define HASH_NOLOCK            (-1)
+
+#define HASH_SHARE             ShareLock
+#define HASH_EXCLUSIVE ExclusiveLock
 
 /*
  *     Strategy number. There's only one valid strategy for hashing: equality.
@@ -199,8 +225,6 @@ typedef HashItemData *HashItem;
 #define HASHPROC               1
 
 
-extern bool BuildingHash;
-
 /* public routines */
 
 extern Datum hashbuild(PG_FUNCTION_ARGS);
@@ -250,36 +274,37 @@ extern void _hash_squeezebucket(Relation rel,
                                                                Bucket bucket, BlockNumber bucket_blkno);
 
 /* hashpage.c */
-extern void _hash_metapinit(Relation rel);
+extern void _hash_getlock(Relation rel, BlockNumber whichlock, int access);
+extern bool _hash_try_getlock(Relation rel, BlockNumber whichlock, int access);
+extern void _hash_droplock(Relation rel, BlockNumber whichlock, int access);
 extern Buffer _hash_getbuf(Relation rel, BlockNumber blkno, int access);
-extern void _hash_relbuf(Relation rel, Buffer buf, int access);
+extern void _hash_relbuf(Relation rel, Buffer buf);
+extern void _hash_dropbuf(Relation rel, Buffer buf);
 extern void _hash_wrtbuf(Relation rel, Buffer buf);
-extern void _hash_wrtnorelbuf(Buffer buf);
+extern void _hash_wrtnorelbuf(Relation rel, Buffer buf);
 extern void _hash_chgbufaccess(Relation rel, Buffer buf, int from_access,
                                   int to_access);
+extern void _hash_metapinit(Relation rel);
 extern void _hash_pageinit(Page page, Size size);
 extern void _hash_expandtable(Relation rel, Buffer metabuf);
 
 /* hashscan.c */
 extern void _hash_regscan(IndexScanDesc scan);
 extern void _hash_dropscan(IndexScanDesc scan);
-extern void _hash_adjscans(Relation rel, ItemPointer tid);
+extern bool _hash_has_active_scan(Relation rel, Bucket bucket);
 extern void AtEOXact_hash(void);
 
 /* hashsearch.c */
-extern void _hash_search(Relation rel, int keysz, ScanKey scankey,
-                        Buffer *bufP, HashMetaPage metap);
 extern bool _hash_next(IndexScanDesc scan, ScanDirection dir);
 extern bool _hash_first(IndexScanDesc scan, ScanDirection dir);
-extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir,
-                  Buffer metabuf);
+extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir);
 
 /* hashutil.c */
-extern ScanKey _hash_mkscankey(Relation rel, IndexTuple itup);
-extern void _hash_freeskey(ScanKey skey);
 extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
 extern HashItem _hash_formitem(IndexTuple itup);
-extern Bucket _hash_call(Relation rel, HashMetaPage metap, Datum key);
+extern uint32 _hash_datum2hashkey(Relation rel, Datum key);
+extern Bucket _hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket,
+                                                                  uint32 highmask, uint32 lowmask);
 extern uint32 _hash_log2(uint32 num);
 extern void _hash_checkpage(Relation rel, Page page, int flags);
 
index d7a557d2b5764bac63c4abaccfd856e1a6828586..19bda76d725c7f979dc7435850b0f00012d6137f 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lmgr.h,v 1.39 2003/08/04 02:40:14 momjian Exp $
+ * $Id: lmgr.h,v 1.40 2003/09/04 22:06:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -54,8 +54,9 @@ extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
 extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
 
-/* Lock a page (mainly used for indices) */
+/* Lock a page (mainly used for indexes) */
 extern void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
+extern bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
 extern void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
 
 /* Lock an XID (used to wait for a transaction to finish) */