Allow parallel DISTINCT
authorDavid Rowley <drowley@postgresql.org>
Sun, 22 Aug 2021 11:31:16 +0000 (23:31 +1200)
committerDavid Rowley <drowley@postgresql.org>
Sun, 22 Aug 2021 11:31:16 +0000 (23:31 +1200)
We've supported parallel aggregation since e06a38965.  At the time, we
didn't quite get around to also adding parallel DISTINCT. So, let's do
that now.

This is implemented by introducing a two-phase DISTINCT.  Phase 1 is
performed on parallel workers, rows are made distinct there either by
hashing or by sort/unique.  The results from the parallel workers are
combined and the final distinct phase is performed serially to get rid of
any duplicate rows that appear due to combining rows for each of the
parallel workers.

Author: David Rowley
Reviewed-by: Zhihong Yu
Discussion: https://postgr.es/m/CAApHDvrjRxVKwQN0he79xS+9wyotFXL=RmoWqGGO2N45Farpgw@mail.gmail.com

src/backend/optimizer/README
src/backend/optimizer/plan/planner.c
src/include/nodes/pathnodes.h
src/test/regress/expected/select_distinct.out
src/test/regress/sql/select_distinct.sql

index 2339347c24109817c48135319ac8f68f2dceb73c..41c120e0cdf6f9d2de24f2adb6b1c284eda2e2e0 100644 (file)
@@ -1015,6 +1015,7 @@ UPPERREL_SETOP        result of UNION/INTERSECT/EXCEPT, if any
 UPPERREL_PARTIAL_GROUP_AGG result of partial grouping/aggregation, if any
 UPPERREL_GROUP_AGG result of grouping/aggregation, if any
 UPPERREL_WINDOW        result of window functions, if any
+UPPERREL_PARTIAL_DISTINCT  result of partial "SELECT DISTINCT", if any
 UPPERREL_DISTINCT  result of "SELECT DISTINCT", if any
 UPPERREL_ORDERED   result of ORDER BY, if any
 UPPERREL_FINAL     result of any remaining top-level actions
index 2cd691191c94149ef6d4881f44e680897c83f829..1e42d75465eceb04ce74bfcc6dd60c0cdccbe1fd 100644 (file)
@@ -189,6 +189,12 @@ static void create_one_window_path(PlannerInfo *root,
                                   List *activeWindows);
 static RelOptInfo *create_distinct_paths(PlannerInfo *root,
                                         RelOptInfo *input_rel);
+static void create_partial_distinct_paths(PlannerInfo *root,
+                                         RelOptInfo *input_rel,
+                                         RelOptInfo *final_distinct_rel);
+static RelOptInfo *create_final_distinct_paths(PlannerInfo *root,
+                                              RelOptInfo *input_rel,
+                                              RelOptInfo *distinct_rel);
 static RelOptInfo *create_ordered_paths(PlannerInfo *root,
                                        RelOptInfo *input_rel,
                                        PathTarget *target,
@@ -1570,6 +1576,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
         */
        root->upper_targets[UPPERREL_FINAL] = final_target;
        root->upper_targets[UPPERREL_ORDERED] = final_target;
+       root->upper_targets[UPPERREL_PARTIAL_DISTINCT] = sort_input_target;
        root->upper_targets[UPPERREL_DISTINCT] = sort_input_target;
        root->upper_targets[UPPERREL_WINDOW] = sort_input_target;
        root->upper_targets[UPPERREL_GROUP_AGG] = grouping_target;
@@ -4227,16 +4234,9 @@ create_one_window_path(PlannerInfo *root,
  * Sort/Unique won't project anything.
  */
 static RelOptInfo *
-create_distinct_paths(PlannerInfo *root,
-                     RelOptInfo *input_rel)
+create_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel)
 {
-   Query      *parse = root->parse;
-   Path       *cheapest_input_path = input_rel->cheapest_total_path;
    RelOptInfo *distinct_rel;
-   double      numDistinctRows;
-   bool        allow_hash;
-   Path       *path;
-   ListCell   *lc;
 
    /* For now, do all work in the (DISTINCT, NULL) upperrel */
    distinct_rel = fetch_upper_rel(root, UPPERREL_DISTINCT, NULL);
@@ -4258,6 +4258,184 @@ create_distinct_paths(PlannerInfo *root,
    distinct_rel->useridiscurrent = input_rel->useridiscurrent;
    distinct_rel->fdwroutine = input_rel->fdwroutine;
 
+   /* build distinct paths based on input_rel's pathlist */
+   create_final_distinct_paths(root, input_rel, distinct_rel);
+
+   /* now build distinct paths based on input_rel's partial_pathlist */
+   create_partial_distinct_paths(root, input_rel, distinct_rel);
+
+   /* Give a helpful error if we failed to create any paths */
+   if (distinct_rel->pathlist == NIL)
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("could not implement DISTINCT"),
+                errdetail("Some of the datatypes only support hashing, while others only support sorting.")));
+
+   /*
+    * If there is an FDW that's responsible for all baserels of the query,
+    * let it consider adding ForeignPaths.
+    */
+   if (distinct_rel->fdwroutine &&
+       distinct_rel->fdwroutine->GetForeignUpperPaths)
+       distinct_rel->fdwroutine->GetForeignUpperPaths(root,
+                                                      UPPERREL_DISTINCT,
+                                                      input_rel,
+                                                      distinct_rel,
+                                                      NULL);
+
+   /* Let extensions possibly add some more paths */
+   if (create_upper_paths_hook)
+       (*create_upper_paths_hook) (root, UPPERREL_DISTINCT, input_rel,
+                                   distinct_rel, NULL);
+
+   /* Now choose the best path(s) */
+   set_cheapest(distinct_rel);
+
+   return distinct_rel;
+}
+
+/*
+ * create_partial_distinct_paths
+ *
+ * Process 'input_rel' partial paths and add unique/aggregate paths to the
+ * UPPERREL_PARTIAL_DISTINCT rel.  For paths created, add Gather/GatherMerge
+ * paths on top and add a final unique/aggregate path to remove any duplicate
+ * produced from combining rows from parallel workers.
+ */
+static void
+create_partial_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel,
+                             RelOptInfo *final_distinct_rel)
+{
+   RelOptInfo *partial_distinct_rel;
+   Query      *parse;
+   List       *distinctExprs;
+   double      numDistinctRows;
+   Path       *cheapest_partial_path;
+   ListCell   *lc;
+
+   /* nothing to do when there are no partial paths in the input rel */
+   if (!input_rel->consider_parallel || input_rel->partial_pathlist == NIL)
+       return;
+
+   parse = root->parse;
+
+   /* can't do parallel DISTINCT ON */
+   if (parse->hasDistinctOn)
+       return;
+
+   partial_distinct_rel = fetch_upper_rel(root, UPPERREL_PARTIAL_DISTINCT,
+                                          NULL);
+   partial_distinct_rel->reltarget = root->upper_targets[UPPERREL_PARTIAL_DISTINCT];
+   partial_distinct_rel->consider_parallel = input_rel->consider_parallel;
+
+   /*
+    * If input_rel belongs to a single FDW, so does the partial_distinct_rel.
+    */
+   partial_distinct_rel->serverid = input_rel->serverid;
+   partial_distinct_rel->userid = input_rel->userid;
+   partial_distinct_rel->useridiscurrent = input_rel->useridiscurrent;
+   partial_distinct_rel->fdwroutine = input_rel->fdwroutine;
+
+   cheapest_partial_path = linitial(input_rel->partial_pathlist);
+
+   distinctExprs = get_sortgrouplist_exprs(parse->distinctClause,
+                                           parse->targetList);
+
+   /* estimate how many distinct rows we'll get from each worker */
+   numDistinctRows = estimate_num_groups(root, distinctExprs,
+                                         cheapest_partial_path->rows,
+                                         NULL, NULL);
+
+   /* first try adding unique paths atop of sorted paths */
+   if (grouping_is_sortable(parse->distinctClause))
+   {
+       foreach(lc, input_rel->partial_pathlist)
+       {
+           Path       *path = (Path *) lfirst(lc);
+
+           if (pathkeys_contained_in(root->distinct_pathkeys, path->pathkeys))
+           {
+               add_partial_path(partial_distinct_rel, (Path *)
+                                create_upper_unique_path(root,
+                                                         partial_distinct_rel,
+                                                         path,
+                                                         list_length(root->distinct_pathkeys),
+                                                         numDistinctRows));
+           }
+       }
+   }
+
+   /*
+    * Now try hash aggregate paths, if enabled and hashing is possible. Since
+    * we're not on the hook to ensure we do our best to create at least one
+    * path here, we treat enable_hashagg as a hard off-switch rather than the
+    * slightly softer variant in create_final_distinct_paths.
+    */
+   if (enable_hashagg && grouping_is_hashable(parse->distinctClause))
+   {
+       add_partial_path(partial_distinct_rel, (Path *)
+                        create_agg_path(root,
+                                        partial_distinct_rel,
+                                        cheapest_partial_path,
+                                        cheapest_partial_path->pathtarget,
+                                        AGG_HASHED,
+                                        AGGSPLIT_SIMPLE,
+                                        parse->distinctClause,
+                                        NIL,
+                                        NULL,
+                                        numDistinctRows));
+   }
+
+   /*
+    * If there is an FDW that's responsible for all baserels of the query,
+    * let it consider adding ForeignPaths.
+    */
+   if (partial_distinct_rel->fdwroutine &&
+       partial_distinct_rel->fdwroutine->GetForeignUpperPaths)
+       partial_distinct_rel->fdwroutine->GetForeignUpperPaths(root,
+                                                              UPPERREL_PARTIAL_DISTINCT,
+                                                              input_rel,
+                                                              partial_distinct_rel,
+                                                              NULL);
+
+   /* Let extensions possibly add some more partial paths */
+   if (create_upper_paths_hook)
+       (*create_upper_paths_hook) (root, UPPERREL_PARTIAL_DISTINCT,
+                                   input_rel, partial_distinct_rel, NULL);
+
+   if (partial_distinct_rel->partial_pathlist != NIL)
+   {
+       generate_gather_paths(root, partial_distinct_rel, true);
+       set_cheapest(partial_distinct_rel);
+
+       /*
+        * Finally, create paths to distinctify the final result.  This step
+        * is needed to remove any duplicates due to combining rows from
+        * parallel workers.
+        */
+       create_final_distinct_paths(root, partial_distinct_rel,
+                                   final_distinct_rel);
+   }
+}
+
+/*
+ * create_final_distinct_paths
+ *     Create distinct paths in 'distinct_rel' based on 'input_rel' pathlist
+ *
+ * input_rel: contains the source-data paths
+ * distinct_rel: destination relation for storing created paths
+ */
+static RelOptInfo *
+create_final_distinct_paths(PlannerInfo *root, RelOptInfo *input_rel,
+                           RelOptInfo *distinct_rel)
+{
+   Query      *parse = root->parse;
+   Path       *cheapest_input_path = input_rel->cheapest_total_path;
+   double      numDistinctRows;
+   bool        allow_hash;
+   Path       *path;
+   ListCell   *lc;
+
    /* Estimate number of distinct rows there will be */
    if (parse->groupClause || parse->groupingSets || parse->hasAggs ||
        root->hasHavingQual)
@@ -4384,31 +4562,6 @@ create_distinct_paths(PlannerInfo *root,
                                 numDistinctRows));
    }
 
-   /* Give a helpful error if we failed to find any implementation */
-   if (distinct_rel->pathlist == NIL)
-       ereport(ERROR,
-               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                errmsg("could not implement DISTINCT"),
-                errdetail("Some of the datatypes only support hashing, while others only support sorting.")));
-
-   /*
-    * If there is an FDW that's responsible for all baserels of the query,
-    * let it consider adding ForeignPaths.
-    */
-   if (distinct_rel->fdwroutine &&
-       distinct_rel->fdwroutine->GetForeignUpperPaths)
-       distinct_rel->fdwroutine->GetForeignUpperPaths(root, UPPERREL_DISTINCT,
-                                                      input_rel, distinct_rel,
-                                                      NULL);
-
-   /* Let extensions possibly add some more paths */
-   if (create_upper_paths_hook)
-       (*create_upper_paths_hook) (root, UPPERREL_DISTINCT,
-                                   input_rel, distinct_rel, NULL);
-
-   /* Now choose the best path(s) */
-   set_cheapest(distinct_rel);
-
    return distinct_rel;
 }
 
index 6e068f2c8b387a28932fab0aa45e2a5d45991c91..1abe233db2bb90633fe279edb4f5973ed524f7d8 100644 (file)
@@ -71,6 +71,7 @@ typedef enum UpperRelationKind
                                 * any */
    UPPERREL_GROUP_AGG,         /* result of grouping/aggregation, if any */
    UPPERREL_WINDOW,            /* result of window functions, if any */
+   UPPERREL_PARTIAL_DISTINCT,  /* result of partial "SELECT DISTINCT", if any */
    UPPERREL_DISTINCT,          /* result of "SELECT DISTINCT", if any */
    UPPERREL_ORDERED,           /* result of ORDER BY, if any */
    UPPERREL_FINAL              /* result of any remaining top-level actions */
index 11c6f50fbfa8c340fcfc5e2e7b815baf442393f8..0c8e10f88a5733f4215a09167fc09681b6c18bda 100644 (file)
@@ -210,6 +210,73 @@ DROP TABLE distinct_hash_1;
 DROP TABLE distinct_hash_2;
 DROP TABLE distinct_group_1;
 DROP TABLE distinct_group_2;
+-- Test parallel DISTINCT
+SET parallel_tuple_cost=0;
+SET parallel_setup_cost=0;
+SET min_parallel_table_scan_size=0;
+-- Ensure we get a parallel plan
+EXPLAIN (costs off)
+SELECT DISTINCT four FROM tenk1;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: four
+         ->  Gather
+               Workers Planned: 2
+               ->  HashAggregate
+                     Group Key: four
+                     ->  Parallel Seq Scan on tenk1
+(8 rows)
+
+-- Ensure the parallel plan produces the correct results
+SELECT DISTINCT four FROM tenk1;
+ four 
+------
+    0
+    1
+    2
+    3
+(4 rows)
+
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL UNSAFE;
+-- Ensure we don't do parallel distinct with a parallel unsafe function
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: (distinct_func(1))
+         ->  Index Only Scan using tenk1_hundred on tenk1
+(4 rows)
+
+-- make the function parallel safe
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL SAFE;
+-- Ensure we do parallel distinct now that the function is parallel safe
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+                  QUERY PLAN                  
+----------------------------------------------
+ Unique
+   ->  Sort
+         Sort Key: (distinct_func(1))
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+RESET min_parallel_table_scan_size;
+RESET parallel_setup_cost;
+RESET parallel_tuple_cost;
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.
index 33102744ebf7ab1196c3724e2ef70dbec6c4a82d..e00582e46c14da3c029c97fc61277327cb3a379b 100644 (file)
@@ -107,6 +107,43 @@ DROP TABLE distinct_hash_2;
 DROP TABLE distinct_group_1;
 DROP TABLE distinct_group_2;
 
+-- Test parallel DISTINCT
+SET parallel_tuple_cost=0;
+SET parallel_setup_cost=0;
+SET min_parallel_table_scan_size=0;
+
+-- Ensure we get a parallel plan
+EXPLAIN (costs off)
+SELECT DISTINCT four FROM tenk1;
+
+-- Ensure the parallel plan produces the correct results
+SELECT DISTINCT four FROM tenk1;
+
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL UNSAFE;
+
+-- Ensure we don't do parallel distinct with a parallel unsafe function
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+
+-- make the function parallel safe
+CREATE OR REPLACE FUNCTION distinct_func(a INT) RETURNS INT AS $$
+  BEGIN
+    RETURN a;
+  END;
+$$ LANGUAGE plpgsql PARALLEL SAFE;
+
+-- Ensure we do parallel distinct now that the function is parallel safe
+EXPLAIN (COSTS OFF)
+SELECT DISTINCT distinct_func(1) FROM tenk1;
+
+RESET min_parallel_table_scan_size;
+RESET parallel_setup_cost;
+RESET parallel_tuple_cost;
+
 --
 -- Also, some tests of IS DISTINCT FROM, which doesn't quite deserve its
 -- very own regression file.