btree: Support parallel index scans.
authorRobert Haas <rhaas@postgresql.org>
Wed, 15 Feb 2017 12:41:14 +0000 (07:41 -0500)
committerRobert Haas <rhaas@postgresql.org>
Wed, 15 Feb 2017 12:41:14 +0000 (07:41 -0500)
This isn't exposed to the optimizer or the executor yet; we'll add
support for those things in a separate patch.  But this puts the
basic mechanism in place: several processes can attach to a parallel
btree index scan, and each one will get a subset of the tuples that
would have been produced by a non-parallel scan.  Each index page
becomes the responsibility of a single worker, which then returns
all of the TIDs on that page.

Rahila Syed, Amit Kapila, Robert Haas, reviewed and tested by
Anastasia Lubennikova, Tushar Ahuja, and Haribabu Kommi.

doc/src/sgml/monitoring.sgml
src/backend/access/nbtree/nbtree.c
src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtutils.c
src/backend/postmaster/pgstat.c
src/include/access/nbtree.h
src/include/pgstat.h
src/tools/pgindent/typedefs.list

index 5b67defdb8918e6718a265b78c7bc8af4fb6bea5..fad5cb05b95dac91a03af111af0b179ea690bbe4 100644 (file)
@@ -1207,7 +1207,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry>Waiting in an extension.</entry>
         </row>
         <row>
-         <entry morerows="9"><literal>IPC</></entry>
+         <entry morerows="10"><literal>IPC</></entry>
          <entry><literal>BgWorkerShutdown</></entry>
          <entry>Waiting for background worker to shut down.</entry>
         </row>
@@ -1215,6 +1215,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>BgWorkerStartup</></entry>
          <entry>Waiting for background worker to start up.</entry>
         </row>
+        <row>
+         <entry><literal>BtreePage</></entry>
+         <entry>Waiting for the page number needed to continue a parallel btree scan to become available.</entry>
+        </row>
         <row>
          <entry><literal>ExecuteGather</></entry>
          <entry>Waiting for activity from child process when executing <literal>Gather</> node.</entry>
index 945e563fcc50bd5c5a629e792909f6fdca4f1bad..cbc575d5cf2e8b1149f542067e812d8590b0a0c9 100644 (file)
@@ -23,6 +23,8 @@
 #include "access/xlog.h"
 #include "catalog/index.h"
 #include "commands/vacuum.h"
+#include "pgstat.h"
+#include "storage/condition_variable.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -63,6 +65,45 @@ typedef struct
    MemoryContext pagedelcontext;
 } BTVacState;
 
+/*
+ * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started.
+ *
+ * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to
+ * a new page; others must wait.
+ *
+ * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan
+ * to a new page; some process can start doing that.
+ *
+ * BTPARALLEL_DONE indicates that the scan is complete (including error exit).
+ * We reach this state once for every distinct combination of array keys.
+ */
+typedef enum
+{
+   BTPARALLEL_NOT_INITIALIZED,
+   BTPARALLEL_ADVANCING,
+   BTPARALLEL_IDLE,
+   BTPARALLEL_DONE
+} BTPS_State;
+
+/*
+ * BTParallelScanDescData contains btree specific shared information required
+ * for parallel scan.
+ */
+typedef struct BTParallelScanDescData
+{
+   BlockNumber btps_scanPage;  /* latest or next page to be scanned */
+   BTPS_State  btps_pageStatus;/* indicates whether next page is available
+                                * for scan. see above for possible states of
+                                * parallel scan. */
+   int         btps_arrayKeyCount;     /* count indicating number of array
+                                        * scan keys processed by parallel
+                                        * scan */
+   slock_t     btps_mutex;     /* protects above variables */
+   ConditionVariable btps_cv;  /* used to synchronize parallel scan */
+} BTParallelScanDescData;
+
+typedef struct BTParallelScanDescData *BTParallelScanDesc;
+
 
 static void btbuildCallback(Relation index,
                HeapTuple htup,
@@ -118,9 +159,9 @@ bthandler(PG_FUNCTION_ARGS)
    amroutine->amendscan = btendscan;
    amroutine->ammarkpos = btmarkpos;
    amroutine->amrestrpos = btrestrpos;
-   amroutine->amestimateparallelscan = NULL;
-   amroutine->aminitparallelscan = NULL;
-   amroutine->amparallelrescan = NULL;
+   amroutine->amestimateparallelscan = btestimateparallelscan;
+   amroutine->aminitparallelscan = btinitparallelscan;
+   amroutine->amparallelrescan = btparallelrescan;
 
    PG_RETURN_POINTER(amroutine);
 }
@@ -491,6 +532,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
    }
 
    so->markItemIndex = -1;
+   so->arrayKeyCount = 0;
    BTScanPosUnpinIfPinned(so->markPos);
    BTScanPosInvalidate(so->markPos);
 
@@ -652,6 +694,217 @@ btrestrpos(IndexScanDesc scan)
    }
 }
 
+/*
+ * btestimateparallelscan -- estimate storage for BTParallelScanDescData
+ */
+Size
+btestimateparallelscan(void)
+{
+   return sizeof(BTParallelScanDescData);
+}
+
+/*
+ * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan
+ */
+void
+btinitparallelscan(void *target)
+{
+   BTParallelScanDesc bt_target = (BTParallelScanDesc) target;
+
+   SpinLockInit(&bt_target->btps_mutex);
+   bt_target->btps_scanPage = InvalidBlockNumber;
+   bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+   bt_target->btps_arrayKeyCount = 0;
+   ConditionVariableInit(&bt_target->btps_cv);
+}
+
+/*
+ * btparallelrescan() -- reset parallel scan
+ */
+void
+btparallelrescan(IndexScanDesc scan)
+{
+   BTParallelScanDesc btscan;
+   ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+
+   Assert(parallel_scan);
+
+   btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                 parallel_scan->ps_offset);
+
+   /*
+    * In theory, we don't need to acquire the spinlock here, because there
+    * shouldn't be any other workers running at this point, but we do so for
+    * consistency.
+    */
+   SpinLockAcquire(&btscan->btps_mutex);
+   btscan->btps_scanPage = InvalidBlockNumber;
+   btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+   btscan->btps_arrayKeyCount = 0;
+   SpinLockRelease(&btscan->btps_mutex);
+}
+
+/*
+ * _bt_parallel_seize() -- Begin the process of advancing the scan to a new
+ *     page.  Other scans must wait until we call bt_parallel_release() or
+ *     bt_parallel_done().
+ *
+ * The return value is true if we successfully seized the scan and false
+ * if we did not.  The latter case occurs if no pages remain for the current
+ * set of scankeys.
+ *
+ * If the return value is true, *pageno returns the next or current page
+ * of the scan (depending on the scan direction).  An invalid block number
+ * means the scan hasn't yet started, and P_NONE means we've reached the end.
+ * The first time a participating process reaches the last page, it will return
+ * true and set *pageno to P_NONE; after that, further attempts to seize the
+ * scan will return false.
+ *
+ * Callers should ignore the value of pageno if the return value is false.
+ */
+bool
+_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno)
+{
+   BTScanOpaque so = (BTScanOpaque) scan->opaque;
+   BTPS_State  pageStatus;
+   bool        exit_loop = false;
+   bool        status = true;
+   ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+   BTParallelScanDesc btscan;
+
+   *pageno = P_NONE;
+
+   btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                 parallel_scan->ps_offset);
+
+   while (1)
+   {
+       SpinLockAcquire(&btscan->btps_mutex);
+       pageStatus = btscan->btps_pageStatus;
+
+       if (so->arrayKeyCount < btscan->btps_arrayKeyCount)
+       {
+           /* Parallel scan has already advanced to a new set of scankeys. */
+           status = false;
+       }
+       else if (pageStatus == BTPARALLEL_DONE)
+       {
+           /*
+            * We're done with this set of scankeys.  This may be the end, or
+            * there could be more sets to try.
+            */
+           status = false;
+       }
+       else if (pageStatus != BTPARALLEL_ADVANCING)
+       {
+           /*
+            * We have successfully seized control of the scan for the purpose
+            * of advancing it to a new page!
+            */
+           btscan->btps_pageStatus = BTPARALLEL_ADVANCING;
+           *pageno = btscan->btps_scanPage;
+           exit_loop = true;
+       }
+       SpinLockRelease(&btscan->btps_mutex);
+       if (exit_loop || !status)
+           break;
+       ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE);
+   }
+   ConditionVariableCancelSleep();
+
+   return status;
+}
+
+/*
+ * _bt_parallel_release() -- Complete the process of advancing the scan to a
+ *     new page.  We now have the new value btps_scanPage; some other backend
+ *     can now begin advancing the scan.
+ */
+void
+_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page)
+{
+   ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+   BTParallelScanDesc btscan;
+
+   btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                 parallel_scan->ps_offset);
+
+   SpinLockAcquire(&btscan->btps_mutex);
+   btscan->btps_scanPage = scan_page;
+   btscan->btps_pageStatus = BTPARALLEL_IDLE;
+   SpinLockRelease(&btscan->btps_mutex);
+   ConditionVariableSignal(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_done() -- Mark the parallel scan as complete.
+ *
+ * When there are no pages left to scan, this function should be called to
+ * notify other workers.  Otherwise, they might wait forever for the scan to
+ * advance to the next page.
+ */
+void
+_bt_parallel_done(IndexScanDesc scan)
+{
+   BTScanOpaque so = (BTScanOpaque) scan->opaque;
+   ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+   BTParallelScanDesc btscan;
+   bool        status_changed = false;
+
+   /* Do nothing, for non-parallel scans */
+   if (parallel_scan == NULL)
+       return;
+
+   btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                 parallel_scan->ps_offset);
+
+   /*
+    * Mark the parallel scan as done for this combination of scan keys,
+    * unless some other process already did so.  See also
+    * _bt_advance_array_keys.
+    */
+   SpinLockAcquire(&btscan->btps_mutex);
+   if (so->arrayKeyCount >= btscan->btps_arrayKeyCount &&
+       btscan->btps_pageStatus != BTPARALLEL_DONE)
+   {
+       btscan->btps_pageStatus = BTPARALLEL_DONE;
+       status_changed = true;
+   }
+   SpinLockRelease(&btscan->btps_mutex);
+
+   /* wake up all the workers associated with this parallel scan */
+   if (status_changed)
+       ConditionVariableBroadcast(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array
+ *         keys.
+ *
+ * Updates the count of array keys processed for both local and parallel
+ * scans.
+ */
+void
+_bt_parallel_advance_array_keys(IndexScanDesc scan)
+{
+   BTScanOpaque so = (BTScanOpaque) scan->opaque;
+   ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+   BTParallelScanDesc btscan;
+
+   btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+                                                 parallel_scan->ps_offset);
+
+   so->arrayKeyCount++;
+   SpinLockAcquire(&btscan->btps_mutex);
+   if (btscan->btps_pageStatus == BTPARALLEL_DONE)
+   {
+       btscan->btps_scanPage = InvalidBlockNumber;
+       btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+       btscan->btps_arrayKeyCount++;
+   }
+   SpinLockRelease(&btscan->btps_mutex);
+}
+
 /*
  * Bulk deletion of all index entries pointing to a set of heap tuples.
  * The set of target tuples is specified via a callback routine that tells
index b6459d2f2a0e434b816e95a34109d35e9f070600..2f32b2e78d26905fcacb96052776ae1d88bf369e 100644 (file)
@@ -30,9 +30,13 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
 static void _bt_saveitem(BTScanOpaque so, int itemIndex,
             OffsetNumber offnum, IndexTuple itup);
 static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
+static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
+static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno,
+                     ScanDirection dir);
 static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
 static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
+static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir);
 
 
 /*
@@ -544,8 +548,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    ScanKeyData notnullkeys[INDEX_MAX_KEYS];
    int         keysCount = 0;
    int         i;
+   bool        status = true;
    StrategyNumber strat_total;
    BTScanPosItem *currItem;
+   BlockNumber blkno;
 
    Assert(!BTScanPosIsValid(so->currPos));
 
@@ -564,6 +570,30 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    if (!so->qual_ok)
        return false;
 
+   /*
+    * For parallel scans, get the starting page from shared state. If the
+    * scan has not started, proceed to find out first leaf page in the usual
+    * way while keeping other participating processes waiting.  If the scan
+    * has already begun, use the page number from the shared structure.
+    */
+   if (scan->parallel_scan != NULL)
+   {
+       status = _bt_parallel_seize(scan, &blkno);
+       if (!status)
+           return false;
+       else if (blkno == P_NONE)
+       {
+           _bt_parallel_done(scan);
+           return false;
+       }
+       else if (blkno != InvalidBlockNumber)
+       {
+           if (!_bt_parallel_readpage(scan, blkno, dir))
+               return false;
+           goto readcomplete;
+       }
+   }
+
    /*----------
     * Examine the scan keys to discover where we need to start the scan.
     *
@@ -743,7 +773,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
     * there.
     */
    if (keysCount == 0)
-       return _bt_endpoint(scan, dir);
+   {
+       bool        match;
+
+       match = _bt_endpoint(scan, dir);
+
+       if (!match)
+       {
+           /* No match, so mark (parallel) scan finished */
+           _bt_parallel_done(scan);
+       }
+
+       return match;
+   }
 
    /*
     * We want to start the scan somewhere within the index.  Set up an
@@ -773,7 +815,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
            Assert(subkey->sk_flags & SK_ROW_MEMBER);
            if (subkey->sk_flags & SK_ISNULL)
+           {
+               _bt_parallel_done(scan);
                return false;
+           }
            memcpy(scankeys + i, subkey, sizeof(ScanKeyData));
 
            /*
@@ -993,25 +1038,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
         * because nothing finer to lock exists.
         */
        PredicateLockRelation(rel, scan->xs_snapshot);
+
+       /*
+        * mark parallel scan as done, so that all the workers can finish
+        * their scan
+        */
+       _bt_parallel_done(scan);
+       BTScanPosInvalidate(so->currPos);
+
        return false;
    }
    else
        PredicateLockPage(rel, BufferGetBlockNumber(buf),
                          scan->xs_snapshot);
 
-   /* initialize moreLeft/moreRight appropriately for scan direction */
-   if (ScanDirectionIsForward(dir))
-   {
-       so->currPos.moreLeft = false;
-       so->currPos.moreRight = true;
-   }
-   else
-   {
-       so->currPos.moreLeft = true;
-       so->currPos.moreRight = false;
-   }
-   so->numKilled = 0;          /* just paranoia */
-   Assert(so->markItemIndex == -1);
+   _bt_initialize_more_data(so, dir);
 
    /* position to the precise item on the page */
    offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
@@ -1060,6 +1101,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
        _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
    }
 
+readcomplete:
    /* OK, itemIndex says what to return */
    currItem = &so->currPos.items[so->currPos.itemIndex];
    scan->xs_ctup.t_self = currItem->heapTid;
@@ -1132,6 +1174,10 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
  * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports
  * that there can be no more matching tuples in the current scan direction.
  *
+ * In the case of a parallel scan, caller must have called _bt_parallel_seize
+ * prior to calling this function; this function will invoke
+ * _bt_parallel_release before returning.
+ *
  * Returns true if any matching items found on the page, false if none.
  */
 static bool
@@ -1154,6 +1200,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
    page = BufferGetPage(so->currPos.buf);
    opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+   /* allow next page be processed by parallel worker */
+   if (scan->parallel_scan)
+   {
+       if (ScanDirectionIsForward(dir))
+           _bt_parallel_release(scan, opaque->btpo_next);
+       else
+           _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf));
+   }
+
    minoff = P_FIRSTDATAKEY(opaque);
    maxoff = PageGetMaxOffsetNumber(page);
 
@@ -1278,21 +1334,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
  * if pinned, we'll drop the pin before moving to next page.  The buffer is
  * not locked on entry.
  *
- * On success exit, so->currPos is updated to contain data from the next
- * interesting page.  For success on a scan using a non-MVCC snapshot we hold
- * a pin, but not a read lock, on that page.  If we do not hold the pin, we
- * set so->currPos.buf to InvalidBuffer.  We return TRUE to indicate success.
- *
- * If there are no more matching records in the given direction, we drop all
- * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ * For success on a scan using a non-MVCC snapshot we hold a pin, but not a
+ * read lock, on that page.  If we do not hold the pin, we set so->currPos.buf
+ * to InvalidBuffer.  We return TRUE to indicate success.
  */
 static bool
 _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 {
    BTScanOpaque so = (BTScanOpaque) scan->opaque;
-   Relation    rel;
-   Page        page;
-   BTPageOpaque opaque;
+   BlockNumber blkno = InvalidBlockNumber;
+   bool        status = true;
 
    Assert(BTScanPosIsValid(so->currPos));
 
@@ -1319,25 +1370,103 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
        so->markItemIndex = -1;
    }
 
-   rel = scan->indexRelation;
-
    if (ScanDirectionIsForward(dir))
    {
        /* Walk right to the next page with data */
-       /* We must rely on the previously saved nextPage link! */
-       BlockNumber blkno = so->currPos.nextPage;
+       if (scan->parallel_scan != NULL)
+       {
+           /*
+            * Seize the scan to get the next block number; if the scan has
+            * ended already, bail out.
+            */
+           status = _bt_parallel_seize(scan, &blkno);
+           if (!status)
+           {
+               /* release the previous buffer, if pinned */
+               BTScanPosUnpinIfPinned(so->currPos);
+               BTScanPosInvalidate(so->currPos);
+               return false;
+           }
+       }
+       else
+       {
+           /* Not parallel, so use the previously-saved nextPage link. */
+           blkno = so->currPos.nextPage;
+       }
 
        /* Remember we left a page with data */
        so->currPos.moreLeft = true;
 
        /* release the previous buffer, if pinned */
        BTScanPosUnpinIfPinned(so->currPos);
+   }
+   else
+   {
+       /* Remember we left a page with data */
+       so->currPos.moreRight = true;
+
+       if (scan->parallel_scan != NULL)
+       {
+           /*
+            * Seize the scan to get the current block number; if the scan has
+            * ended already, bail out.
+            */
+           status = _bt_parallel_seize(scan, &blkno);
+           BTScanPosUnpinIfPinned(so->currPos);
+           if (!status)
+           {
+               BTScanPosInvalidate(so->currPos);
+               return false;
+           }
+       }
+       else
+       {
+           /* Not parallel, so just use our own notion of the current page */
+           blkno = so->currPos.currPage;
+       }
+   }
+
+   if (!_bt_readnextpage(scan, blkno, dir))
+       return false;
+
+   /* Drop the lock, and maybe the pin, on the current page */
+   _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
 
+   return true;
+}
+
+/*
+ * _bt_readnextpage() -- Read next page containing valid data for scan
+ *
+ * On success exit, so->currPos is updated to contain data from the next
+ * interesting page.  Caller is responsible to release lock and pin on
+ * buffer on success.  We return TRUE to indicate success.
+ *
+ * If there are no more matching records in the given direction, we drop all
+ * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ */
+static bool
+_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+   BTScanOpaque so = (BTScanOpaque) scan->opaque;
+   Relation    rel;
+   Page        page;
+   BTPageOpaque opaque;
+   bool        status = true;
+
+   rel = scan->indexRelation;
+
+   if (ScanDirectionIsForward(dir))
+   {
        for (;;)
        {
-           /* if we're at end of scan, give up */
+           /*
+            * if we're at end of scan, give up and mark parallel scan as
+            * done, so that all the workers can finish their scan
+            */
            if (blkno == P_NONE || !so->currPos.moreRight)
            {
+               _bt_parallel_done(scan);
                BTScanPosInvalidate(so->currPos);
                return false;
            }
@@ -1359,14 +1488,32 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
            }
 
            /* nope, keep going */
-           blkno = opaque->btpo_next;
+           if (scan->parallel_scan != NULL)
+           {
+               status = _bt_parallel_seize(scan, &blkno);
+               if (!status)
+               {
+                   _bt_relbuf(rel, so->currPos.buf);
+                   BTScanPosInvalidate(so->currPos);
+                   return false;
+               }
+           }
+           else
+               blkno = opaque->btpo_next;
            _bt_relbuf(rel, so->currPos.buf);
        }
    }
    else
    {
-       /* Remember we left a page with data */
-       so->currPos.moreRight = true;
+       /*
+        * Should only happen in parallel cases, when some other backend
+        * advanced the scan.
+        */
+       if (so->currPos.currPage != blkno)
+       {
+           BTScanPosUnpinIfPinned(so->currPos);
+           so->currPos.currPage = blkno;
+       }
 
        /*
         * Walk left to the next page with data.  This is much more complex
@@ -1401,6 +1548,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
            if (!so->currPos.moreLeft)
            {
                _bt_relbuf(rel, so->currPos.buf);
+               _bt_parallel_done(scan);
                BTScanPosInvalidate(so->currPos);
                return false;
            }
@@ -1412,6 +1560,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
            /* if we're physically at end of index, return failure */
            if (so->currPos.buf == InvalidBuffer)
            {
+               _bt_parallel_done(scan);
                BTScanPosInvalidate(so->currPos);
                return false;
            }
@@ -1432,9 +1581,46 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
                if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
                    break;
            }
+
+           /*
+            * For parallel scans, get the last page scanned as it is quite
+            * possible that by the time we try to seize the scan, some other
+            * worker has already advanced the scan to a different page.  We
+            * must continue based on the latest page scanned by any worker.
+            */
+           if (scan->parallel_scan != NULL)
+           {
+               _bt_relbuf(rel, so->currPos.buf);
+               status = _bt_parallel_seize(scan, &blkno);
+               if (!status)
+               {
+                   BTScanPosInvalidate(so->currPos);
+                   return false;
+               }
+               so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
+           }
        }
    }
 
+   return true;
+}
+
+/*
+ * _bt_parallel_readpage() -- Read current page containing valid data for scan
+ *
+ * On success, release lock and maybe pin on buffer.  We return TRUE to
+ * indicate success.
+ */
+static bool
+_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+   BTScanOpaque so = (BTScanOpaque) scan->opaque;
+
+   _bt_initialize_more_data(so, dir);
+
+   if (!_bt_readnextpage(scan, blkno, dir))
+       return false;
+
    /* Drop the lock, and maybe the pin, on the current page */
    _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
 
@@ -1712,19 +1898,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
    /* remember which buffer we have pinned */
    so->currPos.buf = buf;
 
-   /* initialize moreLeft/moreRight appropriately for scan direction */
-   if (ScanDirectionIsForward(dir))
-   {
-       so->currPos.moreLeft = false;
-       so->currPos.moreRight = true;
-   }
-   else
-   {
-       so->currPos.moreLeft = true;
-       so->currPos.moreRight = false;
-   }
-   so->numKilled = 0;          /* just paranoia */
-   so->markItemIndex = -1;     /* ditto */
+   _bt_initialize_more_data(so, dir);
 
    /*
     * Now load data from the first page of the scan.
@@ -1753,3 +1927,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
 
    return true;
 }
+
+/*
+ * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately
+ * for scan direction
+ */
+static inline void
+_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir)
+{
+   /* initialize moreLeft/moreRight appropriately for scan direction */
+   if (ScanDirectionIsForward(dir))
+   {
+       so->currPos.moreLeft = false;
+       so->currPos.moreRight = true;
+   }
+   else
+   {
+       so->currPos.moreLeft = true;
+       so->currPos.moreRight = false;
+   }
+   so->numKilled = 0;          /* just paranoia */
+   so->markItemIndex = -1;     /* ditto */
+}
index da0f330c9680d2b6773b760483945d64869c35b7..5b259a31d99a6e822b9bf5fee4616f9447dc19f6 100644 (file)
@@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
            break;
    }
 
+   /* advance parallel scan */
+   if (scan->parallel_scan != NULL)
+       _bt_parallel_advance_array_keys(scan);
+
    return found;
 }
 
index 7176cf1bbeb52de251c179650b34c02ea66bc673..ada374c0c4402da9e0cccaaf4baf2c7ccf8dd063 100644 (file)
@@ -3374,6 +3374,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
        case WAIT_EVENT_BGWORKER_STARTUP:
            event_name = "BgWorkerStartup";
            break;
+       case WAIT_EVENT_BTREE_PAGE:
+           event_name = "BtreePage";
+           break;
        case WAIT_EVENT_EXECUTE_GATHER:
            event_name = "ExecuteGather";
            break;
index 25a1dc818cf90e890f0eeee454184b7a7523561f..6289ffa9bd44fb75a5aa7313dfe8817893c9851a 100644 (file)
@@ -383,6 +383,8 @@ typedef struct BTScanOpaqueData
    ScanKey     arrayKeyData;   /* modified copy of scan->keyData */
    int         numArrayKeys;   /* number of equality-type array keys (-1 if
                                 * there are any unsatisfiable array keys) */
+   int         arrayKeyCount;  /* count indicating number of array scan keys
+                                * processed */
    BTArrayKeyInfo *arrayKeys;  /* info about each equality-type array key */
    MemoryContext arrayContext; /* scan-lifespan context for array data */
 
@@ -426,7 +428,7 @@ typedef BTScanOpaqueData *BTScanOpaque;
 #define SK_BT_NULLS_FIRST  (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
 
 /*
- * prototypes for functions in nbtree.c (external entry points for btree)
+ * external entry points for btree, in nbtree.c
  */
 extern IndexBuildResult *btbuild(Relation heap, Relation index,
        struct IndexInfo *indexInfo);
@@ -436,10 +438,13 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull,
         IndexUniqueCheck checkUnique,
         struct IndexInfo *indexInfo);
 extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys);
+extern Size btestimateparallelscan(void);
+extern void btinitparallelscan(void *target);
 extern bool btgettuple(IndexScanDesc scan, ScanDirection dir);
 extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm);
 extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
         ScanKey orderbys, int norderbys);
+extern void btparallelrescan(IndexScanDesc scan);
 extern void btendscan(IndexScanDesc scan);
 extern void btmarkpos(IndexScanDesc scan);
 extern void btrestrpos(IndexScanDesc scan);
@@ -451,6 +456,14 @@ extern IndexBulkDeleteResult *btvacuumcleanup(IndexVacuumInfo *info,
                IndexBulkDeleteResult *stats);
 extern bool btcanreturn(Relation index, int attno);
 
+/*
+ * prototypes for internal functions in nbtree.c
+ */
+extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno);
+extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page);
+extern void _bt_parallel_done(IndexScanDesc scan);
+extern void _bt_parallel_advance_array_keys(IndexScanDesc scan);
+
 /*
  * prototypes for functions in nbtinsert.c
  */
index de8225b9890a3b43ffc8f37d63c48b79e2c5bac9..8b710ecb24e2011e2b9861fc7d73c7e2466467b3 100644 (file)
@@ -780,6 +780,7 @@ typedef enum
 {
    WAIT_EVENT_BGWORKER_SHUTDOWN = PG_WAIT_IPC,
    WAIT_EVENT_BGWORKER_STARTUP,
+   WAIT_EVENT_BTREE_PAGE,
    WAIT_EVENT_EXECUTE_GATHER,
    WAIT_EVENT_MQ_INTERNAL,
    WAIT_EVENT_MQ_PUT_MESSAGE,
index c4235ae63a4e544bba1bec004128350adadbcd1b..9f876ae264f5afbd36d182eeecc3ce0e0a3daa22 100644 (file)
@@ -161,6 +161,9 @@ BTPageOpaque
 BTPageOpaqueData
 BTPageStat
 BTPageState
+BTParallelScanDesc
+BTParallelScanDescData
+BTPS_State
 BTScanOpaque
 BTScanOpaqueData
 BTScanPos