Use streaming I/O in sequential scans.
authorThomas Munro <tmunro@postgresql.org>
Sun, 7 Apr 2024 13:48:27 +0000 (01:48 +1200)
committerThomas Munro <tmunro@postgresql.org>
Sun, 7 Apr 2024 13:53:57 +0000 (01:53 +1200)
Instead of calling ReadBuffer() for each block, heap sequential scans
and TID range scans now use the streaming API introduced in b5a9b18cd0.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com

src/backend/access/heap/heapam.c
src/include/access/heapam.h

index a32acc90473f1a6430faaa5d57ee65b260897698..2663f52d1a7caf60ce0b2996b7d14979ad151830 100644 (file)
@@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+/*
+ * Streaming read API callback for parallel sequential scans. Returns the next
+ * block the caller wants from the read stream or InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_parallel(ReadStream *stream,
+                                                                       void *callback_private_data,
+                                                                       void *per_buffer_data)
+{
+       HeapScanDesc scan = (HeapScanDesc) callback_private_data;
+
+       Assert(ScanDirectionIsForward(scan->rs_dir));
+       Assert(scan->rs_base.rs_parallel);
+
+       if (unlikely(!scan->rs_inited))
+       {
+               /* parallel scan */
+               table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+                                                                                                scan->rs_parallelworkerdata,
+                                                                                                (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+
+               /* may return InvalidBlockNumber if there are no more blocks */
+               scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+                                                                                                                                       scan->rs_parallelworkerdata,
+                                                                                                                                       (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+               scan->rs_inited = true;
+       }
+       else
+       {
+               scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+                                                                                                                                       scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
+                                                                                                                                       scan->rs_base.rs_parallel);
+       }
+
+       return scan->rs_prefetch_block;
+}
+
+/*
+ * Streaming read API callback for serial sequential and TID range scans.
+ * Returns the next block the caller wants from the read stream or
+ * InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_serial(ReadStream *stream,
+                                                                 void *callback_private_data,
+                                                                 void *per_buffer_data)
+{
+       HeapScanDesc scan = (HeapScanDesc) callback_private_data;
+
+       if (unlikely(!scan->rs_inited))
+       {
+               scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
+               scan->rs_inited = true;
+       }
+       else
+               scan->rs_prefetch_block = heapgettup_advance_block(scan,
+                                                                                                                  scan->rs_prefetch_block,
+                                                                                                                  scan->rs_dir);
+
+       return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *             initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
        scan->rs_cbuf = InvalidBuffer;
        scan->rs_cblock = InvalidBlockNumber;
 
+       /*
+        * Initialize to ForwardScanDirection because it is most common and
+        * because heap scans go forward before going backward (e.g. CURSORs).
+        */
+       scan->rs_dir = ForwardScanDirection;
+       scan->rs_prefetch_block = InvalidBlockNumber;
+
        /* page-at-a-time fields are always invalid when not rs_inited */
 
        /*
@@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
 /*
  * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
  *
- * Read the next block of the scan relation into a buffer and pin that buffer
- * before saving it in the scan descriptor.
+ * Read the next block of the scan relation from the read stream and save it
+ * in the scan descriptor.  It is already pinned.
  */
 static inline void
 heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
 {
+       Assert(scan->rs_read_stream);
+
        /* release previous scan buffer, if any */
        if (BufferIsValid(scan->rs_cbuf))
        {
@@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
         */
        CHECK_FOR_INTERRUPTS();
 
-       if (unlikely(!scan->rs_inited))
+       /*
+        * If the scan direction is changing, reset the prefetch block to the
+        * current block. Otherwise, we will incorrectly prefetch the blocks
+        * between the prefetch block and the current block again before
+        * prefetching blocks in the new, correct scan direction.
+        */
+       if (unlikely(scan->rs_dir != dir))
        {
-               scan->rs_cblock = heapgettup_initial_block(scan, dir);
+               scan->rs_prefetch_block = scan->rs_cblock;
+               read_stream_reset(scan->rs_read_stream);
+       }
 
-               /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-               Assert(scan->rs_cblock != InvalidBlockNumber ||
-                          !BufferIsValid(scan->rs_cbuf));
+       scan->rs_dir = dir;
 
-               scan->rs_inited = true;
-       }
-       else
-               scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
-                                                                                                  dir);
-
-       /* read block if valid */
-       if (BlockNumberIsValid(scan->rs_cblock))
-               scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-                                                                                  scan->rs_cblock, RBM_NORMAL,
-                                                                                  scan->rs_strategy);
+       scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+       if (BufferIsValid(scan->rs_cbuf))
+               scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
 
 /*
@@ -560,6 +629,7 @@ static pg_noinline BlockNumber
 heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 {
        Assert(!scan->rs_inited);
+       Assert(scan->rs_base.rs_parallel == NULL);
 
        /* When there are no pages to scan, return InvalidBlockNumber */
        if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
@@ -567,27 +637,10 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 
        if (ScanDirectionIsForward(dir))
        {
-               /* serial scan */
-               if (scan->rs_base.rs_parallel == NULL)
-                       return scan->rs_startblock;
-               else
-               {
-                       /* parallel scan */
-                       table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-                                                                                                        scan->rs_parallelworkerdata,
-                                                                                                        (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-
-                       /* may return InvalidBlockNumber if there are no more blocks */
-                       return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-                                                                                                        scan->rs_parallelworkerdata,
-                                                                                                        (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-               }
+               return scan->rs_startblock;
        }
        else
        {
-               /* backward parallel scan not supported */
-               Assert(scan->rs_base.rs_parallel == NULL);
-
                /*
                 * Disable reporting to syncscan logic in a backwards scan; it's not
                 * very likely anyone else is doing the same thing at the same time,
@@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
 static inline BlockNumber
 heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
 {
-       if (ScanDirectionIsForward(dir))
+       Assert(scan->rs_base.rs_parallel == NULL);
+
+       if (likely(ScanDirectionIsForward(dir)))
        {
-               if (scan->rs_base.rs_parallel == NULL)
-               {
-                       block++;
+               block++;
 
-                       /* wrap back to the start of the heap */
-                       if (block >= scan->rs_nblocks)
-                               block = 0;
+               /* wrap back to the start of the heap */
+               if (block >= scan->rs_nblocks)
+                       block = 0;
 
-                       /*
-                        * Report our new scan position for synchronization purposes. We
-                        * don't do that when moving backwards, however. That would just
-                        * mess up any other forward-moving scanners.
-                        *
-                        * Note: we do this before checking for end of scan so that the
-                        * final state of the position hint is back at the start of the
-                        * rel.  That's not strictly necessary, but otherwise when you run
-                        * the same query multiple times the starting position would shift
-                        * a little bit backwards on every invocation, which is confusing.
-                        * We don't guarantee any specific ordering in general, though.
-                        */
-                       if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
-                               ss_report_location(scan->rs_base.rs_rd, block);
-
-                       /* we're done if we're back at where we started */
-                       if (block == scan->rs_startblock)
-                               return InvalidBlockNumber;
+               /*
+                * Report our new scan position for synchronization purposes. We don't
+                * do that when moving backwards, however. That would just mess up any
+                * other forward-moving scanners.
+                *
+                * Note: we do this before checking for end of scan so that the final
+                * state of the position hint is back at the start of the rel.  That's
+                * not strictly necessary, but otherwise when you run the same query
+                * multiple times the starting position would shift a little bit
+                * backwards on every invocation, which is confusing. We don't
+                * guarantee any specific ordering in general, though.
+                */
+               if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
+                       ss_report_location(scan->rs_base.rs_rd, block);
 
-                       /* check if the limit imposed by heap_setscanlimits() is met */
-                       if (scan->rs_numblocks != InvalidBlockNumber)
-                       {
-                               if (--scan->rs_numblocks == 0)
-                                       return InvalidBlockNumber;
-                       }
+               /* we're done if we're back at where we started */
+               if (block == scan->rs_startblock)
+                       return InvalidBlockNumber;
 
-                       return block;
-               }
-               else
+               /* check if the limit imposed by heap_setscanlimits() is met */
+               if (scan->rs_numblocks != InvalidBlockNumber)
                {
-                       return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-                                                                                                        scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
-                                                                                                        scan->rs_base.rs_parallel);
+                       if (--scan->rs_numblocks == 0)
+                               return InvalidBlockNumber;
                }
+
+               return block;
        }
        else
        {
@@ -879,6 +925,7 @@ continue_page:
 
        scan->rs_cbuf = InvalidBuffer;
        scan->rs_cblock = InvalidBlockNumber;
+       scan->rs_prefetch_block = InvalidBlockNumber;
        tuple->t_data = NULL;
        scan->rs_inited = false;
 }
@@ -974,6 +1021,7 @@ continue_page:
                ReleaseBuffer(scan->rs_cbuf);
        scan->rs_cbuf = InvalidBuffer;
        scan->rs_cblock = InvalidBlockNumber;
+       scan->rs_prefetch_block = InvalidBlockNumber;
        tuple->t_data = NULL;
        scan->rs_inited = false;
 }
@@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
        initscan(scan, key, false);
 
+       scan->rs_read_stream = NULL;
+
+       /*
+        * Set up a read stream for sequential scans and TID range scans. This
+        * should be done after initscan() because initscan() allocates the
+        * BufferAccessStrategy object passed to the streaming read API.
+        */
+       if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+               scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+       {
+               ReadStreamBlockNumberCB cb;
+
+               if (scan->rs_base.rs_parallel)
+                       cb = heap_scan_stream_read_next_parallel;
+               else
+                       cb = heap_scan_stream_read_next_serial;
+
+               scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+                                                                                                                 scan->rs_strategy,
+                                                                                                                 scan->rs_base.rs_rd,
+                                                                                                                 MAIN_FORKNUM,
+                                                                                                                 cb,
+                                                                                                                 scan,
+                                                                                                                 0);
+       }
+
+
        return (TableScanDesc) scan;
 }
 
@@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 
        Assert(scan->rs_empty_tuples_pending == 0);
 
+       /*
+        * The read stream is reset on rescan. This must be done before
+        * initscan(), as some state referred to by read_stream_reset() is reset
+        * in initscan().
+        */
+       if (scan->rs_read_stream)
+               read_stream_reset(scan->rs_read_stream);
+
        /*
         * reinitialize scan descriptor
         */
@@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
 
        Assert(scan->rs_empty_tuples_pending == 0);
 
+       /*
+        * Must free the read stream before freeing the BufferAccessStrategy.
+        */
+       if (scan->rs_read_stream)
+               read_stream_end(scan->rs_read_stream);
+
        /*
         * decrement relation reference count and free scan descriptor storage
         */
index 750ea30852e3aae4b5278b967875ebd845fb00ac..48936826bcc2e62caab043bd81bb4616694a2994 100644 (file)
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -70,6 +71,20 @@ typedef struct HeapScanDescData
 
        HeapTupleData rs_ctup;          /* current tuple in scan, if any */
 
+       /* For scans that stream reads */
+       ReadStream *rs_read_stream;
+
+       /*
+        * For sequential scans and TID range scans to stream reads. The read
+        * stream is allocated at the beginning of the scan and reset on rescan or
+        * when the scan direction changes. The scan direction is saved each time
+        * a new page is requested. If the scan direction changes from one page to
+        * the next, the read stream releases all previously pinned buffers and
+        * resets the prefetch block.
+        */
+       ScanDirection rs_dir;
+       BlockNumber rs_prefetch_block;
+
        /*
         * For parallel scans to store page allocation data.  NULL when not
         * performing a parallel scan.