summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Haas2015-10-15 21:45:22 +0000
committerRobert Haas2015-10-15 21:45:22 +0000
commit91945e758095f496642464652bedb22905647501 (patch)
treeab42ee07af16bf9a7c984c9560b46f96c68b81ae
parent08fbad0afd62690cc82990c0504529ef238ac24d (diff)
-rw-r--r--contrib/parallel_dummy/Makefile19
-rw-r--r--contrib/parallel_dummy/parallel_dummy--1.0.sql7
-rw-r--r--contrib/parallel_dummy/parallel_dummy.c137
-rw-r--r--contrib/parallel_dummy/parallel_dummy.control4
-rw-r--r--src/backend/access/heap/heapam.c251
-rw-r--r--src/include/access/heapam.h8
-rw-r--r--src/include/access/relscan.h20
7 files changed, 425 insertions, 21 deletions
diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644
index 0000000000..de00f50381
--- /dev/null
+++ b/contrib/parallel_dummy/Makefile
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644
index 0000000000..d49bd0fab0
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql
@@ -0,0 +1,7 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_count(rel pg_catalog.regclass,
+ nworkers pg_catalog.int4)
+ RETURNS pg_catalog.int8 STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644
index 0000000000..84c863cae8
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.c
@@ -0,0 +1,137 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ * Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/relscan.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define TOC_SCAN_KEY 1
+#define TOC_RESULT_KEY 2
+
+void _PG_init(void);
+void count_worker_main(dsm_segment *seg, shm_toc *toc);
+
+static void count_helper(Relation rel, ParallelHeapScanDesc pscan,
+ int64 *resultp);
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nworkers = PG_GETARG_INT32(1);
+ int64 *resultp;
+ int64 result;
+ ParallelContext *pcxt;
+ ParallelHeapScanDesc pscan;
+ Relation rel;
+ Size sz;
+
+ if (nworkers < 0)
+ ereport(ERROR,
+ (errmsg("number of parallel workers must be non-negative")));
+
+ rel = relation_open(relid, AccessShareLock);
+
+ EnterParallelMode();
+
+ pcxt = CreateParallelContextForExternalFunction("parallel_dummy",
+ "count_worker_main",
+ nworkers);
+ sz = heap_parallelscan_estimate(GetActiveSnapshot());
+ shm_toc_estimate_chunk(&pcxt->estimator, sz);
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int64));
+ shm_toc_estimate_keys(&pcxt->estimator, 2);
+ InitializeParallelDSM(pcxt);
+ pscan = shm_toc_allocate(pcxt->toc, sz);
+ heap_parallelscan_initialize(pscan, rel, GetActiveSnapshot());
+ shm_toc_insert(pcxt->toc, TOC_SCAN_KEY, pscan);
+ resultp = shm_toc_allocate(pcxt->toc, sizeof(int64));
+ shm_toc_insert(pcxt->toc, TOC_RESULT_KEY, resultp);
+
+ LaunchParallelWorkers(pcxt);
+
+ /* here's where we do the "real work" ... */
+ count_helper(rel, pscan, resultp);
+
+ WaitForParallelWorkersToFinish(pcxt);
+
+ result = *resultp;
+
+ DestroyParallelContext(pcxt);
+
+ relation_close(rel, AccessShareLock);
+
+ ExitParallelMode();
+
+ PG_RETURN_INT64(result);
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+ ParallelHeapScanDesc pscan;
+ Relation rel;
+ int64 *resultp;
+
+ pscan = shm_toc_lookup(toc, TOC_SCAN_KEY);
+ resultp = shm_toc_lookup(toc, TOC_RESULT_KEY);
+ Assert(pscan != NULL && resultp != NULL);
+
+ rel = relation_open(pscan->phs_relid, AccessShareLock);
+ count_helper(rel, pscan, resultp);
+ relation_close(rel, AccessShareLock);
+}
+
+static void
+count_helper(Relation rel, ParallelHeapScanDesc pscan, int64 *resultp)
+{
+ int64 mytuples = 0;
+ HeapScanDesc scan;
+ BlockNumber firstblock = InvalidBlockNumber;
+
+ scan = heap_beginscan_parallel(rel, pscan);
+
+ for (;;)
+ {
+ HeapTuple tup = heap_getnext(scan, ForwardScanDirection);
+
+ if (!HeapTupleIsValid(tup))
+ break;
+ if (firstblock == InvalidBlockNumber)
+ firstblock = scan->rs_cblock;
+
+ ++mytuples;
+ }
+
+ heap_endscan(scan);
+
+ SpinLockAcquire(&pscan->phs_mutex); /* dirty hack */
+ *resultp += mytuples;
+ SpinLockRelease(&pscan->phs_mutex);
+
+ elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples starting at block %u",
+ MyProcPid, mytuples, firstblock);
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644
index 0000000000..90bae3f6e0
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.control
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index bcf987124f..164b6b55eb 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -63,6 +63,7 @@
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
+#include "storage/spin.h"
#include "storage/standby.h"
#include "utils/datum.h"
#include "utils/inval.h"
@@ -80,12 +81,14 @@ bool synchronize_seqscans = true;
static HeapScanDesc heap_beginscan_internal(Relation relation,
Snapshot snapshot,
int nkeys, ScanKey key,
+ ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
bool is_bitmapscan,
bool is_samplescan,
bool temp_snap);
+static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* results for a non-MVCC snapshot, the caller must hold some higher-level
* lock that ensures the interesting tuple(s) won't change.)
*/
- scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+ if (scan->rs_parallel != NULL)
+ scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+ else
+ scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
/*
* If the table is large relative to NBuffers, use a bulk-read access
@@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* behaviors, independently of the size of the table; also there is a GUC
* variable that can disable synchronized scanning.)
*
- * During a rescan, don't make a new strategy object if we don't have to.
+ * Note that heap_parallelscan_initialize has a very similar test; if you
+ * change this, consider changing that one, too.
*/
if (!RelationUsesLocalBuffers(scan->rs_rd) &&
scan->rs_nblocks > NBuffers / 4)
@@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
if (allow_strat)
{
+ /* During a rescan, keep the previous strategy object. */
if (scan->rs_strategy == NULL)
scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
}
@@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
scan->rs_strategy = NULL;
}
- if (keep_startblock)
+ if (scan->rs_parallel != NULL)
+ {
+ /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
+ scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+ }
+ else if (keep_startblock)
{
/*
* When rescanning, we want to keep the previous startblock setting,
@@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- page = scan->rs_startblock; /* first page */
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+
+ /* Other processes might have already finished the scan. */
+ if (page == InvalidBlockNumber)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
@@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan,
}
else if (backward)
{
+ /* backward parallel scan not supported */
+ Assert(scan->rs_parallel == NULL);
+
if (!scan->rs_inited)
{
/*
@@ -671,11 +700,20 @@ heapgettup(HeapScanDesc scan,
}
else
{
- page++;
- if (page >= scan->rs_nblocks)
- page = 0;
- finished = (page == scan->rs_startblock) ||
- (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+ finished = (page == InvalidBlockNumber);
+ }
+ else
+ {
+ page++;
+ if (page >= scan->rs_nblocks)
+ page = 0;
+
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+ }
/*
* Report our new scan position for synchronization purposes. We
@@ -773,7 +811,20 @@ heapgettup_pagemode(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- page = scan->rs_startblock; /* first page */
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+
+ /* Other processes might have already finished the scan. */
+ if (page == InvalidBlockNumber)
+ {
+ Assert(!BufferIsValid(scan->rs_cbuf));
+ tuple->t_data = NULL;
+ return;
+ }
+ }
+ else
+ page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineindex = 0;
scan->rs_inited = true;
@@ -793,6 +844,9 @@ heapgettup_pagemode(HeapScanDesc scan,
}
else if (backward)
{
+ /* backward parallel scan not supported */
+ Assert(scan->rs_parallel == NULL);
+
if (!scan->rs_inited)
{
/*
@@ -934,11 +988,20 @@ heapgettup_pagemode(HeapScanDesc scan,
}
else
{
- page++;
- if (page >= scan->rs_nblocks)
- page = 0;
- finished = (page == scan->rs_startblock) ||
- (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false);
+ if (scan->rs_parallel != NULL)
+ {
+ page = heap_parallelscan_nextpage(scan);
+ finished = (page == InvalidBlockNumber);
+ }
+ else
+ {
+ page++;
+ if (page >= scan->rs_nblocks)
+ page = 0;
+
+ finished = (page == scan->rs_startblock) ||
+ (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false);
+ }
/*
* Report our new scan position for synchronization purposes. We
@@ -1341,7 +1404,7 @@ HeapScanDesc
heap_beginscan(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, false);
}
@@ -1351,7 +1414,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Oid relid = RelationGetRelid(relation);
Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, true);
}
@@ -1360,7 +1423,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, true,
false, false, false);
}
@@ -1369,7 +1432,7 @@ HeapScanDesc
heap_beginscan_bm(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
false, false, true, true, false, false);
}
@@ -1378,7 +1441,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync, bool allow_pagemode)
{
- return heap_beginscan_internal(relation, snapshot, nkeys, key,
+ return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, allow_pagemode,
false, true, false);
}
@@ -1386,6 +1449,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
static HeapScanDesc
heap_beginscan_internal(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
+ ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
@@ -1418,6 +1482,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
scan->rs_allow_strat = allow_strat;
scan->rs_allow_sync = allow_sync;
scan->rs_temp_snap = temp_snap;
+ scan->rs_parallel = parallel_scan;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1473,6 +1538,25 @@ heap_rescan(HeapScanDesc scan,
* reinitialize scan descriptor
*/
initscan(scan, key, true);
+
+ /*
+ * reset parallel scan, if present
+ */
+ if (scan->rs_parallel != NULL)
+ {
+ ParallelHeapScanDesc parallel_scan;
+
+ /*
+ * Caller is responsible for making sure that all workers have
+ * finished the scan before calling this, so it really shouldn't be
+ * necessary to acquire the mutex at all. We acquire it anyway, just
+ * to be tidy.
+ */
+ parallel_scan = scan->rs_parallel;
+ SpinLockAcquire(&parallel_scan->phs_mutex);
+ parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+ SpinLockRelease(&parallel_scan->phs_mutex);
+ }
}
/* ----------------
@@ -1532,6 +1616,133 @@ heap_endscan(HeapScanDesc scan)
}
/* ----------------
+ * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
+ *
+ * Sadly, this doesn't reduce to a constant, because the size required
+ * to serialize the snapshot can vary.
+ * ----------------
+ */
+Size
+heap_parallelscan_estimate(Snapshot snapshot)
+{
+ return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+}
+
+/* ----------------
+ * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
+ *
+ * Must allow as many bytes of shared memory as returned by
+ * heap_parallelscan_estimate. Call this just once in the leader
+ * process; then, individual workers attach via heap_beginscan_parallel.
+ * ----------------
+ */
+void
+heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
+ Snapshot snapshot)
+{
+ target->phs_relid = RelationGetRelid(relation);
+ target->phs_nblocks = RelationGetNumberOfBlocks(relation);
+ /* compare phs_syncscan initialization to similar logic in initscan */
+ target->phs_syncscan = synchronize_seqscans &&
+ !RelationUsesLocalBuffers(relation) &&
+ target->phs_nblocks > NBuffers / 4;
+ SpinLockInit(&target->phs_mutex);
+ target->phs_cblock = InvalidBlockNumber;
+ target->phs_startblock = InvalidBlockNumber;
+ SerializeSnapshot(snapshot, target->phs_snapshot_data);
+}
+
+/* ----------------
+ * heap_beginscan_parallel - join a parallel scan
+ *
+ * Caller must hold a suitable lock on the correct relation.
+ * ----------------
+ */
+HeapScanDesc
+heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
+{
+ Snapshot snapshot;
+
+ Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+ snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
+ RegisterSnapshot(snapshot);
+
+ return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
+ true, true, true, false, false, true);
+}
+
+/* ----------------
+ * heap_parallelscan_nextpage - get the next page to scan
+ *
+ * Get the next page to scan. Even if there are no pages left to scan,
+ * another backend could have grabbed a page to scan and not yet finished
+ * looking at it, so it doesn't follow that the scan is done when the
+ * first backend gets an InvalidBlockNumber return.
+ * ----------------
+ */
+static BlockNumber
+heap_parallelscan_nextpage(HeapScanDesc scan)
+{
+ BlockNumber page = InvalidBlockNumber;
+ BlockNumber sync_startpage = InvalidBlockNumber;
+ ParallelHeapScanDesc parallel_scan;
+
+ Assert(scan->rs_parallel);
+ parallel_scan = scan->rs_parallel;
+
+retry:
+ /* Grab the spinlock. */
+ SpinLockAcquire(&parallel_scan->phs_mutex);
+
+ /*
+ * If the scan's startblock has not yet been initialized, we must do so
+ * now. If this is not a synchronized scan, we just start at block 0, but
+ * if it is a synchronized scan, we must get the starting position from
+ * the synchronized scan machinery. We can't hold the spinlock while
+ * doing that, though, so release the spinlock, get the information we
+ * need, and retry. If nobody else has initialized the scan in the
+ * meantime, we'll fill in the value we fetched on the second time
+ * through.
+ */
+ if (parallel_scan->phs_startblock == InvalidBlockNumber)
+ {
+ if (!parallel_scan->phs_syncscan)
+ parallel_scan->phs_startblock = 0;
+ else if (sync_startpage != InvalidBlockNumber)
+ parallel_scan->phs_startblock = sync_startpage;
+ else
+ {
+ SpinLockRelease(&parallel_scan->phs_mutex);
+ sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+ goto retry;
+ }
+ parallel_scan->phs_cblock = parallel_scan->phs_startblock;
+ }
+
+ /*
+ * The current block number is the next one that needs to be scanned,
+ * unless it's InvalidBlockNumber already, in which case there are no more
+ * blocks to scan. After remembering the current value, we must advance
+ * it so that the next call to this function returns the next block to be
+ * scanned.
+ */
+ page = parallel_scan->phs_cblock;
+ if (page != InvalidBlockNumber)
+ {
+ parallel_scan->phs_cblock++;
+ if (parallel_scan->phs_cblock >= scan->rs_nblocks)
+ parallel_scan->phs_cblock = 0;
+ if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
+ parallel_scan->phs_cblock = InvalidBlockNumber;
+ }
+
+ /* Release the lock and return. */
+ SpinLockRelease(&parallel_scan->phs_mutex);
+ return page;
+}
+
+/* ----------------
* heap_getnext - retrieve next tuple in scan
*
* Fix to work with index relations.
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 75e6b72f9e..98eeadd23f 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation,
#define heap_close(r,l) relation_close(r,l)
-/* struct definition appears in relscan.h */
+/* struct definitions appear in relscan.h */
typedef struct HeapScanDescData *HeapScanDesc;
+typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc;
/*
* HeapScanIsValid
@@ -126,6 +127,11 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key,
extern void heap_endscan(HeapScanDesc scan);
extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
+extern Size heap_parallelscan_estimate(Snapshot snapshot);
+extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
+ Relation relation, Snapshot snapshot);
+extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
+
extern bool heap_fetch(Relation relation, Snapshot snapshot,
HeapTuple tuple, Buffer *userbuf, bool keep_buf,
Relation stats_relation);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 6e6231971f..356c7e6b04 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -20,6 +20,25 @@
#include "access/itup.h"
#include "access/tupdesc.h"
+/*
+ * Shared state for parallel heap scan.
+ *
+ * Each backend participating in a parallel heap scan has its own
+ * HeapScanDesc in backend-private memory, and those objects all contain
+ * a pointer to this structure. The information here must be sufficient
+ * to properly initialize each new HeapScanDesc as workers join the scan,
+ * and it must act as a font of block numbers for those workers.
+ */
+typedef struct ParallelHeapScanDescData
+{
+ Oid phs_relid; /* OID of relation to scan */
+ bool phs_syncscan; /* report location to syncscan logic? */
+ BlockNumber phs_nblocks; /* # blocks in relation at start of scan */
+ slock_t phs_mutex; /* mutual exclusion for block number fields */
+ BlockNumber phs_startblock; /* starting block number */
+ BlockNumber phs_cblock; /* current block number */
+ char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelHeapScanDescData;
typedef struct HeapScanDescData
{
@@ -49,6 +68,7 @@ typedef struct HeapScanDescData
BlockNumber rs_cblock; /* current block # in scan, if any */
Buffer rs_cbuf; /* current buffer in scan, if any */
/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
+ ParallelHeapScanDesc rs_parallel; /* parallel scan information */
/* these fields only used in page-at-a-time mode and for bitmap scans */
int rs_cindex; /* current tuple's index in vistuples */