More refactoring of heapgettup() and heapgettup_pagemode()
authorDavid Rowley <drowley@postgresql.org>
Tue, 7 Feb 2023 04:24:07 +0000 (17:24 +1300)
committerDavid Rowley <drowley@postgresql.org>
Tue, 7 Feb 2023 04:24:07 +0000 (17:24 +1300)
Here we further simplify the code in heapgettup() and
heapgettup_pagemode() to make better use of the helper functions added in
the previous recent refactors in this area.

In passing, remove an unneeded cast added in 8ca6d49f6.

Author: Melanie Plageman
Reviewed-by: Andres Freund, David Rowley
Discussion: https://postgr.es/m/CAAKRu_YSOnhKsDyFcqJsKtBSrd32DP-jjXmv7hL0BPD-z0TGXQ@mail.gmail.com

src/backend/access/heap/heapam.c

index 3bb1d5cff687dff944667604789f59a2945b4069..7eb79cee58d6b26411dd6c388641242d63f33682 100644 (file)
@@ -567,7 +567,7 @@ heapgettup_start_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
 
        TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
 
-       *linesleft = PageGetMaxOffsetNumber((Page) page) - FirstOffsetNumber + 1;
+       *linesleft = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1;
 
        if (ScanDirectionIsForward(dir))
                *lineoff = FirstOffsetNumber;
@@ -732,34 +732,17 @@ heapgettup(HeapScanDesc scan,
                   ScanKey key)
 {
        HeapTuple       tuple = &(scan->rs_ctup);
-       bool            backward = ScanDirectionIsBackward(dir);
        BlockNumber block;
        Page            page;
        OffsetNumber lineoff;
        int                     linesleft;
-       ItemId          lpp;
 
        if (unlikely(!scan->rs_inited))
        {
                block = heapgettup_initial_block(scan, dir);
-
-               /*
-                * Check if we have reached the end of the scan already. This could
-                * happen if the table is empty or if the parallel workers have
-                * already finished the scan before we did anything ourselves
-                */
-               if (block == InvalidBlockNumber)
-               {
-                       Assert(!BufferIsValid(scan->rs_cbuf));
-                       tuple->t_data = NULL;
-                       return;
-               }
+               /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
+               Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
                scan->rs_inited = true;
-
-               heapgetpage((TableScanDesc) scan, block);
-
-               LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
-               page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
        }
        else
        {
@@ -768,15 +751,20 @@ heapgettup(HeapScanDesc scan,
 
                LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
                page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff);
+               goto continue_page;
        }
 
        /*
         * advance the scan until we find a qualifying tuple or run out of stuff
         * to scan
         */
-       lpp = PageGetItemId(page, lineoff);
-       for (;;)
+       while (block != InvalidBlockNumber)
        {
+               heapgetpage((TableScanDesc) scan, block);
+               LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+               page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
+continue_page:
+
                /*
                 * Only continue scanning the page while we have lines left.
                 *
@@ -784,53 +772,39 @@ heapgettup(HeapScanDesc scan,
                 * PageGetMaxOffsetNumber(); both for forward scans when we resume the
                 * table scan, and for when we start scanning a new page.
                 */
-               while (linesleft > 0)
+               for (; linesleft > 0; linesleft--, lineoff += dir)
                {
-                       if (ItemIdIsNormal(lpp))
-                       {
-                               bool            valid;
+                       bool            visible;
+                       ItemId          lpp = PageGetItemId(page, lineoff);
 
-                               tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
-                               tuple->t_len = ItemIdGetLength(lpp);
-                               ItemPointerSet(&(tuple->t_self), block, lineoff);
+                       if (!ItemIdIsNormal(lpp))
+                               continue;
 
-                               /*
-                                * if current tuple qualifies, return it.
-                                */
-                               valid = HeapTupleSatisfiesVisibility(tuple,
-                                                                                                        scan->rs_base.rs_snapshot,
-                                                                                                        scan->rs_cbuf);
+                       tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+                       tuple->t_len = ItemIdGetLength(lpp);
+                       ItemPointerSet(&(tuple->t_self), block, lineoff);
 
-                               HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
-                                                                                                       tuple, scan->rs_cbuf,
-                                                                                                       scan->rs_base.rs_snapshot);
+                       visible = HeapTupleSatisfiesVisibility(tuple,
+                                                                                                  scan->rs_base.rs_snapshot,
+                                                                                                  scan->rs_cbuf);
 
-                               if (valid && key != NULL)
-                                       valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
-                                                                               nkeys, key);
+                       HeapCheckForSerializableConflictOut(visible, scan->rs_base.rs_rd,
+                                                                                               tuple, scan->rs_cbuf,
+                                                                                               scan->rs_base.rs_snapshot);
 
-                               if (valid)
-                               {
-                                       LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-                                       scan->rs_coffset = lineoff;
-                                       return;
-                               }
-                       }
+                       /* skip tuples not visible to this snapshot */
+                       if (!visible)
+                               continue;
 
-                       /*
-                        * otherwise move to the next item on the page
-                        */
-                       --linesleft;
-                       if (backward)
-                       {
-                               --lpp;                  /* move back in this page's ItemId array */
-                               --lineoff;
-                       }
-                       else
-                       {
-                               ++lpp;                  /* move forward in this page's ItemId array */
-                               ++lineoff;
-                       }
+                       /* skip any tuples that don't match the scan key */
+                       if (key != NULL &&
+                               !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
+                                                        nkeys, key))
+                               continue;
+
+                       LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+                       scan->rs_coffset = lineoff;
+                       return;
                }
 
                /*
@@ -841,40 +815,16 @@ heapgettup(HeapScanDesc scan,
 
                /* get the BlockNumber to scan next */
                block = heapgettup_advance_block(scan, block, dir);
+       }
 
-               /*
-                * return NULL if we've exhausted all the pages
-                */
-               if (block == InvalidBlockNumber)
-               {
-                       if (BufferIsValid(scan->rs_cbuf))
-                               ReleaseBuffer(scan->rs_cbuf);
-                       scan->rs_cbuf = InvalidBuffer;
-                       scan->rs_cblock = InvalidBlockNumber;
-                       tuple->t_data = NULL;
-                       scan->rs_inited = false;
-                       return;
-               }
-
-               heapgetpage((TableScanDesc) scan, block);
-
-               LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+       /* end of scan */
+       if (BufferIsValid(scan->rs_cbuf))
+               ReleaseBuffer(scan->rs_cbuf);
 
-               page = BufferGetPage(scan->rs_cbuf);
-               TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd,
-                                                  page);
-               linesleft = PageGetMaxOffsetNumber(page);
-               if (backward)
-               {
-                       lineoff = linesleft;
-                       lpp = PageGetItemId(page, linesleft);
-               }
-               else
-               {
-                       lineoff = FirstOffsetNumber;
-                       lpp = PageGetItemId(page, FirstOffsetNumber);
-               }
-       }
+       scan->rs_cbuf = InvalidBuffer;
+       scan->rs_cblock = InvalidBlockNumber;
+       tuple->t_data = NULL;
+       scan->rs_inited = false;
 }
 
 /* ----------------
@@ -897,35 +847,16 @@ heapgettup_pagemode(HeapScanDesc scan,
                                        ScanKey key)
 {
        HeapTuple       tuple = &(scan->rs_ctup);
-       bool            backward = ScanDirectionIsBackward(dir);
        BlockNumber block;
        Page            page;
        int                     lineindex;
-       OffsetNumber lineoff;
        int                     linesleft;
-       ItemId          lpp;
 
        if (unlikely(!scan->rs_inited))
        {
                block = heapgettup_initial_block(scan, dir);
-
-               /*
-                * Check if we have reached the end of the scan already. This could
-                * happen if the table is empty or if the other workers in a parallel
-                * scan have already finished the scan.
-                */
-               if (block == InvalidBlockNumber)
-               {
-                       Assert(!BufferIsValid(scan->rs_cbuf));
-                       tuple->t_data = NULL;
-                       return;
-               }
-
-               heapgetpage((TableScanDesc) scan, block);
-               page = BufferGetPage(scan->rs_cbuf);
-               TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
-               linesleft = scan->rs_ntuples;
-               lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
+               /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
+               Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
                scan->rs_inited = true;
        }
        else
@@ -940,16 +871,31 @@ heapgettup_pagemode(HeapScanDesc scan,
                        linesleft = scan->rs_ntuples - lineindex;
                else
                        linesleft = scan->rs_cindex;
+               /* lineindex now references the next or previous visible tid */
+
+               goto continue_page;
        }
 
        /*
         * advance the scan until we find a qualifying tuple or run out of stuff
         * to scan
         */
-       for (;;)
+       while (block != InvalidBlockNumber)
        {
-               while (linesleft > 0)
+               heapgetpage((TableScanDesc) scan, block);
+               page = BufferGetPage(scan->rs_cbuf);
+               TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
+               linesleft = scan->rs_ntuples;
+               lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
+
+               /* lineindex now references the next or previous visible tid */
+continue_page:
+
+               for (; linesleft > 0; linesleft--, lineindex += dir)
                {
+                       ItemId          lpp;
+                       OffsetNumber lineoff;
+
                        lineoff = scan->rs_vistuples[lineindex];
                        lpp = PageGetItemId(page, lineoff);
                        Assert(ItemIdIsNormal(lpp));
@@ -958,64 +904,27 @@ heapgettup_pagemode(HeapScanDesc scan,
                        tuple->t_len = ItemIdGetLength(lpp);
                        ItemPointerSet(&(tuple->t_self), block, lineoff);
 
-                       /*
-                        * if current tuple qualifies, return it.
-                        */
-                       if (key != NULL)
-                       {
-                               bool            valid;
-
-                               valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
-                                                                       nkeys, key);
-                               if (valid)
-                               {
-                                       scan->rs_cindex = lineindex;
-                                       return;
-                               }
-                       }
-                       else
-                       {
-                               scan->rs_cindex = lineindex;
-                               return;
-                       }
+                       /* skip any tuples that don't match the scan key */
+                       if (key != NULL &&
+                               !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
+                                                        nkeys, key))
+                               continue;
 
-                       /*
-                        * otherwise move to the next item on the page
-                        */
-                       --linesleft;
-                       if (backward)
-                               --lineindex;
-                       else
-                               ++lineindex;
+                       scan->rs_cindex = lineindex;
+                       return;
                }
 
                /* get the BlockNumber to scan next */
                block = heapgettup_advance_block(scan, block, dir);
-
-               /*
-                * return NULL if we've exhausted all the pages
-                */
-               if (block == InvalidBlockNumber)
-               {
-                       if (BufferIsValid(scan->rs_cbuf))
-                               ReleaseBuffer(scan->rs_cbuf);
-                       scan->rs_cbuf = InvalidBuffer;
-                       scan->rs_cblock = InvalidBlockNumber;
-                       tuple->t_data = NULL;
-                       scan->rs_inited = false;
-                       return;
-               }
-
-               heapgetpage((TableScanDesc) scan, block);
-
-               page = BufferGetPage(scan->rs_cbuf);
-               TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
-               linesleft = scan->rs_ntuples;
-               if (backward)
-                       lineindex = linesleft - 1;
-               else
-                       lineindex = 0;
        }
+
+       /* end of scan */
+       if (BufferIsValid(scan->rs_cbuf))
+               ReleaseBuffer(scan->rs_cbuf);
+       scan->rs_cbuf = InvalidBuffer;
+       scan->rs_cblock = InvalidBlockNumber;
+       tuple->t_data = NULL;
+       scan->rs_inited = false;
 }