diff options
author | Robert Haas | 2015-01-30 13:39:22 +0000 |
---|---|---|
committer | Robert Haas | 2015-01-30 15:19:57 +0000 |
commit | b68b38ab1b648676eb989ddd20de2dbdeb39382b (patch) | |
tree | 1bbe80bfd55ba68ab2095ab2f1d40ff0a5f91a0c | |
parent | 4ad1db0a6c907a56a9aa1ffbce1a1d8e84df80e5 (diff) |
contrib/parallel_dummyparallel
-rw-r--r-- | contrib/parallel_dummy/Makefile | 19 | ||||
-rw-r--r-- | contrib/parallel_dummy/parallel_dummy--1.0.sql | 12 | ||||
-rw-r--r-- | contrib/parallel_dummy/parallel_dummy.c | 285 | ||||
-rw-r--r-- | contrib/parallel_dummy/parallel_dummy.control | 4 |
4 files changed, 320 insertions, 0 deletions
diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile new file mode 100644 index 0000000000..de00f50381 --- /dev/null +++ b/contrib/parallel_dummy/Makefile @@ -0,0 +1,19 @@ +MODULE_big = parallel_dummy +OBJS = parallel_dummy.o $(WIN32RES) +PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure" + +EXTENSION = parallel_dummy +DATA = parallel_dummy--1.0.sql + +REGRESS = parallel_dummy + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/parallel_dummy +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql new file mode 100644 index 0000000000..3c0ae7d5d0 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql @@ -0,0 +1,12 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit + +CREATE FUNCTION parallel_sleep(sleep_time pg_catalog.int4, + nworkers pg_catalog.int4) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION parallel_count(rel pg_catalog.regclass, + nworkers pg_catalog.int4) + RETURNS pg_catalog.int8 STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c new file mode 100644 index 0000000000..f92030e066 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.c @@ -0,0 +1,285 @@ +/*-------------------------------------------------------------------------- + * + * parallel_dummy.c + * Test harness code for parallel mode code. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/parallel_dummy/parallel_dummy.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/parallel.h" +#include "access/xact.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" +#include "utils/tqual.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(parallel_sleep); +PG_FUNCTION_INFO_V1(parallel_count); + +#define PARALLEL_DUMMY_KEY 1 + +typedef struct +{ + int32 sleep_time; +} ParallelSleepInfo; + +typedef struct +{ + int32 relid; + slock_t mutex; + BlockNumber lastblock; + BlockNumber currentblock; + BlockNumber prefetchblock; + int64 ntuples; +} ParallelCountInfo; + +void _PG_init(void); +void sleep_worker_main(dsm_segment *seg, shm_toc *toc); +void count_worker_main(dsm_segment *seg, shm_toc *toc); + +static void count_helper(Relation rel, ParallelCountInfo *info); + +int prefetch_distance; +int prefetch_increment; + +void +_PG_init() +{ + DefineCustomIntVariable("parallel_dummy.prefetch_distance", + "Sets the prefetch distance in blocks.", + NULL, &prefetch_distance, + 0, 0, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL); + DefineCustomIntVariable("parallel_dummy.prefetch_increment", + "Sets the prefetch increment in blocks.", + NULL, &prefetch_increment, + 8, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL); +} + + +Datum +parallel_sleep(PG_FUNCTION_ARGS) +{ + int32 sleep_time = PG_GETARG_INT32(0); + int32 nworkers = PG_GETARG_INT32(1); + bool already_in_parallel_mode = IsInParallelMode(); + ParallelContext *pcxt; + ParallelSleepInfo *info; + + if (nworkers < 0) + ereport(ERROR, + (errmsg("number of parallel workers must be non-negative"))); + + if (!already_in_parallel_mode) + EnterParallelMode(); + + pcxt = CreateParallelContextForExtension("parallel_dummy", + "sleep_worker_main", + nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelSleepInfo)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + InitializeParallelDSM(pcxt); + info = shm_toc_allocate(pcxt->toc, sizeof(ParallelSleepInfo)); + info->sleep_time = sleep_time; + shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info); + LaunchParallelWorkers(pcxt); + + /* here's where we do the "real work" ... */ + DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time)); + + WaitForParallelWorkersToFinish(pcxt); + DestroyParallelContext(pcxt); + + if (!already_in_parallel_mode) + ExitParallelMode(); + + PG_RETURN_VOID(); +} + +Datum +parallel_count(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nworkers = PG_GETARG_INT32(1); + bool already_in_parallel_mode = IsInParallelMode(); + ParallelContext *pcxt; + ParallelCountInfo *info; + Relation rel; + int64 result; + + if (nworkers < 0) + ereport(ERROR, + (errmsg("number of parallel workers must be non-negative"))); + + rel = relation_open(relid, AccessShareLock); + + if (!already_in_parallel_mode) + EnterParallelMode(); + + pcxt = CreateParallelContextForExtension("parallel_dummy", + "count_worker_main", + nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCountInfo)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + InitializeParallelDSM(pcxt); + info = shm_toc_allocate(pcxt->toc, sizeof(ParallelCountInfo)); + info->relid = relid; + SpinLockInit(&info->mutex); + info->lastblock = RelationGetNumberOfBlocks(rel); + info->currentblock = 0; + info->prefetchblock = 0; + info->ntuples = 0; + shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info); + LaunchParallelWorkers(pcxt); + + /* here's where we do the "real work" ... */ + count_helper(rel, info); + + WaitForParallelWorkersToFinish(pcxt); + + result = info->ntuples; + + DestroyParallelContext(pcxt); + + relation_close(rel, AccessShareLock); + + if (!already_in_parallel_mode) + ExitParallelMode(); + + PG_RETURN_INT64(result); +} + +void +sleep_worker_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelSleepInfo *info; + + info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY); + Assert(info != NULL); + + /* here's where we do the "real work" ... */ + DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time)); +} + +void +count_worker_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelCountInfo *info; + Relation rel; + + info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY); + Assert(info != NULL); + + rel = relation_open(info->relid, AccessShareLock); + count_helper(rel, info); + relation_close(rel, AccessShareLock); +} + +static void +count_helper(Relation rel, ParallelCountInfo *info) +{ + int64 ntuples = 0; + int64 mytuples = 0; + Oid relid = info->relid; + Snapshot snapshot = GetActiveSnapshot(); + + for (;;) + { + BlockNumber blkno; + Buffer buffer; + Page page; + int lines; + OffsetNumber lineoff; + ItemId lpp; + bool all_visible; + bool done = false; +#ifdef USE_PREFETCH + BlockNumber prefetch_blkno = InvalidBlockNumber; + uint32 prefetch_count = 0; +#endif + + CHECK_FOR_INTERRUPTS(); + + SpinLockAcquire(&info->mutex); + if (info->currentblock >= info->lastblock) + done = true; + else + { +#ifdef USE_PREFETCH + BlockNumber max_prefetch; + + max_prefetch = info->lastblock - info->prefetchblock; + if (max_prefetch > 0 && + info->prefetchblock - info->currentblock < prefetch_distance) + { + prefetch_blkno = info->prefetchblock; + prefetch_count = Min(prefetch_increment, max_prefetch); + info->prefetchblock += prefetch_count; + } +#endif + blkno = info->currentblock++; + } + info->ntuples += ntuples; + SpinLockRelease(&info->mutex); + + mytuples += ntuples; + if (done) + break; + +#ifdef USE_PREFETCH + while (prefetch_count > 0) + { + PrefetchBuffer(rel, MAIN_FORKNUM, prefetch_blkno); + ++prefetch_blkno; + --prefetch_count; + } +#endif + + buffer = ReadBuffer(rel, blkno); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + lines = PageGetMaxOffsetNumber(page); + ntuples = 0; + + all_visible = PageIsAllVisible(page) && !snapshot->takenDuringRecovery; + + for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(page, lineoff); + lineoff <= lines; + lineoff++, lpp++) + { + HeapTupleData loctup; + + if (!ItemIdIsNormal(lpp)) + continue; + if (all_visible) + { + ++ntuples; + continue; + } + + loctup.t_tableOid = relid; + loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp); + loctup.t_len = ItemIdGetLength(lpp); + + if (HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer)) + ++ntuples; + } + + UnlockReleaseBuffer(buffer); + } + + elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, mytuples); +} diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control new file mode 100644 index 0000000000..90bae3f6e0 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.control @@ -0,0 +1,4 @@ +comment = 'Dummy parallel code' +default_version = '1.0' +module_pathname = '$libdir/parallel_dummy' +relocatable = true |