Port single-page btree vacuum logic to hash indexes.
authorRobert Haas <rhaas@postgresql.org>
Thu, 16 Mar 2017 02:18:56 +0000 (22:18 -0400)
committerRobert Haas <rhaas@postgresql.org>
Thu, 16 Mar 2017 02:18:56 +0000 (22:18 -0400)
This is advantageous for hash indexes for the same reasons it's good
for btrees: it accelerates space recycling, reducing bloat.

Ashutosh Sharma, reviewed by Amit Kapila and by me.  A bit of
additional hacking by me.

Discussion: http://postgr.es/m/CAE9k0PkRSyzx8dOnokEpUi2A-RFZK72WN0h9DEMv_ut9q6bPRw@mail.gmail.com

src/backend/access/hash/README
src/backend/access/hash/hash.c
src/backend/access/hash/hash_xlog.c
src/backend/access/hash/hashinsert.c
src/backend/access/hash/hashsearch.c
src/backend/access/hash/hashsort.c
src/backend/access/hash/hashutil.c
src/backend/access/rmgrdesc/hashdesc.c
src/include/access/hash.h
src/include/access/hash_xlog.h

index 53b0e0def1536bf670470a9d498b7c9e0fec2d2f..15414383540f04e7a6c8a67166efd4dfbe15086b 100644 (file)
@@ -284,7 +284,10 @@ The insertion algorithm is rather similar:
                if we get the lock on both the buckets
                        finish the split using algorithm mentioned below for split
                release the pin on old bucket and restart the insert from beginning.
-       if current page is full, release lock but not pin, read/exclusive-lock
+       if current page is full, first check if this page contains any dead tuples.
+       if yes, remove dead tuples from the current page and again check for the
+       availability of the space. If enough space found, insert the tuple else
+       release lock but not pin, read/exclusive-lock
      next page; repeat as needed
        >> see below if no space in any page of bucket
        take buffer content lock in exclusive mode on metapage
index 641676964bb351fb8ff1888dbb066748a4996873..cfcec3475d420effd4eeb87051cb4e7618c4dca7 100644 (file)
@@ -36,6 +36,7 @@ typedef struct
 {
        HSpool     *spool;                      /* NULL if not using spooling */
        double          indtuples;              /* # tuples accepted into index */
+       Relation        heapRel;                /* heap relation descriptor */
 } HashBuildState;
 
 static void hashbuildCallback(Relation index,
@@ -154,6 +155,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 
        /* prepare to build the index */
        buildstate.indtuples = 0;
+       buildstate.heapRel = heap;
 
        /* do the heap scan */
        reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
@@ -162,7 +164,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
        if (buildstate.spool)
        {
                /* sort the tuples and insert them into the index */
-               _h_indexbuild(buildstate.spool);
+               _h_indexbuild(buildstate.spool, buildstate.heapRel);
                _h_spooldestroy(buildstate.spool);
        }
 
@@ -218,7 +220,7 @@ hashbuildCallback(Relation index,
                itup = index_form_tuple(RelationGetDescr(index),
                                                                index_values, index_isnull);
                itup->t_tid = htup->t_self;
-               _hash_doinsert(index, itup);
+               _hash_doinsert(index, itup, buildstate->heapRel);
                pfree(itup);
        }
 
@@ -251,7 +253,7 @@ hashinsert(Relation rel, Datum *values, bool *isnull,
        itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
        itup->t_tid = *ht_ctid;
 
-       _hash_doinsert(rel, itup);
+       _hash_doinsert(rel, itup, heapRel);
 
        pfree(itup);
 
@@ -331,14 +333,24 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
                if (scan->kill_prior_tuple)
                {
                        /*
-                        * Yes, so mark it by setting the LP_DEAD state in the item flags.
+                        * Yes, so remember it for later. (We'll deal with all such
+                        * tuples at once right after leaving the index page or at
+                        * end of scan.) In case if caller reverses the indexscan
+                        * direction it is quite possible that the same item might
+                        * get entered multiple times. But, we don't detect that;
+                        * instead, we just forget any excess entries.
                         */
-                       ItemIdMarkDead(PageGetItemId(page, offnum));
+                       if (so->killedItems == NULL)
+                               so->killedItems = palloc(MaxIndexTuplesPerPage *
+                                                                                sizeof(HashScanPosItem));
 
-                       /*
-                        * Since this can be redone later if needed, mark as a hint.
-                        */
-                       MarkBufferDirtyHint(buf, true);
+                       if (so->numKilled < MaxIndexTuplesPerPage)
+                       {
+                               so->killedItems[so->numKilled].heapTid = so->hashso_heappos;
+                               so->killedItems[so->numKilled].indexOffset =
+                                                       ItemPointerGetOffsetNumber(&(so->hashso_curpos));
+                               so->numKilled++;
+                       }
                }
 
                /*
@@ -446,6 +458,9 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
        so->hashso_buc_populated = false;
        so->hashso_buc_split = false;
 
+       so->killedItems = NULL;
+       so->numKilled = 0;
+
        scan->opaque = so;
 
        return scan;
@@ -461,6 +476,10 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
        Relation        rel = scan->indexRelation;
 
+       /* Before leaving current page, deal with any killed items */
+       if (so->numKilled > 0)
+               _hash_kill_items(scan);
+
        _hash_dropscanbuf(rel, so);
 
        /* set position invalid (this will cause _hash_first call) */
@@ -488,8 +507,14 @@ hashendscan(IndexScanDesc scan)
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
        Relation        rel = scan->indexRelation;
 
+       /* Before leaving current page, deal with any killed items */
+       if (so->numKilled > 0)
+               _hash_kill_items(scan);
+
        _hash_dropscanbuf(rel, so);
 
+       if (so->killedItems != NULL)
+               pfree(so->killedItems);
        pfree(so);
        scan->opaque = NULL;
 }
@@ -848,6 +873,16 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
 
                        PageIndexMultiDelete(page, deletable, ndeletable);
                        bucket_dirty = true;
+
+                       /*
+                        * Let us mark the page as clean if vacuum removes the DEAD tuples
+                        * from an index page. We do this by clearing LH_PAGE_HAS_DEAD_TUPLES
+                        * flag. Clearing this flag is just a hint; replay won't redo this.
+                        */
+                       if (tuples_removed && *tuples_removed > 0 &&
+                               opaque->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
+                               opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
                        MarkBufferDirty(buf);
 
                        /* XLOG stuff */
index 0c830ab595268390b720a0a79e6fefbe4a51a9d1..8647e8c6adca5cb9dfbda70d87f20c44cf5819b8 100644 (file)
  */
 #include "postgres.h"
 
+#include "access/heapam_xlog.h"
 #include "access/bufmask.h"
 #include "access/hash.h"
 #include "access/hash_xlog.h"
 #include "access/xlogutils.h"
+#include "access/xlog.h"
+#include "access/transam.h"
+#include "storage/procarray.h"
+#include "miscadmin.h"
 
 /*
  * replay a hash index meta page
@@ -915,6 +920,235 @@ hash_xlog_update_meta_page(XLogReaderState *record)
                UnlockReleaseBuffer(metabuf);
 }
 
+/*
+ * Get the latestRemovedXid from the heap pages pointed at by the index
+ * tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
+ * on which this function is based.
+ */
+static TransactionId
+hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
+{
+       xl_hash_vacuum_one_page *xlrec;
+       OffsetNumber    *unused;
+       Buffer          ibuffer,
+                               hbuffer;
+       Page            ipage,
+                               hpage;
+       RelFileNode     rnode;
+       BlockNumber     blkno;
+       ItemId          iitemid,
+                               hitemid;
+       IndexTuple      itup;
+       HeapTupleHeader htuphdr;
+       BlockNumber     hblkno;
+       OffsetNumber    hoffnum;
+       TransactionId   latestRemovedXid = InvalidTransactionId;
+       int             i;
+       char *ptr;
+       Size len;
+
+       xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+
+       /*
+        * If there's nothing running on the standby we don't need to derive a
+        * full latestRemovedXid value, so use a fast path out of here.  This
+        * returns InvalidTransactionId, and so will conflict with all HS
+        * transactions; but since we just worked out that that's zero people,
+        * it's OK.
+        *
+        * XXX There is a race condition here, which is that a new backend might
+        * start just after we look.  If so, it cannot need to conflict, but this
+        * coding will result in throwing a conflict anyway.
+        */
+       if (CountDBBackends(InvalidOid) == 0)
+               return latestRemovedXid;
+
+       /*
+        * Get index page.  If the DB is consistent, this should not fail, nor
+        * should any of the heap page fetches below.  If one does, we return
+        * InvalidTransactionId to cancel all HS transactions.  That's probably
+        * overkill, but it's safe, and certainly better than panicking here.
+        */
+       XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno);
+       ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
+
+       if (!BufferIsValid(ibuffer))
+               return InvalidTransactionId;
+       LockBuffer(ibuffer, HASH_READ);
+       ipage = (Page) BufferGetPage(ibuffer);
+
+       /*
+        * Loop through the deleted index items to obtain the TransactionId from
+        * the heap items they point to.
+        */
+       ptr = XLogRecGetBlockData(record, 1, &len);
+
+       unused = (OffsetNumber *) ptr;
+
+       for (i = 0; i < xlrec->ntuples; i++)
+       {
+               /*
+                * Identify the index tuple about to be deleted.
+                */
+               iitemid = PageGetItemId(ipage, unused[i]);
+               itup = (IndexTuple) PageGetItem(ipage, iitemid);
+
+               /*
+                * Locate the heap page that the index tuple points at
+                */
+               hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
+               hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
+                                                                                hblkno, RBM_NORMAL);
+
+               if (!BufferIsValid(hbuffer))
+               {
+                       UnlockReleaseBuffer(ibuffer);
+                       return InvalidTransactionId;
+               }
+               LockBuffer(hbuffer, HASH_READ);
+               hpage = (Page) BufferGetPage(hbuffer);
+
+               /*
+                * Look up the heap tuple header that the index tuple points at by
+                * using the heap node supplied with the xlrec. We can't use
+                * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
+                * Note that we are not looking at tuple data here, just headers.
+                */
+               hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
+               hitemid = PageGetItemId(hpage, hoffnum);
+
+               /*
+                * Follow any redirections until we find something useful.
+                */
+               while (ItemIdIsRedirected(hitemid))
+               {
+                       hoffnum = ItemIdGetRedirect(hitemid);
+                       hitemid = PageGetItemId(hpage, hoffnum);
+                       CHECK_FOR_INTERRUPTS();
+               }
+
+               /*
+                * If the heap item has storage, then read the header and use that to
+                * set latestRemovedXid.
+                *
+                * Some LP_DEAD items may not be accessible, so we ignore them.
+                */
+               if (ItemIdHasStorage(hitemid))
+               {
+                       htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
+                       HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
+               }
+               else if (ItemIdIsDead(hitemid))
+               {
+                       /*
+                        * Conjecture: if hitemid is dead then it had xids before the xids
+                        * marked on LP_NORMAL items. So we just ignore this item and move
+                        * onto the next, for the purposes of calculating
+                        * latestRemovedxids.
+                        */
+               }
+               else
+                       Assert(!ItemIdIsUsed(hitemid));
+
+               UnlockReleaseBuffer(hbuffer);
+       }
+
+       UnlockReleaseBuffer(ibuffer);
+
+       /*
+        * If all heap tuples were LP_DEAD then we will be returning
+        * InvalidTransactionId here, which avoids conflicts. This matches
+        * existing logic which assumes that LP_DEAD tuples must already be older
+        * than the latestRemovedXid on the cleanup record that set them as
+        * LP_DEAD, hence must already have generated a conflict.
+        */
+       return latestRemovedXid;
+}
+
+/*
+ * replay delete operation in hash index to remove
+ * tuples marked as DEAD during index tuple insertion.
+ */
+static void
+hash_xlog_vacuum_one_page(XLogReaderState *record)
+{
+       XLogRecPtr lsn = record->EndRecPtr;
+       xl_hash_vacuum_one_page *xldata;
+       Buffer buffer;
+       Buffer metabuf;
+       Page page;
+       XLogRedoAction action;
+
+       xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+
+       /*
+        * If we have any conflict processing to do, it must happen before we
+        * update the page.
+        *
+        * Hash index records that are marked as LP_DEAD and being removed during
+        * hash index tuple insertion can conflict with standby queries. You might
+        * think that vacuum records would conflict as well, but we've handled
+        * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
+        * cleaned by the vacuum of the heap and so we can resolve any conflicts
+        * just once when that arrives.  After that we know that no conflicts
+        * exist from individual hash index vacuum records on that index.
+        */
+       if (InHotStandby)
+       {
+               TransactionId latestRemovedXid =
+                                       hash_xlog_vacuum_get_latestRemovedXid(record);
+               RelFileNode rnode;
+
+               XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
+               ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+       }
+
+       action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
+
+       if (action == BLK_NEEDS_REDO)
+       {
+               char *ptr;
+               Size len;
+
+               ptr = XLogRecGetBlockData(record, 0, &len);
+
+               page = (Page) BufferGetPage(buffer);
+
+               if (len > 0)
+               {
+                       OffsetNumber *unused;
+                       OffsetNumber *unend;
+
+                       unused = (OffsetNumber *) ptr;
+                       unend = (OffsetNumber *) ((char *) ptr + len);
+
+                       if ((unend - unused) > 0)
+                               PageIndexMultiDelete(page, unused, unend - unused);
+               }
+
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+
+       if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
+       {
+               Page metapage;
+               HashMetaPage metap;
+
+               metapage = BufferGetPage(metabuf);
+               metap = HashPageGetMeta(metapage);
+
+               metap->hashm_ntuples -= xldata->ntuples;
+
+               PageSetLSN(metapage, lsn);
+               MarkBufferDirty(metabuf);
+       }
+       if (BufferIsValid(metabuf))
+               UnlockReleaseBuffer(metabuf);
+}
+
 void
 hash_redo(XLogReaderState *record)
 {
@@ -958,6 +1192,9 @@ hash_redo(XLogReaderState *record)
                case XLOG_HASH_UPDATE_META_PAGE:
                        hash_xlog_update_meta_page(record);
                        break;
+               case XLOG_HASH_VACUUM_ONE_PAGE:
+                       hash_xlog_vacuum_one_page(record);
+                       break;
                default:
                        elog(PANIC, "hash_redo: unknown op code %u", info);
        }
index 241728fe6b1db7ea0f3e273c0e68f1a07e536694..8b6d0a0ff7821bbbe2717369e21f2108d06a9e65 100644 (file)
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "access/heapam.h"
 #include "miscadmin.h"
 #include "utils/rel.h"
+#include "storage/lwlock.h"
+#include "storage/buf_internals.h"
 
+static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+                                                                 RelFileNode hnode);
 
 /*
  *     _hash_doinsert() -- Handle insertion of a single index tuple.
@@ -28,7 +33,7 @@
  *             and hashinsert.  By here, itup is completely filled in.
  */
 void
-_hash_doinsert(Relation rel, IndexTuple itup)
+_hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
 {
        Buffer          buf = InvalidBuffer;
        Buffer          bucket_buf;
@@ -118,10 +123,30 @@ restart_insert:
        /* Do the insertion */
        while (PageGetFreeSpace(page) < itemsz)
        {
+               BlockNumber nextblkno;
+
+               /*
+                * Check if current page has any DEAD tuples. If yes,
+                * delete these tuples and see if we can get a space for
+                * the new item to be inserted before moving to the next
+                * page in the bucket chain.
+                */
+               if (H_HAS_DEAD_TUPLES(pageopaque))
+               {
+
+                       if (IsBufferCleanupOK(buf))
+                       {
+                               _hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);
+
+                               if (PageGetFreeSpace(page) >= itemsz)
+                                       break;                          /* OK, now we have enough space */
+                       }
+               }
+
                /*
                 * no space on this page; check for an overflow page
                 */
-               BlockNumber nextblkno = pageopaque->hasho_nextblkno;
+               nextblkno = pageopaque->hasho_nextblkno;
 
                if (BlockNumberIsValid(nextblkno))
                {
@@ -157,7 +182,7 @@ restart_insert:
                        Assert(PageGetFreeSpace(page) >= itemsz);
                }
                pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
-               Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE);
+               Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
                Assert(pageopaque->hasho_bucket == bucket);
        }
 
@@ -300,3 +325,93 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
                                 RelationGetRelationName(rel));
        }
 }
+
+/*
+ * _hash_vacuum_one_page - vacuum just one index page.
+ *
+ * Try to remove LP_DEAD items from the given page. We must acquire cleanup
+ * lock on the page being modified before calling this function.
+ */
+
+static void
+_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+                                         RelFileNode hnode)
+{
+       OffsetNumber    deletable[MaxOffsetNumber];
+       int ndeletable = 0;
+       OffsetNumber offnum,
+                                maxoff;
+       Page    page = BufferGetPage(buf);
+       HashPageOpaque  pageopaque;
+       HashMetaPage    metap;
+       double tuples_removed = 0;
+
+       /* Scan each tuple in page to see if it is marked as LP_DEAD */
+       maxoff = PageGetMaxOffsetNumber(page);
+       for (offnum = FirstOffsetNumber;
+                offnum <= maxoff;
+                offnum = OffsetNumberNext(offnum))
+       {
+               ItemId  itemId = PageGetItemId(page, offnum);
+
+               if (ItemIdIsDead(itemId))
+               {
+                       deletable[ndeletable++] = offnum;
+                       tuples_removed += 1;
+               }
+       }
+
+       if (ndeletable > 0)
+       {
+               /*
+                * Write-lock the meta page so that we can decrement
+                * tuple count.
+                */
+               LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+               /* No ereport(ERROR) until changes are logged */
+               START_CRIT_SECTION();
+
+               PageIndexMultiDelete(page, deletable, ndeletable);
+
+               pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+               pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+               metap = HashPageGetMeta(BufferGetPage(metabuf));
+               metap->hashm_ntuples -= tuples_removed;
+
+               MarkBufferDirty(buf);
+               MarkBufferDirty(metabuf);
+
+               /* XLOG stuff */
+               if (RelationNeedsWAL(rel))
+               {
+                       xl_hash_vacuum_one_page xlrec;
+                       XLogRecPtr      recptr;
+
+                       xlrec.hnode = hnode;
+                       xlrec.ntuples = tuples_removed;
+
+                       XLogBeginInsert();
+                       XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
+
+                       XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+                       XLogRegisterBufData(0, (char *) deletable,
+                                               ndeletable * sizeof(OffsetNumber));
+
+                       XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+                       recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
+
+                       PageSetLSN(BufferGetPage(buf), recptr);
+                       PageSetLSN(BufferGetPage(metabuf), recptr);
+               }
+
+               END_CRIT_SECTION();
+               /*
+                * Releasing write lock on meta page as we have updated
+                * the tuple count.
+                */
+               LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+       }
+}
index d7337703b0b17c870609fd1ecd1ffa2d4cfc95ec..2d9204903fac91346958e21758ea19ee013cddb1 100644 (file)
@@ -465,6 +465,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
                                                        break;          /* yes, so exit for-loop */
                                        }
 
+                                       /* Before leaving current page, deal with any killed items */
+                                       if (so->numKilled > 0)
+                                               _hash_kill_items(scan);
+
                                        /*
                                         * ran off the end of this page, try the next
                                         */
@@ -518,6 +522,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
                                                        break;          /* yes, so exit for-loop */
                                        }
 
+                                       /* Before leaving current page, deal with any killed items */
+                                       if (so->numKilled > 0)
+                                               _hash_kill_items(scan);
+
                                        /*
                                         * ran off the end of this page, try the next
                                         */
index ea8f109a575d09573a032deef841bf248192ff64..0e0f3937117637ac866382da486fc59559576099 100644 (file)
@@ -101,7 +101,7 @@ _h_spool(HSpool *hspool, ItemPointer self, Datum *values, bool *isnull)
  * create an entire index.
  */
 void
-_h_indexbuild(HSpool *hspool)
+_h_indexbuild(HSpool *hspool, Relation heapRel)
 {
        IndexTuple      itup;
 #ifdef USE_ASSERT_CHECKING
@@ -126,6 +126,6 @@ _h_indexbuild(HSpool *hspool)
                Assert(hashkey >= lasthashkey);
 #endif
 
-               _hash_doinsert(hspool->index, itup);
+               _hash_doinsert(hspool->index, itup, heapRel);
        }
 }
index c705531f04a813d3c710cc17ffc4d9c784567208..2e9971920bca9f4b77e9d0059e4dfafd17090a6e 100644 (file)
@@ -19,6 +19,7 @@
 #include "access/relscan.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "storage/buf_internals.h"
 
 #define CALC_NEW_BUCKET(old_bucket, lowmask) \
                        old_bucket | (lowmask + 1)
@@ -446,3 +447,70 @@ _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
 
        return new_bucket;
 }
+
+/*
+ * _hash_kill_items - set LP_DEAD state for items an indexscan caller has
+ * told us were killed.
+ *
+ * scan->opaque, referenced locally through so, contains information about the
+ * current page and killed tuples thereon (generally, this should only be
+ * called if so->numKilled > 0).
+ *
+ * We match items by heap TID before assuming they are the right ones to
+ * delete.
+ */
+void
+_hash_kill_items(IndexScanDesc scan)
+{
+       HashScanOpaque  so = (HashScanOpaque) scan->opaque;
+       Page    page;
+       HashPageOpaque  opaque;
+       OffsetNumber    offnum, maxoff;
+       int     numKilled = so->numKilled;
+       int             i;
+       bool    killedsomething = false;
+
+       Assert(so->numKilled > 0);
+       Assert(so->killedItems != NULL);
+
+       /*
+        * Always reset the scan state, so we don't look for same
+        * items on other pages.
+        */
+       so->numKilled = 0;
+
+       page = BufferGetPage(so->hashso_curbuf);
+       opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+       maxoff = PageGetMaxOffsetNumber(page);
+
+       for (i = 0; i < numKilled; i++)
+       {
+               offnum = so->killedItems[i].indexOffset;
+
+               while (offnum <= maxoff)
+               {
+                       ItemId  iid = PageGetItemId(page, offnum);
+                       IndexTuple      ituple = (IndexTuple) PageGetItem(page, iid);
+
+                       if (ItemPointerEquals(&ituple->t_tid, &so->killedItems[i].heapTid))
+                       {
+                               /* found the item */
+                               ItemIdMarkDead(iid);
+                               killedsomething = true;
+                               break;          /* out of inner search loop */
+                       }
+                       offnum = OffsetNumberNext(offnum);
+               }
+       }
+
+       /*
+        * Since this can be redone later if needed, mark as dirty hint.
+        * Whenever we mark anything LP_DEAD, we also set the page's
+        * LH_PAGE_HAS_DEAD_TUPLES flag, which is likewise just a hint.
+        */
+       if (killedsomething)
+       {
+               opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
+               MarkBufferDirtyHint(so->hashso_curbuf, true);
+       }
+}
index f1cc9ff9514a9dd94d58be192f66a08acc1311a8..5bd5c8dc0103cdf5902f32fd00bbd131d90529df 100644 (file)
@@ -154,6 +154,8 @@ hash_identify(uint8 info)
                case XLOG_HASH_UPDATE_META_PAGE:
                        id = "UPDATE_META_PAGE";
                        break;
+               case XLOG_HASH_VACUUM_ONE_PAGE:
+                       id = "VACUUM_ONE_PAGE";
        }
 
        return id;
index bfdfed8657f58ec3e5e18b1e39e4ac25f1880620..eb1df57291bade4f633fd70372a01e945c66377e 100644 (file)
@@ -57,6 +57,7 @@ typedef uint32 Bucket;
 #define LH_BUCKET_BEING_POPULATED      (1 << 4)
 #define LH_BUCKET_BEING_SPLIT  (1 << 5)
 #define LH_BUCKET_NEEDS_SPLIT_CLEANUP  (1 << 6)
+#define LH_PAGE_HAS_DEAD_TUPLES        (1 << 7)
 
 #define LH_PAGE_TYPE \
        (LH_OVERFLOW_PAGE|LH_BUCKET_PAGE|LH_BITMAP_PAGE|LH_META_PAGE)
@@ -86,6 +87,7 @@ typedef HashPageOpaqueData *HashPageOpaque;
 #define H_NEEDS_SPLIT_CLEANUP(opaque)  ((opaque)->hasho_flag & LH_BUCKET_NEEDS_SPLIT_CLEANUP)
 #define H_BUCKET_BEING_SPLIT(opaque)   ((opaque)->hasho_flag & LH_BUCKET_BEING_SPLIT)
 #define H_BUCKET_BEING_POPULATED(opaque)       ((opaque)->hasho_flag & LH_BUCKET_BEING_POPULATED)
+#define H_HAS_DEAD_TUPLES(opaque)              ((opaque)->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
 
 /*
  * The page ID is for the convenience of pg_filedump and similar utilities,
@@ -95,6 +97,13 @@ typedef HashPageOpaqueData *HashPageOpaque;
  */
 #define HASHO_PAGE_ID          0xFF80
 
+typedef struct HashScanPosItem    /* what we remember about each match */
+{
+       ItemPointerData heapTid;        /* TID of referenced heap item */
+       OffsetNumber indexOffset;       /* index item's location within page */
+} HashScanPosItem;
+
+
 /*
  *     HashScanOpaqueData is private state for a hash index scan.
  */
@@ -135,6 +144,9 @@ typedef struct HashScanOpaqueData
         * referred only when hashso_buc_populated is true.
         */
        bool            hashso_buc_split;
+       /* info about killed items if any (killedItems is NULL if never used) */
+       HashScanPosItem *killedItems;   /* tids and offset numbers of killed items */
+       int                     numKilled;                      /* number of currently stored items */
 } HashScanOpaqueData;
 
 typedef HashScanOpaqueData *HashScanOpaque;
@@ -300,7 +312,7 @@ extern Datum hash_uint32(uint32 k);
 /* private routines */
 
 /* hashinsert.c */
-extern void _hash_doinsert(Relation rel, IndexTuple itup);
+extern void _hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel);
 extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
                           Size itemsize, IndexTuple itup);
 extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
@@ -361,7 +373,7 @@ extern HSpool *_h_spoolinit(Relation heap, Relation index, uint32 num_buckets);
 extern void _h_spooldestroy(HSpool *hspool);
 extern void _h_spool(HSpool *hspool, ItemPointer self,
                 Datum *values, bool *isnull);
-extern void _h_indexbuild(HSpool *hspool);
+extern void _h_indexbuild(HSpool *hspool, Relation heapRel);
 
 /* hashutil.c */
 extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
@@ -381,6 +393,7 @@ extern BlockNumber _hash_get_oldblock_from_newbucket(Relation rel, Bucket new_bu
 extern BlockNumber _hash_get_newblock_from_oldbucket(Relation rel, Bucket old_bucket);
 extern Bucket _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
                                                                   uint32 lowmask, uint32 maxbucket);
+extern void _hash_kill_items(IndexScanDesc scan);
 
 /* hash.c */
 extern void hashbucketcleanup(Relation rel, Bucket cur_bucket,
index 552d6428cad84758e629f343f845a9318b504ab1..dfd923781997e4586b90c492c2dff700d79fc40c 100644 (file)
@@ -44,6 +44,7 @@
 #define XLOG_HASH_UPDATE_META_PAGE     0xB0            /* update meta page after
                                                                                                 * vacuum */
 
+#define XLOG_HASH_VACUUM_ONE_PAGE      0xC0    /* remove dead tuples from index page */
 
 /*
  * xl_hash_split_allocate_page flag values, 8 bits are available.
@@ -250,6 +251,24 @@ typedef struct xl_hash_init_bitmap_page
 #define SizeOfHashInitBitmapPage       \
        (offsetof(xl_hash_init_bitmap_page, bmsize) + sizeof(uint16))
 
+/*
+ * This is what we need for index tuple deletion and to
+ * update the meta page.
+ *
+ * This data record is used for XLOG_HASH_VACUUM_ONE_PAGE
+ *
+ * Backup Blk 0: bucket page
+ * Backup Blk 1: meta page
+ */
+typedef struct xl_hash_vacuum_one_page
+{
+       RelFileNode     hnode;
+       double          ntuples;
+}      xl_hash_vacuum_one_page;
+
+#define SizeOfHashVacuumOnePage        \
+       (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(double))
+
 extern void hash_redo(XLogReaderState *record);
 extern void hash_desc(StringInfo buf, XLogReaderState *record);
 extern const char *hash_identify(uint8 info);