summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/nbtree/nbtinsert.c42
-rw-r--r--src/backend/access/nbtree/nbtxlog.c106
-rw-r--r--src/include/access/nbtree.h3
3 files changed, 96 insertions, 55 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index c05f82b209d..6d8cf324ecf 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.134 2006/03/31 23:32:05 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.135 2006/04/13 03:53:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -323,9 +323,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* child page on the parent.
* + updates the metapage if a true root or fast root is split.
*
- * On entry, we must have the right buffer on which to do the
- * insertion, and the buffer must be pinned and locked. On return,
- * we will have dropped both the pin and the write lock on the buffer.
+ * On entry, we must have the right buffer in which to do the
+ * insertion, and the buffer must be pinned and write-locked. On return,
+ * we will have dropped both the pin and the lock on the buffer.
*
* If 'afteritem' is >0 then the new tuple must be inserted after the
* existing item of that number, noplace else. If 'afteritem' is 0
@@ -527,6 +527,8 @@ _bt_insertonpg(Relation rel,
*/
if (split_only_page)
{
+ Assert(!P_ISLEAF(lpageop));
+
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
@@ -557,10 +559,11 @@ _bt_insertonpg(Relation rel,
if (!rel->rd_istemp)
{
xl_btree_insert xlrec;
+ BlockNumber xldownlink;
xl_btree_metadata xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
- XLogRecData rdata[3];
+ XLogRecData rdata[4];
XLogRecData *nextrdata;
IndexTupleData trunctuple;
@@ -572,6 +575,22 @@ _bt_insertonpg(Relation rel,
rdata[0].buffer = InvalidBuffer;
rdata[0].next = nextrdata = &(rdata[1]);
+ if (P_ISLEAF(lpageop))
+ xlinfo = XLOG_BTREE_INSERT_LEAF;
+ else
+ {
+ xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+
+ nextrdata->data = (char *) &xldownlink;
+ nextrdata->len = sizeof(BlockNumber);
+ nextrdata->buffer = InvalidBuffer;
+ nextrdata->next = nextrdata + 1;
+ nextrdata++;
+
+ xlinfo = XLOG_BTREE_INSERT_UPPER;
+ }
+
if (BufferIsValid(metabuf))
{
xlmeta.root = metad->btm_root;
@@ -584,12 +603,9 @@ _bt_insertonpg(Relation rel,
nextrdata->buffer = InvalidBuffer;
nextrdata->next = nextrdata + 1;
nextrdata++;
+
xlinfo = XLOG_BTREE_INSERT_META;
}
- else if (P_ISLEAF(lpageop))
- xlinfo = XLOG_BTREE_INSERT_LEAF;
- else
- xlinfo = XLOG_BTREE_INSERT_UPPER;
/* Read comments in _bt_pgaddtup */
if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
@@ -633,7 +649,7 @@ _bt_insertonpg(Relation rel,
/*
* _bt_split() -- split a page in the btree.
*
- * On entry, buf is the page to split, and is write-locked and pinned.
+ * On entry, buf is the page to split, and is pinned and write-locked.
* firstright is the item index of the first item to be moved to the
* new right page. newitemoff etc. tell us about the new item that
* must be inserted along with the data from the old page.
@@ -860,7 +876,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
* Direct access to page is not good but faster - we should implement
* some new func in page API. Note we only store the tuples
* themselves, knowing that the item pointers are in the same order
- * and can be reconstructed by scanning the tuples.
+ * and can be reconstructed by scanning the tuples. See comments
+ * for _bt_restore_page().
*/
xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
((PageHeader) leftpage)->pd_upper;
@@ -1445,6 +1462,9 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
* Insert the left page pointer into the new root page. The root page is
* the rightmost page on its level so there is no "high key" in it; the
* two items will go into positions P_HIKEY and P_FIRSTKEY.
+ *
+ * Note: we *must* insert the two items in item-number order, for the
+ * benefit of _bt_restore_page().
*/
if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
elog(PANIC, "failed to add leftkey to new root page");
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index c10936a8e27..7aae27dc853 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.31 2006/04/01 03:03:37 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.32 2006/04/13 03:53:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -51,32 +51,16 @@ log_incomplete_split(RelFileNode node, BlockNumber leftblk,
}
static void
-forget_matching_split(Relation reln, RelFileNode node,
- BlockNumber insertblk, OffsetNumber offnum,
- bool is_root)
+forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
{
- Buffer buffer;
- Page page;
- IndexTuple itup;
- BlockNumber rightblk;
ListCell *l;
- /* Get downlink TID from page */
- buffer = XLogReadBuffer(reln, insertblk, false);
- if (!BufferIsValid(buffer))
- return;
- page = (Page) BufferGetPage(buffer);
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
- rightblk = ItemPointerGetBlockNumber(&(itup->t_tid));
- Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
- UnlockReleaseBuffer(buffer);
-
foreach(l, incomplete_splits)
{
bt_incomplete_split *split = (bt_incomplete_split *) lfirst(l);
if (RelFileNodeEquals(node, split->node) &&
- rightblk == split->rightblk)
+ downlink == split->rightblk)
{
if (is_root != split->is_root)
elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
@@ -87,6 +71,20 @@ forget_matching_split(Relation reln, RelFileNode node,
}
}
+/*
+ * _bt_restore_page -- re-enter all the index tuples on a page
+ *
+ * The page is freshly init'd, and *from (length len) is a copy of what
+ * had been its upper part (pd_upper to pd_special). We assume that the
+ * tuples had been added to the page in item-number order, and therefore
+ * the one with highest item number appears first (lowest on the page).
+ *
+ * NOTE: the way this routine is coded, the rebuilt page will have the items
+ * in correct itemno sequence, but physically the opposite order from the
+ * original, because we insert them in the opposite of itemno order. This
+ * does not matter in any current btree code, but it's something to keep an
+ * eye on. Is it worth changing just on general principles?
+ */
static void
_bt_restore_page(Page page, char *from, int len)
{
@@ -158,9 +156,16 @@ btree_xlog_insert(bool isleaf, bool ismeta,
char *datapos;
int datalen;
xl_btree_metadata md;
+ BlockNumber downlink = 0;
datapos = (char *) xlrec + SizeOfBtreeInsert;
datalen = record->xl_len - SizeOfBtreeInsert;
+ if (!isleaf)
+ {
+ memcpy(&downlink, datapos, sizeof(BlockNumber));
+ datapos += sizeof(BlockNumber);
+ datalen -= sizeof(BlockNumber);
+ }
if (ismeta)
{
memcpy(&md, datapos, sizeof(xl_btree_metadata));
@@ -168,8 +173,7 @@ btree_xlog_insert(bool isleaf, bool ismeta,
datalen -= sizeof(xl_btree_metadata);
}
- if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta &&
- incomplete_splits == NIL)
+ if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
return; /* nothing to do */
reln = XLogOpenRelation(xlrec->target.node);
@@ -208,13 +212,8 @@ btree_xlog_insert(bool isleaf, bool ismeta,
md.fastroot, md.fastlevel);
/* Forget any split this insertion completes */
- if (!isleaf && incomplete_splits != NIL)
- {
- forget_matching_split(reln, xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
- false);
- }
+ if (!isleaf)
+ forget_matching_split(xlrec->target.node, downlink, false);
}
static void
@@ -224,14 +223,17 @@ btree_xlog_split(bool onleft, bool isroot,
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
Relation reln;
BlockNumber targetblk;
+ OffsetNumber targetoff;
BlockNumber leftsib;
BlockNumber rightsib;
+ BlockNumber downlink = 0;
Buffer buffer;
Page page;
BTPageOpaque pageop;
reln = XLogOpenRelation(xlrec->target.node);
targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ targetoff = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
leftsib = (onleft) ? targetblk : xlrec->otherblk;
rightsib = (onleft) ? xlrec->otherblk : targetblk;
@@ -252,6 +254,16 @@ btree_xlog_split(bool onleft, bool isroot,
(char *) xlrec + SizeOfBtreeSplit,
xlrec->leftlen);
+ if (onleft && xlrec->level > 0)
+ {
+ IndexTuple itup;
+
+ /* extract downlink in the target tuple */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
+ downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
@@ -274,6 +286,16 @@ btree_xlog_split(bool onleft, bool isroot,
(char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
+ if (!onleft && xlrec->level > 0)
+ {
+ IndexTuple itup;
+
+ /* extract downlink in the target tuple */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
+ downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
@@ -308,13 +330,8 @@ btree_xlog_split(bool onleft, bool isroot,
}
/* Forget any split this insertion completes */
- if (xlrec->level > 0 && incomplete_splits != NIL)
- {
- forget_matching_split(reln, xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
- false);
- }
+ if (xlrec->level > 0)
+ forget_matching_split(xlrec->target.node, downlink, false);
/* The job ain't done till the parent link is inserted... */
log_incomplete_split(xlrec->target.node,
@@ -516,6 +533,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
BTPageOpaque pageop;
+ BlockNumber downlink = 0;
reln = XLogOpenRelation(xlrec->node);
buffer = XLogReadBuffer(reln, xlrec->rootblk, true);
@@ -532,9 +550,17 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
pageop->btpo_flags |= BTP_LEAF;
if (record->xl_len > SizeOfBtreeNewroot)
+ {
+ IndexTuple itup;
+
_bt_restore_page(page,
(char *) xlrec + SizeOfBtreeNewroot,
record->xl_len - SizeOfBtreeNewroot);
+ /* extract downlink to the right-hand split page */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
+ downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ }
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
@@ -546,14 +572,8 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
xlrec->rootblk, xlrec->level);
/* Check to see if this satisfies any incomplete insertions */
- if (record->xl_len > SizeOfBtreeNewroot &&
- incomplete_splits != NIL)
- {
- forget_matching_split(reln, xlrec->node,
- xlrec->rootblk,
- P_FIRSTKEY,
- true);
- }
+ if (record->xl_len > SizeOfBtreeNewroot)
+ forget_matching_split(xlrec->node, downlink, true);
}
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index ba4a8441f1c..4b254f1fd12 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.95 2006/04/01 03:03:37 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/nbtree.h,v 1.96 2006/04/13 03:53:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -206,6 +206,7 @@ typedef struct xl_btree_metadata
typedef struct xl_btree_insert
{
xl_btreetid target; /* inserted tuple id */
+ /* BlockNumber downlink field FOLLOWS IF NOT XLOG_BTREE_INSERT_LEAF */
/* xl_btree_metadata FOLLOWS IF XLOG_BTREE_INSERT_META */
/* INDEX TUPLE FOLLOWS AT END OF STRUCT */
} xl_btree_insert;