summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/parallel_dummy/Makefile19
-rw-r--r--contrib/parallel_dummy/parallel_dummy--1.0.sql12
-rw-r--r--contrib/parallel_dummy/parallel_dummy.c285
-rw-r--r--contrib/parallel_dummy/parallel_dummy.control4
4 files changed, 320 insertions, 0 deletions
diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644
index 0000000000..de00f50381
--- /dev/null
+++ b/contrib/parallel_dummy/Makefile
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644
index 0000000000..3c0ae7d5d0
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql
@@ -0,0 +1,12 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_sleep(sleep_time pg_catalog.int4,
+ nworkers pg_catalog.int4)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION parallel_count(rel pg_catalog.regclass,
+ nworkers pg_catalog.int4)
+ RETURNS pg_catalog.int8 STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644
index 0000000000..f92030e066
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.c
@@ -0,0 +1,285 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ * Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "utils/tqual.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_sleep);
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define PARALLEL_DUMMY_KEY 1
+
+typedef struct
+{
+ int32 sleep_time;
+} ParallelSleepInfo;
+
+typedef struct
+{
+ int32 relid;
+ slock_t mutex;
+ BlockNumber lastblock;
+ BlockNumber currentblock;
+ BlockNumber prefetchblock;
+ int64 ntuples;
+} ParallelCountInfo;
+
+void _PG_init(void);
+void sleep_worker_main(dsm_segment *seg, shm_toc *toc);
+void count_worker_main(dsm_segment *seg, shm_toc *toc);
+
+static void count_helper(Relation rel, ParallelCountInfo *info);
+
+int prefetch_distance;
+int prefetch_increment;
+
+void
+_PG_init()
+{
+ DefineCustomIntVariable("parallel_dummy.prefetch_distance",
+ "Sets the prefetch distance in blocks.",
+ NULL, &prefetch_distance,
+ 0, 0, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
+ DefineCustomIntVariable("parallel_dummy.prefetch_increment",
+ "Sets the prefetch increment in blocks.",
+ NULL, &prefetch_increment,
+ 8, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
+}
+
+
+Datum
+parallel_sleep(PG_FUNCTION_ARGS)
+{
+ int32 sleep_time = PG_GETARG_INT32(0);
+ int32 nworkers = PG_GETARG_INT32(1);
+ bool already_in_parallel_mode = IsInParallelMode();
+ ParallelContext *pcxt;
+ ParallelSleepInfo *info;
+
+ if (nworkers < 0)
+ ereport(ERROR,
+ (errmsg("number of parallel workers must be non-negative")));
+
+ if (!already_in_parallel_mode)
+ EnterParallelMode();
+
+ pcxt = CreateParallelContextForExtension("parallel_dummy",
+ "sleep_worker_main",
+ nworkers);
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelSleepInfo));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ InitializeParallelDSM(pcxt);
+ info = shm_toc_allocate(pcxt->toc, sizeof(ParallelSleepInfo));
+ info->sleep_time = sleep_time;
+ shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+ LaunchParallelWorkers(pcxt);
+
+ /* here's where we do the "real work" ... */
+ DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time));
+
+ WaitForParallelWorkersToFinish(pcxt);
+ DestroyParallelContext(pcxt);
+
+ if (!already_in_parallel_mode)
+ ExitParallelMode();
+
+ PG_RETURN_VOID();
+}
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nworkers = PG_GETARG_INT32(1);
+ bool already_in_parallel_mode = IsInParallelMode();
+ ParallelContext *pcxt;
+ ParallelCountInfo *info;
+ Relation rel;
+ int64 result;
+
+ if (nworkers < 0)
+ ereport(ERROR,
+ (errmsg("number of parallel workers must be non-negative")));
+
+ rel = relation_open(relid, AccessShareLock);
+
+ if (!already_in_parallel_mode)
+ EnterParallelMode();
+
+ pcxt = CreateParallelContextForExtension("parallel_dummy",
+ "count_worker_main",
+ nworkers);
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCountInfo));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ InitializeParallelDSM(pcxt);
+ info = shm_toc_allocate(pcxt->toc, sizeof(ParallelCountInfo));
+ info->relid = relid;
+ SpinLockInit(&info->mutex);
+ info->lastblock = RelationGetNumberOfBlocks(rel);
+ info->currentblock = 0;
+ info->prefetchblock = 0;
+ info->ntuples = 0;
+ shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+ LaunchParallelWorkers(pcxt);
+
+ /* here's where we do the "real work" ... */
+ count_helper(rel, info);
+
+ WaitForParallelWorkersToFinish(pcxt);
+
+ result = info->ntuples;
+
+ DestroyParallelContext(pcxt);
+
+ relation_close(rel, AccessShareLock);
+
+ if (!already_in_parallel_mode)
+ ExitParallelMode();
+
+ PG_RETURN_INT64(result);
+}
+
+void
+sleep_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+ ParallelSleepInfo *info;
+
+ info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+ Assert(info != NULL);
+
+ /* here's where we do the "real work" ... */
+ DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time));
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+ ParallelCountInfo *info;
+ Relation rel;
+
+ info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+ Assert(info != NULL);
+
+ rel = relation_open(info->relid, AccessShareLock);
+ count_helper(rel, info);
+ relation_close(rel, AccessShareLock);
+}
+
+static void
+count_helper(Relation rel, ParallelCountInfo *info)
+{
+ int64 ntuples = 0;
+ int64 mytuples = 0;
+ Oid relid = info->relid;
+ Snapshot snapshot = GetActiveSnapshot();
+
+ for (;;)
+ {
+ BlockNumber blkno;
+ Buffer buffer;
+ Page page;
+ int lines;
+ OffsetNumber lineoff;
+ ItemId lpp;
+ bool all_visible;
+ bool done = false;
+#ifdef USE_PREFETCH
+ BlockNumber prefetch_blkno = InvalidBlockNumber;
+ uint32 prefetch_count = 0;
+#endif
+
+ CHECK_FOR_INTERRUPTS();
+
+ SpinLockAcquire(&info->mutex);
+ if (info->currentblock >= info->lastblock)
+ done = true;
+ else
+ {
+#ifdef USE_PREFETCH
+ BlockNumber max_prefetch;
+
+ max_prefetch = info->lastblock - info->prefetchblock;
+ if (max_prefetch > 0 &&
+ info->prefetchblock - info->currentblock < prefetch_distance)
+ {
+ prefetch_blkno = info->prefetchblock;
+ prefetch_count = Min(prefetch_increment, max_prefetch);
+ info->prefetchblock += prefetch_count;
+ }
+#endif
+ blkno = info->currentblock++;
+ }
+ info->ntuples += ntuples;
+ SpinLockRelease(&info->mutex);
+
+ mytuples += ntuples;
+ if (done)
+ break;
+
+#ifdef USE_PREFETCH
+ while (prefetch_count > 0)
+ {
+ PrefetchBuffer(rel, MAIN_FORKNUM, prefetch_blkno);
+ ++prefetch_blkno;
+ --prefetch_count;
+ }
+#endif
+
+ buffer = ReadBuffer(rel, blkno);
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ page = BufferGetPage(buffer);
+ lines = PageGetMaxOffsetNumber(page);
+ ntuples = 0;
+
+ all_visible = PageIsAllVisible(page) && !snapshot->takenDuringRecovery;
+
+ for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(page, lineoff);
+ lineoff <= lines;
+ lineoff++, lpp++)
+ {
+ HeapTupleData loctup;
+
+ if (!ItemIdIsNormal(lpp))
+ continue;
+ if (all_visible)
+ {
+ ++ntuples;
+ continue;
+ }
+
+ loctup.t_tableOid = relid;
+ loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+ loctup.t_len = ItemIdGetLength(lpp);
+
+ if (HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer))
+ ++ntuples;
+ }
+
+ UnlockReleaseBuffer(buffer);
+ }
+
+ elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, mytuples);
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644
index 0000000000..90bae3f6e0
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.control
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true