</listitem>
</varlistentry>
+ <varlistentry id="guc-enable-parallel-hash" xreflabel="enable_parallel_hash">
+ <term><varname>enable_parallel_hash</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>enable_parallel_hash</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables or disables the query planner's use of hash-join plan
+ types with parallel hash. Has no effect if hash-join plans are not
+ also enabled. The default is <literal>on</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-enable-partition-wise-join" xreflabel="enable_partition_wise_join">
<term><varname>enable_partition_wise_join</varname> (<type>boolean</type>)
<indexterm>
<entry>Waiting in an extension.</entry>
</row>
<row>
- <entry morerows="17"><literal>IPC</literal></entry>
+ <entry morerows="32"><literal>IPC</literal></entry>
<entry><literal>BgWorkerShutdown</literal></entry>
<entry>Waiting for background worker to shut down.</entry>
</row>
<entry><literal>ExecuteGather</literal></entry>
<entry>Waiting for activity from child process when executing <literal>Gather</literal> node.</entry>
</row>
+ <row>
+ <entry><literal>Hash/Batch/Allocating</literal></entry>
+ <entry>Waiting for an elected Parallel Hash participant to allocate a hash table.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/Batch/Electing</literal></entry>
+ <entry>Electing a Parallel Hash participant to allocate a hash table.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/Batch/Loading</literal></entry>
+ <entry>Waiting for other Parallel Hash participants to finish loading a hash table.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/Build/Allocating</literal></entry>
+ <entry>Waiting for an elected Parallel Hash participant to allocate the initial hash table.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/Build/Electing</literal></entry>
+ <entry>Electing a Parallel Hash participant to allocate the initial hash table.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/Build/HashingInner</literal></entry>
+ <entry>Waiting for other Parallel Hash participants to finish hashing the inner relation.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/Build/HashingOuter</literal></entry>
+ <entry>Waiting for other Parallel Hash participants to finish partitioning the outer relation.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBatches/Allocating</literal></entry>
+ <entry>Waiting for an elected Parallel Hash participant to allocate more batches.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBatches/Deciding</literal></entry>
+ <entry>Electing a Parallel Hash participant to decide on future batch growth.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBatches/Electing</literal></entry>
+ <entry>Electing a Parallel Hash participant to allocate more batches.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBatches/Finishing</literal></entry>
+ <entry>Waiting for an elected Parallel Hash participant to decide on future batch growth.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBatches/Repartitioning</literal></entry>
+ <entry>Waiting for other Parallel Hash participants to finishing repartitioning.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBuckets/Allocating</literal></entry>
+ <entry>Waiting for an elected Parallel Hash participant to finish allocating more buckets.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBuckets/Electing</literal></entry>
+ <entry>Electing a Parallel Hash participant to allocate more buckets.</entry>
+ </row>
+ <row>
+ <entry><literal>Hash/GrowBuckets/Reinserting</literal></entry>
+ <entry>Waiting for other Parallel Hash participants to finish inserting tuples into new buckets.</entry>
+ </row>
<row>
<entry><literal>LogicalSyncData</literal></entry>
<entry>Waiting for logical replication remote server to send data for initial table synchronization.</entry>
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeHash.h"
+#include "executor/nodeHashjoin.h"
#include "executor/nodeIndexscan.h"
#include "executor/nodeIndexonlyscan.h"
#include "executor/nodeSeqscan.h"
ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
e->pcxt);
break;
+ case T_HashJoinState:
+ if (planstate->plan->parallel_aware)
+ ExecHashJoinEstimate((HashJoinState *) planstate,
+ e->pcxt);
+ break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashEstimate((HashState *) planstate, e->pcxt);
ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
d->pcxt);
break;
+ case T_HashJoinState:
+ if (planstate->plan->parallel_aware)
+ ExecHashJoinInitializeDSM((HashJoinState *) planstate,
+ d->pcxt);
+ break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
+ case T_HashJoinState:
+ if (planstate->plan->parallel_aware)
+ ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
+ pcxt);
+ break;
case T_HashState:
case T_SortState:
/* these nodes have DSM state, but no reinitialization is required */
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
pwcxt);
break;
+ case T_HashJoinState:
+ if (planstate->plan->parallel_aware)
+ ExecHashJoinInitializeWorker((HashJoinState *) planstate,
+ pwcxt);
+ break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeWorker((HashState *) planstate, pwcxt);
case T_HashState:
ExecShutdownHash((HashState *) node);
break;
+ case T_HashJoinState:
+ ExecShutdownHashJoin((HashJoinState *) node);
+ break;
default:
break;
}
* IDENTIFICATION
* src/backend/executor/nodeHash.c
*
+ * See note on parallelism in nodeHashjoin.c.
+ *
*-------------------------------------------------------------------------
*/
/*
#include <limits.h>
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "catalog/pg_statistic.h"
#include "commands/tablespace.h"
#include "executor/execdebug.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "port/atomics.h"
#include "utils/dynahash.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
+static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable);
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
int mcvsToUse);
static void ExecHashSkewTableInsert(HashJoinTable hashtable,
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
static void *dense_alloc(HashJoinTable hashtable, Size size);
+static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable,
+ size_t size,
+ dsa_pointer *shared);
+static void MultiExecPrivateHash(HashState *node);
+static void MultiExecParallelHash(HashState *node);
+static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table,
+ int bucketno);
+static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table,
+ HashJoinTuple tuple);
+static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+ HashJoinTuple tuple,
+ dsa_pointer tuple_shared);
+static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch);
+static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable);
+static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable);
+static void ExecParallelHashRepartitionRest(HashJoinTable hashtable);
+static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table,
+ dsa_pointer *shared);
+static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
+ int batchno,
+ size_t size);
+static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
+static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+
/* ----------------------------------------------------------------
* ExecHash
*/
Node *
MultiExecHash(HashState *node)
+{
+ /* must provide our own instrumentation support */
+ if (node->ps.instrument)
+ InstrStartNode(node->ps.instrument);
+
+ if (node->parallel_state != NULL)
+ MultiExecParallelHash(node);
+ else
+ MultiExecPrivateHash(node);
+
+ /* must provide our own instrumentation support */
+ if (node->ps.instrument)
+ InstrStopNode(node->ps.instrument, node->hashtable->partialTuples);
+
+ /*
+ * We do not return the hash table directly because it's not a subtype of
+ * Node, and so would violate the MultiExecProcNode API. Instead, our
+ * parent Hashjoin node is expected to know how to fish it out of our node
+ * state. Ugly but not really worth cleaning up, since Hashjoin knows
+ * quite a bit more about Hash besides that.
+ */
+ return NULL;
+}
+
+/* ----------------------------------------------------------------
+ * MultiExecPrivateHash
+ *
+ * parallel-oblivious version, building a backend-private
+ * hash table and (if necessary) batch files.
+ * ----------------------------------------------------------------
+ */
+static void
+MultiExecPrivateHash(HashState *node)
{
PlanState *outerNode;
List *hashkeys;
ExprContext *econtext;
uint32 hashvalue;
- /* must provide our own instrumentation support */
- if (node->ps.instrument)
- InstrStartNode(node->ps.instrument);
-
/*
* get state info from node
*/
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
- /* must provide our own instrumentation support */
- if (node->ps.instrument)
- InstrStopNode(node->ps.instrument, hashtable->totalTuples);
+ hashtable->partialTuples = hashtable->totalTuples;
+}
+
+/* ----------------------------------------------------------------
+ * MultiExecParallelHash
+ *
+ * parallel-aware version, building a shared hash table and
+ * (if necessary) batch files using the combined effort of
+ * a set of co-operating backends.
+ * ----------------------------------------------------------------
+ */
+static void
+MultiExecParallelHash(HashState *node)
+{
+ ParallelHashJoinState *pstate;
+ PlanState *outerNode;
+ List *hashkeys;
+ HashJoinTable hashtable;
+ TupleTableSlot *slot;
+ ExprContext *econtext;
+ uint32 hashvalue;
+ Barrier *build_barrier;
+ int i;
/*
- * We do not return the hash table directly because it's not a subtype of
- * Node, and so would violate the MultiExecProcNode API. Instead, our
- * parent Hashjoin node is expected to know how to fish it out of our node
- * state. Ugly but not really worth cleaning up, since Hashjoin knows
- * quite a bit more about Hash besides that.
+ * get state info from node
*/
- return NULL;
+ outerNode = outerPlanState(node);
+ hashtable = node->hashtable;
+
+ /*
+ * set expression context
+ */
+ hashkeys = node->hashkeys;
+ econtext = node->ps.ps_ExprContext;
+
+ /*
+ * Synchronize the parallel hash table build. At this stage we know that
+ * the shared hash table has been or is being set up by
+ * ExecHashTableCreate(), but we don't know if our peers have returned
+ * from there or are here in MultiExecParallelHash(), and if so how far
+ * through they are. To find out, we check the build_barrier phase then
+ * and jump to the right step in the build algorithm.
+ */
+ pstate = hashtable->parallel_state;
+ build_barrier = &pstate->build_barrier;
+ Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
+ switch (BarrierPhase(build_barrier))
+ {
+ case PHJ_BUILD_ALLOCATING:
+
+ /*
+ * Either I just allocated the initial hash table in
+ * ExecHashTableCreate(), or someone else is doing that. Either
+ * way, wait for everyone to arrive here so we can proceed.
+ */
+ BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_BUILD_HASHING_INNER:
+
+ /*
+ * It's time to begin hashing, or if we just arrived here then
+ * hashing is already underway, so join in that effort. While
+ * hashing we have to be prepared to help increase the number of
+ * batches or buckets at any time, and if we arrived here when
+ * that was already underway we'll have to help complete that work
+ * immediately so that it's safe to access batches and buckets
+ * below.
+ */
+ if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
+ PHJ_GROW_BATCHES_ELECTING)
+ ExecParallelHashIncreaseNumBatches(hashtable);
+ if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
+ PHJ_GROW_BUCKETS_ELECTING)
+ ExecParallelHashIncreaseNumBuckets(hashtable);
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+ for (;;)
+ {
+ slot = ExecProcNode(outerNode);
+ if (TupIsNull(slot))
+ break;
+ econtext->ecxt_innertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
+ false, hashtable->keepNulls,
+ &hashvalue))
+ ExecParallelHashTableInsert(hashtable, slot, hashvalue);
+ hashtable->partialTuples++;
+ }
+ BarrierDetach(&pstate->grow_buckets_barrier);
+ BarrierDetach(&pstate->grow_batches_barrier);
+
+ /*
+ * Make sure that any tuples we wrote to disk are visible to
+ * others before anyone tries to load them.
+ */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ sts_end_write(hashtable->batches[i].inner_tuples);
+
+ /*
+ * Update shared counters. We need an accurate total tuple count
+ * to control the empty table optimization.
+ */
+ ExecParallelHashMergeCounters(hashtable);
+
+ /*
+ * Wait for everyone to finish building and flushing files and
+ * counters.
+ */
+ if (BarrierArriveAndWait(build_barrier,
+ WAIT_EVENT_HASH_BUILD_HASHING_INNER))
+ {
+ /*
+ * Elect one backend to disable any further growth. Batches
+ * are now fixed. While building them we made sure they'd fit
+ * in our memory budget when we load them back in later (or we
+ * tried to do that and gave up because we detected extreme
+ * skew).
+ */
+ pstate->growth = PHJ_GROWTH_DISABLED;
+ }
+ }
+
+ /*
+ * We're not yet attached to a batch. We all agree on the dimensions and
+ * number of inner tuples (for the empty table optimization).
+ */
+ hashtable->curbatch = -1;
+ hashtable->nbuckets = pstate->nbuckets;
+ hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
+ hashtable->totalTuples = pstate->total_tuples;
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+
+ /*
+ * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
+ * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
+ * there already).
+ */
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+ BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
}
/* ----------------------------------------------------------------
* ----------------------------------------------------------------
*/
HashJoinTable
-ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
+ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
{
+ Hash *node;
HashJoinTable hashtable;
Plan *outerNode;
+ size_t space_allowed;
int nbuckets;
int nbatch;
+ double rows;
int num_skew_mcvs;
int log2_nbuckets;
int nkeys;
* "outer" subtree of this node, but the inner relation of the hashjoin).
* Compute the appropriate size of the hash table.
*/
+ node = (Hash *) state->ps.plan;
outerNode = outerPlan(node);
- ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
+ /*
+ * If this is shared hash table with a partial plan, then we can't use
+ * outerNode->plan_rows to estimate its size. We need an estimate of the
+ * total number of rows across all copies of the partial plan.
+ */
+ rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
+
+ ExecChooseHashTableSize(rows, outerNode->plan_width,
OidIsValid(node->skewTable),
+ state->parallel_state != NULL,
+ state->parallel_state != NULL ?
+ state->parallel_state->nparticipants - 1 : 0,
+ &space_allowed,
&nbuckets, &nbatch, &num_skew_mcvs);
/* nbuckets must be a power of 2 */
hashtable->nbuckets_optimal = nbuckets;
hashtable->log2_nbuckets = log2_nbuckets;
hashtable->log2_nbuckets_optimal = log2_nbuckets;
- hashtable->buckets = NULL;
+ hashtable->buckets.unshared = NULL;
hashtable->keepNulls = keepNulls;
hashtable->skewEnabled = false;
hashtable->skewBucket = NULL;
hashtable->nbatch_outstart = nbatch;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
+ hashtable->partialTuples = 0;
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
- hashtable->spaceAllowed = work_mem * 1024L;
+ hashtable->spaceAllowed = space_allowed;
hashtable->spaceUsedSkew = 0;
hashtable->spaceAllowedSkew =
hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
hashtable->chunks = NULL;
+ hashtable->current_chunk = NULL;
+ hashtable->parallel_state = state->parallel_state;
+ hashtable->area = state->ps.state->es_query_dsa;
+ hashtable->batches = NULL;
#ifdef HJDEBUG
printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
- if (nbatch > 1)
+ if (nbatch > 1 && hashtable->parallel_state == NULL)
{
/*
- * allocate and initialize the file arrays in hashCxt
+ * allocate and initialize the file arrays in hashCxt (not needed for
+ * parallel case which uses shared tuplestores instead of raw files)
*/
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
PrepareTempTablespaces();
}
- /*
- * Prepare context for the first-scan space allocations; allocate the
- * hashbucket array therein, and set each bucket "empty".
- */
- MemoryContextSwitchTo(hashtable->batchCxt);
+ MemoryContextSwitchTo(oldcxt);
- hashtable->buckets = (HashJoinTuple *)
- palloc0(nbuckets * sizeof(HashJoinTuple));
+ if (hashtable->parallel_state)
+ {
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ Barrier *build_barrier;
- /*
- * Set up for skew optimization, if possible and there's a need for more
- * than one batch. (In a one-batch join, there's no point in it.)
- */
- if (nbatch > 1)
- ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
+ /*
+ * Attach to the build barrier. The corresponding detach operation is
+ * in ExecHashTableDetach. Note that we won't attach to the
+ * batch_barrier for batch 0 yet. We'll attach later and start it out
+ * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
+ * and then loaded while hashing (the standard hybrid hash join
+ * algorithm), and we'll coordinate that using build_barrier.
+ */
+ build_barrier = &pstate->build_barrier;
+ BarrierAttach(build_barrier);
- MemoryContextSwitchTo(oldcxt);
+ /*
+ * So far we have no idea whether there are any other participants,
+ * and if so, what phase they are working on. The only thing we care
+ * about at this point is whether someone has already created the
+ * SharedHashJoinBatch objects and the hash table for batch 0. One
+ * backend will be elected to do that now if necessary.
+ */
+ if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
+ BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING))
+ {
+ pstate->nbatch = nbatch;
+ pstate->space_allowed = space_allowed;
+ pstate->growth = PHJ_GROWTH_OK;
+
+ /* Set up the shared state for coordinating batches. */
+ ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
+
+ /*
+ * Allocate batch 0's hash table up front so we can load it
+ * directly while hashing.
+ */
+ pstate->nbuckets = nbuckets;
+ ExecParallelHashTableAlloc(hashtable, 0);
+ }
+
+ /*
+ * The next Parallel Hash synchronization point is in
+ * MultiExecParallelHash(), which will progress it all the way to
+ * PHJ_BUILD_DONE. The caller must not return control from this
+ * executor node between now and then.
+ */
+ }
+ else
+ {
+ /*
+ * Prepare context for the first-scan space allocations; allocate the
+ * hashbucket array therein, and set each bucket "empty".
+ */
+ MemoryContextSwitchTo(hashtable->batchCxt);
+
+ hashtable->buckets.unshared = (HashJoinTuple *)
+ palloc0(nbuckets * sizeof(HashJoinTuple));
+
+ /*
+ * Set up for skew optimization, if possible and there's a need for
+ * more than one batch. (In a one-batch join, there's no point in
+ * it.)
+ */
+ if (nbatch > 1)
+ ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
+
+ MemoryContextSwitchTo(oldcxt);
+ }
return hashtable;
}
void
ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
+ bool try_combined_work_mem,
+ int parallel_workers,
+ size_t *space_allowed,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs)
*/
hash_table_bytes = work_mem * 1024L;
+ /*
+ * Parallel Hash tries to use the combined work_mem of all workers to
+ * avoid the need to batch. If that won't work, it falls back to work_mem
+ * per worker and tries to process batches in parallel.
+ */
+ if (try_combined_work_mem)
+ hash_table_bytes += hash_table_bytes * parallel_workers;
+
+ *space_allowed = hash_table_bytes;
+
/*
* If skew optimization is possible, estimate the number of skew buckets
* that will fit in the memory allowed, and decrement the assumed space
* Note that both nbuckets and nbatch must be powers of 2 to make
* ExecHashGetBucketAndBatch fast.
*/
- max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple);
+ max_pointers = *space_allowed / sizeof(HashJoinTuple);
max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
/* If max_pointers isn't a power of 2, must round it down to one */
mppow2 = 1L << my_log2(max_pointers);
int minbatch;
long bucket_size;
+ /*
+ * If Parallel Hash with combined work_mem would still need multiple
+ * batches, we'll have to fall back to regular work_mem budget.
+ */
+ if (try_combined_work_mem)
+ {
+ ExecChooseHashTableSize(ntuples, tupwidth, useskew,
+ false, parallel_workers,
+ space_allowed,
+ numbuckets,
+ numbatches,
+ num_skew_mcvs);
+ return;
+ }
+
/*
* Estimate the number of buckets we'll want to have when work_mem is
* entirely full. Each bucket will contain a bucket pointer plus
/*
* Make sure all the temp files are closed. We skip batch 0, since it
* can't have any temp files (and the arrays might not even exist if
- * nbatch is only 1).
+ * nbatch is only 1). Parallel hash joins don't use these files.
*/
- for (i = 1; i < hashtable->nbatch; i++)
+ if (hashtable->innerBatchFile != NULL)
{
- if (hashtable->innerBatchFile[i])
- BufFileClose(hashtable->innerBatchFile[i]);
- if (hashtable->outerBatchFile[i])
- BufFileClose(hashtable->outerBatchFile[i]);
+ for (i = 1; i < hashtable->nbatch; i++)
+ {
+ if (hashtable->innerBatchFile[i])
+ BufFileClose(hashtable->innerBatchFile[i]);
+ if (hashtable->outerBatchFile[i])
+ BufFileClose(hashtable->outerBatchFile[i]);
+ }
}
/* Release working memory (batchCxt is a child, so it goes away too) */
hashtable->nbuckets = hashtable->nbuckets_optimal;
hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
- hashtable->buckets = repalloc(hashtable->buckets,
- sizeof(HashJoinTuple) * hashtable->nbuckets);
+ hashtable->buckets.unshared =
+ repalloc(hashtable->buckets.unshared,
+ sizeof(HashJoinTuple) * hashtable->nbuckets);
}
/*
* buckets now and not have to keep track which tuples in the buckets have
* already been processed. We will free the old chunks as we go.
*/
- memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
+ memset(hashtable->buckets.unshared, 0,
+ sizeof(HashJoinTuple) * hashtable->nbuckets);
oldchunks = hashtable->chunks;
hashtable->chunks = NULL;
/* so, let's scan through the old chunks, and all tuples in each chunk */
while (oldchunks != NULL)
{
- HashMemoryChunk nextchunk = oldchunks->next;
+ HashMemoryChunk nextchunk = oldchunks->next.unshared;
/* position within the buffer (up to oldchunks->used) */
size_t idx = 0;
memcpy(copyTuple, hashTuple, hashTupleSize);
/* and add it back to the appropriate bucket */
- copyTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = copyTuple;
+ copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = copyTuple;
}
else
{
}
}
+/*
+ * ExecParallelHashIncreaseNumBatches
+ * Every participant attached to grow_barrier must run this function
+ * when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES.
+ */
+static void
+ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+
+ Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+
+ /*
+ * It's unlikely, but we need to be prepared for new participants to show
+ * up while we're in the middle of this operation so we need to switch on
+ * barrier phase here.
+ */
+ switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
+ {
+ case PHJ_GROW_BATCHES_ELECTING:
+
+ /*
+ * Elect one participant to prepare to grow the number of batches.
+ * This involves reallocating or resetting the buckets of batch 0
+ * in preparation for all participants to begin repartitioning the
+ * tuples.
+ */
+ if (BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_ELECTING))
+ {
+ dsa_pointer_atomic *buckets;
+ ParallelHashJoinBatch *old_batch0;
+ int new_nbatch;
+ int i;
+
+ /* Move the old batch out of the way. */
+ old_batch0 = hashtable->batches[0].shared;
+ pstate->old_batches = pstate->batches;
+ pstate->old_nbatch = hashtable->nbatch;
+ pstate->batches = InvalidDsaPointer;
+
+ /* Free this backend's old accessors. */
+ ExecParallelHashCloseBatchAccessors(hashtable);
+
+ /* Figure out how many batches to use. */
+ if (hashtable->nbatch == 1)
+ {
+ /*
+ * We are going from single-batch to multi-batch. We need
+ * to switch from one large combined memory budget to the
+ * regular work_mem budget.
+ */
+ pstate->space_allowed = work_mem * 1024L;
+
+ /*
+ * The combined work_mem of all participants wasn't
+ * enough. Therefore one batch per participant would be
+ * approximately equivalent and would probably also be
+ * insufficient. So try two batches per particiant,
+ * rounded up to a power of two.
+ */
+ new_nbatch = 1 << my_log2(pstate->nparticipants * 2);
+ }
+ else
+ {
+ /*
+ * We were already multi-batched. Try doubling the number
+ * of batches.
+ */
+ new_nbatch = hashtable->nbatch * 2;
+ }
+
+ /* Allocate new larger generation of batches. */
+ Assert(hashtable->nbatch == pstate->nbatch);
+ ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch);
+ Assert(hashtable->nbatch == pstate->nbatch);
+
+ /* Replace or recycle batch 0's bucket array. */
+ if (pstate->old_nbatch == 1)
+ {
+ double dtuples;
+ double dbuckets;
+ int new_nbuckets;
+
+ /*
+ * We probably also need a smaller bucket array. How many
+ * tuples do we expect per batch, assuming we have only
+ * half of them so far? Normally we don't need to change
+ * the bucket array's size, because the size of each batch
+ * stays the same as we add more batches, but in this
+ * special case we move from a large batch to many smaller
+ * batches and it would be wasteful to keep the large
+ * array.
+ */
+ dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;
+ dbuckets = ceil(dtuples / NTUP_PER_BUCKET);
+ dbuckets = Min(dbuckets,
+ MaxAllocSize / sizeof(dsa_pointer_atomic));
+ new_nbuckets = (int) dbuckets;
+ new_nbuckets = Max(new_nbuckets, 1024);
+ new_nbuckets = 1 << my_log2(new_nbuckets);
+ dsa_free(hashtable->area, old_batch0->buckets);
+ hashtable->batches[0].shared->buckets =
+ dsa_allocate(hashtable->area,
+ sizeof(dsa_pointer_atomic) * new_nbuckets);
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area,
+ hashtable->batches[0].shared->buckets);
+ for (i = 0; i < new_nbuckets; ++i)
+ dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
+ pstate->nbuckets = new_nbuckets;
+ }
+ else
+ {
+ /* Recycle the existing bucket array. */
+ hashtable->batches[0].shared->buckets = old_batch0->buckets;
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area, old_batch0->buckets);
+ for (i = 0; i < hashtable->nbuckets; ++i)
+ dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer);
+ }
+
+ /* Move all chunks to the work queue for parallel processing. */
+ pstate->chunk_work_queue = old_batch0->chunks;
+
+ /* Disable further growth temporarily while we're growing. */
+ pstate->growth = PHJ_GROWTH_DISABLED;
+ }
+ else
+ {
+ /* All other participants just flush their tuples to disk. */
+ ExecParallelHashCloseBatchAccessors(hashtable);
+ }
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_ALLOCATING:
+ /* Wait for the above to be finished. */
+ BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_REPARTITIONING:
+ /* Make sure that we have the current dimensions and buckets. */
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+ /* Then partition, flush counters. */
+ ExecParallelHashRepartitionFirst(hashtable);
+ ExecParallelHashRepartitionRest(hashtable);
+ ExecParallelHashMergeCounters(hashtable);
+ /* Wait for the above to be finished. */
+ BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING);
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_DECIDING:
+
+ /*
+ * Elect one participant to clean up and decide whether further
+ * repartitioning is needed, or should be disabled because it's
+ * not helping.
+ */
+ if (BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_DECIDING))
+ {
+ bool space_exhausted = false;
+ bool extreme_skew_detected = false;
+
+ /* Make sure that we have the current dimensions and buckets. */
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+
+ /* Are any of the new generation of batches exhausted? */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatch *batch = hashtable->batches[i].shared;
+
+ if (batch->space_exhausted ||
+ batch->estimated_size > pstate->space_allowed)
+ {
+ int parent;
+
+ space_exhausted = true;
+
+ /*
+ * Did this batch receive ALL of the tuples from its
+ * parent batch? That would indicate that further
+ * repartitioning isn't going to help (the hash values
+ * are probably all the same).
+ */
+ parent = i % pstate->old_nbatch;
+ if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples)
+ extreme_skew_detected = true;
+ }
+ }
+
+ /* Don't keep growing if it's not helping or we'd overflow. */
+ if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2)
+ pstate->growth = PHJ_GROWTH_DISABLED;
+ else if (space_exhausted)
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
+ else
+ pstate->growth = PHJ_GROWTH_OK;
+
+ /* Free the old batches in shared memory. */
+ dsa_free(hashtable->area, pstate->old_batches);
+ pstate->old_batches = InvalidDsaPointer;
+ }
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_FINISHING:
+ /* Wait for the above to complete. */
+ BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_FINISHING);
+ }
+}
+
+/*
+ * Repartition the tuples currently loaded into memory for inner batch 0
+ * because the number of batches has been increased. Some tuples are retained
+ * in memory and some are written out to a later batch.
+ */
+static void
+ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
+{
+ dsa_pointer chunk_shared;
+ HashMemoryChunk chunk;
+
+ Assert(hashtable->nbatch = hashtable->parallel_state->nbatch);
+
+ while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared)))
+ {
+ size_t idx = 0;
+
+ /* Repartition all tuples in this chunk. */
+ while (idx < chunk->used)
+ {
+ HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx);
+ MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+ HashJoinTuple copyTuple;
+ dsa_pointer shared;
+ int bucketno;
+ int batchno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+ &bucketno, &batchno);
+
+ Assert(batchno < hashtable->nbatch);
+ if (batchno == 0)
+ {
+ /* It still belongs in batch 0. Copy to a new chunk. */
+ copyTuple =
+ ExecParallelHashTupleAlloc(hashtable,
+ HJTUPLE_OVERHEAD + tuple->t_len,
+ &shared);
+ copyTuple->hashvalue = hashTuple->hashvalue;
+ memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len);
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ copyTuple, shared);
+ }
+ else
+ {
+ size_t tuple_size =
+ MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
+
+ /* It belongs in a later batch. */
+ hashtable->batches[batchno].estimated_size += tuple_size;
+ sts_puttuple(hashtable->batches[batchno].inner_tuples,
+ &hashTuple->hashvalue, tuple);
+ }
+
+ /* Count this tuple. */
+ ++hashtable->batches[0].old_ntuples;
+ ++hashtable->batches[batchno].ntuples;
+
+ idx += MAXALIGN(HJTUPLE_OVERHEAD +
+ HJTUPLE_MINTUPLE(hashTuple)->t_len);
+ }
+
+ /* Free this chunk. */
+ dsa_free(hashtable->area, chunk_shared);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+}
+
+/*
+ * Help repartition inner batches 1..n.
+ */
+static void
+ExecParallelHashRepartitionRest(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int old_nbatch = pstate->old_nbatch;
+ SharedTuplestoreAccessor **old_inner_tuples;
+ ParallelHashJoinBatch *old_batches;
+ int i;
+
+ /* Get our hands on the previous generation of batches. */
+ old_batches = (ParallelHashJoinBatch *)
+ dsa_get_address(hashtable->area, pstate->old_batches);
+ old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch);
+ for (i = 1; i < old_nbatch; ++i)
+ {
+ ParallelHashJoinBatch *shared =
+ NthParallelHashJoinBatch(old_batches, i);
+
+ old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared),
+ ParallelWorkerNumber + 1,
+ &pstate->fileset);
+ }
+
+ /* Join in the effort to repartition them. */
+ for (i = 1; i < old_nbatch; ++i)
+ {
+ MinimalTuple tuple;
+ uint32 hashvalue;
+
+ /* Scan one partition from the previous generation. */
+ sts_begin_parallel_scan(old_inner_tuples[i]);
+ while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue)))
+ {
+ size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
+ int bucketno;
+ int batchno;
+
+ /* Decide which partition it goes to in the new generation. */
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
+ &batchno);
+
+ hashtable->batches[batchno].estimated_size += tuple_size;
+ ++hashtable->batches[batchno].ntuples;
+ ++hashtable->batches[i].old_ntuples;
+
+ /* Store the tuple its new batch. */
+ sts_puttuple(hashtable->batches[batchno].inner_tuples,
+ &hashvalue, tuple);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+ sts_end_parallel_scan(old_inner_tuples[i]);
+ }
+
+ pfree(old_inner_tuples);
+}
+
+/*
+ * Transfer the backend-local per-batch counters to the shared totals.
+ */
+static void
+ExecParallelHashMergeCounters(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+ pstate->total_tuples = 0;
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i];
+
+ batch->shared->size += batch->size;
+ batch->shared->estimated_size += batch->estimated_size;
+ batch->shared->ntuples += batch->ntuples;
+ batch->shared->old_ntuples += batch->old_ntuples;
+ batch->size = 0;
+ batch->estimated_size = 0;
+ batch->ntuples = 0;
+ batch->old_ntuples = 0;
+ pstate->total_tuples += batch->shared->ntuples;
+ }
+ LWLockRelease(&pstate->lock);
+}
+
/*
* ExecHashIncreaseNumBuckets
* increase the original number of buckets in order to reduce
* ExecHashIncreaseNumBatches, but without all the copying into new
* chunks)
*/
- hashtable->buckets =
- (HashJoinTuple *) repalloc(hashtable->buckets,
+ hashtable->buckets.unshared =
+ (HashJoinTuple *) repalloc(hashtable->buckets.unshared,
hashtable->nbuckets * sizeof(HashJoinTuple));
- memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
+ memset(hashtable->buckets.unshared, 0,
+ hashtable->nbuckets * sizeof(HashJoinTuple));
/* scan through all tuples in all chunks to rebuild the hash table */
- for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
+ for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
{
/* process all tuples stored in this chunk */
size_t idx = 0;
&bucketno, &batchno);
/* add the tuple to the proper bucket */
- hashTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = hashTuple;
+ hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = hashTuple;
/* advance index past the tuple */
idx += MAXALIGN(HJTUPLE_OVERHEAD +
}
}
+static void
+ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+ HashMemoryChunk chunk;
+ dsa_pointer chunk_s;
-/*
- * ExecHashTableInsert
- * insert a tuple into the hash table depending on the hash value
- * it may just go to a temp file for later batches
- *
- * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
+ Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+
+ /*
+ * It's unlikely, but we need to be prepared for new participants to show
+ * up while we're in the middle of this operation so we need to switch on
+ * barrier phase here.
+ */
+ switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
+ {
+ case PHJ_GROW_BUCKETS_ELECTING:
+ /* Elect one participant to prepare to increase nbuckets. */
+ if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING))
+ {
+ size_t size;
+ dsa_pointer_atomic *buckets;
+
+ /* Double the size of the bucket array. */
+ pstate->nbuckets *= 2;
+ size = pstate->nbuckets * sizeof(dsa_pointer_atomic);
+ hashtable->batches[0].shared->size += size / 2;
+ dsa_free(hashtable->area, hashtable->batches[0].shared->buckets);
+ hashtable->batches[0].shared->buckets =
+ dsa_allocate(hashtable->area, size);
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area,
+ hashtable->batches[0].shared->buckets);
+ for (i = 0; i < pstate->nbuckets; ++i)
+ dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
+
+ /* Put the chunk list onto the work queue. */
+ pstate->chunk_work_queue = hashtable->batches[0].shared->chunks;
+
+ /* Clear the flag. */
+ pstate->growth = PHJ_GROWTH_OK;
+ }
+ /* Fall through. */
+
+ case PHJ_GROW_BUCKETS_ALLOCATING:
+ /* Wait for the above to complete. */
+ BarrierArriveAndWait(&pstate->grow_buckets_barrier,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_GROW_BUCKETS_REINSERTING:
+ /* Reinsert all tuples into the hash table. */
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+ while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s)))
+ {
+ size_t idx = 0;
+
+ while (idx < chunk->used)
+ {
+ HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx);
+ dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx;
+ int bucketno;
+ int batchno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+ &bucketno, &batchno);
+ Assert(batchno == 0);
+
+ /* add the tuple to the proper bucket */
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ hashTuple, shared);
+
+ /* advance index past the tuple */
+ idx += MAXALIGN(HJTUPLE_OVERHEAD +
+ HJTUPLE_MINTUPLE(hashTuple)->t_len);
+ }
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+ BarrierArriveAndWait(&pstate->grow_buckets_barrier,
+ WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING);
+ }
+}
+
+/*
+ * ExecHashTableInsert
+ * insert a tuple into the hash table depending on the hash value
+ * it may just go to a temp file for later batches
+ *
+ * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
* tuple; the minimal case in particular is certain to happen while reloading
* tuples from batch files. We could save some cycles in the regular-tuple
* case by not forcing the slot contents into minimal form; not clear if it's
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
/* Push it onto the front of the bucket's list */
- hashTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = hashTuple;
+ hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = hashTuple;
/*
* Increase the (optimal) number of buckets if we just exceeded the
}
}
+/*
+ * ExecHashTableParallelInsert
+ * insert a tuple into a shared hash table or shared batch tuplestore
+ */
+void
+ExecParallelHashTableInsert(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue)
+{
+ MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
+ dsa_pointer shared;
+ int bucketno;
+ int batchno;
+
+retry:
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
+
+ if (batchno == 0)
+ {
+ HashJoinTuple hashTuple;
+
+ /* Try to load it into memory. */
+ Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) ==
+ PHJ_BUILD_HASHING_INNER);
+ hashTuple = ExecParallelHashTupleAlloc(hashtable,
+ HJTUPLE_OVERHEAD + tuple->t_len,
+ &shared);
+ if (hashTuple == NULL)
+ goto retry;
+
+ /* Store the hash value in the HashJoinTuple header. */
+ hashTuple->hashvalue = hashvalue;
+ memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
+
+ /* Push it onto the front of the bucket's list */
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ hashTuple, shared);
+ }
+ else
+ {
+ size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
+
+ Assert(batchno > 0);
+
+ /* Try to preallocate space in the batch if necessary. */
+ if (hashtable->batches[batchno].preallocated < tuple_size)
+ {
+ if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
+ goto retry;
+ }
+
+ Assert(hashtable->batches[batchno].preallocated >= tuple_size);
+ hashtable->batches[batchno].preallocated -= tuple_size;
+ sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
+ tuple);
+ }
+ ++hashtable->batches[batchno].ntuples;
+}
+
+/*
+ * Insert a tuple into the current hash table. Unlike
+ * ExecParallelHashTableInsert, this version is not prepared to send the tuple
+ * to other batches or to run out of memory, and should only be called with
+ * tuples that belong in the current batch once growth has been disabled.
+ */
+void
+ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue)
+{
+ MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
+ HashJoinTuple hashTuple;
+ dsa_pointer shared;
+ int batchno;
+ int bucketno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
+ Assert(batchno == hashtable->curbatch);
+ hashTuple = ExecParallelHashTupleAlloc(hashtable,
+ HJTUPLE_OVERHEAD + tuple->t_len,
+ &shared);
+ hashTuple->hashvalue = hashvalue;
+ memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
+ HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ hashTuple, shared);
+}
+
/*
* ExecHashGetHashValue
* Compute the hash value for a tuple
* otherwise scan the standard hashtable bucket.
*/
if (hashTuple != NULL)
- hashTuple = hashTuple->next;
+ hashTuple = hashTuple->next.unshared;
else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
else
- hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
+ hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
+
+ while (hashTuple != NULL)
+ {
+ if (hashTuple->hashvalue == hashvalue)
+ {
+ TupleTableSlot *inntuple;
+
+ /* insert hashtable's tuple into exec slot so ExecQual sees it */
+ inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false); /* do not pfree */
+ econtext->ecxt_innertuple = inntuple;
+
+ /* reset temp memory each time to avoid leaks from qual expr */
+ ResetExprContext(econtext);
+
+ if (ExecQual(hjclauses, econtext))
+ {
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
+ }
+ }
+
+ hashTuple = hashTuple->next.unshared;
+ }
+
+ /*
+ * no match
+ */
+ return false;
+}
+
+/*
+ * ExecParallelScanHashBucket
+ * scan a hash bucket for matches to the current outer tuple
+ *
+ * The current outer tuple must be stored in econtext->ecxt_outertuple.
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashBucket(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ ExprState *hjclauses = hjstate->hashclauses;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+ uint32 hashvalue = hjstate->hj_CurHashValue;
+
+ /*
+ * hj_CurTuple is the address of the tuple last returned from the current
+ * bucket, or NULL if it's time to start scanning a new bucket.
+ */
+ if (hashTuple != NULL)
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ else
+ hashTuple = ExecParallelHashFirstTuple(hashtable,
+ hjstate->hj_CurBucketNo);
while (hashTuple != NULL)
{
}
}
- hashTuple = hashTuple->next;
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
}
/*
* bucket.
*/
if (hashTuple != NULL)
- hashTuple = hashTuple->next;
+ hashTuple = hashTuple->next.unshared;
else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
{
- hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
+ hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
hjstate->hj_CurBucketNo++;
}
else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
return true;
}
- hashTuple = hashTuple->next;
+ hashTuple = hashTuple->next.unshared;
}
/* allow this loop to be cancellable */
oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
/* Reallocate and reinitialize the hash bucket headers. */
- hashtable->buckets = (HashJoinTuple *)
+ hashtable->buckets.unshared = (HashJoinTuple *)
palloc0(nbuckets * sizeof(HashJoinTuple));
hashtable->spaceUsed = 0;
/* Reset all flags in the main table ... */
for (i = 0; i < hashtable->nbuckets; i++)
{
- for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next)
+ for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
+ tuple = tuple->next.unshared)
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
}
int j = hashtable->skewBucketNums[i];
HashSkewBucket *skewBucket = hashtable->skewBucket[j];
- for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next)
+ for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
}
}
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
/* Push it onto the front of the skew bucket's list */
- hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
+ hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples;
hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
+ Assert(hashTuple != hashTuple->next.unshared);
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashTuple = bucket->tuples;
while (hashTuple != NULL)
{
- HashJoinTuple nextHashTuple = hashTuple->next;
+ HashJoinTuple nextHashTuple = hashTuple->next.unshared;
MinimalTuple tuple;
Size tupleSize;
memcpy(copyTuple, hashTuple, tupleSize);
pfree(hashTuple);
- copyTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = copyTuple;
+ copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = copyTuple;
/* We have reduced skew space, but overall space doesn't change */
hashtable->spaceUsedSkew -= tupleSize;
if (hashtable->chunks != NULL)
{
newChunk->next = hashtable->chunks->next;
- hashtable->chunks->next = newChunk;
+ hashtable->chunks->next.unshared = newChunk;
}
else
{
- newChunk->next = hashtable->chunks;
+ newChunk->next.unshared = hashtable->chunks;
hashtable->chunks = newChunk;
}
newChunk->used = size;
newChunk->ntuples = 1;
- newChunk->next = hashtable->chunks;
+ newChunk->next.unshared = hashtable->chunks;
hashtable->chunks = newChunk;
return newChunk->data;
/* return pointer to the start of the tuple memory */
return ptr;
}
+
+/*
+ * Allocate space for a tuple in shared dense storage. This is equivalent to
+ * dense_alloc but for Parallel Hash using shared memory.
+ *
+ * While loading a tuple into shared memory, we might run out of memory and
+ * decide to repartition, or determine that the load factor is too high and
+ * decide to expand the bucket array, or discover that another participant has
+ * commanded us to help do that. Return NULL if number of buckets or batches
+ * has changed, indicating that the caller must retry (considering the
+ * possibility that the tuple no longer belongs in the same batch).
+ */
+static HashJoinTuple
+ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
+ dsa_pointer *shared)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ dsa_pointer chunk_shared;
+ HashMemoryChunk chunk;
+ Size chunk_size;
+ HashJoinTuple result;
+ int curbatch = hashtable->curbatch;
+
+ size = MAXALIGN(size);
+
+ /*
+ * Fast path: if there is enough space in this backend's current chunk,
+ * then we can allocate without any locking.
+ */
+ chunk = hashtable->current_chunk;
+ if (chunk != NULL &&
+ size < HASH_CHUNK_THRESHOLD &&
+ chunk->maxlen - chunk->used >= size)
+ {
+
+ chunk_shared = hashtable->current_chunk_shared;
+ Assert(chunk == dsa_get_address(hashtable->area, chunk_shared));
+ *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used;
+ result = (HashJoinTuple) (chunk->data + chunk->used);
+ chunk->used += size;
+
+ Assert(chunk->used <= chunk->maxlen);
+ Assert(result == dsa_get_address(hashtable->area, *shared));
+
+ return result;
+ }
+
+ /* Slow path: try to allocate a new chunk. */
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+
+ /*
+ * Check if we need to help increase the number of buckets or batches.
+ */
+ if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
+ pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ {
+ ParallelHashGrowth growth = pstate->growth;
+
+ hashtable->current_chunk = NULL;
+ LWLockRelease(&pstate->lock);
+
+ /* Another participant has commanded us to help grow. */
+ if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
+ ExecParallelHashIncreaseNumBatches(hashtable);
+ else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ ExecParallelHashIncreaseNumBuckets(hashtable);
+
+ /* The caller must retry. */
+ return NULL;
+ }
+
+ /* Oversized tuples get their own chunk. */
+ if (size > HASH_CHUNK_THRESHOLD)
+ chunk_size = size + HASH_CHUNK_HEADER_SIZE;
+ else
+ chunk_size = HASH_CHUNK_SIZE;
+
+ /* Check if it's time to grow batches or buckets. */
+ if (pstate->growth != PHJ_GROWTH_DISABLED)
+ {
+ Assert(curbatch == 0);
+ Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+
+ /*
+ * Check if our space limit would be exceeded. To avoid choking on
+ * very large tuples or very low work_mem setting, we'll always allow
+ * each backend to allocate at least one chunk.
+ */
+ if (hashtable->batches[0].at_least_one_chunk &&
+ hashtable->batches[0].shared->size +
+ chunk_size > pstate->space_allowed)
+ {
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
+ hashtable->batches[0].shared->space_exhausted = true;
+ LWLockRelease(&pstate->lock);
+
+ return NULL;
+ }
+
+ /* Check if our load factor limit would be exceeded. */
+ if (hashtable->nbatch == 1)
+ {
+ hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples;
+ hashtable->batches[0].ntuples = 0;
+ if (hashtable->batches[0].shared->ntuples + 1 >
+ hashtable->nbuckets * NTUP_PER_BUCKET &&
+ hashtable->nbuckets < (INT_MAX / 2))
+ {
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS;
+ LWLockRelease(&pstate->lock);
+
+ return NULL;
+ }
+ }
+ }
+
+ /* We are cleared to allocate a new chunk. */
+ chunk_shared = dsa_allocate(hashtable->area, chunk_size);
+ hashtable->batches[curbatch].shared->size += chunk_size;
+ hashtable->batches[curbatch].at_least_one_chunk = true;
+
+ /* Set up the chunk. */
+ chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared);
+ *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE;
+ chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE;
+ chunk->used = size;
+
+ /*
+ * Push it onto the list of chunks, so that it can be found if we need to
+ * increase the number of buckets or batches (batch 0 only) and later for
+ * freeing the memory (all batches).
+ */
+ chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
+ hashtable->batches[curbatch].shared->chunks = chunk_shared;
+
+ if (size <= HASH_CHUNK_THRESHOLD)
+ {
+ /*
+ * Make this the current chunk so that we can use the fast path to
+ * fill the rest of it up in future calls.
+ */
+ hashtable->current_chunk = chunk;
+ hashtable->current_chunk_shared = chunk_shared;
+ }
+ LWLockRelease(&pstate->lock);
+
+ Assert(chunk->data == dsa_get_address(hashtable->area, *shared));
+ result = (HashJoinTuple) chunk->data;
+
+ return result;
+}
+
+/*
+ * One backend needs to set up the shared batch state including tuplestores.
+ * Other backends will ensure they have correctly configured accessors by
+ * called ExecParallelHashEnsureBatchAccessors().
+ */
+static void
+ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ ParallelHashJoinBatch *batches;
+ MemoryContext oldcxt;
+ int i;
+
+ Assert(hashtable->batches == NULL);
+
+ /* Allocate space. */
+ pstate->batches =
+ dsa_allocate0(hashtable->area,
+ EstimateParallelHashJoinBatch(hashtable) * nbatch);
+ pstate->nbatch = nbatch;
+ batches = dsa_get_address(hashtable->area, pstate->batches);
+
+ /* Use hash join memory context. */
+ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+
+ /* Allocate this backend's accessor array. */
+ hashtable->nbatch = nbatch;
+ hashtable->batches = (ParallelHashJoinBatchAccessor *)
+ palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch);
+
+ /* Set up the shared state, tuplestores and backend-local accessors. */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
+ ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
+ char name[MAXPGPATH];
+
+ /*
+ * All members of shared were zero-initialized. We just need to set
+ * up the Barrier.
+ */
+ BarrierInit(&shared->batch_barrier, 0);
+ if (i == 0)
+ {
+ /* Batch 0 doesn't need to be loaded. */
+ BarrierAttach(&shared->batch_barrier);
+ while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING)
+ BarrierArriveAndWait(&shared->batch_barrier, 0);
+ BarrierDetach(&shared->batch_barrier);
+ }
+
+ /* Initialize accessor state. All members were zero-initialized. */
+ accessor->shared = shared;
+
+ /* Initialize the shared tuplestores. */
+ snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch);
+ accessor->inner_tuples =
+ sts_initialize(ParallelHashJoinBatchInner(shared),
+ pstate->nparticipants,
+ ParallelWorkerNumber + 1,
+ sizeof(uint32),
+ SHARED_TUPLESTORE_SINGLE_PASS,
+ &pstate->fileset,
+ name);
+ snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch);
+ accessor->outer_tuples =
+ sts_initialize(ParallelHashJoinBatchOuter(shared,
+ pstate->nparticipants),
+ pstate->nparticipants,
+ ParallelWorkerNumber + 1,
+ sizeof(uint32),
+ SHARED_TUPLESTORE_SINGLE_PASS,
+ &pstate->fileset,
+ name);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Free the current set of ParallelHashJoinBatchAccessor objects.
+ */
+static void
+ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
+{
+ int i;
+
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ /* Make sure no files are left open. */
+ sts_end_write(hashtable->batches[i].inner_tuples);
+ sts_end_write(hashtable->batches[i].outer_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].outer_tuples);
+ }
+ pfree(hashtable->batches);
+ hashtable->batches = NULL;
+}
+
+/*
+ * Make sure this backend has up-to-date accessors for the current set of
+ * batches.
+ */
+static void
+ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ ParallelHashJoinBatch *batches;
+ MemoryContext oldcxt;
+ int i;
+
+ if (hashtable->batches != NULL)
+ {
+ if (hashtable->nbatch == pstate->nbatch)
+ return;
+ ExecParallelHashCloseBatchAccessors(hashtable);
+ }
+
+ /*
+ * It's possible for a backend to start up very late so that the whole
+ * join is finished and the shm state for tracking batches has already
+ * been freed by ExecHashTableDetach(). In that case we'll just leave
+ * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
+ * up early.
+ */
+ if (!DsaPointerIsValid(pstate->batches))
+ return;
+
+ /* Use hash join memory context. */
+ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+
+ /* Allocate this backend's accessor array. */
+ hashtable->nbatch = pstate->nbatch;
+ hashtable->batches = (ParallelHashJoinBatchAccessor *)
+ palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch);
+
+ /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */
+ batches = (ParallelHashJoinBatch *)
+ dsa_get_address(hashtable->area, pstate->batches);
+
+ /* Set up the accessor array and attach to the tuplestores. */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
+ ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
+
+ accessor->shared = shared;
+ accessor->preallocated = 0;
+ accessor->done = false;
+ accessor->inner_tuples =
+ sts_attach(ParallelHashJoinBatchInner(shared),
+ ParallelWorkerNumber + 1,
+ &pstate->fileset);
+ accessor->outer_tuples =
+ sts_attach(ParallelHashJoinBatchOuter(shared,
+ pstate->nparticipants),
+ ParallelWorkerNumber + 1,
+ &pstate->fileset);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Allocate an empty shared memory hash table for a given batch.
+ */
+void
+ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
+{
+ ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
+ dsa_pointer_atomic *buckets;
+ int nbuckets = hashtable->parallel_state->nbuckets;
+ int i;
+
+ batch->buckets =
+ dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area, batch->buckets);
+ for (i = 0; i < nbuckets; ++i)
+ dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
+}
+
+/*
+ * If we are currently attached to a shared hash join batch, detach. If we
+ * are last to detach, clean up.
+ */
+void
+ExecHashTableDetachBatch(HashJoinTable hashtable)
+{
+ if (hashtable->parallel_state != NULL &&
+ hashtable->curbatch >= 0)
+ {
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+ /* Detach from the batch we were last working on. */
+ if (BarrierArriveAndDetach(&batch->batch_barrier))
+ {
+ /*
+ * Technically we shouldn't access the barrier because we're no
+ * longer attached, but since there is no way it's moving after
+ * this point it seems safe to make the following assertion.
+ */
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
+
+ /* Free shared chunks and buckets. */
+ while (DsaPointerIsValid(batch->chunks))
+ {
+ HashMemoryChunk chunk =
+ dsa_get_address(hashtable->area, batch->chunks);
+ dsa_pointer next = chunk->next.shared;
+
+ dsa_free(hashtable->area, batch->chunks);
+ batch->chunks = next;
+ }
+ if (DsaPointerIsValid(batch->buckets))
+ {
+ dsa_free(hashtable->area, batch->buckets);
+ batch->buckets = InvalidDsaPointer;
+ }
+ }
+ ExecParallelHashUpdateSpacePeak(hashtable, curbatch);
+ /* Remember that we are not attached to a batch. */
+ hashtable->curbatch = -1;
+ }
+}
+
+/*
+ * Detach from all shared resources. If we are last to detach, clean up.
+ */
+void
+ExecHashTableDetach(HashJoinTable hashtable)
+{
+ if (hashtable->parallel_state)
+ {
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+
+ /* Make sure any temporary files are closed. */
+ if (hashtable->batches)
+ {
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ sts_end_write(hashtable->batches[i].inner_tuples);
+ sts_end_write(hashtable->batches[i].outer_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].outer_tuples);
+ }
+ }
+
+ /* If we're last to detach, clean up shared memory. */
+ if (BarrierDetach(&pstate->build_barrier))
+ {
+ if (DsaPointerIsValid(pstate->batches))
+ {
+ dsa_free(hashtable->area, pstate->batches);
+ pstate->batches = InvalidDsaPointer;
+ }
+ }
+
+ hashtable->parallel_state = NULL;
+ }
+}
+
+/*
+ * Get the first tuple in a given bucket identified by number.
+ */
+static inline HashJoinTuple
+ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
+{
+ HashJoinTuple tuple;
+ dsa_pointer p;
+
+ Assert(hashtable->parallel_state);
+ p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);
+ tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p);
+
+ return tuple;
+}
+
+/*
+ * Get the next tuple in the same bucket as 'tuple'.
+ */
+static inline HashJoinTuple
+ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
+{
+ HashJoinTuple next;
+
+ Assert(hashtable->parallel_state);
+ next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared);
+
+ return next;
+}
+
+/*
+ * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
+ */
+static inline void
+ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+ HashJoinTuple tuple,
+ dsa_pointer tuple_shared)
+{
+ for (;;)
+ {
+ tuple->next.shared = dsa_pointer_atomic_read(head);
+ if (dsa_pointer_atomic_compare_exchange(head,
+ &tuple->next.shared,
+ tuple_shared))
+ break;
+ }
+}
+
+/*
+ * Prepare to work on a given batch.
+ */
+void
+ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
+{
+ Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
+
+ hashtable->curbatch = batchno;
+ hashtable->buckets.shared = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area,
+ hashtable->batches[batchno].shared->buckets);
+ hashtable->nbuckets = hashtable->parallel_state->nbuckets;
+ hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
+ hashtable->current_chunk = NULL;
+ hashtable->current_chunk_shared = InvalidDsaPointer;
+ hashtable->batches[batchno].at_least_one_chunk = false;
+}
+
+/*
+ * Take the next available chunk from the queue of chunks being worked on in
+ * parallel. Return NULL if there are none left. Otherwise return a pointer
+ * to the chunk, and set *shared to the DSA pointer to the chunk.
+ */
+static HashMemoryChunk
+ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ HashMemoryChunk chunk;
+
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+ if (DsaPointerIsValid(pstate->chunk_work_queue))
+ {
+ *shared = pstate->chunk_work_queue;
+ chunk = (HashMemoryChunk)
+ dsa_get_address(hashtable->area, *shared);
+ pstate->chunk_work_queue = chunk->next.shared;
+ }
+ else
+ chunk = NULL;
+ LWLockRelease(&pstate->lock);
+
+ return chunk;
+}
+
+/*
+ * Increase the space preallocated in this backend for a given inner batch by
+ * at least a given amount. This allows us to track whether a given batch
+ * would fit in memory when loaded back in. Also increase the number of
+ * batches or buckets if required.
+ *
+ * This maintains a running estimation of how much space will be taken when we
+ * load the batch back into memory by simulating the way chunks will be handed
+ * out to workers. It's not perfectly accurate because the tuples will be
+ * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but
+ * it should be pretty close. It tends to overestimate by a fraction of a
+ * chunk per worker since all workers gang up to preallocate during hashing,
+ * but workers tend to reload batches alone if there are enough to go around,
+ * leaving fewer partially filled chunks. This effect is bounded by
+ * nparticipants.
+ *
+ * Return false if the number of batches or buckets has changed, and the
+ * caller should reconsider which batch a given tuple now belongs in and call
+ * again.
+ */
+static bool
+ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno];
+ size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE);
+
+ Assert(batchno > 0);
+ Assert(batchno < hashtable->nbatch);
+
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+
+ /* Has another participant commanded us to help grow? */
+ if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
+ pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ {
+ ParallelHashGrowth growth = pstate->growth;
+
+ LWLockRelease(&pstate->lock);
+ if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
+ ExecParallelHashIncreaseNumBatches(hashtable);
+ else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ ExecParallelHashIncreaseNumBuckets(hashtable);
+
+ return false;
+ }
+
+ if (pstate->growth != PHJ_GROWTH_DISABLED &&
+ batch->at_least_one_chunk &&
+ (batch->shared->estimated_size + size > pstate->space_allowed))
+ {
+ /*
+ * We have determined that this batch would exceed the space budget if
+ * loaded into memory. Command all participants to help repartition.
+ */
+ batch->shared->space_exhausted = true;
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
+ LWLockRelease(&pstate->lock);
+
+ return false;
+ }
+
+ batch->at_least_one_chunk = true;
+ batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE;
+ batch->preallocated = want;
+ LWLockRelease(&pstate->lock);
+
+ return true;
+}
+
+/*
+ * Update this backend's copy of hashtable->spacePeak to account for a given
+ * batch. This is called at the end of hashing for batch 0, and then for each
+ * batch when it is done or discovered to be already done. The result is used
+ * for EXPLAIN output.
+ */
+void
+ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno)
+{
+ size_t size;
+
+ size = hashtable->batches[batchno].shared->size;
+ size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets;
+ hashtable->spacePeak = Max(hashtable->spacePeak, size);
+}
* IDENTIFICATION
* src/backend/executor/nodeHashjoin.c
*
+ * PARALLELISM
+ *
+ * Hash joins can participate in parallel query execution in several ways. A
+ * parallel-oblivious hash join is one where the node is unaware that it is
+ * part of a parallel plan. In this case, a copy of the inner plan is used to
+ * build a copy of the hash table in every backend, and the outer plan could
+ * either be built from a partial or complete path, so that the results of the
+ * hash join are correspondingly either partial or complete. A parallel-aware
+ * hash join is one that behaves differently, coordinating work between
+ * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
+ * Hash Join always appears with a Parallel Hash node.
+ *
+ * Parallel-aware hash joins use the same per-backend state machine to track
+ * progress through the hash join algorithm as parallel-oblivious hash joins.
+ * In a parallel-aware hash join, there is also a shared state machine that
+ * co-operating backends use to synchronize their local state machines and
+ * program counters. The shared state machine is managed with a Barrier IPC
+ * primitive. When all attached participants arrive at a barrier, the phase
+ * advances and all waiting participants are released.
+ *
+ * When a participant begins working on a parallel hash join, it must first
+ * figure out how much progress has already been made, because participants
+ * don't wait for each other to begin. For this reason there are switch
+ * statements at key points in the code where we have to synchronize our local
+ * state machine with the phase, and then jump to the correct part of the
+ * algorithm so that we can get started.
+ *
+ * One barrier called build_barrier is used to coordinate the hashing phases.
+ * The phase is represented by an integer which begins at zero and increments
+ * one by one, but in the code it is referred to by symbolic names as follows:
+ *
+ * PHJ_BUILD_ELECTING -- initial state
+ * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
+ * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
+ * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
+ * PHJ_BUILD_DONE -- building done, probing can begin
+ *
+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets. Their phases are as follows:
+ *
+ * PHJ_GROW_BATCHES_ELECTING -- initial state
+ * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
+ * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
+ * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
+ *
+ * PHJ_GROW_BUCKETS_ELECTING -- initial state
+ * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
+ * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
+ *
+ * If the planner got the number of batches and buckets right, those won't be
+ * necessary, but on the other hand we might finish up needing to expand the
+ * buckets or batches multiple times while hashing the inner relation to stay
+ * within our memory budget and load factor target. For that reason it's a
+ * separate pair of barriers using circular phases.
+ *
+ * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
+ * because we need to divide the outer relation into batches up front in order
+ * to be able to process batches entirely independently. In contrast, the
+ * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
+ * batches whenever it encounters them while scanning and probing, which it
+ * can do because it processes batches in serial order.
+ *
+ * Once PHJ_BUILD_DONE is reached, backends then split up and process
+ * different batches, or gang up and work together on probing batches if there
+ * aren't enough to go around. For each batch there is a separate barrier
+ * with the following phases:
+ *
+ * PHJ_BATCH_ELECTING -- initial state
+ * PHJ_BATCH_ALLOCATING -- one allocates buckets
+ * PHJ_BATCH_LOADING -- all load the hash table from disk
+ * PHJ_BATCH_PROBING -- all probe
+ * PHJ_BATCH_DONE -- end
+ *
+ * Batch 0 is a special case, because it starts out in phase
+ * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
+ * PHJ_BUILD_HASHING_INNER so we can skip loading.
+ *
+ * Initially we try to plan for a single-batch hash join using the combined
+ * work_mem of all participants to create a large shared hash table. If that
+ * turns out either at planning or execution time to be impossible then we
+ * fall back to regular work_mem sized hash tables.
+ *
+ * To avoid deadlocks, we never wait for any barrier unless it is known that
+ * all other backends attached to it are actively executing the node or have
+ * already arrived. Practically, that means that we never return a tuple
+ * while attached to a barrier, unless the barrier has reached its final
+ * state. In the slightly special case of the per-batch barrier, we return
+ * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
+ *
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "utils/memutils.h"
+#include "utils/sharedtuplestore.h"
/*
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
+static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue);
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
uint32 *hashvalue,
TupleTableSlot *tupleSlot);
static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
+static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
/* ----------------------------------------------------------------
- * ExecHashJoin
+ * ExecHashJoinImpl
*
- * This function implements the Hybrid Hashjoin algorithm.
+ * This function implements the Hybrid Hashjoin algorithm. It is marked
+ * with an always-inline attribute so that ExecHashJoin() and
+ * ExecParallelHashJoin() can inline it. Compilers that respect the
+ * attribute should create versions specialized for parallel == true and
+ * parallel == false with unnecessary branches removed.
*
* Note: the relation we build hash table on is the "inner"
* the other one is "outer".
* ----------------------------------------------------------------
*/
-static TupleTableSlot * /* return: a tuple or NULL */
-ExecHashJoin(PlanState *pstate)
+pg_attribute_always_inline
+static inline TupleTableSlot *
+ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
HashJoinState *node = castNode(HashJoinState, pstate);
PlanState *outerNode;
TupleTableSlot *outerTupleSlot;
uint32 hashvalue;
int batchno;
+ ParallelHashJoinState *parallel_state;
/*
* get information from HashJoin node
outerNode = outerPlanState(node);
hashtable = node->hj_HashTable;
econtext = node->js.ps.ps_ExprContext;
+ parallel_state = hashNode->parallel_state;
/*
* Reset per-tuple memory context to free any expression evaluation
/* no chance to not build the hash table */
node->hj_FirstOuterTupleSlot = NULL;
}
+ else if (parallel)
+ {
+ /*
+ * The empty-outer optimization is not implemented for
+ * shared hash tables, because no one participant can
+ * determine that there are no outer tuples, and it's not
+ * yet clear that it's worth the synchronization overhead
+ * of reaching consensus to figure that out. So we have
+ * to build the hash table.
+ */
+ node->hj_FirstOuterTupleSlot = NULL;
+ }
else if (HJ_FILL_OUTER(node) ||
(outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
!node->hj_OuterNotEmpty))
node->hj_FirstOuterTupleSlot = NULL;
/*
- * create the hash table
+ * Create the hash table. If using Parallel Hash, then
+ * whoever gets here first will create the hash table and any
+ * later arrivals will merely attach to it.
*/
- hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan,
+ hashtable = ExecHashTableCreate(hashNode,
node->hj_HashOperators,
HJ_FILL_INNER(node));
node->hj_HashTable = hashtable;
/*
- * execute the Hash node, to build the hash table
+ * Execute the Hash node, to build the hash table. If using
+ * Parallel Hash, then we'll try to help hashing unless we
+ * arrived too late.
*/
hashNode->hashtable = hashtable;
(void) MultiExecProcNode((PlanState *) hashNode);
*/
node->hj_OuterNotEmpty = false;
- node->hj_JoinState = HJ_NEED_NEW_OUTER;
+ if (parallel)
+ {
+ Barrier *build_barrier;
+
+ build_barrier = ¶llel_state->build_barrier;
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+ BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+ if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+ {
+ /*
+ * If multi-batch, we need to hash the outer relation
+ * up front.
+ */
+ if (hashtable->nbatch > 1)
+ ExecParallelHashJoinPartitionOuter(node);
+ BarrierArriveAndWait(build_barrier,
+ WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+ }
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+
+ /* Each backend should now select a batch to work on. */
+ hashtable->curbatch = -1;
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+
+ continue;
+ }
+ else
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;
/* FALL THRU */
/*
* We don't have an outer tuple, try to get the next one
*/
- outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
- node,
- &hashvalue);
+ if (parallel)
+ outerTupleSlot =
+ ExecParallelHashJoinOuterGetTuple(outerNode, node,
+ &hashvalue);
+ else
+ outerTupleSlot =
+ ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
+
if (TupIsNull(outerTupleSlot))
{
/* end of batch, or maybe whole join */
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
*/
+ Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
&hashtable->outerBatchFile[batchno]);
+
/* Loop around, staying in HJ_NEED_NEW_OUTER state */
continue;
}
/*
* Scan the selected hash bucket for matches to current outer
*/
- if (!ExecScanHashBucket(node, econtext))
+ if (parallel)
{
- /* out of matches; check for possible outer-join fill */
- node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
- continue;
+ if (!ExecParallelScanHashBucket(node, econtext))
+ {
+ /* out of matches; check for possible outer-join fill */
+ node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+ continue;
+ }
+ }
+ else
+ {
+ if (!ExecScanHashBucket(node, econtext))
+ {
+ /* out of matches; check for possible outer-join fill */
+ node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
+ continue;
+ }
}
/*
/*
* Try to advance to next batch. Done if there are no more.
*/
- if (!ExecHashJoinNewBatch(node))
- return NULL; /* end of join */
+ if (parallel)
+ {
+ if (!ExecParallelHashJoinNewBatch(node))
+ return NULL; /* end of parallel-aware join */
+ }
+ else
+ {
+ if (!ExecHashJoinNewBatch(node))
+ return NULL; /* end of parallel-oblivious join */
+ }
node->hj_JoinState = HJ_NEED_NEW_OUTER;
break;
}
}
+/* ----------------------------------------------------------------
+ * ExecHashJoin
+ *
+ * Parallel-oblivious version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot * /* return: a tuple or NULL */
+ExecHashJoin(PlanState *pstate)
+{
+ /*
+ * On sufficiently smart compilers this should be inlined with the
+ * parallel-aware branches removed.
+ */
+ return ExecHashJoinImpl(pstate, false);
+}
+
+/* ----------------------------------------------------------------
+ * ExecParallelHashJoin
+ *
+ * Parallel-aware version.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot * /* return: a tuple or NULL */
+ExecParallelHashJoin(PlanState *pstate)
+{
+ /*
+ * On sufficiently smart compilers this should be inlined with the
+ * parallel-oblivious branches removed.
+ */
+ return ExecHashJoinImpl(pstate, true);
+}
+
/* ----------------------------------------------------------------
* ExecInitHashJoin
*
hjstate = makeNode(HashJoinState);
hjstate->js.ps.plan = (Plan *) node;
hjstate->js.ps.state = estate;
+
+ /*
+ * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
+ * where this function may be replaced with a parallel version, if we
+ * managed to launch a parallel query.
+ */
hjstate->js.ps.ExecProcNode = ExecHashJoin;
/*
/*
* ExecHashJoinOuterGetTuple
*
- * get the next outer tuple for hashjoin: either by
- * executing the outer plan node in the first pass, or from
- * the temp files for the hashjoin batches.
+ * get the next outer tuple for a parallel oblivious hashjoin: either by
+ * executing the outer plan node in the first pass, or from the temp
+ * files for the hashjoin batches.
*
* Returns a null slot if no more outer tuples (within the current batch).
*
return NULL;
}
+/*
+ * ExecHashJoinOuterGetTuple variant for the parallel case.
+ */
+static TupleTableSlot *
+ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
+ HashJoinState *hjstate,
+ uint32 *hashvalue)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int curbatch = hashtable->curbatch;
+ TupleTableSlot *slot;
+
+ /*
+ * In the Parallel Hash case we only run the outer plan directly for
+ * single-batch hash joins. Otherwise we have to go to batch files, even
+ * for batch 0.
+ */
+ if (curbatch == 0 && hashtable->nbatch == 1)
+ {
+ slot = ExecProcNode(outerNode);
+
+ while (!TupIsNull(slot))
+ {
+ ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+ econtext->ecxt_outertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext,
+ hjstate->hj_OuterHashKeys,
+ true, /* outer tuple */
+ HJ_FILL_OUTER(hjstate),
+ hashvalue))
+ return slot;
+
+ /*
+ * That tuple couldn't match because of a NULL, so discard it and
+ * continue with the next one.
+ */
+ slot = ExecProcNode(outerNode);
+ }
+ }
+ else if (curbatch < hashtable->nbatch)
+ {
+ MinimalTuple tuple;
+
+ tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
+ hashvalue);
+ if (tuple != NULL)
+ {
+ slot = ExecStoreMinimalTuple(tuple,
+ hjstate->hj_OuterTupleSlot,
+ false);
+ return slot;
+ }
+ else
+ ExecClearTuple(hjstate->hj_OuterTupleSlot);
+ }
+
+ /* End of this batch */
+ return NULL;
+}
+
/*
* ExecHashJoinNewBatch
* switch to a new hashjoin batch
return true;
}
+/*
+ * Choose a batch to work on, and attach to it. Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ int start_batchno;
+ int batchno;
+
+ /*
+ * If we started up so late that the batch tracking array has been freed
+ * already by ExecHashTableDetach(), then we are finished. See also
+ * ExecParallelHashEnsureBatchAccessors().
+ */
+ if (hashtable->batches == NULL)
+ return false;
+
+ /*
+ * If we were already attached to a batch, remember not to bother checking
+ * it again, and detach from it (possibly freeing the hash table if we are
+ * last to detach).
+ */
+ if (hashtable->curbatch >= 0)
+ {
+ hashtable->batches[hashtable->curbatch].done = true;
+ ExecHashTableDetachBatch(hashtable);
+ }
+
+ /*
+ * Search for a batch that isn't done. We use an atomic counter to start
+ * our search at a different batch in every participant when there are
+ * more batches than participants.
+ */
+ batchno = start_batchno =
+ pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
+ hashtable->nbatch;
+ do
+ {
+ uint32 hashvalue;
+ MinimalTuple tuple;
+ TupleTableSlot *slot;
+
+ if (!hashtable->batches[batchno].done)
+ {
+ SharedTuplestoreAccessor *inner_tuples;
+ Barrier *batch_barrier =
+ &hashtable->batches[batchno].shared->batch_barrier;
+
+ switch (BarrierAttach(batch_barrier))
+ {
+ case PHJ_BATCH_ELECTING:
+
+ /* One backend allocates the hash table. */
+ if (BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_ELECTING))
+ ExecParallelHashTableAlloc(hashtable, batchno);
+ /* Fall through. */
+
+ case PHJ_BATCH_ALLOCATING:
+ /* Wait for allocation to complete. */
+ BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_BATCH_LOADING:
+ /* Start (or join in) loading tuples. */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ inner_tuples = hashtable->batches[batchno].inner_tuples;
+ sts_begin_parallel_scan(inner_tuples);
+ while ((tuple = sts_parallel_scan_next(inner_tuples,
+ &hashvalue)))
+ {
+ slot = ExecStoreMinimalTuple(tuple,
+ hjstate->hj_HashTupleSlot,
+ false);
+ ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
+ hashvalue);
+ }
+ sts_end_parallel_scan(inner_tuples);
+ BarrierArriveAndWait(batch_barrier,
+ WAIT_EVENT_HASH_BATCH_LOADING);
+ /* Fall through. */
+
+ case PHJ_BATCH_PROBING:
+
+ /*
+ * This batch is ready to probe. Return control to
+ * caller. We stay attached to batch_barrier so that the
+ * hash table stays alive until everyone's finished
+ * probing it, but no participant is allowed to wait at
+ * this barrier again (or else a deadlock could occur).
+ * All attached participants must eventually call
+ * BarrierArriveAndDetach() so that the final phase
+ * PHJ_BATCH_DONE can be reached.
+ */
+ ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
+ sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
+ return true;
+
+ case PHJ_BATCH_DONE:
+
+ /*
+ * Already done. Detach and go around again (if any
+ * remain).
+ */
+ BarrierDetach(batch_barrier);
+
+ /*
+ * We didn't work on this batch, but we need to observe
+ * its size for EXPLAIN.
+ */
+ ExecParallelHashUpdateSpacePeak(hashtable, batchno);
+ hashtable->batches[batchno].done = true;
+ hashtable->curbatch = -1;
+ break;
+
+ default:
+ elog(ERROR, "unexpected batch phase %d",
+ BarrierPhase(batch_barrier));
+ }
+ }
+ batchno = (batchno + 1) % hashtable->nbatch;
+ } while (batchno != start_batchno);
+
+ return false;
+}
+
/*
* ExecHashJoinSaveTuple
* save a tuple to a batch file.
if (node->js.ps.lefttree->chgParam == NULL)
ExecReScan(node->js.ps.lefttree);
}
+
+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+ if (node->hj_HashTable)
+ {
+ /*
+ * Detach from shared state before DSM memory goes away. This makes
+ * sure that we don't have any pointers into DSM memory by the time
+ * ExecEndHashJoin runs.
+ */
+ ExecHashTableDetachBatch(node->hj_HashTable);
+ ExecHashTableDetach(node->hj_HashTable);
+ }
+}
+
+static void
+ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
+{
+ PlanState *outerState = outerPlanState(hjstate);
+ ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ TupleTableSlot *slot;
+ uint32 hashvalue;
+ int i;
+
+ Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
+
+ /* Execute outer plan, writing all tuples to shared tuplestores. */
+ for (;;)
+ {
+ slot = ExecProcNode(outerState);
+ if (TupIsNull(slot))
+ break;
+ econtext->ecxt_outertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext,
+ hjstate->hj_OuterHashKeys,
+ true, /* outer tuple */
+ false, /* outer join, currently unsupported */
+ &hashvalue))
+ {
+ int batchno;
+ int bucketno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
+ &batchno);
+ sts_puttuple(hashtable->batches[batchno].outer_tuples,
+ &hashvalue, ExecFetchSlotMinimalTuple(slot));
+ }
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Make sure all outer partitions are readable by any backend. */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ sts_end_write(hashtable->batches[i].outer_tuples);
+}
+
+void
+ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
+{
+ shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+void
+ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
+{
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ HashState *hashNode;
+ ParallelHashJoinState *pstate;
+
+ /*
+ * Disable shared hash table mode if we failed to create a real DSM
+ * segment, because that means that we don't have a DSA area to work with.
+ */
+ if (pcxt->seg == NULL)
+ return;
+
+ ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
+
+ /*
+ * Set up the state needed to coordinate access to the shared hash
+ * table(s), using the plan node ID as the toc key.
+ */
+ pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
+ shm_toc_insert(pcxt->toc, plan_node_id, pstate);
+
+ /*
+ * Set up the shared hash join state with no batches initially.
+ * ExecHashTableCreate() will prepare at least one later and set nbatch
+ * and space_allowed.
+ */
+ pstate->nbatch = 0;
+ pstate->space_allowed = 0;
+ pstate->batches = InvalidDsaPointer;
+ pstate->old_batches = InvalidDsaPointer;
+ pstate->nbuckets = 0;
+ pstate->growth = PHJ_GROWTH_OK;
+ pstate->chunk_work_queue = InvalidDsaPointer;
+ pg_atomic_init_u32(&pstate->distributor, 0);
+ pstate->nparticipants = pcxt->nworkers + 1;
+ pstate->total_tuples = 0;
+ LWLockInitialize(&pstate->lock,
+ LWTRANCHE_PARALLEL_HASH_JOIN);
+ BarrierInit(&pstate->build_barrier, 0);
+ BarrierInit(&pstate->grow_batches_barrier, 0);
+ BarrierInit(&pstate->grow_buckets_barrier, 0);
+
+ /* Set up the space we'll use for shared temporary files. */
+ SharedFileSetInit(&pstate->fileset, pcxt->seg);
+
+ /* Initialize the shared state in the hash node. */
+ hashNode = (HashState *) innerPlanState(state);
+ hashNode->parallel_state = pstate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecHashJoinReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ ParallelHashJoinState *pstate =
+ shm_toc_lookup(cxt->toc, plan_node_id, false);
+
+ /*
+ * It would be possible to reuse the shared hash table in single-batch
+ * cases by resetting and then fast-forwarding build_barrier to
+ * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
+ * currently shared hash tables are already freed by now (by the last
+ * participant to detach from the batch). We could consider keeping it
+ * around for single-batch joins. We'd also need to adjust
+ * finalize_plan() so that it doesn't record a dummy dependency for
+ * Parallel Hash nodes, preventing the rescan optimization. For now we
+ * don't try.
+ */
+
+ /* Detach, freeing any remaining shared memory. */
+ if (state->hj_HashTable != NULL)
+ {
+ ExecHashTableDetachBatch(state->hj_HashTable);
+ ExecHashTableDetach(state->hj_HashTable);
+ }
+
+ /* Clear any shared batch files. */
+ SharedFileSetDeleteAll(&pstate->fileset);
+
+ /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
+ BarrierInit(&pstate->build_barrier, 0);
+}
+
+void
+ExecHashJoinInitializeWorker(HashJoinState *state,
+ ParallelWorkerContext *pwcxt)
+{
+ HashState *hashNode;
+ int plan_node_id = state->js.ps.plan->plan_node_id;
+ ParallelHashJoinState *pstate =
+ shm_toc_lookup(pwcxt->toc, plan_node_id, false);
+
+ /* Attach to the space for shared temporary files. */
+ SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
+
+ /* Attach to the shared state in the hash node. */
+ hashNode = (HashState *) innerPlanState(state);
+ hashNode->parallel_state = pstate;
+
+ ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
+}
COPY_SCALAR_FIELD(skewTable);
COPY_SCALAR_FIELD(skewColumn);
COPY_SCALAR_FIELD(skewInherit);
+ COPY_SCALAR_FIELD(rows_total);
return newnode;
}
WRITE_OID_FIELD(skewTable);
WRITE_INT_FIELD(skewColumn);
WRITE_BOOL_FIELD(skewInherit);
+ WRITE_FLOAT_FIELD(rows_total, "%.0f");
}
static void
READ_OID_FIELD(skewTable);
READ_INT_FIELD(skewColumn);
READ_BOOL_FIELD(skewInherit);
+ READ_FLOAT_FIELD(rows_total);
READ_DONE();
}
bool enable_gathermerge = true;
bool enable_partition_wise_join = false;
bool enable_parallel_append = true;
+bool enable_parallel_hash = true;
typedef struct
{
JoinType jointype,
List *hashclauses,
Path *outer_path, Path *inner_path,
- JoinPathExtraData *extra)
+ JoinPathExtraData *extra,
+ bool parallel_hash)
{
Cost startup_cost = 0;
Cost run_cost = 0;
double outer_path_rows = outer_path->rows;
double inner_path_rows = inner_path->rows;
+ double inner_path_rows_total = inner_path_rows;
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
int num_skew_mcvs;
+ size_t space_allowed; /* unused */
/* cost of source data */
startup_cost += outer_path->startup_cost;
* inner_path_rows;
run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows;
+ /*
+ * If this is a parallel hash build, then the value we have for
+ * inner_rows_total currently refers only to the rows returned by each
+ * participant. For shared hash table size estimation, we need the total
+ * number, so we need to undo the division.
+ */
+ if (parallel_hash)
+ inner_path_rows_total *= get_parallel_divisor(inner_path);
+
/*
* Get hash table size that executor would use for inner relation.
*
* XXX at some point it might be interesting to try to account for skew
* optimization in the cost estimate, but for now, we don't.
*/
- ExecChooseHashTableSize(inner_path_rows,
+ ExecChooseHashTableSize(inner_path_rows_total,
inner_path->pathtarget->width,
true, /* useskew */
+ parallel_hash, /* try_combined_work_mem */
+ outer_path->parallel_workers,
+ &space_allowed,
&numbuckets,
&numbatches,
&num_skew_mcvs);
workspace->run_cost = run_cost;
workspace->numbuckets = numbuckets;
workspace->numbatches = numbatches;
+ workspace->inner_rows_total = inner_path_rows_total;
}
/*
Path *inner_path = path->jpath.innerjoinpath;
double outer_path_rows = outer_path->rows;
double inner_path_rows = inner_path->rows;
+ double inner_path_rows_total = workspace->inner_rows_total;
List *hashclauses = path->path_hashclauses;
Cost startup_cost = workspace->startup_cost;
Cost run_cost = workspace->run_cost;
/* mark the path with estimated # of batches */
path->num_batches = numbatches;
+ /* store the total number of tuples (sum of partial row estimates) */
+ path->inner_rows_total = inner_path_rows_total;
+
/* and compute the number of "virtual" buckets in the whole join */
virtualbuckets = (double) numbuckets * (double) numbatches;
* never have any output pathkeys, per comments in create_hashjoin_path.
*/
initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
- outer_path, inner_path, extra);
+ outer_path, inner_path, extra, false);
if (add_path_precheck(joinrel,
workspace.startup_cost, workspace.total_cost,
extra,
outer_path,
inner_path,
+ false, /* parallel_hash */
extra->restrictlist,
required_outer,
hashclauses));
* try_partial_hashjoin_path
* Consider a partial hashjoin join path; if it appears useful, push it into
* the joinrel's partial_pathlist via add_partial_path().
+ * The outer side is partial. If parallel_hash is true, then the inner path
+ * must be partial and will be run in parallel to create one or more shared
+ * hash tables; otherwise the inner path must be complete and a copy of it
+ * is run in every process to create separate identical private hash tables.
*/
static void
try_partial_hashjoin_path(PlannerInfo *root,
Path *inner_path,
List *hashclauses,
JoinType jointype,
- JoinPathExtraData *extra)
+ JoinPathExtraData *extra,
+ bool parallel_hash)
{
JoinCostWorkspace workspace;
* cost. Bail out right away if it looks terrible.
*/
initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
- outer_path, inner_path, extra);
+ outer_path, inner_path, extra, true);
if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL))
return;
extra,
outer_path,
inner_path,
+ parallel_hash,
extra->restrictlist,
NULL,
hashclauses));
* able to properly guarantee uniqueness. Similarly, we can't handle
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
* extended rows. Also, the resulting path must not be parameterized.
+ * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
+ * Hash, since in that case we're back to a single hash table with a
+ * single set of match bits for each batch, but that will require
+ * figuring out a deadlock-free way to wait for the probe to finish.
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
bms_is_empty(joinrel->lateral_relids))
{
Path *cheapest_partial_outer;
+ Path *cheapest_partial_inner = NULL;
Path *cheapest_safe_inner = NULL;
cheapest_partial_outer =
(Path *) linitial(outerrel->partial_pathlist);
+ /*
+ * Can we use a partial inner plan too, so that we can build a
+ * shared hash table in parallel?
+ */
+ if (innerrel->partial_pathlist != NIL && enable_parallel_hash)
+ {
+ cheapest_partial_inner =
+ (Path *) linitial(innerrel->partial_pathlist);
+ try_partial_hashjoin_path(root, joinrel,
+ cheapest_partial_outer,
+ cheapest_partial_inner,
+ hashclauses, jointype, extra,
+ true /* parallel_hash */ );
+ }
+
/*
* Normally, given that the joinrel is parallel-safe, the cheapest
* total inner path will also be parallel-safe, but if not, we'll
try_partial_hashjoin_path(root, joinrel,
cheapest_partial_outer,
cheapest_safe_inner,
- hashclauses, jointype, extra);
+ hashclauses, jointype, extra,
+ false /* parallel_hash */ );
}
}
}
copy_plan_costsize(&hash_plan->plan, inner_plan);
hash_plan->plan.startup_cost = hash_plan->plan.total_cost;
+ /*
+ * If parallel-aware, the executor will also need an estimate of the total
+ * number of rows expected from all participants so that it can size the
+ * shared hash table.
+ */
+ if (best_path->jpath.path.parallel_aware)
+ {
+ hash_plan->plan.parallel_aware = true;
+ hash_plan->rows_total = best_path->inner_rows_total;
+ }
+
join_plan = make_hashjoin(tlist,
joinclauses,
otherclauses,
* 'extra' contains various information about the join
* 'outer_path' is the cheapest outer path
* 'inner_path' is the cheapest inner path
+ * 'parallel_hash' to select Parallel Hash of inner path (shared hash table)
* 'restrict_clauses' are the RestrictInfo nodes to apply at the join
* 'required_outer' is the set of required outer rels
* 'hashclauses' are the RestrictInfo nodes to use as hash clauses
JoinPathExtraData *extra,
Path *outer_path,
Path *inner_path,
+ bool parallel_hash,
List *restrict_clauses,
Relids required_outer,
List *hashclauses)
extra->sjinfo,
required_outer,
&restrict_clauses);
- pathnode->jpath.path.parallel_aware = false;
+ pathnode->jpath.path.parallel_aware =
+ joinrel->consider_parallel && parallel_hash;
pathnode->jpath.path.parallel_safe = joinrel->consider_parallel &&
outer_path->parallel_safe && inner_path->parallel_safe;
/* This is a foolish way to estimate parallel_workers, but for now... */
case WAIT_EVENT_EXECUTE_GATHER:
event_name = "ExecuteGather";
break;
+ case WAIT_EVENT_HASH_BATCH_ALLOCATING:
+ event_name = "Hash/Batch/Allocating";
+ break;
+ case WAIT_EVENT_HASH_BATCH_ELECTING:
+ event_name = "Hash/Batch/Electing";
+ break;
+ case WAIT_EVENT_HASH_BATCH_LOADING:
+ event_name = "Hash/Batch/Loading";
+ break;
+ case WAIT_EVENT_HASH_BUILD_ALLOCATING:
+ event_name = "Hash/Build/Allocating";
+ break;
+ case WAIT_EVENT_HASH_BUILD_ELECTING:
+ event_name = "Hash/Build/Electing";
+ break;
+ case WAIT_EVENT_HASH_BUILD_HASHING_INNER:
+ event_name = "Hash/Build/HashingInner";
+ break;
+ case WAIT_EVENT_HASH_BUILD_HASHING_OUTER:
+ event_name = "Hash/Build/HashingOuter";
+ break;
+ case WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING:
+ event_name = "Hash/GrowBatches/Allocating";
+ break;
+ case WAIT_EVENT_HASH_GROW_BATCHES_DECIDING:
+ event_name = "Hash/GrowBatches/Deciding";
+ break;
+ case WAIT_EVENT_HASH_GROW_BATCHES_ELECTING:
+ event_name = "Hash/GrowBatches/Electing";
+ break;
+ case WAIT_EVENT_HASH_GROW_BATCHES_FINISHING:
+ event_name = "Hash/GrowBatches/Finishing";
+ break;
+ case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING:
+ event_name = "Hash/GrowBatches/Repartitioning";
+ break;
+ case WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING:
+ event_name = "Hash/GrowBuckets/Allocating";
+ break;
+ case WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING:
+ event_name = "Hash/GrowBuckets/Electing";
+ break;
+ case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING:
+ event_name = "Hash/GrowBuckets/Reinserting";
+ break;
case WAIT_EVENT_LOGICAL_SYNC_DATA:
event_name = "LogicalSyncData";
break;
true,
NULL, NULL, NULL
},
-
+ {
+ {"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's user of parallel hash plans."),
+ NULL
+ },
+ &enable_parallel_hash,
+ true,
+ NULL, NULL, NULL
+ },
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
#enable_sort = on
#enable_tidscan = on
#enable_partition_wise_join = off
+#enable_parallel_hash = on
# - Planner Cost Constants -
#define HASHJOIN_H
#include "nodes/execnodes.h"
+#include "port/atomics.h"
+#include "storage/barrier.h"
#include "storage/buffile.h"
+#include "storage/lwlock.h"
/* ----------------------------------------------------------------
* hash-join hash table structures
typedef struct HashJoinTupleData
{
- struct HashJoinTupleData *next; /* link to next tuple in same bucket */
+ /* link to next tuple in same bucket */
+ union
+ {
+ struct HashJoinTupleData *unshared;
+ dsa_pointer shared;
+ } next;
uint32 hashvalue; /* tuple's hash code */
/* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
} HashJoinTupleData;
size_t maxlen; /* size of the buffer holding the tuples */
size_t used; /* number of buffer bytes already used */
- struct HashMemoryChunkData *next; /* pointer to the next chunk (linked
- * list) */
+ /* pointer to the next chunk (linked list) */
+ union
+ {
+ struct HashMemoryChunkData *unshared;
+ dsa_pointer shared;
+ } next;
char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */
} HashMemoryChunkData;
typedef struct HashMemoryChunkData *HashMemoryChunk;
#define HASH_CHUNK_SIZE (32 * 1024L)
+#define HASH_CHUNK_HEADER_SIZE (offsetof(HashMemoryChunkData, data))
#define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
+/*
+ * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
+ * object in shared memory to coordinate access to it. Since they are
+ * followed by variable-sized objects, they are arranged in contiguous memory
+ * but not accessed directly as an array.
+ */
+typedef struct ParallelHashJoinBatch
+{
+ dsa_pointer buckets; /* array of hash table buckets */
+ Barrier batch_barrier; /* synchronization for joining this batch */
+
+ dsa_pointer chunks; /* chunks of tuples loaded */
+ size_t size; /* size of buckets + chunks in memory */
+ size_t estimated_size; /* size of buckets + chunks while writing */
+ size_t ntuples; /* number of tuples loaded */
+ size_t old_ntuples; /* number of tuples before repartitioning */
+ bool space_exhausted;
+
+ /*
+ * Variable-sized SharedTuplestore objects follow this struct in memory.
+ * See the accessor macros below.
+ */
+} ParallelHashJoinBatch;
+
+/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
+#define ParallelHashJoinBatchInner(batch) \
+ ((SharedTuplestore *) \
+ ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
+
+/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
+#define ParallelHashJoinBatchOuter(batch, nparticipants) \
+ ((SharedTuplestore *) \
+ ((char *) ParallelHashJoinBatchInner(batch) + \
+ MAXALIGN(sts_estimate(nparticipants))))
+
+/* Total size of a ParallelHashJoinBatch and tuplestores. */
+#define EstimateParallelHashJoinBatch(hashtable) \
+ (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
+ MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
+
+/* Accessor for the nth ParallelHashJoinBatch given the base. */
+#define NthParallelHashJoinBatch(base, n) \
+ ((ParallelHashJoinBatch *) \
+ ((char *) (base) + \
+ EstimateParallelHashJoinBatch(hashtable) * (n)))
+
+/*
+ * Each backend requires a small amount of per-batch state to interact with
+ * each ParalellHashJoinBatch.
+ */
+typedef struct ParallelHashJoinBatchAccessor
+{
+ ParallelHashJoinBatch *shared; /* pointer to shared state */
+
+ /* Per-backend partial counters to reduce contention. */
+ size_t preallocated; /* pre-allocated space for this backend */
+ size_t ntuples; /* number of tuples */
+ size_t size; /* size of partition in memory */
+ size_t estimated_size; /* size of partition on disk */
+ size_t old_ntuples; /* how many tuples before repartioning? */
+ bool at_least_one_chunk; /* has this backend allocated a chunk? */
+
+ bool done; /* flag to remember that a batch is done */
+ SharedTuplestoreAccessor *inner_tuples;
+ SharedTuplestoreAccessor *outer_tuples;
+} ParallelHashJoinBatchAccessor;
+
+/*
+ * While hashing the inner relation, any participant might determine that it's
+ * time to increase the number of buckets to reduce the load factor or batches
+ * to reduce the memory size. This is indicated by setting the growth flag to
+ * these values.
+ */
+typedef enum ParallelHashGrowth
+{
+ /* The current dimensions are sufficient. */
+ PHJ_GROWTH_OK,
+ /* The load factor is too high, so we need to add buckets. */
+ PHJ_GROWTH_NEED_MORE_BUCKETS,
+ /* The memory budget would be exhausted, so we need to repartition. */
+ PHJ_GROWTH_NEED_MORE_BATCHES,
+ /* Repartitioning didn't help last time, so don't try to do that again. */
+ PHJ_GROWTH_DISABLED
+} ParallelHashGrowth;
+
+/*
+ * The shared state used to coordinate a Parallel Hash Join. This is stored
+ * in the DSM segment.
+ */
+typedef struct ParallelHashJoinState
+{
+ dsa_pointer batches; /* array of ParallelHashJoinBatch */
+ dsa_pointer old_batches; /* previous generation during repartition */
+ int nbatch; /* number of batches now */
+ int old_nbatch; /* previous number of batches */
+ int nbuckets; /* number of buckets */
+ ParallelHashGrowth growth; /* control batch/bucket growth */
+ dsa_pointer chunk_work_queue; /* chunk work queue */
+ int nparticipants;
+ size_t space_allowed;
+ size_t total_tuples; /* total number of inner tuples */
+ LWLock lock; /* lock protecting the above */
+
+ Barrier build_barrier; /* synchronization for the build phases */
+ Barrier grow_batches_barrier;
+ Barrier grow_buckets_barrier;
+ pg_atomic_uint32 distributor; /* counter for load balancing */
+
+ SharedFileSet fileset; /* space for shared temporary files */
+} ParallelHashJoinState;
+
+/* The phases for building batches, used by build_barrier. */
+#define PHJ_BUILD_ELECTING 0
+#define PHJ_BUILD_ALLOCATING 1
+#define PHJ_BUILD_HASHING_INNER 2
+#define PHJ_BUILD_HASHING_OUTER 3
+#define PHJ_BUILD_DONE 4
+
+/* The phases for probing each batch, used by for batch_barrier. */
+#define PHJ_BATCH_ELECTING 0
+#define PHJ_BATCH_ALLOCATING 1
+#define PHJ_BATCH_LOADING 2
+#define PHJ_BATCH_PROBING 3
+#define PHJ_BATCH_DONE 4
+
+/* The phases of batch growth while hashing, for grow_batches_barrier. */
+#define PHJ_GROW_BATCHES_ELECTING 0
+#define PHJ_GROW_BATCHES_ALLOCATING 1
+#define PHJ_GROW_BATCHES_REPARTITIONING 2
+#define PHJ_GROW_BATCHES_DECIDING 3
+#define PHJ_GROW_BATCHES_FINISHING 4
+#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
+
+/* The phases of bucket growth while hashing, for grow_buckets_barrier. */
+#define PHJ_GROW_BUCKETS_ELECTING 0
+#define PHJ_GROW_BUCKETS_ALLOCATING 1
+#define PHJ_GROW_BUCKETS_REINSERTING 2
+#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
+
typedef struct HashJoinTableData
{
int nbuckets; /* # buckets in the in-memory hash table */
int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
/* buckets[i] is head of list of tuples in i'th in-memory bucket */
- struct HashJoinTupleData **buckets;
- /* buckets array is per-batch storage, as are all the tuples */
+ union
+ {
+ /* unshared array is per-batch storage, as are all the tuples */
+ struct HashJoinTupleData **unshared;
+ /* shared array is per-query DSA area, as are all the tuples */
+ dsa_pointer_atomic *shared;
+ } buckets;
bool keepNulls; /* true to store unmatchable NULL tuples */
bool growEnabled; /* flag to shut off nbatch increases */
double totalTuples; /* # tuples obtained from inner plan */
+ double partialTuples; /* # tuples obtained from inner plan by me */
double skewTuples; /* # tuples inserted into skew tuples */
/*
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
+
+ /* Shared and private state for Parallel Hash. */
+ HashMemoryChunk current_chunk; /* this backend's current chunk */
+ dsa_area *area; /* DSA area to allocate memory from */
+ ParallelHashJoinState *parallel_state;
+ ParallelHashJoinBatchAccessor *batches;
+ dsa_pointer current_chunk_shared;
} HashJoinTableData;
#endif /* HASHJOIN_H */
#include "access/parallel.h"
#include "nodes/execnodes.h"
+struct SharedHashJoinBatch;
+
extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
extern Node *MultiExecHash(HashState *node);
extern void ExecEndHash(HashState *node);
extern void ExecReScanHash(HashState *node);
-extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators,
+extern HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls);
+extern void ExecParallelHashTableAlloc(HashJoinTable hashtable,
+ int batchno);
extern void ExecHashTableDestroy(HashJoinTable hashtable);
+extern void ExecHashTableDetach(HashJoinTable hashtable);
+extern void ExecHashTableDetachBatch(HashJoinTable hashtable);
+extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable,
+ int batchno);
+void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno);
+
extern void ExecHashTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue);
+extern void ExecParallelHashTableInsert(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue);
+extern void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue);
extern bool ExecHashGetHashValue(HashJoinTable hashtable,
ExprContext *econtext,
List *hashkeys,
int *bucketno,
int *batchno);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
+extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
+ bool try_combined_work_mem,
+ int parallel_workers,
+ size_t *space_allowed,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs);
extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node);
extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
- HashJoinTable hashtable);
+ HashJoinTable hashtable);
#endif /* NODEHASH_H */
extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags);
extern void ExecEndHashJoin(HashJoinState *node);
extern void ExecReScanHashJoin(HashJoinState *node);
+extern void ExecShutdownHashJoin(HashJoinState *node);
+extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt);
+extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt);
+extern void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt);
+extern void ExecHashJoinInitializeWorker(HashJoinState *state,
+ ParallelWorkerContext *pwcxt);
extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
BufFile **fileptr);
#include "utils/hsearch.h"
#include "utils/queryenvironment.h"
#include "utils/reltrigger.h"
+#include "utils/sharedtuplestore.h"
#include "utils/sortsupport.h"
#include "utils/tuplestore.h"
#include "utils/tuplesort.h"
struct ExprContext;
struct ExprEvalStep; /* avoid including execExpr.h everywhere */
+struct ParallelHashJoinState;
+
typedef Datum (*ExprStateEvalFunc) (struct ExprState *expression,
struct ExprContext *econtext,
bool *isNull);
SharedHashInfo *shared_info; /* one entry per worker */
HashInstrumentation *hinstrument; /* this worker's entry */
+
+ /* Parallel hash state. */
+ struct ParallelHashJoinState *parallel_state;
} HashState;
/* ----------------
AttrNumber skewColumn; /* outer join key's column #, or zero */
bool skewInherit; /* is outer join rel an inheritance tree? */
/* all other info is in the parent HashJoin node */
+ double rows_total; /* estimate total rows if parallel_aware */
} Hash;
/* ----------------
JoinPath jpath;
List *path_hashclauses; /* join clauses used for hashing */
int num_batches; /* number of batches expected */
+ double inner_rows_total; /* total inner rows expected */
} HashPath;
/*
/* private for cost_hashjoin code */
int numbuckets;
int numbatches;
+ double inner_rows_total;
} JoinCostWorkspace;
#endif /* RELATION_H */
extern bool enable_gathermerge;
extern bool enable_partition_wise_join;
extern bool enable_parallel_append;
+extern bool enable_parallel_hash;
extern int constraint_exclusion;
extern double clamp_row_est(double nrows);
JoinType jointype,
List *hashclauses,
Path *outer_path, Path *inner_path,
- JoinPathExtraData *extra);
+ JoinPathExtraData *extra,
+ bool parallel_hash);
extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path,
JoinCostWorkspace *workspace,
JoinPathExtraData *extra);
JoinPathExtraData *extra,
Path *outer_path,
Path *inner_path,
+ bool parallel_hash,
List *restrict_clauses,
Relids required_outer,
List *hashclauses);
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
WAIT_EVENT_EXECUTE_GATHER,
+ WAIT_EVENT_HASH_BATCH_ALLOCATING,
+ WAIT_EVENT_HASH_BATCH_ELECTING,
+ WAIT_EVENT_HASH_BATCH_LOADING,
+ WAIT_EVENT_HASH_BUILD_ALLOCATING,
+ WAIT_EVENT_HASH_BUILD_ELECTING,
+ WAIT_EVENT_HASH_BUILD_HASHING_INNER,
+ WAIT_EVENT_HASH_BUILD_HASHING_OUTER,
+ WAIT_EVENT_HASH_GROW_BATCHES_ELECTING,
+ WAIT_EVENT_HASH_GROW_BATCHES_FINISHING,
+ WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING,
+ WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING,
+ WAIT_EVENT_HASH_GROW_BATCHES_DECIDING,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING,
+ WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING,
WAIT_EVENT_LOGICAL_SYNC_DATA,
WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
WAIT_EVENT_MQ_INTERNAL,
LWTRANCHE_BUFFER_MAPPING,
LWTRANCHE_LOCK_MANAGER,
LWTRANCHE_PREDICATE_LOCK_MANAGER,
+ LWTRANCHE_PARALLEL_HASH_JOIN,
LWTRANCHE_PARALLEL_QUERY_DSA,
LWTRANCHE_SESSION_DSA,
LWTRANCHE_SESSION_RECORD_TABLE,
update pg_class
set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
where relname = 'extremely_skewed';
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
-- The "optimal" case: the hash table fits in memory; we plan for 1
-- batch, we stick to that number, and peak memory usage stays within
-- our work_mem budget
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '4MB';
+set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join simple s using (id);
QUERY PLAN
f | f
(1 row)
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local enable_parallel_hash = on;
+explain (costs off)
+ select count(*) from simple r join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+ from hash_join_batches(
+$$
+ select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches
+----------------------+-------------------
+ f | f
+(1 row)
+
rollback to settings;
-- The "good" case: batches required, but we plan the right number; we
-- plan for some number of batches, and we stick to that number, and
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
+set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join simple s using (id);
QUERY PLAN
t | f
(1 row)
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local enable_parallel_hash = on;
+explain (costs off)
+ select count(*) from simple r join simple s using (id);
+ QUERY PLAN
+-------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+ from hash_join_batches(
+$$
+ select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches
+----------------------+-------------------
+ t | f
+(1 row)
+
rollback to settings;
-- The "bad" case: during execution we need to increase number of
-- batches; in this case we plan for 1 batch, and increase at least a
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
+set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join bigger_than_it_looks s using (id);
QUERY PLAN
f | t
(1 row)
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local enable_parallel_hash = on;
+explain (costs off)
+ select count(*) from simple r join bigger_than_it_looks s using (id);
+ QUERY PLAN
+---------------------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 1
+ -> Partial Aggregate
+ -> Parallel Hash Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+ from hash_join_batches(
+$$
+ select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches
+----------------------+-------------------
+ f | t
+(1 row)
+
rollback to settings;
-- The "ugly" case: increasing the number of batches during execution
-- doesn't help, so stop trying to fit in work_mem and hope for the
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
+set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join extremely_skewed s using (id);
QUERY PLAN
1 | 2
(1 row)
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local enable_parallel_hash = on;
+explain (costs off)
+ select count(*) from simple r join extremely_skewed s using (id);
+ QUERY PLAN
+-----------------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 1
+ -> Partial Aggregate
+ -> Parallel Hash Join
+ Hash Cond: (r.id = s.id)
+ -> Parallel Seq Scan on simple r
+ -> Parallel Hash
+ -> Parallel Seq Scan on extremely_skewed s
+(9 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+ select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final
+----------+-------
+ 1 | 4
+(1 row)
+
rollback to settings;
-- A couple of other hash join tests unrelated to work_mem management.
-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
-- that we can check that instrumentation comes back correctly.
create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
alter table foo set (parallel_workers = 0);
-create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t;
+create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
alter table bar set (parallel_workers = 2);
-- multi-batch with rescan, parallel-oblivious
savepoint settings;
+set enable_parallel_hash = off;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
rollback to settings;
-- single-batch with rescan, parallel-oblivious
savepoint settings;
+set enable_parallel_hash = off;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
f
(1 row)
+rollback to settings;
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+explain (costs off)
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+ QUERY PLAN
+--------------------------------------------------------------------------
+ Aggregate
+ -> Nested Loop Left Join
+ Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1)))
+ -> Seq Scan on foo
+ -> Gather
+ Workers Planned: 2
+ -> Parallel Hash Join
+ Hash Cond: (b1.id = b2.id)
+ -> Parallel Seq Scan on bar b1
+ -> Parallel Hash
+ -> Parallel Seq Scan on bar b2
+(11 rows)
+
+select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+ count
+-------
+ 3
+(1 row)
+
+select final > 1 as multibatch
+ from hash_join_batches(
+$$
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+$$);
+ multibatch
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+explain (costs off)
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+ QUERY PLAN
+--------------------------------------------------------------------------
+ Aggregate
+ -> Nested Loop Left Join
+ Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1)))
+ -> Seq Scan on foo
+ -> Gather
+ Workers Planned: 2
+ -> Parallel Hash Join
+ Hash Cond: (b1.id = b2.id)
+ -> Parallel Seq Scan on bar b1
+ -> Parallel Hash
+ -> Parallel Seq Scan on bar b2
+(11 rows)
+
+select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+ count
+-------
+ 3
+(1 row)
+
+select final > 1 as multibatch
+ from hash_join_batches(
+$$
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+$$);
+ multibatch
+------------
+ f
+(1 row)
+
rollback to settings;
-- A full outer join where every record is matched.
-- non-parallel
40000
(1 row)
+rollback to settings;
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+explain (costs off)
+ select length(max(s.t))
+ from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+ QUERY PLAN
+----------------------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 2
+ -> Partial Aggregate
+ -> Parallel Hash Left Join
+ Hash Cond: (wide.id = wide_1.id)
+ -> Parallel Seq Scan on wide
+ -> Parallel Hash
+ -> Parallel Seq Scan on wide wide_1
+(9 rows)
+
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+ length
+--------
+ 320000
+(1 row)
+
+select final > 1 as multibatch
+ from hash_join_batches(
+$$
+ select length(max(s.t))
+ from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+ multibatch
+------------
+ t
+(1 row)
+
rollback to settings;
rollback;
enable_mergejoin | on
enable_nestloop | on
enable_parallel_append | on
+ enable_parallel_hash | on
enable_partition_wise_join | off
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(14 rows)
+(15 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
where relname = 'extremely_skewed';
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+
-- The "optimal" case: the hash table fits in memory; we plan for 1
-- batch, we stick to that number, and peak memory usage stays within
-- our work_mem budget
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '4MB';
+set local enable_parallel_hash = off;
+explain (costs off)
+ select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+ from hash_join_batches(
+$$
+ select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join simple s using (id);
select count(*) from simple r join simple s using (id);
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
+set local enable_parallel_hash = off;
+explain (costs off)
+ select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+ from hash_join_batches(
+$$
+ select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join simple s using (id);
select count(*) from simple r join simple s using (id);
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
+set local enable_parallel_hash = off;
+explain (costs off)
+ select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+ from hash_join_batches(
+$$
+ select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join bigger_than_it_looks s using (id);
select count(*) from simple r join bigger_than_it_looks s using (id);
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
+set local enable_parallel_hash = off;
+explain (costs off)
+ select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+ select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join extremely_skewed s using (id);
select count(*) from simple r join extremely_skewed s using (id);
create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
alter table foo set (parallel_workers = 0);
-create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t;
+create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
alter table bar set (parallel_workers = 2);
-- multi-batch with rescan, parallel-oblivious
savepoint settings;
+set enable_parallel_hash = off;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
-- single-batch with rescan, parallel-oblivious
savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+explain (costs off)
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+select final > 1 as multibatch
+ from hash_join_batches(
+$$
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+explain (costs off)
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+select final > 1 as multibatch
+ from hash_join_batches(
+$$
+ select count(*) from foo
+ left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
+ on foo.id < ss.id + 1 and foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+explain (costs off)
+ select length(max(s.t))
+ from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select final > 1 as multibatch
+ from hash_join_batches(
+$$
+ select length(max(s.t))
+ from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+rollback to settings;
+
rollback;
ParallelCompletionPtr
ParallelContext
ParallelExecutorInfo
+ParallelHashGrowth
+ParallelHashJoinBatch
+ParallelHashJoinBatchAccessor
+ParallelHashJoinState
ParallelHeapScanDesc
ParallelIndexScanDesc
ParallelSlot