*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.178 2010/03/28 09:27:01 sriggs Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.179 2010/08/29 19:33:14 tgl Exp $
*
*-------------------------------------------------------------------------
*/
static void _bt_checksplitloc(FindSplitData *state,
OffsetNumber firstoldonright, bool newitemonleft,
int dataitemstoleft, Size firstoldonrightsz);
-static void _bt_pgaddtup(Relation rel, Page page,
- Size itemsize, IndexTuple itup,
- OffsetNumber itup_off, const char *where);
+static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
+ OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
/* Do the update. No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
- _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page");
+ if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
+ elog(PANIC, "failed to add new item to block %u in index \"%s\"",
+ itup_blkno, RelationGetRelationName(rel));
MarkBufferDirty(buf);
Page origpage;
Page leftpage,
rightpage;
+ BlockNumber origpagenumber,
+ rightpagenumber;
BTPageOpaque ropaque,
lopaque,
oopaque;
OffsetNumber i;
bool isroot;
+ /* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
+
+ /*
+ * origpage is the original page to be split. leftpage is a temporary
+ * buffer that receives the left-sibling data, which will be copied back
+ * into origpage on success. rightpage is the new page that receives
+ * the right-sibling data. If we fail before reaching the critical
+ * section, origpage hasn't been modified and leftpage is only workspace.
+ * In principle we shouldn't need to worry about rightpage either,
+ * because it hasn't been linked into the btree page structure; but to
+ * avoid leaving possibly-confusing junk behind, we are careful to rewrite
+ * rightpage as zeroes before throwing any error.
+ */
origpage = BufferGetPage(buf);
leftpage = PageGetTempPage(origpage);
rightpage = BufferGetPage(rbuf);
+ origpagenumber = BufferGetBlockNumber(buf);
+ rightpagenumber = BufferGetBlockNumber(rbuf);
+
_bt_pageinit(leftpage, BufferGetPageSize(buf));
/* rightpage was already initialized by _bt_getbuf */
lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
lopaque->btpo_prev = oopaque->btpo_prev;
- lopaque->btpo_next = BufferGetBlockNumber(rbuf);
- ropaque->btpo_prev = BufferGetBlockNumber(buf);
+ lopaque->btpo_next = rightpagenumber;
+ ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
/* Since we already have write-lock on both pages, ok to read cycleid */
item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
false, false) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the right sibling"
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the right sibling"
" while splitting block %u of index \"%s\"",
- BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
- elog(PANIC, "failed to add hikey to the left sibling"
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the left sibling"
" while splitting block %u of index \"%s\"",
- BufferGetBlockNumber(buf), RelationGetRelationName(rel));
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
/*
{
if (newitemonleft)
{
- _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
/* decide which page to put it on */
if (i < firstright)
{
- _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
- "left sibling");
+ if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
leftoff = OffsetNumberNext(leftoff);
}
else
{
- _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add old item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
}
* not be splitting the page).
*/
Assert(!newitemonleft);
- _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
- "right sibling");
+ if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add new item to the right sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
rightoff = OffsetNumberNext(rightoff);
}
* neighbors.
*/
- if (!P_RIGHTMOST(ropaque))
+ if (!P_RIGHTMOST(oopaque))
{
- sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
+ sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
- if (sopaque->btpo_prev != ropaque->btpo_prev)
- elog(PANIC, "right sibling's left-link doesn't match: "
- "block %u links to %u instead of expected %u in index \"%s\"",
- ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev,
+ if (sopaque->btpo_prev != origpagenumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "right sibling's left-link doesn't match: "
+ "block %u links to %u instead of expected %u in index \"%s\"",
+ oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
RelationGetRelationName(rel));
+ }
/*
* Check to see if we can set the SPLIT_END flag in the right-hand
*
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
* not starting the critical section till here because we haven't been
- * scribbling on the original page yet, and we don't care about the new
- * sibling until it's linked into the btree.
+ * scribbling on the original page yet; see comments above.
*/
START_CRIT_SECTION();
* (in the page management code) that the center of a page always be
* clean, and the most efficient way to guarantee this is just to compact
* the data by reinserting it into a new left page. (XXX the latter
- * comment is probably obsolete.)
+ * comment is probably obsolete; but in any case it's good to not scribble
+ * on the original page until we enter the critical section.)
*
* We need to do this before writing the WAL record, so that XLogInsert
* can WAL log an image of the page if necessary.
*/
PageRestoreTempPage(leftpage, origpage);
+ /* leftpage, lopaque must not be used below here */
MarkBufferDirty(buf);
MarkBufferDirty(rbuf);
if (!P_RIGHTMOST(ropaque))
{
- sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
+ sopaque->btpo_prev = rightpagenumber;
MarkBufferDirty(sbuf);
}
XLogRecData *lastrdata;
xlrec.node = rel->rd_node;
- xlrec.leftsib = BufferGetBlockNumber(buf);
- xlrec.rightsib = BufferGetBlockNumber(rbuf);
+ xlrec.leftsib = origpagenumber;
+ xlrec.rightsib = rightpagenumber;
xlrec.rnext = ropaque->btpo_next;
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstright;
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
-static void
-_bt_pgaddtup(Relation rel,
- Page page,
+static bool
+_bt_pgaddtup(Page page,
Size itemsize,
IndexTuple itup,
- OffsetNumber itup_off,
- const char *where)
+ OffsetNumber itup_off)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
IndexTupleData trunctuple;
if (PageAddItem(page, (Item) itup, itemsize, itup_off,
false, false) == InvalidOffsetNumber)
- elog(PANIC, "failed to add item to the %s in index \"%s\"",
- where, RelationGetRelationName(rel));
+ return false;
+
+ return true;
}
/*
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.123 2010/07/06 19:18:55 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.124 2010/08/29 19:33:14 tgl Exp $
*
* NOTES
* Postgres btree pages look like ordinary relation pages. The opaque
*/
rightsib = opaque->btpo_next;
rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
+ page = BufferGetPage(rbuf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ if (opaque->btpo_prev != target)
+ elog(ERROR, "right sibling's left-link doesn't match: "
+ "block %u links to %u instead of expected %u in index \"%s\"",
+ rightsib, opaque->btpo_prev, target,
+ RelationGetRelationName(rel));
/*
* Next find and write-lock the current parent of the target page. This is
}
}
+ /*
+ * Check that the parent-page index items we're about to delete/overwrite
+ * contain what we expect. This can fail if the index has become
+ * corrupt for some reason. We want to throw any error before entering
+ * the critical section --- otherwise it'd be a PANIC.
+ *
+ * The test on the target item is just an Assert because _bt_getstackbuf
+ * should have guaranteed it has the expected contents. The test on the
+ * next-child downlink is known to sometimes fail in the field, though.
+ */
+ page = BufferGetPage(pbuf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+#ifdef USE_ASSERT_CHECKING
+ itemid = PageGetItemId(page, poffset);
+ itup = (IndexTuple) PageGetItem(page, itemid);
+ Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
+#endif
+
+ if (!parent_half_dead)
+ {
+ OffsetNumber nextoffset;
+
+ nextoffset = OffsetNumberNext(poffset);
+ itemid = PageGetItemId(page, nextoffset);
+ itup = (IndexTuple) PageGetItem(page, itemid);
+ if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
+ elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"",
+ rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)),
+ parent, RelationGetRelationName(rel));
+ }
+
/*
* Here we begin doing the deletion.
*/
* to copy the right sibling's downlink over the target downlink, and then
* delete the following item.
*/
- page = BufferGetPage(pbuf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (parent_half_dead)
{
PageIndexTupleDelete(page, poffset);
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
- Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
- /* This part is just for double-checking */
- itemid = PageGetItemId(page, nextoffset);
- itup = (IndexTuple) PageGetItem(page, itemid);
- if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
- elog(PANIC, "right sibling %u of block %u is not next child of %u in index \"%s\"",
- rightsib, target, BufferGetBlockNumber(pbuf),
- RelationGetRelationName(rel));
PageIndexTupleDelete(page, nextoffset);
}
/*
* Update siblings' side-links. Note the target page's side-links will
- * continue to point to the siblings.
+ * continue to point to the siblings. Asserts here are just rechecking
+ * things we already verified above.
*/
if (BufferIsValid(lbuf))
{