diff options
| author | Andres Freund | 2017-12-21 07:39:21 +0000 |
|---|---|---|
| committer | Andres Freund | 2017-12-21 08:43:41 +0000 |
| commit | 1804284042e659e7d16904e7bbb0ad546394b6a3 (patch) | |
| tree | d1980f7d94cb31aed8880007e5a41ede08d0cb84 /src/test | |
| parent | f94eec490b2671399c102b89c9fa0311aea3a39f (diff) | |
Add parallel-aware hash joins.
Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel
Hash Join with Parallel Hash. While hash joins could already appear in
parallel queries, they were previously always parallel-oblivious and had a
partial subplan only on the outer side, meaning that the work of the inner
subplan was duplicated in every worker.
After this commit, the planner will consider using a partial subplan on the
inner side too, using the Parallel Hash node to divide the work over the
available CPU cores and combine its results in shared memory. If the join
needs to be split into multiple batches in order to respect work_mem, then
workers process different batches as much as possible and then work together
on the remaining batches.
The advantages of a parallel-aware hash join over a parallel-oblivious hash
join used in a parallel query are that it:
* avoids wasting memory on duplicated hash tables
* avoids wasting disk space on duplicated batch files
* divides the work of building the hash table over the CPUs
One disadvantage is that there is some communication between the participating
CPUs which might outweigh the benefits of parallelism in the case of small
hash tables. This is avoided by the planner's existing reluctance to supply
partial plans for small scans, but it may be necessary to estimate
synchronization costs in future if that situation changes. Another is that
outer batch 0 must be written to disk if multiple batches are required.
A potential future advantage of parallel-aware hash joins is that right and
full outer joins could be supported, since there is a single set of matched
bits for each hashtable, but that is not yet implemented.
A new GUC enable_parallel_hash is defined to control the feature, defaulting
to on.
Author: Thomas Munro
Reviewed-By: Andres Freund, Robert Haas
Tested-By: Rafia Sabih, Prabhat Sahu
Discussion:
https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/regress/expected/join.out | 304 | ||||
| -rw-r--r-- | src/test/regress/expected/sysviews.out | 3 | ||||
| -rw-r--r-- | src/test/regress/sql/join.sql | 148 |
3 files changed, 452 insertions, 3 deletions
diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out index 9d3abf0ed0..a7cfdf1f44 100644 --- a/src/test/regress/expected/join.out +++ b/src/test/regress/expected/join.out @@ -5884,6 +5884,9 @@ insert into extremely_skewed update pg_class set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192 where relname = 'extremely_skewed'; +-- Make a relation with a couple of enormous tuples. +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +alter table wide set (parallel_workers = 2); -- The "optimal" case: the hash table fits in memory; we plan for 1 -- batch, we stick to that number, and peak memory usage stays within -- our work_mem budget @@ -5924,6 +5927,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '4MB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join simple s using (id); QUERY PLAN @@ -5956,6 +5960,43 @@ $$); (1 row) rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r join simple s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + f | f +(1 row) + +rollback to settings; -- The "good" case: batches required, but we plan the right number; we -- plan for some number of batches, and we stick to that number, and -- peak memory usage says within our work_mem budget @@ -5996,6 +6037,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join simple s using (id); QUERY PLAN @@ -6028,6 +6070,43 @@ $$); (1 row) rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r join simple s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + t | f +(1 row) + +rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a -- couple of times, and peak memory usage stays within our work_mem @@ -6069,6 +6148,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); QUERY PLAN @@ -6101,6 +6181,43 @@ $$); (1 row) rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '192kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join bigger_than_it_looks s using (id); + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 1 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on bigger_than_it_looks s +(9 rows) + +select count(*) from simple r join bigger_than_it_looks s using (id); + count +------- + 20000 +(1 row) + +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join bigger_than_it_looks s using (id); +$$); + initially_multibatch | increased_batches +----------------------+------------------- + f | t +(1 row) + +rollback to settings; -- The "ugly" case: increasing the number of batches during execution -- doesn't help, so stop trying to fit in work_mem and hope for the -- best; in this case we plan for 1 batch, increases just once and @@ -6142,6 +6259,7 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join extremely_skewed s using (id); QUERY PLAN @@ -6172,6 +6290,42 @@ $$); (1 row) rollback to settings; +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; +explain (costs off) + select count(*) from simple r join extremely_skewed s using (id); + QUERY PLAN +----------------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 1 + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on extremely_skewed s +(9 rows) + +select count(*) from simple r join extremely_skewed s using (id); + count +------- + 20000 +(1 row) + +select * from hash_join_batches( +$$ + select count(*) from simple r join extremely_skewed s using (id); +$$); + original | final +----------+------- + 1 | 4 +(1 row) + +rollback to settings; -- A couple of other hash join tests unrelated to work_mem management. -- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate savepoint settings; @@ -6192,10 +6346,11 @@ rollback to settings; -- that we can check that instrumentation comes back correctly. create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; alter table foo set (parallel_workers = 0); -create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t; +create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t; alter table bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -6246,6 +6401,7 @@ $$); rollback to settings; -- single-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -6294,6 +6450,108 @@ $$); (1 row) rollback to settings; +-- multi-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '64kB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + QUERY PLAN +-------------------------------------------------------------------------- + Aggregate + -> Nested Loop Left Join + Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1))) + -> Seq Scan on foo + -> Gather + Workers Planned: 2 + -> Parallel Hash Join + Hash Cond: (b1.id = b2.id) + -> Parallel Seq Scan on bar b1 + -> Parallel Hash + -> Parallel Seq Scan on bar b2 +(11 rows) + +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + count +------- + 3 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); + multibatch +------------ + t +(1 row) + +rollback to settings; +-- single-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '4MB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + QUERY PLAN +-------------------------------------------------------------------------- + Aggregate + -> Nested Loop Left Join + Join Filter: ((foo.id < (b1.id + 1)) AND (foo.id > (b1.id - 1))) + -> Seq Scan on foo + -> Gather + Workers Planned: 2 + -> Parallel Hash Join + Hash Cond: (b1.id = b2.id) + -> Parallel Seq Scan on bar b1 + -> Parallel Hash + -> Parallel Seq Scan on bar b2 +(11 rows) + +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; + count +------- + 3 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); + multibatch +------------ + f +(1 row) + +rollback to settings; -- A full outer join where every record is matched. -- non-parallel savepoint settings; @@ -6384,4 +6642,48 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; +-- exercise special code paths for huge tuples (note use of non-strict +-- expression and left join required to get the detoasted tuple into +-- the hash table) +-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and +-- sts_puttuple oversized tuple cases because it's multi-batch) +savepoint settings; +set max_parallel_workers_per_gather = 2; +set enable_parallel_hash = on; +set work_mem = '128kB'; +explain (costs off) + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); + QUERY PLAN +---------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Left Join + Hash Cond: (wide.id = wide_1.id) + -> Parallel Seq Scan on wide + -> Parallel Hash + -> Parallel Seq Scan on wide wide_1 +(9 rows) + +select length(max(s.t)) +from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); + length +-------- + 320000 +(1 row) + +select final > 1 as multibatch + from hash_join_batches( +$$ + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +$$); + multibatch +------------ + t +(1 row) + +rollback to settings; rollback; diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 2b738aae7c..c9c8f51e1c 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -82,11 +82,12 @@ select name, setting from pg_settings where name like 'enable%'; enable_mergejoin | on enable_nestloop | on enable_parallel_append | on + enable_parallel_hash | on enable_partition_wise_join | off enable_seqscan | on enable_sort | on enable_tidscan | on -(14 rows) +(15 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql index 0e933e00d5..a6a452f960 100644 --- a/src/test/regress/sql/join.sql +++ b/src/test/regress/sql/join.sql @@ -2028,6 +2028,10 @@ update pg_class set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192 where relname = 'extremely_skewed'; +-- Make a relation with a couple of enormous tuples. +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +alter table wide set (parallel_workers = 2); + -- The "optimal" case: the hash table fits in memory; we plan for 1 -- batch, we stick to that number, and peak memory usage stays within -- our work_mem budget @@ -2050,6 +2054,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '4MB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join simple s using (id); +select count(*) from simple r join simple s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join simple s using (id); select count(*) from simple r join simple s using (id); @@ -2082,6 +2102,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join simple s using (id); +select count(*) from simple r join simple s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join simple s using (id); select count(*) from simple r join simple s using (id); @@ -2115,6 +2151,22 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join bigger_than_it_looks s using (id); +select count(*) from simple r join bigger_than_it_looks s using (id); +select original > 1 as initially_multibatch, final > original as increased_batches + from hash_join_batches( +$$ + select count(*) from simple r join bigger_than_it_looks s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '192kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); select count(*) from simple r join bigger_than_it_looks s using (id); @@ -2148,6 +2200,21 @@ rollback to settings; savepoint settings; set local max_parallel_workers_per_gather = 2; set local work_mem = '128kB'; +set local enable_parallel_hash = off; +explain (costs off) + select count(*) from simple r join extremely_skewed s using (id); +select count(*) from simple r join extremely_skewed s using (id); +select * from hash_join_batches( +$$ + select count(*) from simple r join extremely_skewed s using (id); +$$); +rollback to settings; + +-- parallel with parallel-aware hash join +savepoint settings; +set local max_parallel_workers_per_gather = 1; +set local work_mem = '128kB'; +set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join extremely_skewed s using (id); select count(*) from simple r join extremely_skewed s using (id); @@ -2175,11 +2242,12 @@ rollback to settings; create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; alter table foo set (parallel_workers = 0); -create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t; +create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t; alter table bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -2206,6 +2274,61 @@ rollback to settings; -- single-batch with rescan, parallel-oblivious savepoint settings; +set enable_parallel_hash = off; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '4MB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); +rollback to settings; + +-- multi-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; +set parallel_leader_participation = off; +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set enable_material = off; +set enable_mergejoin = off; +set work_mem = '64kB'; +explain (costs off) + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +select final > 1 as multibatch + from hash_join_batches( +$$ + select count(*) from foo + left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss + on foo.id < ss.id + 1 and foo.id > ss.id - 1; +$$); +rollback to settings; + +-- single-batch with rescan, parallel-aware +savepoint settings; +set enable_parallel_hash = on; set parallel_leader_participation = off; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; @@ -2266,4 +2389,27 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- exercise special code paths for huge tuples (note use of non-strict +-- expression and left join required to get the detoasted tuple into +-- the hash table) + +-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and +-- sts_puttuple oversized tuple cases because it's multi-batch) +savepoint settings; +set max_parallel_workers_per_gather = 2; +set enable_parallel_hash = on; +set work_mem = '128kB'; +explain (costs off) + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +select length(max(s.t)) +from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +select final > 1 as multibatch + from hash_join_batches( +$$ + select length(max(s.t)) + from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id); +$$); +rollback to settings; + rollback; |
