contrib/parallel_dummy parallel
authorRobert Haas <rhaas@postgresql.org>
Fri, 30 Jan 2015 13:39:22 +0000 (08:39 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 30 Jan 2015 15:19:57 +0000 (10:19 -0500)
contrib/parallel_dummy/Makefile [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy--1.0.sql [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy.c [new file with mode: 0644]
contrib/parallel_dummy/parallel_dummy.control [new file with mode: 0644]

diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644 (file)
index 0000000..de00f50
--- /dev/null
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644 (file)
index 0000000..3c0ae7d
--- /dev/null
@@ -0,0 +1,12 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_sleep(sleep_time pg_catalog.int4,
+                                                         nworkers pg_catalog.int4)
+    RETURNS pg_catalog.void STRICT
+       AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION parallel_count(rel pg_catalog.regclass,
+                                                         nworkers pg_catalog.int4)
+    RETURNS pg_catalog.int8 STRICT
+       AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644 (file)
index 0000000..f92030e
--- /dev/null
@@ -0,0 +1,285 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ *             Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *             contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/parallel.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "utils/tqual.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_sleep);
+PG_FUNCTION_INFO_V1(parallel_count);
+
+#define                PARALLEL_DUMMY_KEY                      1
+
+typedef struct
+{
+       int32           sleep_time;
+} ParallelSleepInfo;
+
+typedef struct
+{
+       int32           relid;
+       slock_t         mutex;
+       BlockNumber     lastblock;
+       BlockNumber     currentblock;
+       BlockNumber prefetchblock;
+       int64           ntuples;
+} ParallelCountInfo;
+
+void           _PG_init(void);
+void           sleep_worker_main(dsm_segment *seg, shm_toc *toc);
+void           count_worker_main(dsm_segment *seg, shm_toc *toc);
+
+static void count_helper(Relation rel, ParallelCountInfo *info);
+
+int prefetch_distance;
+int prefetch_increment;
+
+void
+_PG_init()
+{
+       DefineCustomIntVariable("parallel_dummy.prefetch_distance",
+          "Sets the prefetch distance in blocks.",
+                 NULL, &prefetch_distance,
+                 0, 0, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
+       DefineCustomIntVariable("parallel_dummy.prefetch_increment",
+          "Sets the prefetch increment in blocks.",
+                 NULL, &prefetch_increment,
+                 8, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
+}
+
+
+Datum
+parallel_sleep(PG_FUNCTION_ARGS)
+{
+       int32           sleep_time = PG_GETARG_INT32(0);
+       int32           nworkers = PG_GETARG_INT32(1);
+       bool            already_in_parallel_mode = IsInParallelMode();
+       ParallelContext *pcxt;
+       ParallelSleepInfo *info;
+
+       if (nworkers < 0)
+               ereport(ERROR,
+                               (errmsg("number of parallel workers must be non-negative")));
+
+       if (!already_in_parallel_mode)
+               EnterParallelMode();
+
+       pcxt = CreateParallelContextForExtension("parallel_dummy",
+                                                                                        "sleep_worker_main",
+                                                                                        nworkers);
+       shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelSleepInfo));
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+       InitializeParallelDSM(pcxt);
+       info = shm_toc_allocate(pcxt->toc, sizeof(ParallelSleepInfo));
+       info->sleep_time = sleep_time;
+       shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+       LaunchParallelWorkers(pcxt);
+
+       /* here's where we do the "real work" ... */
+       DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time));
+
+       WaitForParallelWorkersToFinish(pcxt);
+       DestroyParallelContext(pcxt);
+
+       if (!already_in_parallel_mode)
+               ExitParallelMode();
+
+       PG_RETURN_VOID();
+}
+
+Datum
+parallel_count(PG_FUNCTION_ARGS)
+{
+       Oid                     relid = PG_GETARG_OID(0);
+       int32           nworkers = PG_GETARG_INT32(1);
+       bool            already_in_parallel_mode = IsInParallelMode();
+       ParallelContext *pcxt;
+       ParallelCountInfo *info;
+       Relation        rel;
+       int64           result;
+
+       if (nworkers < 0)
+               ereport(ERROR,
+                               (errmsg("number of parallel workers must be non-negative")));
+
+       rel = relation_open(relid, AccessShareLock);
+
+       if (!already_in_parallel_mode)
+               EnterParallelMode();
+
+       pcxt = CreateParallelContextForExtension("parallel_dummy",
+                                                                                        "count_worker_main",
+                                                                                        nworkers);
+       shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCountInfo));
+       shm_toc_estimate_keys(&pcxt->estimator, 1);
+       InitializeParallelDSM(pcxt);
+       info = shm_toc_allocate(pcxt->toc, sizeof(ParallelCountInfo));
+       info->relid = relid;
+       SpinLockInit(&info->mutex);
+       info->lastblock = RelationGetNumberOfBlocks(rel);
+       info->currentblock = 0;
+       info->prefetchblock = 0;
+       info->ntuples = 0;
+       shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+       LaunchParallelWorkers(pcxt);
+
+       /* here's where we do the "real work" ... */
+       count_helper(rel, info);
+
+       WaitForParallelWorkersToFinish(pcxt);
+
+       result = info->ntuples;
+
+       DestroyParallelContext(pcxt);
+
+       relation_close(rel, AccessShareLock);
+
+       if (!already_in_parallel_mode)
+               ExitParallelMode();
+
+       PG_RETURN_INT64(result);
+}
+
+void
+sleep_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+       ParallelSleepInfo *info;
+
+       info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+       Assert(info != NULL);
+
+       /* here's where we do the "real work" ... */
+       DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time));
+}
+
+void
+count_worker_main(dsm_segment *seg, shm_toc *toc)
+{
+       ParallelCountInfo *info;
+       Relation        rel;
+
+       info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+       Assert(info != NULL);
+
+       rel = relation_open(info->relid, AccessShareLock);
+       count_helper(rel, info);
+       relation_close(rel, AccessShareLock);
+}
+
+static void
+count_helper(Relation rel, ParallelCountInfo *info)
+{
+       int64           ntuples = 0;
+       int64           mytuples = 0;
+       Oid                     relid = info->relid;
+       Snapshot        snapshot = GetActiveSnapshot();
+
+       for (;;)
+       {
+               BlockNumber blkno;
+               Buffer          buffer;
+               Page            page;
+               int                     lines;
+               OffsetNumber    lineoff;
+               ItemId          lpp;
+               bool            all_visible;
+               bool            done = false;
+#ifdef USE_PREFETCH
+               BlockNumber prefetch_blkno = InvalidBlockNumber;
+               uint32          prefetch_count = 0;
+#endif
+
+               CHECK_FOR_INTERRUPTS();
+
+               SpinLockAcquire(&info->mutex);
+               if (info->currentblock >= info->lastblock)
+                       done = true;
+               else
+               {
+#ifdef USE_PREFETCH
+                       BlockNumber     max_prefetch;
+
+                       max_prefetch = info->lastblock - info->prefetchblock;
+                       if (max_prefetch > 0 &&
+                               info->prefetchblock - info->currentblock < prefetch_distance)
+                       {
+                               prefetch_blkno = info->prefetchblock;
+                               prefetch_count = Min(prefetch_increment, max_prefetch);
+                               info->prefetchblock += prefetch_count;
+                       }
+#endif
+                       blkno = info->currentblock++;
+               }
+               info->ntuples += ntuples;
+               SpinLockRelease(&info->mutex);
+
+               mytuples += ntuples;
+               if (done)
+                       break;
+
+#ifdef USE_PREFETCH
+               while (prefetch_count > 0)
+               {
+                       PrefetchBuffer(rel, MAIN_FORKNUM, prefetch_blkno);
+                       ++prefetch_blkno;
+                       --prefetch_count;
+               }
+#endif
+
+               buffer = ReadBuffer(rel, blkno);
+               LockBuffer(buffer, BUFFER_LOCK_SHARE);
+               page = BufferGetPage(buffer);
+               lines = PageGetMaxOffsetNumber(page);
+               ntuples = 0;
+
+               all_visible = PageIsAllVisible(page) && !snapshot->takenDuringRecovery;
+
+               for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(page, lineoff);
+                        lineoff <= lines;
+                        lineoff++, lpp++)
+               {
+                       HeapTupleData   loctup;
+
+                       if (!ItemIdIsNormal(lpp))
+                               continue;
+                       if (all_visible)
+                       {
+                               ++ntuples;
+                               continue;
+                       }
+
+                       loctup.t_tableOid = relid;
+                       loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+                       loctup.t_len = ItemIdGetLength(lpp);
+
+                       if (HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer))
+                               ++ntuples;
+               }
+
+               UnlockReleaseBuffer(buffer);
+       }
+
+       elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, mytuples);
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644 (file)
index 0000000..90bae3f
--- /dev/null
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true