Fix EXPLAIN ANALYZE for parallel HashAgg plans
authorDavid Rowley <drowley@postgresql.org>
Fri, 19 Jun 2020 05:24:27 +0000 (17:24 +1200)
committerDavid Rowley <drowley@postgresql.org>
Fri, 19 Jun 2020 05:24:27 +0000 (17:24 +1200)
Since 1f39bce02, HashAgg nodes have had the ability to spill to disk when
memory consumption exceeds work_mem. That commit added new properties to
EXPLAIN ANALYZE to show the maximum memory usage and disk usage, however,
it didn't quite go as far as showing that information for parallel
workers.  Since workers may have experienced something very different from
the main process, we should show this information per worker, as is done
in Sort.

Reviewed-by: Justin Pryzby
Reviewed-by: Jeff Davis
Discussion: https://postgr.es/m/CAApHDvpEKbfZa18mM1TD7qV6PG+w97pwCWq5tVD0dX7e11gRJw@mail.gmail.com
Backpatch-through: 13, where the hashagg spilling code was added.

src/backend/commands/explain.c
src/backend/executor/execParallel.c
src/backend/executor/nodeAgg.c
src/include/executor/nodeAgg.h
src/include/nodes/execnodes.h

index 9092b4b3094432a4acc66e613a01ebe0a3b8511e..67bdcb2b27852a7defbbcd3c1020a5c09884804e 100644 (file)
@@ -3051,29 +3051,111 @@ show_hashagg_info(AggState *aggstate, ExplainState *es)
    Agg        *agg = (Agg *) aggstate->ss.ps.plan;
    int64       memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024;
 
-   Assert(IsA(aggstate, AggState));
-
    if (agg->aggstrategy != AGG_HASHED &&
        agg->aggstrategy != AGG_MIXED)
        return;
 
-   if (es->costs && aggstate->hash_planned_partitions > 0)
+   if (es->format != EXPLAIN_FORMAT_TEXT)
    {
-       ExplainPropertyInteger("Planned Partitions", NULL,
-                              aggstate->hash_planned_partitions, es);
+
+       if (es->costs && aggstate->hash_planned_partitions > 0)
+       {
+           ExplainPropertyInteger("Planned Partitions", NULL,
+                                  aggstate->hash_planned_partitions, es);
+       }
+
+       if (!es->analyze)
+           return;
+
+       /* EXPLAIN ANALYZE */
+       ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
+       if (aggstate->hash_batches_used > 0)
+       {
+           ExplainPropertyInteger("Disk Usage", "kB",
+                                  aggstate->hash_disk_used, es);
+           ExplainPropertyInteger("HashAgg Batches", NULL,
+                                  aggstate->hash_batches_used, es);
+       }
    }
+   else
+   {
+       bool        gotone = false;
 
-   if (!es->analyze)
-       return;
+       if (es->costs && aggstate->hash_planned_partitions > 0)
+       {
+           ExplainIndentText(es);
+           appendStringInfo(es->str, "Planned Partitions: %d",
+                            aggstate->hash_planned_partitions);
+           gotone = true;
+       }
+
+       if (!es->analyze)
+       {
+           if (gotone)
+               appendStringInfoChar(es->str, '\n');
+           return;
+       }
+
+       if (!gotone)
+           ExplainIndentText(es);
+       else
+           appendStringInfoString(es->str, "  ");
+
+       appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB",
+                        memPeakKb);
 
-   /* EXPLAIN ANALYZE */
-   ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
-   if (aggstate->hash_batches_used > 0)
+       if (aggstate->hash_batches_used > 0)
+           appendStringInfo(es->str, "  Disk Usage: " UINT64_FORMAT " kB  HashAgg Batches: %d",
+                            aggstate->hash_disk_used,
+                            aggstate->hash_batches_used);
+       appendStringInfoChar(es->str, '\n');
+   }
+
+   /* Display stats for each parallel worker */
+   if (es->analyze && aggstate->shared_info != NULL)
    {
-       ExplainPropertyInteger("Disk Usage", "kB",
-                              aggstate->hash_disk_used, es);
-       ExplainPropertyInteger("HashAgg Batches", NULL,
-                              aggstate->hash_batches_used, es);
+       for (int n = 0; n < aggstate->shared_info->num_workers; n++)
+       {
+           AggregateInstrumentation *sinstrument;
+           uint64      hash_disk_used;
+           int         hash_batches_used;
+
+           sinstrument = &aggstate->shared_info->sinstrument[n];
+           hash_disk_used = sinstrument->hash_disk_used;
+           hash_batches_used = sinstrument->hash_batches_used;
+           memPeakKb = (sinstrument->hash_mem_peak + 1023) / 1024;
+
+           if (es->workers_state)
+               ExplainOpenWorker(n, es);
+
+           if (es->format == EXPLAIN_FORMAT_TEXT)
+           {
+               ExplainIndentText(es);
+
+               appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB",
+                                memPeakKb);
+
+               if (hash_batches_used > 0)
+                   appendStringInfo(es->str, "  Disk Usage: " UINT64_FORMAT " kB  HashAgg Batches: %d",
+                                    hash_disk_used, hash_batches_used);
+               appendStringInfoChar(es->str, '\n');
+           }
+           else
+           {
+               ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb,
+                                      es);
+               if (hash_batches_used > 0)
+               {
+                   ExplainPropertyInteger("Disk Usage", "kB", hash_disk_used,
+                                          es);
+                   ExplainPropertyInteger("HashAgg Batches", NULL,
+                                          hash_batches_used, es);
+               }
+           }
+
+           if (es->workers_state)
+               ExplainCloseWorker(n, es);
+       }
    }
 }
 
index 41cb41481df63d8ee35c4262425ec2c0fe857371..382e78fb7fed23b872cf1919b25b645dec9c03a6 100644 (file)
@@ -25,6 +25,7 @@
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeAgg.h"
 #include "executor/nodeAppend.h"
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeCustom.h"
@@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
            /* even when not parallel-aware, for EXPLAIN ANALYZE */
            ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
            break;
-
+       case T_AggState:
+           /* even when not parallel-aware, for EXPLAIN ANALYZE */
+           ExecAggEstimate((AggState *) planstate, e->pcxt);
+           break;
        default:
            break;
    }
@@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
            /* even when not parallel-aware, for EXPLAIN ANALYZE */
            ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
            break;
-
+       case T_AggState:
+           /* even when not parallel-aware, for EXPLAIN ANALYZE */
+           ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
+           break;
        default:
            break;
    }
@@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
        case T_HashState:
            ExecHashRetrieveInstrumentation((HashState *) planstate);
            break;
+       case T_AggState:
+           ExecAggRetrieveInstrumentation((AggState *) planstate);
+           break;
        default:
            break;
    }
@@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
            ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
                                                pwcxt);
            break;
-
+       case T_AggState:
+           /* even when not parallel-aware, for EXPLAIN ANALYZE */
+           ExecAggInitializeWorker((AggState *) planstate, pwcxt);
+           break;
        default:
            break;
    }
index 331acee28141202b8a6aa620861347a4f3405422..a20554ae65a678dc917c2f43bdf5367a5d9fc7b3 100644 (file)
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/parallel.h"
 #include "catalog/objectaccess.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_proc.h"
@@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node)
    int         numGroupingSets = Max(node->maxsets, 1);
    int         setno;
 
+   /*
+    * When ending a parallel worker, copy the statistics gathered by the
+    * worker back into shared memory so that it can be picked up by the main
+    * process to report in EXPLAIN ANALYZE.
+    */
+   if (node->shared_info && IsParallelWorker())
+   {
+       AggregateInstrumentation *si;
+
+       Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+       si = &node->shared_info->sinstrument[ParallelWorkerNumber];
+       si->hash_batches_used = node->hash_batches_used;
+       si->hash_disk_used = node->hash_disk_used;
+       si->hash_mem_peak = node->hash_mem_peak;
+   }
+
    /* Make sure we have closed any open tuplesorts */
 
    if (node->sort_in)
@@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS)
         fcinfo->flinfo->fn_oid);
    return (Datum) 0;           /* keep compiler quiet */
 }
+
+/* ----------------------------------------------------------------
+ *                     Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+ /* ----------------------------------------------------------------
+  *        ExecAggEstimate
+  *
+  *        Estimate space required to propagate aggregate statistics.
+  * ----------------------------------------------------------------
+  */
+void
+ExecAggEstimate(AggState *node, ParallelContext *pcxt)
+{
+   Size        size;
+
+   /* don't need this if not instrumenting or no workers */
+   if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+       return;
+
+   size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
+   size = add_size(size, offsetof(SharedAggInfo, sinstrument));
+   shm_toc_estimate_chunk(&pcxt->estimator, size);
+   shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAggInitializeDSM
+ *
+ *     Initialize DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
+{
+   Size        size;
+
+   /* don't need this if not instrumenting or no workers */
+   if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+       return;
+
+   size = offsetof(SharedAggInfo, sinstrument)
+       + pcxt->nworkers * sizeof(AggregateInstrumentation);
+   node->shared_info = shm_toc_allocate(pcxt->toc, size);
+   /* ensure any unfilled slots will contain zeroes */
+   memset(node->shared_info, 0, size);
+   node->shared_info->num_workers = pcxt->nworkers;
+   shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+                  node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAggInitializeWorker
+ *
+ *     Attach worker to DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
+{
+   node->shared_info =
+       shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAggRetrieveInstrumentation
+ *
+ *     Transfer aggregate statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggRetrieveInstrumentation(AggState *node)
+{
+   Size        size;
+   SharedAggInfo *si;
+
+   if (node->shared_info == NULL)
+       return;
+
+   size = offsetof(SharedAggInfo, sinstrument)
+       + node->shared_info->num_workers * sizeof(AggregateInstrumentation);
+   si = palloc(size);
+   memcpy(si, node->shared_info, size);
+   node->shared_info = si;
+}
index 92c2337fd3ac69a23a6086b50df027c4e6ddbc34..bb0805abe091a31ae4594cf0dbbcf46bb759b24d 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef NODEAGG_H
 #define NODEAGG_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 
@@ -323,4 +324,10 @@ extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups,
                                int used_bits, Size *mem_limit,
                                uint64 *ngroups_limit, int *num_partitions);
 
+/* parallel instrumentation support */
+extern void ExecAggEstimate(AggState *node, ParallelContext *pcxt);
+extern void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt);
+extern void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt);
+extern void ExecAggRetrieveInstrumentation(AggState *node);
+
 #endif                         /* NODEAGG_H */
index 98e0072b8ad2ee6dc47c833a93ce697763decc12..f5dfa32d55c48331c6e4c1e070192238c5f7b15f 100644 (file)
@@ -2101,6 +2101,27 @@ typedef struct GroupState
    bool        grp_done;       /* indicates completion of Group scan */
 } GroupState;
 
+/* ---------------------
+ * per-worker aggregate information
+ * ---------------------
+ */
+typedef struct AggregateInstrumentation
+{
+   Size        hash_mem_peak;  /* peak hash table memory usage */
+   uint64      hash_disk_used; /* kB of disk space used */
+   int         hash_batches_used;  /* batches used during entire execution */
+} AggregateInstrumentation;
+
+/* ----------------
+ *  Shared memory container for per-worker aggregate information
+ * ----------------
+ */
+typedef struct SharedAggInfo
+{
+   int         num_workers;
+   AggregateInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedAggInfo;
+
 /* ---------------------
  * AggState information
  *
@@ -2190,6 +2211,7 @@ typedef struct AggState
    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than
                                         * ->hash_pergroup */
    ProjectionInfo *combinedproj;   /* projection machinery */
+   SharedAggInfo *shared_info; /* one entry per worker */
 } AggState;
 
 /* ----------------