Use atomic ops to hand out pages to scan in parallel scan.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 16 Aug 2017 13:18:41 +0000 (16:18 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 16 Aug 2017 13:18:41 +0000 (16:18 +0300)
With a lot of CPUs, the spinlock that protects the current scan location
in a parallel scan can become a bottleneck. Use an atomic fetch-and-add
instruction instead.

David Rowley

Discussion: https://www.postgresql.org/message-id/CAKJS1f9tgsPhqBcoPjv9_KUPZvTLCZ4jy%3DB%3DbhqgaKn7cYzm-w@mail.gmail.com

src/backend/access/heap/heapam.c
src/include/access/relscan.h

index e283fe5b1f51ac8dbc2712b16f6774efda96ee87..7dea8472c17243dfd99d7db0ab77a8531d384d55 100644 (file)
@@ -58,6 +58,7 @@
 #include "catalog/namespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "port/atomics.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
@@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
                        bool is_bitmapscan,
                        bool is_samplescan,
                        bool temp_snap);
+static void heap_parallelscan_startblock_init(HeapScanDesc scan);
 static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
                    TransactionId xid, CommandId cid, int options);
@@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan,
            }
            if (scan->rs_parallel != NULL)
            {
+               heap_parallelscan_startblock_init(scan);
+
                page = heap_parallelscan_nextpage(scan);
 
                /* Other processes might have already finished the scan. */
@@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan,
            }
            if (scan->rs_parallel != NULL)
            {
+               heap_parallelscan_startblock_init(scan);
+
                page = heap_parallelscan_nextpage(scan);
 
                /* Other processes might have already finished the scan. */
@@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan,
 
        /*
         * Caller is responsible for making sure that all workers have
-        * finished the scan before calling this, so it really shouldn't be
-        * necessary to acquire the mutex at all.  We acquire it anyway, just
-        * to be tidy.
+        * finished the scan before calling this.
         */
        parallel_scan = scan->rs_parallel;
-       SpinLockAcquire(&parallel_scan->phs_mutex);
-       parallel_scan->phs_cblock = parallel_scan->phs_startblock;
-       SpinLockRelease(&parallel_scan->phs_mutex);
+       pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
    }
 }
 
@@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
        !RelationUsesLocalBuffers(relation) &&
        target->phs_nblocks > NBuffers / 4;
    SpinLockInit(&target->phs_mutex);
-   target->phs_cblock = InvalidBlockNumber;
    target->phs_startblock = InvalidBlockNumber;
+   pg_atomic_write_u64(&target->phs_nallocated, 0);
    SerializeSnapshot(snapshot, target->phs_snapshot_data);
 }
 
@@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
 }
 
 /* ----------------
- *     heap_parallelscan_nextpage - get the next page to scan
+ *     heap_parallelscan_startblock_init - find and set the scan's startblock
  *
- *     Get the next page to scan.  Even if there are no pages left to scan,
- *     another backend could have grabbed a page to scan and not yet finished
- *     looking at it, so it doesn't follow that the scan is done when the
- *     first backend gets an InvalidBlockNumber return.
+ *     Determine where the parallel seq scan should start.  This function may
+ *     be called many times, once by each parallel worker.  We must be careful
+ *     only to set the startblock once.
  * ----------------
  */
-static BlockNumber
-heap_parallelscan_nextpage(HeapScanDesc scan)
+static void
+heap_parallelscan_startblock_init(HeapScanDesc scan)
 {
-   BlockNumber page = InvalidBlockNumber;
    BlockNumber sync_startpage = InvalidBlockNumber;
-   BlockNumber report_page = InvalidBlockNumber;
    ParallelHeapScanDesc parallel_scan;
 
    Assert(scan->rs_parallel);
@@ -1705,46 +1704,63 @@ retry:
            sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
            goto retry;
        }
-       parallel_scan->phs_cblock = parallel_scan->phs_startblock;
    }
+   SpinLockRelease(&parallel_scan->phs_mutex);
+}
+
+/* ----------------
+ *     heap_parallelscan_nextpage - get the next page to scan
+ *
+ *     Get the next page to scan.  Even if there are no pages left to scan,
+ *     another backend could have grabbed a page to scan and not yet finished
+ *     looking at it, so it doesn't follow that the scan is done when the
+ *     first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+   BlockNumber page;
+   ParallelHeapScanDesc parallel_scan;
+   uint64      nallocated;
+
+   Assert(scan->rs_parallel);
+   parallel_scan = scan->rs_parallel;
 
    /*
-    * The current block number is the next one that needs to be scanned,
-    * unless it's InvalidBlockNumber already, in which case there are no more
-    * blocks to scan.  After remembering the current value, we must advance
-    * it so that the next call to this function returns the next block to be
-    * scanned.
+    * phs_nallocated tracks how many pages have been allocated to workers
+    * already.  When phs_nallocated >= rs_nblocks, all blocks have been
+    * allocated.
+    *
+    * Because we use an atomic fetch-and-add to fetch the current value, the
+    * phs_nallocated counter will exceed rs_nblocks, because workers will
+    * still increment the value, when they try to allocate the next block but
+    * all blocks have been allocated already. The counter must be 64 bits
+    * wide because of that, to avoid wrapping around when rs_nblocks is close
+    * to 2^32.
+    *
+    * The actual page to return is calculated by adding the counter to the
+    * starting block number, modulo nblocks.
     */
-   page = parallel_scan->phs_cblock;
-   if (page != InvalidBlockNumber)
-   {
-       parallel_scan->phs_cblock++;
-       if (parallel_scan->phs_cblock >= scan->rs_nblocks)
-           parallel_scan->phs_cblock = 0;
-       if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
-       {
-           parallel_scan->phs_cblock = InvalidBlockNumber;
-           report_page = parallel_scan->phs_startblock;
-       }
-   }
-
-   /* Release the lock. */
-   SpinLockRelease(&parallel_scan->phs_mutex);
+   nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
+   if (nallocated >= scan->rs_nblocks)
+       page = InvalidBlockNumber;  /* all blocks have been allocated */
+   else
+       page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
 
    /*
     * Report scan location.  Normally, we report the current page number.
     * When we reach the end of the scan, though, we report the starting page,
     * not the ending page, just so the starting positions for later scans
     * doesn't slew backwards.  We only report the position at the end of the
-    * scan once, though: subsequent callers will have report nothing, since
-    * they will have page == InvalidBlockNumber.
+    * scan once, though: subsequent callers will report nothing.
     */
    if (scan->rs_syncscan)
    {
-       if (report_page == InvalidBlockNumber)
-           report_page = page;
-       if (report_page != InvalidBlockNumber)
-           ss_report_location(scan->rs_rd, report_page);
+       if (page != InvalidBlockNumber)
+           ss_report_location(scan->rs_rd, page);
+       else if (nallocated == scan->rs_nblocks)
+           ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
    }
 
    return page;
index a20646b2b706191b442e98f72420238015488238..147f862a2b85fcf19ab3992b3c5d2eda6ecc6b5b 100644 (file)
@@ -35,9 +35,10 @@ typedef struct ParallelHeapScanDescData
    Oid         phs_relid;      /* OID of relation to scan */
    bool        phs_syncscan;   /* report location to syncscan logic? */
    BlockNumber phs_nblocks;    /* # blocks in relation at start of scan */
-   slock_t     phs_mutex;      /* mutual exclusion for block number fields */
+   slock_t     phs_mutex;      /* mutual exclusion for setting startblock */
    BlockNumber phs_startblock; /* starting block number */
-   BlockNumber phs_cblock;     /* current block number */
+   pg_atomic_uint64 phs_nallocated;    /* number of blocks allocated to
+                                        * workers so far. */
    char        phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
 }          ParallelHeapScanDescData;