diff options
author | Robert Haas | 2015-10-15 21:45:22 +0000 |
---|---|---|
committer | Robert Haas | 2015-10-15 21:45:22 +0000 |
commit | 91945e758095f496642464652bedb22905647501 (patch) | |
tree | ab42ee07af16bf9a7c984c9560b46f96c68b81ae | |
parent | 08fbad0afd62690cc82990c0504529ef238ac24d (diff) |
Thunk.parallel_count
-rw-r--r-- | contrib/parallel_dummy/Makefile | 19 | ||||
-rw-r--r-- | contrib/parallel_dummy/parallel_dummy--1.0.sql | 7 | ||||
-rw-r--r-- | contrib/parallel_dummy/parallel_dummy.c | 137 | ||||
-rw-r--r-- | contrib/parallel_dummy/parallel_dummy.control | 4 | ||||
-rw-r--r-- | src/backend/access/heap/heapam.c | 251 | ||||
-rw-r--r-- | src/include/access/heapam.h | 8 | ||||
-rw-r--r-- | src/include/access/relscan.h | 20 |
7 files changed, 425 insertions, 21 deletions
diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile new file mode 100644 index 0000000000..de00f50381 --- /dev/null +++ b/contrib/parallel_dummy/Makefile @@ -0,0 +1,19 @@ +MODULE_big = parallel_dummy +OBJS = parallel_dummy.o $(WIN32RES) +PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure" + +EXTENSION = parallel_dummy +DATA = parallel_dummy--1.0.sql + +REGRESS = parallel_dummy + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/parallel_dummy +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql new file mode 100644 index 0000000000..d49bd0fab0 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql @@ -0,0 +1,7 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit + +CREATE FUNCTION parallel_count(rel pg_catalog.regclass, + nworkers pg_catalog.int4) + RETURNS pg_catalog.int8 STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c new file mode 100644 index 0000000000..84c863cae8 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.c @@ -0,0 +1,137 @@ +/*-------------------------------------------------------------------------- + * + * parallel_dummy.c + * Test harness code for parallel mode code. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/parallel_dummy/parallel_dummy.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/parallel.h" +#include "access/relscan.h" +#include "access/xact.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(parallel_count); + +#define TOC_SCAN_KEY 1 +#define TOC_RESULT_KEY 2 + +void _PG_init(void); +void count_worker_main(dsm_segment *seg, shm_toc *toc); + +static void count_helper(Relation rel, ParallelHeapScanDesc pscan, + int64 *resultp); + +Datum +parallel_count(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nworkers = PG_GETARG_INT32(1); + int64 *resultp; + int64 result; + ParallelContext *pcxt; + ParallelHeapScanDesc pscan; + Relation rel; + Size sz; + + if (nworkers < 0) + ereport(ERROR, + (errmsg("number of parallel workers must be non-negative"))); + + rel = relation_open(relid, AccessShareLock); + + EnterParallelMode(); + + pcxt = CreateParallelContextForExternalFunction("parallel_dummy", + "count_worker_main", + nworkers); + sz = heap_parallelscan_estimate(GetActiveSnapshot()); + shm_toc_estimate_chunk(&pcxt->estimator, sz); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int64)); + shm_toc_estimate_keys(&pcxt->estimator, 2); + InitializeParallelDSM(pcxt); + pscan = shm_toc_allocate(pcxt->toc, sz); + heap_parallelscan_initialize(pscan, rel, GetActiveSnapshot()); + shm_toc_insert(pcxt->toc, TOC_SCAN_KEY, pscan); + resultp = shm_toc_allocate(pcxt->toc, sizeof(int64)); + shm_toc_insert(pcxt->toc, TOC_RESULT_KEY, resultp); + + LaunchParallelWorkers(pcxt); + + /* here's where we do the "real work" ... */ + count_helper(rel, pscan, resultp); + + WaitForParallelWorkersToFinish(pcxt); + + result = *resultp; + + DestroyParallelContext(pcxt); + + relation_close(rel, AccessShareLock); + + ExitParallelMode(); + + PG_RETURN_INT64(result); +} + +void +count_worker_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelHeapScanDesc pscan; + Relation rel; + int64 *resultp; + + pscan = shm_toc_lookup(toc, TOC_SCAN_KEY); + resultp = shm_toc_lookup(toc, TOC_RESULT_KEY); + Assert(pscan != NULL && resultp != NULL); + + rel = relation_open(pscan->phs_relid, AccessShareLock); + count_helper(rel, pscan, resultp); + relation_close(rel, AccessShareLock); +} + +static void +count_helper(Relation rel, ParallelHeapScanDesc pscan, int64 *resultp) +{ + int64 mytuples = 0; + HeapScanDesc scan; + BlockNumber firstblock = InvalidBlockNumber; + + scan = heap_beginscan_parallel(rel, pscan); + + for (;;) + { + HeapTuple tup = heap_getnext(scan, ForwardScanDirection); + + if (!HeapTupleIsValid(tup)) + break; + if (firstblock == InvalidBlockNumber) + firstblock = scan->rs_cblock; + + ++mytuples; + } + + heap_endscan(scan); + + SpinLockAcquire(&pscan->phs_mutex); /* dirty hack */ + *resultp += mytuples; + SpinLockRelease(&pscan->phs_mutex); + + elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples starting at block %u", + MyProcPid, mytuples, firstblock); +} diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control new file mode 100644 index 0000000000..90bae3f6e0 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.control @@ -0,0 +1,4 @@ +comment = 'Dummy parallel code' +default_version = '1.0' +module_pathname = '$libdir/parallel_dummy' +relocatable = true diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index bcf987124f..164b6b55eb 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -63,6 +63,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr.h" +#include "storage/spin.h" #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" @@ -80,12 +81,14 @@ bool synchronize_seqscans = true; static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, bool is_bitmapscan, bool is_samplescan, bool temp_snap); +static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * results for a non-MVCC snapshot, the caller must hold some higher-level * lock that ensures the interesting tuple(s) won't change.) */ - scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + if (scan->rs_parallel != NULL) + scan->rs_nblocks = scan->rs_parallel->phs_nblocks; + else + scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); /* * If the table is large relative to NBuffers, use a bulk-read access @@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * behaviors, independently of the size of the table; also there is a GUC * variable that can disable synchronized scanning.) * - * During a rescan, don't make a new strategy object if we don't have to. + * Note that heap_parallelscan_initialize has a very similar test; if you + * change this, consider changing that one, too. */ if (!RelationUsesLocalBuffers(scan->rs_rd) && scan->rs_nblocks > NBuffers / 4) @@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) if (allow_strat) { + /* During a rescan, keep the previous strategy object. */ if (scan->rs_strategy == NULL) scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD); } @@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_strategy = NULL; } - if (keep_startblock) + if (scan->rs_parallel != NULL) + { + /* For parallel scan, believe whatever ParallelHeapScanDesc says. */ + scan->rs_syncscan = scan->rs_parallel->phs_syncscan; + } + else if (keep_startblock) { /* * When rescanning, we want to keep the previous startblock setting, @@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; @@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -671,11 +700,20 @@ heapgettup(HeapScanDesc scan, } else { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + } /* * Report our new scan position for synchronization purposes. We @@ -773,7 +811,20 @@ heapgettup_pagemode(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineindex = 0; scan->rs_inited = true; @@ -793,6 +844,9 @@ heapgettup_pagemode(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -934,11 +988,20 @@ heapgettup_pagemode(HeapScanDesc scan, } else { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + } /* * Report our new scan position for synchronization purposes. We @@ -1341,7 +1404,7 @@ HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, false); } @@ -1351,7 +1414,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) Oid relid = RelationGetRelid(relation); Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, true); } @@ -1360,7 +1423,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, true, false, false, false); } @@ -1369,7 +1432,7 @@ HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, false, false, true, true, false, false); } @@ -1378,7 +1441,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, allow_pagemode, false, true, false); } @@ -1386,6 +1449,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, @@ -1418,6 +1482,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, scan->rs_allow_strat = allow_strat; scan->rs_allow_sync = allow_sync; scan->rs_temp_snap = temp_snap; + scan->rs_parallel = parallel_scan; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot @@ -1473,6 +1538,25 @@ heap_rescan(HeapScanDesc scan, * reinitialize scan descriptor */ initscan(scan, key, true); + + /* + * reset parallel scan, if present + */ + if (scan->rs_parallel != NULL) + { + ParallelHeapScanDesc parallel_scan; + + /* + * Caller is responsible for making sure that all workers have + * finished the scan before calling this, so it really shouldn't be + * necessary to acquire the mutex at all. We acquire it anyway, just + * to be tidy. + */ + parallel_scan = scan->rs_parallel; + SpinLockAcquire(¶llel_scan->phs_mutex); + parallel_scan->phs_cblock = parallel_scan->phs_startblock; + SpinLockRelease(¶llel_scan->phs_mutex); + } } /* ---------------- @@ -1532,6 +1616,133 @@ heap_endscan(HeapScanDesc scan) } /* ---------------- + * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc + * + * Sadly, this doesn't reduce to a constant, because the size required + * to serialize the snapshot can vary. + * ---------------- + */ +Size +heap_parallelscan_estimate(Snapshot snapshot) +{ + return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data), + EstimateSnapshotSpace(snapshot)); +} + +/* ---------------- + * heap_parallelscan_initialize - initialize ParallelHeapScanDesc + * + * Must allow as many bytes of shared memory as returned by + * heap_parallelscan_estimate. Call this just once in the leader + * process; then, individual workers attach via heap_beginscan_parallel. + * ---------------- + */ +void +heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, + Snapshot snapshot) +{ + target->phs_relid = RelationGetRelid(relation); + target->phs_nblocks = RelationGetNumberOfBlocks(relation); + /* compare phs_syncscan initialization to similar logic in initscan */ + target->phs_syncscan = synchronize_seqscans && + !RelationUsesLocalBuffers(relation) && + target->phs_nblocks > NBuffers / 4; + SpinLockInit(&target->phs_mutex); + target->phs_cblock = InvalidBlockNumber; + target->phs_startblock = InvalidBlockNumber; + SerializeSnapshot(snapshot, target->phs_snapshot_data); +} + +/* ---------------- + * heap_beginscan_parallel - join a parallel scan + * + * Caller must hold a suitable lock on the correct relation. + * ---------------- + */ +HeapScanDesc +heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) +{ + Snapshot snapshot; + + Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); + snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); + RegisterSnapshot(snapshot); + + return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, + true, true, true, false, false, true); +} + +/* ---------------- + * heap_parallelscan_nextpage - get the next page to scan + * + * Get the next page to scan. Even if there are no pages left to scan, + * another backend could have grabbed a page to scan and not yet finished + * looking at it, so it doesn't follow that the scan is done when the + * first backend gets an InvalidBlockNumber return. + * ---------------- + */ +static BlockNumber +heap_parallelscan_nextpage(HeapScanDesc scan) +{ + BlockNumber page = InvalidBlockNumber; + BlockNumber sync_startpage = InvalidBlockNumber; + ParallelHeapScanDesc parallel_scan; + + Assert(scan->rs_parallel); + parallel_scan = scan->rs_parallel; + +retry: + /* Grab the spinlock. */ + SpinLockAcquire(¶llel_scan->phs_mutex); + + /* + * If the scan's startblock has not yet been initialized, we must do so + * now. If this is not a synchronized scan, we just start at block 0, but + * if it is a synchronized scan, we must get the starting position from + * the synchronized scan machinery. We can't hold the spinlock while + * doing that, though, so release the spinlock, get the information we + * need, and retry. If nobody else has initialized the scan in the + * meantime, we'll fill in the value we fetched on the second time + * through. + */ + if (parallel_scan->phs_startblock == InvalidBlockNumber) + { + if (!parallel_scan->phs_syncscan) + parallel_scan->phs_startblock = 0; + else if (sync_startpage != InvalidBlockNumber) + parallel_scan->phs_startblock = sync_startpage; + else + { + SpinLockRelease(¶llel_scan->phs_mutex); + sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks); + goto retry; + } + parallel_scan->phs_cblock = parallel_scan->phs_startblock; + } + + /* + * The current block number is the next one that needs to be scanned, + * unless it's InvalidBlockNumber already, in which case there are no more + * blocks to scan. After remembering the current value, we must advance + * it so that the next call to this function returns the next block to be + * scanned. + */ + page = parallel_scan->phs_cblock; + if (page != InvalidBlockNumber) + { + parallel_scan->phs_cblock++; + if (parallel_scan->phs_cblock >= scan->rs_nblocks) + parallel_scan->phs_cblock = 0; + if (parallel_scan->phs_cblock == parallel_scan->phs_startblock) + parallel_scan->phs_cblock = InvalidBlockNumber; + } + + /* Release the lock and return. */ + SpinLockRelease(¶llel_scan->phs_mutex); + return page; +} + +/* ---------------- * heap_getnext - retrieve next tuple in scan * * Fix to work with index relations. diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 75e6b72f9e..98eeadd23f 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation, #define heap_close(r,l) relation_close(r,l) -/* struct definition appears in relscan.h */ +/* struct definitions appear in relscan.h */ typedef struct HeapScanDescData *HeapScanDesc; +typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc; /* * HeapScanIsValid @@ -126,6 +127,11 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, extern void heap_endscan(HeapScanDesc scan); extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); +extern Size heap_parallelscan_estimate(Snapshot snapshot); +extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, + Relation relation, Snapshot snapshot); +extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); + extern bool heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, bool keep_buf, Relation stats_relation); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6e6231971f..356c7e6b04 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -20,6 +20,25 @@ #include "access/itup.h" #include "access/tupdesc.h" +/* + * Shared state for parallel heap scan. + * + * Each backend participating in a parallel heap scan has its own + * HeapScanDesc in backend-private memory, and those objects all contain + * a pointer to this structure. The information here must be sufficient + * to properly initialize each new HeapScanDesc as workers join the scan, + * and it must act as a font of block numbers for those workers. + */ +typedef struct ParallelHeapScanDescData +{ + Oid phs_relid; /* OID of relation to scan */ + bool phs_syncscan; /* report location to syncscan logic? */ + BlockNumber phs_nblocks; /* # blocks in relation at start of scan */ + slock_t phs_mutex; /* mutual exclusion for block number fields */ + BlockNumber phs_startblock; /* starting block number */ + BlockNumber phs_cblock; /* current block number */ + char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelHeapScanDescData; typedef struct HeapScanDescData { @@ -49,6 +68,7 @@ typedef struct HeapScanDescData BlockNumber rs_cblock; /* current block # in scan, if any */ Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ + ParallelHeapScanDesc rs_parallel; /* parallel scan information */ /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ |