public inbox for [email protected]
help / color / mirror / Atom feedFrom: Alexander Pyhalov <[email protected]>
To: Matheus Alcantara <[email protected]>
Cc: Alena Rybakina <[email protected]>
Cc: Pgsql Hackers <[email protected]>
Subject: Re: Asynchronous MergeAppend
Date: Tue, 18 Nov 2025 10:14:23 +0300
Message-ID: <[email protected]> (raw)
In-Reply-To: <CAFY6G8d3Yvxa_kRQA24BsJhwqfmSCv1ujiv_7b6g5isf-ZTs_Q@mail.gmail.com>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAFY6G8d3Yvxa_kRQA24BsJhwqfmSCv1ujiv_7b6g5isf-ZTs_Q@mail.gmail.com>
Hi.
Matheus Alcantara писал(а) 2025-11-18 00:09:
> On Sat Nov 15, 2025 at 7:57 AM -03, Alexander Pyhalov wrote:
>>> Here are some comments on my first look at the patches. I still don't
>>> have too much experience with the executor code but I hope that I can
>>> help with something.
>>>
>>> v5-0001-mark_async_capable-subpath-should-match-subplan.patch
>>>
>>> I don't have to much comments on this, perhaps we could have a commit
>>> message explaining the reason behind the change.
>>
>> I've expanded commit message. The issue is that mark_async_capable()
>> relies
>> on the fact that plan node type corresponds to path type. This is not
>> true when
>> (Merge)Append decides to insert Sort node.
>>
> Your explanation about why this change is needed that you've include on
> your first email sounds more clear IMHO. I would suggest the following
> for a commit message:
> mark_async_capable() believes that path corresponds to plan. This
> is
> not true when create_[merge_]append_plan() inserts sort node. In
> this case mark_async_capable() can treat Sort plan node as some
> other and crash. Fix this by handling the Sort node separately.
>
> This is needed to make MergeAppend node async-capable that it will
> be implemented in a next commit.
>
> What do you think?
>
Seems to be OK.
> I was reading the patch changes again and I have a minor point:
>
> SubqueryScan *scan_plan = (SubqueryScan
> *) plan;
>
> /*
> - * If the generated plan node includes
> a gating Result node,
> - * we can't execute it asynchronously.
> + * If the generated plan node includes
> a gating Result node or
> + * a Sort node, we can't execute it
> asynchronously.
> */
> - if (IsA(plan, Result))
> + if (IsA(plan, Result) || IsA(plan,
> Sort))
>
> Shouldn't we cast the plan to SubqueryScan* after the IsA(...) check? I
> know this code has been before your changes but type casting before a
> IsA() check sounds a bit strange to me. Also perhaps we could add an
> Assert(IsA(plan, SubqueryScan)) after the IsA(...) check and before the
> type casting just for sanity?
Yes, checking for node not to be A and then using it as B seems to be
strange. But casting to another type and checking if node is of a
particular type before using seems to be rather common. It doesn't do
any harm if we don't actually refer to SubqueryScan fields.
Updated the first patch.
--
Best regards,
Alexander Pyhalov,
Postgres Professional
Attachments:
[text/x-diff] v7-0001-mark_async_capable-subpath-should-match-subplan.patch (2.4K, 2-v7-0001-mark_async_capable-subpath-should-match-subplan.patch)
download | inline diff:
From 10d872ce10bc8c7f6a430fe70367714b526fab4d Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Sat, 15 Nov 2025 10:16:25 +0300
Subject: [PATCH 1/2] mark_async_capable(): subpath should match subplan
mark_async_capable() believes that path corresponds to plan. This is
not true when create_[merge_]append_plan() inserts sort node. In
this case mark_async_capable() can treat Sort plan node as some
other and crash. Fix this by handling the Sort node separately.
This is needed to make MergeAppend node async-capable that will
be implemented in a next commit.
---
src/backend/optimizer/plan/createplan.c | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 8af091ba647..5cd7fa7b897 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1139,10 +1139,12 @@ mark_async_capable_plan(Plan *plan, Path *path)
SubqueryScan *scan_plan = (SubqueryScan *) plan;
/*
- * If the generated plan node includes a gating Result node,
- * we can't execute it asynchronously.
+ * Check that plan is really a SubqueryScan before using it.
+ * It can be not true, if the generated plan node includes a
+ * gating Result node or a Sort node. In such case we can't
+ * execute it asynchronously.
*/
- if (IsA(plan, Result))
+ if (!IsA(scan_plan, SubqueryScan))
return false;
/*
@@ -1160,10 +1162,10 @@ mark_async_capable_plan(Plan *plan, Path *path)
FdwRoutine *fdwroutine = path->parent->fdwroutine;
/*
- * If the generated plan node includes a gating Result node,
- * we can't execute it asynchronously.
+ * If the generated plan node includes a gating Result node or
+ * a Sort node, we can't execute it asynchronously.
*/
- if (IsA(plan, Result))
+ if (IsA(plan, Result) || IsA(plan, Sort))
return false;
Assert(fdwroutine != NULL);
@@ -1176,9 +1178,9 @@ mark_async_capable_plan(Plan *plan, Path *path)
/*
* If the generated plan node includes a Result node for the
- * projection, we can't execute it asynchronously.
+ * projection or a Sort node, we can't execute it asynchronously.
*/
- if (IsA(plan, Result))
+ if (IsA(plan, Result) || IsA(plan, Sort))
return false;
/*
--
2.43.0
[text/x-diff] v7-0002-MergeAppend-should-support-Async-Foreign-Scan-subpla.patch (49.8K, 3-v7-0002-MergeAppend-should-support-Async-Foreign-Scan-subpla.patch)
download | inline diff:
From 76a212d1ba66cb727a95a4f6cba28786795b6d11 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Sat, 15 Nov 2025 10:23:47 +0300
Subject: [PATCH 2/2] MergeAppend should support Async Foreign Scan subplans
---
.../postgres_fdw/expected/postgres_fdw.out | 288 +++++++++++
contrib/postgres_fdw/postgres_fdw.c | 10 +-
contrib/postgres_fdw/sql/postgres_fdw.sql | 87 ++++
doc/src/sgml/config.sgml | 14 +
src/backend/executor/execAsync.c | 4 +
src/backend/executor/nodeAppend.c | 21 +-
src/backend/executor/nodeMergeAppend.c | 458 +++++++++++++++++-
src/backend/optimizer/path/costsize.c | 1 +
src/backend/optimizer/plan/createplan.c | 9 +
src/backend/utils/misc/guc_parameters.dat | 8 +
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/executor/nodeMergeAppend.h | 1 +
src/include/nodes/execnodes.h | 57 +++
src/include/optimizer/cost.h | 1 +
src/test/regress/expected/sysviews.out | 3 +-
15 files changed, 938 insertions(+), 25 deletions(-)
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index cd28126049d..5aa563c95ed 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11556,6 +11556,46 @@ SELECT * FROM result_tbl ORDER BY a;
(2 rows)
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(8 rows)
+
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1000 | 0 | 0000
+ 2000 | 0 | 0000
+ 1100 | 100 | 0100
+ 2100 | 100 | 0100
+ 1200 | 200 | 0200
+ 2200 | 200 | 0200
+ 1300 | 300 | 0300
+ 2300 | 300 | 0300
+ 1400 | 400 | 0400
+ 2400 | 400 | 0400
+ 1500 | 500 | 0500
+ 2500 | 500 | 0500
+ 1600 | 600 | 0600
+ 2600 | 600 | 0600
+ 1700 | 700 | 0700
+ 2700 | 700 | 0700
+ 1800 | 800 | 0800
+ 2800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2900 | 900 | 0900
+(20 rows)
+
-- Test error handling, if accessing one of the foreign partitions errors out
CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -11604,6 +11644,76 @@ COPY async_pt TO stdout; --error
ERROR: cannot copy from foreign table "async_p1"
DETAIL: Partition "async_p1" is a foreign table in partitioned table "async_pt"
HINT: Try the COPY (SELECT ...) TO variant.
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(14 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+-- Test async Merge Append rescan
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------------------------
+ Function Scan on pg_catalog.generate_series g
+ Output: ARRAY(SubPlan array_1)
+ Function Call: generate_series(1, 3)
+ SubPlan array_1
+ -> Limit
+ Output: f.i
+ -> Sort
+ Output: f.i
+ Sort Key: f.i
+ -> Subquery Scan on f
+ Output: f.i
+ -> Merge Append
+ Sort Key: async_pt.b
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: (async_pt_1.b + g.i), async_pt_1.b
+ Remote SQL: SELECT b FROM public.base_tbl1 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: (async_pt_2.b + g.i), async_pt_2.b
+ Remote SQL: SELECT b FROM public.base_tbl2 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p3 async_pt_3
+ Output: (async_pt_3.b + g.i), async_pt_3.b
+ Remote SQL: SELECT b FROM public.base_tbl3 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+(22 rows)
+
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+ array
+---------------------------
+ {1,1,1,6,6,6,11,11,11,16}
+ {2,2,2,7,7,7,12,12,12,17}
+ {3,3,3,8,8,8,13,13,13,18}
+(3 rows)
+
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
-- Check case where the partitioned table has local/remote partitions
@@ -11639,6 +11749,37 @@ SELECT * FROM result_tbl ORDER BY a;
(3 rows)
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Sort
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Sort Key: async_pt_3.b, async_pt_3.a
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+(16 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
-- partitionwise joins
SET enable_partitionwise_join TO true;
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
@@ -12421,6 +12562,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
DROP FOREIGN TABLE foreign_tbl CASCADE;
NOTICE: drop cascades to foreign table foreign_tbl2
DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+SET enable_partitionwise_join TO ON;
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr1.i
+ -> Async Foreign Scan
+ Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+ Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1)
+ Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST
+ -> Async Foreign Scan
+ Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+ Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2)
+ Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ i | j | k | i | j | k
+----+-----+---------+----+-----+---------
+ 10 | 100 | data_10 | 10 | 100 | data_10
+ 11 | 110 | data_11 | 11 | 110 | data_11
+ 12 | 120 | data_12 | 12 | 120 | data_12
+ 13 | 130 | data_13 | 13 | 130 | data_13
+ 14 | 140 | data_14 | 14 | 140 | data_14
+ 15 | 150 | data_15 | 15 | 150 | data_15
+ 16 | 160 | data_16 | 16 | 160 | data_16
+ 17 | 170 | data_17 | 17 | 170 | data_17
+ 18 | 180 | data_18 | 18 | 180 | data_18
+ 19 | 190 | data_19 | 19 | 190 | data_19
+(10 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr1.i
+ -> Async Foreign Scan
+ Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+ Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1)
+ Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST
+ -> Async Foreign Scan
+ Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+ Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2)
+ Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+ i | j | k | i | j | k
+-----+------+----------+-----+------+----------
+ 91 | 910 | data_91 | 91 | 910 | data_91
+ 92 | 920 | data_92 | 92 | 920 | data_92
+ 93 | 930 | data_93 | 93 | 930 | data_93
+ 94 | 940 | data_94 | 94 | 940 | data_94
+ 95 | 950 | data_95 | 95 | 950 | data_95
+ 96 | 960 | data_96 | 96 | 960 | data_96
+ 97 | 970 | data_97 | 97 | 970 | data_97
+ 98 | 980 | data_98 | 98 | 980 | data_98
+ 99 | 990 | data_99 | 99 | 990 | data_99
+ 100 | 1000 | data_100 | 100 | 1000 | data_100
+ 101 | 1010 | data_101 | | |
+ 102 | 1020 | data_102 | | |
+ 103 | 1030 | data_103 | | |
+ 104 | 1040 | data_104 | | |
+ 105 | 1050 | data_105 | | |
+ 106 | 1060 | data_106 | | |
+ 107 | 1070 | data_107 | | |
+ 108 | 1080 | data_108 | | |
+ 109 | 1090 | data_109 | | |
+ 110 | 1100 | data_110 | | |
+(20 rows)
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+ ORDER BY i,j
+ LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+ EXECUTE async_pt_query(1, 1);
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr2.i, distr2.j
+ Subplans Removed: 1
+ -> Async Foreign Scan on public.distr2_p1 distr2_1
+ Output: distr2_1.i, distr2_1.j, distr2_1.k
+ Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST
+(8 rows)
+
+EXECUTE async_pt_query(1, 1);
+ i | j | k
+---+-----+---------
+ 1 | 10 | data_1
+ 1 | 110 | data_11
+ 1 | 210 | data_21
+ 1 | 310 | data_31
+ 1 | 410 | data_41
+ 1 | 510 | data_51
+ 1 | 610 | data_61
+ 1 | 710 | data_71
+ 1 | 810 | data_81
+ 1 | 910 | data_91
+(10 rows)
+
+RESET plan_cache_mode;
+RESET enable_partitionwise_join;
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
-- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 06b52c65300..2105c9c90b9 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7213,12 +7213,16 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
- AppendState *requestor = (AppendState *) areq->requestor;
- WaitEventSet *set = requestor->as_eventset;
+ PlanState *requestor = areq->requestor;
+ WaitEventSet *set;
+ Bitmapset *needrequest;
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
+ set = GetAppendEventSet(requestor);
+ needrequest = GetNeedRequest(requestor);
+
/*
* If process_pending_request() has been invoked on the given request
* before we get here, we might have some tuples already; in which case
@@ -7256,7 +7260,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
* below, because we might otherwise end up with no configured events
* other than the postmaster death event.
*/
- if (!bms_is_empty(requestor->as_needrequest))
+ if (!bms_is_empty(needrequest))
return;
if (GetNumRegisteredWaitEvents(set) > 1)
return;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..aa388cb027f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3921,6 +3921,11 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+
-- Test error handling, if accessing one of the foreign partitions errors out
CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -3944,6 +3949,20 @@ DELETE FROM result_tbl;
-- Test COPY TO when foreign table is partition
COPY async_pt TO stdout; --error
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
+-- Test async Merge Append rescan
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
@@ -3959,6 +3978,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
-- partitionwise joins
SET enable_partitionwise_join TO true;
@@ -4197,6 +4221,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
DROP FOREIGN TABLE foreign_tbl CASCADE;
DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+
+SET enable_partitionwise_join TO ON;
+
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+ ORDER BY i,j
+ LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+ EXECUTE async_pt_query(1, 1);
+EXECUTE async_pt_query(1, 1);
+RESET plan_cache_mode;
+
+RESET enable_partitionwise_join;
+
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
+
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index df1c3eaaa58..1d46de13918 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5455,6 +5455,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
+ <varlistentry id="guc-enable-async-merge-append" xreflabel="enable_async_merge_append">
+ <term><varname>enable_async_merge_append</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>enable_async_merge_append</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables or disables the query planner's use of async-aware
+ merge append plan types. The default is <literal>on</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
<term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
<indexterm>
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index 5d3cabe73e3..6dc19ebc374 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -17,6 +17,7 @@
#include "executor/execAsync.h"
#include "executor/executor.h"
#include "executor/nodeAppend.h"
+#include "executor/nodeMergeAppend.h"
#include "executor/nodeForeignscan.h"
/*
@@ -121,6 +122,9 @@ ExecAsyncResponse(AsyncRequest *areq)
case T_AppendState:
ExecAsyncAppendResponse(areq);
break;
+ case T_MergeAppendState:
+ ExecAsyncMergeAppendResponse(areq);
+ break;
default:
/* If the node doesn't support async, caller messed up. */
elog(ERROR, "unrecognized node type: %d",
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a11b36c7176..c89e2d2787f 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1187,10 +1187,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
static void
classify_matching_subplans(AppendState *node)
{
- Bitmapset *valid_asyncplans;
-
Assert(node->as_valid_subplans_identified);
- Assert(node->as_valid_asyncplans == NULL);
/* Nothing to do if there are no valid subplans. */
if (bms_is_empty(node->as_valid_subplans))
@@ -1200,21 +1197,7 @@ classify_matching_subplans(AppendState *node)
return;
}
- /* Nothing to do if there are no valid async subplans. */
- if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
- {
+ /* No valid async subplans identified. */
+ if (!classify_matching_subplans_common(&node->as_valid_subplans, node->as_asyncplans, &node->as_valid_asyncplans))
node->as_nasyncremain = 0;
- return;
- }
-
- /* Get valid async subplans. */
- valid_asyncplans = bms_intersect(node->as_asyncplans,
- node->as_valid_subplans);
-
- /* Adjust the valid subplans to contain sync subplans only. */
- node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
- valid_asyncplans);
-
- /* Save valid async subplans. */
- node->as_valid_asyncplans = valid_asyncplans;
}
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 405e8f94285..7db41fbf40f 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -39,10 +39,15 @@
#include "postgres.h"
#include "executor/executor.h"
+#include "executor/execAsync.h"
#include "executor/execPartition.h"
#include "executor/nodeMergeAppend.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
+#include "storage/latch.h"
+#include "utils/wait_event.h"
+
+#define EVENT_BUFFER_SIZE 16
/*
* We have one slot for each item in the heap array. We use SlotNumber
@@ -54,6 +59,12 @@ typedef int32 SlotNumber;
static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
static int heap_compare_slots(Datum a, Datum b, void *arg);
+static void classify_matching_subplans(MergeAppendState *node);
+static void ExecMergeAppendAsyncBegin(MergeAppendState *node);
+static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan);
+static bool ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan);
+static void ExecMergeAppendAsyncEventWait(MergeAppendState *node);
+
/* ----------------------------------------------------------------
* ExecInitMergeAppend
@@ -71,6 +82,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
int nplans;
int i,
j;
+ Bitmapset *asyncplans;
+ int nasyncplans;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -106,7 +119,10 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
* later calls to ExecFindMatchingSubPlans.
*/
if (!prunestate->do_exec_prune && nplans > 0)
+ {
mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+ mergestate->ms_valid_subplans_identified = true;
+ }
}
else
{
@@ -119,6 +135,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
Assert(nplans > 0);
mergestate->ms_valid_subplans = validsubplans =
bms_add_range(NULL, 0, nplans - 1);
+ mergestate->ms_valid_subplans_identified = true;
mergestate->ms_prune_state = NULL;
}
@@ -135,11 +152,25 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
* the results into the mergeplanstates array.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
+
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
+ /*
+ * Record async subplans. When executing EvalPlanQual, we treat them
+ * as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
}
@@ -170,6 +201,45 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
*/
mergestate->ps.ps_ProjInfo = NULL;
+ /* Initialize async state */
+ mergestate->ms_asyncplans = asyncplans;
+ mergestate->ms_nasyncplans = nasyncplans;
+ mergestate->ms_asyncrequests = NULL;
+ mergestate->ms_asyncresults = NULL;
+ mergestate->ms_has_asyncresults = NULL;
+ mergestate->ms_asyncremain = NULL;
+ mergestate->ms_needrequest = NULL;
+ mergestate->ms_eventset = NULL;
+ mergestate->ms_valid_asyncplans = NULL;
+
+ if (nasyncplans > 0)
+ {
+ mergestate->ms_asyncrequests = (AsyncRequest **)
+ palloc0(nplans * sizeof(AsyncRequest *));
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) mergestate;
+ areq->requestee = mergeplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ mergestate->ms_asyncrequests[i] = areq;
+ }
+
+ mergestate->ms_asyncresults = (TupleTableSlot **)
+ palloc0(nplans * sizeof(TupleTableSlot *));
+
+ if (mergestate->ms_valid_subplans_identified)
+ classify_matching_subplans(mergestate);
+ }
+
/*
* initialize sort-key information
*/
@@ -231,9 +301,17 @@ ExecMergeAppend(PlanState *pstate)
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
- if (node->ms_valid_subplans == NULL)
+ if (!node->ms_valid_subplans_identified)
+ {
node->ms_valid_subplans =
ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
+ node->ms_valid_subplans_identified = true;
+ classify_matching_subplans(node);
+ }
+
+ /* If there are any async subplans, begin executing them. */
+ if (node->ms_nasyncplans > 0)
+ ExecMergeAppendAsyncBegin(node);
/*
* First time through: pull the first tuple from each valid subplan,
@@ -246,6 +324,16 @@ ExecMergeAppend(PlanState *pstate)
if (!TupIsNull(node->ms_slots[i]))
binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
}
+
+ /* Look at valid async subplans */
+ i = -1;
+ while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+ {
+ ExecMergeAppendAsyncGetNext(node, i);
+ if (!TupIsNull(node->ms_slots[i]))
+ binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+ }
+
binaryheap_build(node->ms_heap);
node->ms_initialized = true;
}
@@ -260,7 +348,13 @@ ExecMergeAppend(PlanState *pstate)
* to not pull tuples until necessary.)
*/
i = DatumGetInt32(binaryheap_first(node->ms_heap));
- node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ if (bms_is_member(i, node->ms_asyncplans))
+ ExecMergeAppendAsyncGetNext(node, i);
+ else
+ {
+ Assert(bms_is_member(i, node->ms_valid_subplans));
+ node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ }
if (!TupIsNull(node->ms_slots[i]))
binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
else
@@ -276,6 +370,8 @@ ExecMergeAppend(PlanState *pstate)
{
i = DatumGetInt32(binaryheap_first(node->ms_heap));
result = node->ms_slots[i];
+ /* For async plan record that we can get the next tuple */
+ node->ms_has_asyncresults = bms_del_member(node->ms_has_asyncresults, i);
}
return result;
@@ -355,6 +451,7 @@ void
ExecReScanMergeAppend(MergeAppendState *node)
{
int i;
+ int nasyncplans = node->ms_nasyncplans;
/*
* If any PARAM_EXEC Params used in pruning expressions have changed, then
@@ -365,8 +462,11 @@ ExecReScanMergeAppend(MergeAppendState *node)
bms_overlap(node->ps.chgParam,
node->ms_prune_state->execparamids))
{
+ node->ms_valid_subplans_identified = false;
bms_free(node->ms_valid_subplans);
node->ms_valid_subplans = NULL;
+ bms_free(node->ms_valid_asyncplans);
+ node->ms_valid_asyncplans = NULL;
}
for (i = 0; i < node->ms_nplans; i++)
@@ -387,6 +487,360 @@ ExecReScanMergeAppend(MergeAppendState *node)
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
+
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->ms_asyncremain);
+ node->ms_asyncremain = NULL;
+ bms_free(node->ms_needrequest);
+ node->ms_needrequest = NULL;
+ bms_free(node->ms_has_asyncresults);
+ node->ms_has_asyncresults = NULL;
+ }
binaryheap_reset(node->ms_heap);
node->ms_initialized = false;
}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's ms_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's ms_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(MergeAppendState *node)
+{
+ Assert(node->ms_valid_subplans_identified);
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->ms_valid_subplans))
+ {
+ node->ms_asyncremain = NULL;
+ return;
+ }
+
+ /* No valid async subplans identified. */
+ if (!classify_matching_subplans_common(&node->ms_valid_subplans, node->ms_asyncplans, &node->ms_valid_asyncplans))
+ node->ms_asyncremain = NULL;
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncBegin
+ *
+ * Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncBegin(MergeAppendState *node)
+{
+ int i;
+
+ /* Backward scan is not supported by async-aware MergeAppends. */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ /* We should never be called when there are no subplans */
+ Assert(node->ms_nplans > 0);
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->ms_nasyncplans > 0);
+
+ /* ExecMergeAppend() identifies valid subplans */
+ Assert(node->ms_valid_subplans_identified);
+
+ /* Initialize state variables. */
+ node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans);
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (bms_is_empty(node->ms_asyncremain))
+ return;
+
+ /* Make a request for each of the valid async subplans. */
+ i = -1;
+ while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncGetNext
+ *
+ * Get the next tuple from specified asynchronous subplan.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan)
+{
+ node->ms_slots[mplan] = NULL;
+
+ /* Request a tuple asynchronously. */
+ if (ExecMergeAppendAsyncRequest(node, mplan))
+ return;
+
+ /*
+ * node->ms_asyncremain can be NULL if we have fetched tuples, but haven't
+ * returned them yet. In this case ExecMergeAppendAsyncRequest() above
+ * just returns tuples without performing a request.
+ */
+ while (bms_is_member(mplan, node->ms_asyncremain))
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll for async events. */
+ ExecMergeAppendAsyncEventWait(node);
+
+ /* Request a tuple asynchronously. */
+ if (ExecMergeAppendAsyncRequest(node, mplan))
+ return;
+
+ /*
+ * Waiting until there's no async requests pending or we got some
+ * tuples from our request
+ */
+ }
+
+ /* No tuples */
+ return;
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncRequest
+ *
+ * Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /*
+ * If we've already fetched necessary data, just return it
+ */
+ if (bms_is_member(mplan, node->ms_has_asyncresults))
+ {
+ node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+ return true;
+ }
+
+ /*
+ * Get a list of members which can process request and don't have data
+ * ready.
+ */
+ needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(node->ms_needrequest, i)) >= 0)
+ {
+ if (!bms_is_member(i, node->ms_has_asyncresults))
+ needrequest = bms_add_member(needrequest, i);
+ }
+
+ /*
+ * If there's no members, which still need request, no need to send it.
+ */
+ if (bms_is_empty(needrequest))
+ return false;
+
+ /* Clear ms_needrequest flag, as we are going to send requests now */
+ node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest);
+
+ /* Make a new request for each of the async subplans that need it. */
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ /*
+ * We've just checked that subplan doesn't already have some fetched
+ * data
+ */
+ Assert(!bms_is_member(i, node->ms_has_asyncresults));
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return needed asynchronously-generated results if any. */
+ if (bms_is_member(mplan, node->ms_has_asyncresults))
+ {
+ node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncMergeAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncMergeAppendResponse(AsyncRequest *areq)
+{
+ MergeAppendState *node = (MergeAppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+ Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults));
+
+ node->ms_asyncresults[areq->request_index] = NULL;
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /* The request would have been pending for a callback. */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ node->ms_asyncremain = bms_del_member(node->ms_asyncremain, areq->request_index);
+ return;
+ }
+
+ node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults, areq->request_index);
+ /* Save result so we can return it. */
+ node->ms_asyncresults[areq->request_index] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->ms_needrequest = bms_add_member(node->ms_needrequest,
+ areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncEventWait(MergeAppendState *node)
+{
+ int nevents = node->ms_nasyncplans + 2; /* one for PM death and
+ * one for latch */
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(bms_num_members(node->ms_asyncremain) > 0);
+
+ node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
+ AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /*
+ * No need for further processing if none of the subplans configured any
+ * events.
+ */
+ if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1)
+ {
+ FreeWaitEventSet(node->ms_eventset);
+ node->ms_eventset = NULL;
+ return;
+ }
+
+ /*
+ * Add the process latch to the set, so that we wake up to process the
+ * standard interrupts with CHECK_FOR_INTERRUPTS().
+ *
+ * NOTE: For historical reasons, it's important that this is added to the
+ * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
+ * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
+ * any other events are in the set. That's a poor design, it's
+ * questionable for postgres_fdw to be doing that in the first place, but
+ * we cannot change it now. The pattern has possibly been copied to other
+ * extensions too.
+ */
+ AddWaitEventToSet(node->ms_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
+ MyLatch, NULL);
+
+ /* Return at most EVENT_BUFFER_SIZE events in one call. */
+ if (nevents > EVENT_BUFFER_SIZE)
+ nevents = EVENT_BUFFER_SIZE;
+
+ /*
+ * Wait until at least one event occurs.
+ */
+ noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */ , occurred_event,
+ nevents, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->ms_eventset);
+ node->ms_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ if (areq->callback_pending)
+ {
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+
+ /* Handle standard interrupts */
+ if ((w->events & WL_LATCH_SET) != 0)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+ }
+}
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 8335cf5b5c5..cfbe512a26e 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -163,6 +163,7 @@ bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
bool enable_presorted_aggregate = true;
bool enable_async_append = true;
+bool enable_async_merge_append = true;
typedef struct
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 5cd7fa7b897..8a023aac33b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1466,6 +1466,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
List *subplans = NIL;
ListCell *subpaths;
RelOptInfo *rel = best_path->path.parent;
+ bool consider_async = false;
/*
* We don't have the actual creation of the MergeAppend node split out
@@ -1480,6 +1481,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
plan->righttree = NULL;
node->apprelids = rel->relids;
+ consider_async = (enable_async_merge_append &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
+
/*
* Compute sort column info, and adjust MergeAppend's tlist as needed.
* Because we pass adjust_tlist_in_place = true, we may ignore the
@@ -1580,6 +1585,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
subplan = sort_plan;
}
+ /* If needed, check to see if subplan can be executed asynchronously */
+ if (consider_async)
+ mark_async_capable_plan(subplan, subpath);
+
subplans = lappend(subplans, subplan);
}
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 1128167c025..2a670bd8db1 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -805,6 +805,14 @@
boot_val => 'true',
},
+{ name => 'enable_async_merge_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
+ short_desc => 'Enables the planner\'s use of async merge append plans.',
+ flags => 'GUC_EXPLAIN',
+ variable => 'enable_async_merge_append',
+ boot_val => 'true',
+},
+
+
{ name => 'enable_bitmapscan', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
short_desc => 'Enables the planner\'s use of bitmap-scan plans.',
flags => 'GUC_EXPLAIN',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index f503d36b92e..ea014e006f2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -405,6 +405,7 @@
# - Planner Method Configuration -
#enable_async_append = on
+#enable_async_merge_append = on
#enable_bitmapscan = on
#enable_gathermerge = on
#enable_hashagg = on
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index 4eb05dc30d6..e3fdb26ece6 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -19,5 +19,6 @@
extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
extern void ExecEndMergeAppend(MergeAppendState *node);
extern void ExecReScanMergeAppend(MergeAppendState *node);
+extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq);
#endif /* NODEMERGEAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 18ae8f0d4bb..2bef54550a3 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1545,10 +1545,67 @@ typedef struct MergeAppendState
TupleTableSlot **ms_slots; /* array of length ms_nplans */
struct binaryheap *ms_heap; /* binary heap of slot indices */
bool ms_initialized; /* are subplans started? */
+ Bitmapset *ms_asyncplans; /* asynchronous plans indexes */
+ int ms_nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **ms_asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **ms_asyncresults; /* unreturned results of async plans */
+ Bitmapset *ms_has_asyncresults; /* plans which have async results */
+ Bitmapset *ms_asyncremain; /* remaining asynchronous plans */
+ Bitmapset *ms_needrequest; /* asynchronous plans needing a new request */
+ struct WaitEventSet *ms_eventset; /* WaitEventSet used to configure file
+ * descriptor wait events */
struct PartitionPruneState *ms_prune_state;
+ bool ms_valid_subplans_identified; /* is ms_valid_subplans valid? */
Bitmapset *ms_valid_subplans;
+ Bitmapset *ms_valid_asyncplans; /* valid asynchronous plans indexes */
} MergeAppendState;
+/* Getters for AppendState and MergeAppendState */
+static inline struct WaitEventSet *
+GetAppendEventSet(PlanState *ps)
+{
+ Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+ if (IsA(ps, AppendState))
+ return ((AppendState *) ps)->as_eventset;
+ else
+ return ((MergeAppendState *) ps)->ms_eventset;
+}
+
+static inline Bitmapset *
+GetNeedRequest(PlanState *ps)
+{
+ Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+ if (IsA(ps, AppendState))
+ return ((AppendState *) ps)->as_needrequest;
+ else
+ return ((MergeAppendState *) ps)->ms_needrequest;
+}
+
+/* Common part of classify_matching_subplans() for Append and MergeAppend */
+static inline bool
+classify_matching_subplans_common(Bitmapset **valid_subplans, Bitmapset *asyncplans, Bitmapset **valid_asyncplans)
+{
+ Assert(*valid_asyncplans == NULL);
+
+ /* Checked by classify_matching_subplans() */
+ Assert(!bms_is_empty(*valid_subplans));
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(*valid_subplans, asyncplans))
+ return false;
+
+ /* Get valid async subplans. */
+ *valid_asyncplans = bms_intersect(asyncplans,
+ *valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ *valid_subplans = bms_del_members(*valid_subplans,
+ *valid_asyncplans);
+ return true;
+}
+
/* ----------------
* RecursiveUnionState information
*
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index b523bcda8f3..fee491b77ad 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
extern PGDLLIMPORT bool enable_presorted_aggregate;
extern PGDLLIMPORT bool enable_async_append;
+extern PGDLLIMPORT bool enable_async_merge_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 3b37fafa65b..ae4fa42a438 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -149,6 +149,7 @@ select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
enable_async_append | on
+ enable_async_merge_append | on
enable_bitmapscan | on
enable_distinct_reordering | on
enable_eager_aggregate | on
@@ -173,7 +174,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(25 rows)
+(26 rows)
-- There are always wait event descriptions for various types. InjectionPoint
-- may be present or absent, depending on history since last postmaster start.
--
2.43.0
view thread (33+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Asynchronous MergeAppend
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox