#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
+#include "utils/snapmgr.h"
-/* local state for vacuum operations */
+/* Entry in pending-list of TIDs we need to revisit */
+typedef struct spgVacPendingItem
+{
+ ItemPointerData tid; /* redirection target to visit */
+ bool done; /* have we dealt with this? */
+ struct spgVacPendingItem *next; /* list link */
+} spgVacPendingItem;
+
+/* Local state for vacuum operations */
typedef struct spgBulkDeleteState
{
/* Parameters passed in to spgvacuumscan */
IndexBulkDeleteResult *stats;
IndexBulkDeleteCallback callback;
void *callback_state;
+
/* Additional working state */
- SpGistState spgstate;
- TransactionId OldestXmin;
- BlockNumber lastFilledBlock;
+ SpGistState spgstate; /* for SPGiST operations that need one */
+ spgVacPendingItem *pendingList; /* TIDs we need to (re)visit */
+ TransactionId myXmin; /* for detecting newly-added redirects */
+ TransactionId OldestXmin; /* for deciding a redirect is obsolete */
+ BlockNumber lastFilledBlock; /* last non-deletable block */
} spgBulkDeleteState;
+/*
+ * Add TID to pendingList, but only if not already present.
+ *
+ * Note that new items are always appended at the end of the list; this
+ * ensures that scans of the list don't miss items added during the scan.
+ */
+static void
+spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid)
+{
+ spgVacPendingItem *pitem;
+ spgVacPendingItem **listLink;
+
+ /* search the list for pre-existing entry */
+ listLink = &bds->pendingList;
+ while (*listLink != NULL)
+ {
+ pitem = *listLink;
+ if (ItemPointerEquals(tid, &pitem->tid))
+ return; /* already in list, do nothing */
+ listLink = &pitem->next;
+ }
+ /* not there, so append new entry */
+ pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem));
+ pitem->tid = *tid;
+ pitem->done = false;
+ pitem->next = NULL;
+ *listLink = pitem;
+}
+
+/*
+ * Clear pendingList
+ */
+static void
+spgClearPendingList(spgBulkDeleteState *bds)
+{
+ spgVacPendingItem *pitem;
+ spgVacPendingItem *nitem;
+
+ for (pitem = bds->pendingList; pitem != NULL; pitem = nitem)
+ {
+ nitem = pitem->next;
+ /* All items in list should have been dealt with */
+ Assert(pitem->done);
+ pfree(pitem);
+ }
+ bds->pendingList = NULL;
+}
+
/*
* Vacuum a regular (non-root) leaf page
*
* We must delete tuples that are targeted for deletion by the VACUUM,
* but not move any tuples that are referenced by outside links; we assume
* those are the ones that are heads of chains.
+ *
+ * If we find a REDIRECT that was made by a concurrently-running transaction,
+ * we must add its target TID to pendingList. (We don't try to visit the
+ * target immediately, first because we don't want VACUUM locking more than
+ * one buffer at a time, and second because the duplicate-filtering logic
+ * in spgAddPendingTID is useful to ensure we can't get caught in an infinite
+ * loop in the face of continuous concurrent insertions.)
+ *
+ * If forPending is true, we are examining the page as a consequence of
+ * chasing a redirect link, not as part of the normal sequential scan.
+ * We still vacuum the page normally, but we don't increment the stats
+ * about live tuples; else we'd double-count those tuples, since the page
+ * has been or will be visited in the sequential scan as well.
*/
static void
-vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer)
+vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
+ bool forPending)
{
Page page = BufferGetPage(buffer);
spgxlogVacuumLeaf xlrec;
}
else
{
- bds->stats->num_index_tuples += 1;
+ if (!forPending)
+ bds->stats->num_index_tuples += 1;
}
/* Form predecessor map, too */
predecessor[lt->nextOffset] = i;
}
}
+ else if (lt->tupstate == SPGIST_REDIRECT)
+ {
+ SpGistDeadTuple dt = (SpGistDeadTuple) lt;
+
+ Assert(dt->nextOffset == InvalidOffsetNumber);
+ Assert(ItemPointerIsValid(&dt->pointer));
+
+ /*
+ * Add target TID to pending list if the redirection could have
+ * happened since VACUUM started.
+ *
+ * Note: we could make a tighter test by seeing if the xid is
+ * "running" according to the active snapshot; but tqual.c doesn't
+ * currently export a suitable API, and it's not entirely clear
+ * that a tighter test is worth the cycles anyway.
+ */
+ if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin))
+ spgAddPendingTID(bds, &dt->pointer);
+ }
else
{
Assert(lt->nextOffset == InvalidOffsetNumber);
}
else
{
- vacuumLeafPage(bds, index, buffer);
+ vacuumLeafPage(bds, index, buffer, false);
vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin);
}
}
}
/*
- * The root page must never be deleted, nor marked as available in FSM,
- * because we don't want it ever returned by a search for a place to
+ * The root pages must never be deleted, nor marked as available in FSM,
+ * because we don't want them ever returned by a search for a place to
* put a new tuple. Otherwise, check for empty/deletable page, and
* make sure FSM knows about it.
*/
UnlockReleaseBuffer(buffer);
}
+/*
+ * Process the pending-TID list between pages of the main scan
+ */
+static void
+spgprocesspending(spgBulkDeleteState *bds)
+{
+ Relation index = bds->info->index;
+ spgVacPendingItem *pitem;
+ spgVacPendingItem *nitem;
+ BlockNumber blkno;
+ Buffer buffer;
+ Page page;
+
+ for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next)
+ {
+ if (pitem->done)
+ continue; /* ignore already-done items */
+
+ /* call vacuum_delay_point while not holding any buffer lock */
+ vacuum_delay_point();
+
+ /* examine the referenced page */
+ blkno = ItemPointerGetBlockNumber(&pitem->tid);
+ buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
+ RBM_NORMAL, bds->info->strategy);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ page = (Page) BufferGetPage(buffer);
+
+ if (PageIsNew(page) || SpGistPageIsDeleted(page))
+ {
+ /* Probably shouldn't happen, but ignore it */
+ }
+ else if (SpGistPageIsLeaf(page))
+ {
+ if (SpGistBlockIsRoot(blkno))
+ {
+ /* this should definitely not happen */
+ elog(ERROR, "redirection leads to root page of index \"%s\"",
+ RelationGetRelationName(index));
+ }
+
+ /* deal with any deletable tuples */
+ vacuumLeafPage(bds, index, buffer, true);
+ /* might as well do this while we are here */
+ vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin);
+
+ SpGistSetLastUsedPage(index, buffer);
+
+ /*
+ * We can mark as done not only this item, but any later ones
+ * pointing at the same page, since we vacuumed the whole page.
+ */
+ pitem->done = true;
+ for (nitem = pitem->next; nitem != NULL; nitem = nitem->next)
+ {
+ if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
+ nitem->done = true;
+ }
+ }
+ else
+ {
+ /*
+ * On an inner page, visit the referenced inner tuple and add
+ * all its downlinks to the pending list. We might have pending
+ * items for more than one inner tuple on the same page (in fact
+ * this is pretty likely given the way space allocation works),
+ * so get them all while we are here.
+ */
+ for (nitem = pitem; nitem != NULL; nitem = nitem->next)
+ {
+ if (nitem->done)
+ continue;
+ if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
+ {
+ OffsetNumber offset;
+ SpGistInnerTuple innerTuple;
+
+ offset = ItemPointerGetOffsetNumber(&nitem->tid);
+ innerTuple = (SpGistInnerTuple) PageGetItem(page,
+ PageGetItemId(page, offset));
+ if (innerTuple->tupstate == SPGIST_LIVE)
+ {
+ SpGistNodeTuple node;
+ int i;
+
+ SGITITERATE(innerTuple, i, node)
+ {
+ if (ItemPointerIsValid(&node->t_tid))
+ spgAddPendingTID(bds, &node->t_tid);
+ }
+ }
+ else if (innerTuple->tupstate == SPGIST_REDIRECT)
+ {
+ /* transfer attention to redirect point */
+ spgAddPendingTID(bds,
+ &((SpGistDeadTuple) innerTuple)->pointer);
+ }
+ else
+ elog(ERROR, "unexpected SPGiST tuple state: %d",
+ innerTuple->tupstate);
+
+ nitem->done = true;
+ }
+ }
+ }
+
+ UnlockReleaseBuffer(buffer);
+ }
+
+ spgClearPendingList(bds);
+}
+
/*
* Perform a bulkdelete scan
*/
/* Finish setting up spgBulkDeleteState */
initSpGistState(&bds->spgstate, index);
+ bds->pendingList = NULL;
+ bds->myXmin = GetActiveSnapshot()->xmin;
bds->OldestXmin = GetOldestXmin(true, false);
bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
for (; blkno < num_pages; blkno++)
{
spgvacuumpage(bds, blkno);
+ /* empty the pending-list after each page */
+ if (bds->pendingList != NULL)
+ spgprocesspending(bds);
}
}
IndexFreeSpaceMapVacuum(index);
/*
- * It's quite possible for us to be fooled by concurrent page splits into
+ * It's quite possible for us to be fooled by concurrent tuple moves into
* double-counting some index tuples, so disbelieve any total that exceeds
* the underlying heap's count ... if we know that accurately. Otherwise
* this might just make matters worse.