diff options
author | Etsuro Fujita | 2022-04-06 06:45:00 +0000 |
---|---|---|
committer | Etsuro Fujita | 2022-04-06 06:45:00 +0000 |
commit | c2bb02bc2e858ba345b8b33f1f3a54628f719d93 (patch) | |
tree | fe742ab96982b0a74ed615d0c1e4c0982715a6e6 /contrib | |
parent | 376dc437de40bd17e99a37f72f88627a16d7f200 (diff) |
Allow asynchronous execution in more cases.
In commit 27e1f1456, create_append_plan() only allowed the subplan
created from a given subpath to be executed asynchronously when it was
an async-capable ForeignPath. To extend coverage, this patch handles
cases when the given subpath includes some other Path types as well that
can be omitted in the plan processing, such as a ProjectionPath directly
atop an async-capable ForeignPath, allowing asynchronous execution in
partitioned-scan/partitioned-join queries with non-Var tlist expressions
and more UNION queries.
Andrey Lepikhov and Etsuro Fujita, reviewed by Alexander Pyhalov and
Zhihong Yu.
Discussion: https://postgr.es/m/659c37a8-3e71-0ff2-394c-f04428c76f08%40postgrespro.ru
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/postgres_fdw/expected/postgres_fdw.out | 170 | ||||
-rw-r--r-- | contrib/postgres_fdw/sql/postgres_fdw.sql | 41 |
2 files changed, 211 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 11e9b4e8cc..30e95f585f 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10222,6 +10222,31 @@ SELECT * FROM result_tbl ORDER BY a; (2 rows) DELETE FROM result_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505; + QUERY PLAN +--------------------------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, ('AAA'::text || async_pt_1.c) + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, ('AAA'::text || async_pt_2.c) + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 +(10 rows) + +INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+--------- + 1505 | 505 | AAA0505 + 2505 | 505 | AAA0505 +(2 rows) + +DELETE FROM result_tbl; -- Check case where multiple partitions use the same connection CREATE TABLE base_tbl3 (a int, b int, c text); CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000) @@ -10359,6 +10384,69 @@ SELECT * FROM join_tbl ORDER BY a1; (30 rows) DELETE FROM join_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Insert on public.join_tbl + -> Append + -> Async Foreign Scan + Output: t1_1.a, t1_1.b, ('AAA'::text || t1_1.c), t2_1.a, t2_1.b, ('AAA'::text || t2_1.c) + Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1) + Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0)))) + -> Async Foreign Scan + Output: t1_2.a, t1_2.b, ('AAA'::text || t1_2.c), t2_2.a, t2_2.b, ('AAA'::text || t2_2.c) + Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2) + Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0)))) + -> Hash Join + Output: t1_3.a, t1_3.b, ('AAA'::text || t1_3.c), t2_3.a, t2_3.b, ('AAA'::text || t2_3.c) + Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b)) + -> Seq Scan on public.async_p3 t2_3 + Output: t2_3.a, t2_3.b, t2_3.c + -> Hash + Output: t1_3.a, t1_3.b, t1_3.c + -> Seq Scan on public.async_p3 t1_3 + Output: t1_3.a, t1_3.b, t1_3.c + Filter: ((t1_3.b % 100) = 0) +(20 rows) + +INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +SELECT * FROM join_tbl ORDER BY a1; + a1 | b1 | c1 | a2 | b2 | c2 +------+-----+---------+------+-----+--------- + 1000 | 0 | AAA0000 | 1000 | 0 | AAA0000 + 1100 | 100 | AAA0100 | 1100 | 100 | AAA0100 + 1200 | 200 | AAA0200 | 1200 | 200 | AAA0200 + 1300 | 300 | AAA0300 | 1300 | 300 | AAA0300 + 1400 | 400 | AAA0400 | 1400 | 400 | AAA0400 + 1500 | 500 | AAA0500 | 1500 | 500 | AAA0500 + 1600 | 600 | AAA0600 | 1600 | 600 | AAA0600 + 1700 | 700 | AAA0700 | 1700 | 700 | AAA0700 + 1800 | 800 | AAA0800 | 1800 | 800 | AAA0800 + 1900 | 900 | AAA0900 | 1900 | 900 | AAA0900 + 2000 | 0 | AAA0000 | 2000 | 0 | AAA0000 + 2100 | 100 | AAA0100 | 2100 | 100 | AAA0100 + 2200 | 200 | AAA0200 | 2200 | 200 | AAA0200 + 2300 | 300 | AAA0300 | 2300 | 300 | AAA0300 + 2400 | 400 | AAA0400 | 2400 | 400 | AAA0400 + 2500 | 500 | AAA0500 | 2500 | 500 | AAA0500 + 2600 | 600 | AAA0600 | 2600 | 600 | AAA0600 + 2700 | 700 | AAA0700 | 2700 | 700 | AAA0700 + 2800 | 800 | AAA0800 | 2800 | 800 | AAA0800 + 2900 | 900 | AAA0900 | 2900 | 900 | AAA0900 + 3000 | 0 | AAA0000 | 3000 | 0 | AAA0000 + 3100 | 100 | AAA0100 | 3100 | 100 | AAA0100 + 3200 | 200 | AAA0200 | 3200 | 200 | AAA0200 + 3300 | 300 | AAA0300 | 3300 | 300 | AAA0300 + 3400 | 400 | AAA0400 | 3400 | 400 | AAA0400 + 3500 | 500 | AAA0500 | 3500 | 500 | AAA0500 + 3600 | 600 | AAA0600 | 3600 | 600 | AAA0600 + 3700 | 700 | AAA0700 | 3700 | 700 | AAA0700 + 3800 | 800 | AAA0800 | 3800 | 800 | AAA0800 + 3900 | 900 | AAA0900 | 3900 | 900 | AAA0900 +(30 rows) + +DELETE FROM join_tbl; RESET enable_partitionwise_join; -- Test rescan of an async Append node with do_exec_prune=false SET enable_hashjoin TO false; @@ -10536,6 +10624,88 @@ DROP TABLE local_tbl; DROP INDEX base_tbl1_idx; DROP INDEX base_tbl2_idx; DROP INDEX async_p3_idx; +-- UNION queries +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- + Insert on public.result_tbl + -> HashAggregate + Output: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c)) + Group Key: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c)) + -> Append + -> Async Foreign Scan on public.async_p1 + Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint + -> Async Foreign Scan on public.async_p2 + Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10)) +(11 rows) + +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+----+--------- + 1000 | 0 | AAA0000 + 1005 | 5 | AAA0005 + 1010 | 10 | AAA0010 + 1015 | 15 | AAA0015 + 1020 | 20 | AAA0020 + 1025 | 25 | AAA0025 + 1030 | 30 | AAA0030 + 1035 | 35 | AAA0035 + 1040 | 40 | AAA0040 + 1045 | 45 | AAA0045 + 2000 | 0 | AAA0000 + 2005 | 5 | AAA0005 +(12 rows) + +DELETE FROM result_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION ALL +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); + QUERY PLAN +----------------------------------------------------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 + Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint + -> Async Foreign Scan on public.async_p2 + Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10)) +(8 rows) + +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION ALL +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+----+--------- + 1000 | 0 | AAA0000 + 1005 | 5 | AAA0005 + 1010 | 10 | AAA0010 + 1015 | 15 | AAA0015 + 1020 | 20 | AAA0020 + 1025 | 25 | AAA0025 + 1030 | 30 | AAA0030 + 1035 | 35 | AAA0035 + 1040 | 40 | AAA0040 + 1045 | 45 | AAA0045 + 2000 | 0 | AAA0000 + 2005 | 5 | AAA0005 +(12 rows) + +DELETE FROM result_tbl; -- Test that pending requests are processed properly SET enable_mergejoin TO false; SET enable_hashjoin TO false; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 6b5de89e14..ea35e61eb8 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3245,6 +3245,13 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; SELECT * FROM result_tbl ORDER BY a; DELETE FROM result_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505; +INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + -- Check case where multiple partitions use the same connection CREATE TABLE base_tbl3 (a int, b int, c text); CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000) @@ -3286,6 +3293,13 @@ INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AN SELECT * FROM join_tbl ORDER BY a1; DELETE FROM join_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + +SELECT * FROM join_tbl ORDER BY a1; +DELETE FROM join_tbl; + RESET enable_partitionwise_join; -- Test rescan of an async Append node with do_exec_prune=false @@ -3357,6 +3371,33 @@ DROP INDEX base_tbl1_idx; DROP INDEX base_tbl2_idx; DROP INDEX async_p3_idx; +-- UNION queries +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION ALL +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); +INSERT INTO result_tbl +(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10) +UNION ALL +(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10); + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + -- Test that pending requests are processed properly SET enable_mergejoin TO false; SET enable_hashjoin TO false; |