public inbox for [email protected]
help / color / mirror / Atom feedFrom: Alexander Korotkov <[email protected]>
To: Matheus Alcantara <[email protected]>
Cc: Alexander Pyhalov <[email protected]>
Cc: Pgsql Hackers <[email protected]>
Subject: Re: Asynchronous MergeAppend
Date: Sun, 5 Apr 2026 05:24:48 +0300
Message-ID: <CAPpHfdsO8zYpDW==D6T5N0cJ+AzK7a_OyXJoYU1kFi=xZFTLuQ@mail.gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAFY6G8d3Yvxa_kRQA24BsJhwqfmSCv1ujiv_7b6g5isf-ZTs_Q@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAPpHfdu+3Eud0CBpdFT+osWiT=e=zOQUtBsx8Z5okKrqhgVAJg@mail.gmail.com>
<[email protected]>
Hi!
On Mon, Mar 30, 2026 at 3:25 PM Matheus Alcantara
<[email protected]> wrote:
> On 29/03/26 22:20, Alexander Korotkov wrote:
> > Thank you for your work on this subject.
> > I have revised the patchset. I think it would be better if common
> > infrastructure goes first. Otherwise we commit async merge append and
> > immediately revise it. I also did some minor improvements.
> >
>
> I was thinking about this but did not managed to spent time on it.
> Thanks for re-organizing the patches, it looks better and I think that
> it make more sense on this order.
>
> I also agree with the minor improvements.
I made more work on the patchset.
Patch #1 now considers IncrementalSort as exclusion alongside with
Sort. Exclusion check is now on the top of the switch().
Patch #2 is split into 3 patches: common structures, common sync
append logic, and common async append logic.
New structs are now named AppendBase/AppendBaseState, corresponding
fields are "ab" and "as".
Most importantly I noted that this patchset actually only makes
initial heap filling asynchronous. The steady work after that is
still syncnronous. Even that it used async infrastructure, it fetched
tuples from children subplans one-by-one: effectively synchronous but
paying for asynchronous infrastructure. I think even with this
limitation, this patchset is valuable: the startup cost for children
foreignscans can be high. But this understanding allowed me to
significantly simplify the main patch including:
1) After initial heap filling, use ExecProcNode() to fetch from children plans.
2) Remove ms_has_asyncresults entirely. Async responses store directly
into ms_slots[] (the existing heap slot array), which serves as both
the merge state and the "result arrived" indicator via TupIsNull().
3) Removed needrequest usage from MergeAppend. Since MergeAppend only
fires initial requests (via ExecAppendBaseAsyncBegin()) and never
sends follow-up requests, needrequest tracking is unnecessary.
ExecMergeAppendAsyncRequest() was eliminated entirely.
4) ExecMergeAppendAsyncGetNext() reduced to a simple wait loop:
5) asyncresults allocation reduced back to nasyncplans. MergeAppend
doesn't use it (stores in ms_slots), and Append only needs nasyncplans
entries for its stack.
Additionally, I made the following changes.
1) WAIT_EVENT_MERGE_APPEND_READY wait event instead of extending
WAIT_EVENT_APPEND_READY. That should be less confusing for monitoring
purposes.
2) More tests: error handling with broken partition, plan-time
partition pruning, and run-time partition pruning tests for async
MergeAppend.
I'm going to went through this patchset another time tomorrow and push
it on Monday if there are no objections.
------
Regards,
Alexander Korotkov
Supabase
Attachments:
[application/octet-stream] v17-0005-MergeAppend-should-support-Async-Foreign-Scan-su.patch (44.3K, 2-v17-0005-MergeAppend-should-support-Async-Foreign-Scan-su.patch)
download | inline diff:
From 7cebc0da9da80f973e985cbd16cbc4e32c40b5ce Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sun, 5 Apr 2026 04:47:10 +0300
Subject: [PATCH v17 5/5] MergeAppend should support Async Foreign Scan
subplans
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
This commit makes the MergeAppend node async-capable, similar to the existing
async support for Append nodes. When the planner chooses MergeAppend for
partitioned tables with foreign partitions, asynchronous execution is now
possible.
The primary benefit is during the initial heap fill: all async subplans are
kicked off concurrently, so their first tuples are fetched in parallel rather
than sequentially. In steady state, however, the heap merge algorithm needs
the next tuple from one specific subplan (the heap top), so execution at
that point is effectively synchronous — we block until that particular
subplan delivers its result.
A new GUC enable_async_merge_append controls this feature (default on).
A new wait event MERGE_APPEND_READY is added (separate from APPEND_READY)
so that monitoring tools can distinguish the two node types.
The postgres_fdw is updated to work generically with both Append and
MergeAppend requestors by casting to the shared AppendBaseState type.
Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru
Author: Alexander Pyhalov <[email protected]>
Reviewed-by: Matheus Alcantara <[email protected]>
Reviewed-by: Alena Rybakina <[email protected]>
---
.../postgres_fdw/expected/postgres_fdw.out | 353 ++++++++++++++++++
contrib/postgres_fdw/postgres_fdw.c | 6 +-
contrib/postgres_fdw/sql/postgres_fdw.sql | 105 ++++++
doc/src/sgml/config.sgml | 14 +
src/backend/executor/execAsync.c | 4 +
src/backend/executor/nodeMergeAppend.c | 168 +++++++++
src/backend/optimizer/path/costsize.c | 1 +
src/backend/optimizer/plan/createplan.c | 9 +
.../utils/activity/wait_event_names.txt | 1 +
src/backend/utils/misc/guc_parameters.dat | 8 +
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/executor/nodeMergeAppend.h | 1 +
src/include/nodes/execnodes.h | 3 +
src/include/optimizer/cost.h | 1 +
src/test/regress/expected/sysviews.out | 3 +-
15 files changed, 674 insertions(+), 4 deletions(-)
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 0f5271d476e..e3f80e752f7 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11575,12 +11575,56 @@ SELECT * FROM result_tbl ORDER BY a;
(2 rows)
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(8 rows)
+
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1000 | 0 | 0000
+ 2000 | 0 | 0000
+ 1100 | 100 | 0100
+ 2100 | 100 | 0100
+ 1200 | 200 | 0200
+ 2200 | 200 | 0200
+ 1300 | 300 | 0300
+ 2300 | 300 | 0300
+ 1400 | 400 | 0400
+ 2400 | 400 | 0400
+ 1500 | 500 | 0500
+ 2500 | 500 | 0500
+ 1600 | 600 | 0600
+ 2600 | 600 | 0600
+ 1700 | 700 | 0700
+ 2700 | 700 | 0700
+ 1800 | 800 | 0800
+ 2800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2900 | 900 | 0900
+(20 rows)
+
-- Test error handling, if accessing one of the foreign partitions errors out
CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
SERVER loopback OPTIONS (table_name 'non_existent_table');
SELECT * FROM async_pt;
ERROR: relation "public.non_existent_table" does not exist
CONTEXT: remote SQL command: SELECT a, b, c FROM public.non_existent_table
+-- Test error handling for async Merge Append
+SELECT * FROM async_pt ORDER BY b, a;
+ERROR: relation "public.non_existent_table" does not exist
+CONTEXT: remote SQL command: SELECT a, b, c FROM public.non_existent_table ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
DROP FOREIGN TABLE async_p_broken;
-- Check case where multiple partitions use the same connection
CREATE TABLE base_tbl3 (a int, b int, c text);
@@ -11623,6 +11667,76 @@ COPY async_pt TO stdout; --error
ERROR: cannot copy from foreign table "async_p1"
DETAIL: Partition "async_p1" is a foreign table in partitioned table "async_pt"
HINT: Try the COPY (SELECT ...) TO variant.
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(14 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+-- Test async Merge Append rescan
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------------------------
+ Function Scan on pg_catalog.generate_series g
+ Output: ARRAY(SubPlan array_1)
+ Function Call: generate_series(1, 3)
+ SubPlan array_1
+ -> Limit
+ Output: f.i
+ -> Sort
+ Output: f.i
+ Sort Key: f.i
+ -> Subquery Scan on f
+ Output: f.i
+ -> Merge Append
+ Sort Key: async_pt.b
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: (async_pt_1.b + g.i), async_pt_1.b
+ Remote SQL: SELECT b FROM public.base_tbl1 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: (async_pt_2.b + g.i), async_pt_2.b
+ Remote SQL: SELECT b FROM public.base_tbl2 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p3 async_pt_3
+ Output: (async_pt_3.b + g.i), async_pt_3.b
+ Remote SQL: SELECT b FROM public.base_tbl3 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST
+(22 rows)
+
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+ array
+---------------------------
+ {1,1,1,6,6,6,11,11,11,16}
+ {2,2,2,7,7,7,12,12,12,17}
+ {3,3,3,8,8,8,13,13,13,18}
+(3 rows)
+
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
-- Check case where the partitioned table has local/remote partitions
@@ -11658,6 +11772,37 @@ SELECT * FROM result_tbl ORDER BY a;
(3 rows)
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Sort
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Sort Key: async_pt_3.b, async_pt_3.a
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+(16 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
-- partitionwise joins
SET enable_partitionwise_join TO true;
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
@@ -11853,6 +11998,21 @@ SELECT * FROM async_pt WHERE a < 2000;
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000))
(3 rows)
+-- Test interaction of async Merge Append with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000 ORDER BY b, a;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(8 rows)
+
-- Test interaction of async execution with run-time partition pruning
SET plan_cache_mode TO force_generic_plan;
PREPARE async_pt_query (int, int) AS
@@ -11904,6 +12064,52 @@ SELECT * FROM result_tbl ORDER BY a;
(1 row)
DELETE FROM result_tbl;
+-- Test interaction of async Merge Append with run-time partition pruning
+PREPARE async_pt_merge_query (int, int) AS
+ SELECT * FROM async_pt WHERE a < $1 AND b === $2 ORDER BY b, a;
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_merge_query (3000, 505);
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ Subplans Removed: 1
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(11 rows)
+
+EXECUTE async_pt_merge_query (3000, 505);
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+(2 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_merge_query (2000, 505);
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ Subplans Removed: 2
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(7 rows)
+
+EXECUTE async_pt_merge_query (2000, 505);
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+(1 row)
+
RESET plan_cache_mode;
CREATE TABLE local_tbl(a int, b int, c text);
INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
@@ -12440,6 +12646,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
DROP FOREIGN TABLE foreign_tbl CASCADE;
NOTICE: drop cascades to foreign table foreign_tbl2
DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+SET enable_partitionwise_join TO ON;
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr1.i
+ -> Async Foreign Scan
+ Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+ Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1)
+ Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST
+ -> Async Foreign Scan
+ Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+ Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2)
+ Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ i | j | k | i | j | k
+----+-----+---------+----+-----+---------
+ 10 | 100 | data_10 | 10 | 100 | data_10
+ 11 | 110 | data_11 | 11 | 110 | data_11
+ 12 | 120 | data_12 | 12 | 120 | data_12
+ 13 | 130 | data_13 | 13 | 130 | data_13
+ 14 | 140 | data_14 | 14 | 140 | data_14
+ 15 | 150 | data_15 | 15 | 150 | data_15
+ 16 | 160 | data_16 | 16 | 160 | data_16
+ 17 | 170 | data_17 | 17 | 170 | data_17
+ 18 | 180 | data_18 | 18 | 180 | data_18
+ 19 | 190 | data_19 | 19 | 190 | data_19
+(10 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr1.i
+ -> Async Foreign Scan
+ Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+ Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1)
+ Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST
+ -> Async Foreign Scan
+ Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+ Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2)
+ Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+ i | j | k | i | j | k
+-----+------+----------+-----+------+----------
+ 91 | 910 | data_91 | 91 | 910 | data_91
+ 92 | 920 | data_92 | 92 | 920 | data_92
+ 93 | 930 | data_93 | 93 | 930 | data_93
+ 94 | 940 | data_94 | 94 | 940 | data_94
+ 95 | 950 | data_95 | 95 | 950 | data_95
+ 96 | 960 | data_96 | 96 | 960 | data_96
+ 97 | 970 | data_97 | 97 | 970 | data_97
+ 98 | 980 | data_98 | 98 | 980 | data_98
+ 99 | 990 | data_99 | 99 | 990 | data_99
+ 100 | 1000 | data_100 | 100 | 1000 | data_100
+ 101 | 1010 | data_101 | | |
+ 102 | 1020 | data_102 | | |
+ 103 | 1030 | data_103 | | |
+ 104 | 1040 | data_104 | | |
+ 105 | 1050 | data_105 | | |
+ 106 | 1060 | data_106 | | |
+ 107 | 1070 | data_107 | | |
+ 108 | 1080 | data_108 | | |
+ 109 | 1090 | data_109 | | |
+ 110 | 1100 | data_110 | | |
+(20 rows)
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+ ORDER BY i,j
+ LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+ EXECUTE async_pt_query(1, 1);
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr2.i, distr2.j
+ Subplans Removed: 1
+ -> Async Foreign Scan on public.distr2_p1 distr2_1
+ Output: distr2_1.i, distr2_1.j, distr2_1.k
+ Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST
+(8 rows)
+
+EXECUTE async_pt_query(1, 1);
+ i | j | k
+---+-----+---------
+ 1 | 10 | data_1
+ 1 | 110 | data_11
+ 1 | 210 | data_21
+ 1 | 310 | data_31
+ 1 | 410 | data_41
+ 1 | 510 | data_51
+ 1 | 610 | data_61
+ 1 | 710 | data_71
+ 1 | 810 | data_81
+ 1 | 910 | data_91
+(10 rows)
+
+RESET plan_cache_mode;
+RESET enable_partitionwise_join;
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
-- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index efc70a49a86..c8c793ae0e2 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7214,8 +7214,8 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
- AppendState *requestor = (AppendState *) areq->requestor;
- WaitEventSet *set = requestor->as.eventset;
+ AppendBaseState *requestor = (AppendBaseState *) areq->requestor;
+ WaitEventSet *set = requestor->eventset;
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
@@ -7257,7 +7257,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
* below, because we might otherwise end up with no configured events
* other than the postmaster death event.
*/
- if (!bms_is_empty(requestor->as.needrequest))
+ if (!bms_is_empty(requestor->needrequest))
return;
if (GetNumRegisteredWaitEvents(set) > 1)
return;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 49ed797e8ef..01bd18ce9bb 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3943,10 +3943,17 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+
-- Test error handling, if accessing one of the foreign partitions errors out
CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
SERVER loopback OPTIONS (table_name 'non_existent_table');
SELECT * FROM async_pt;
+-- Test error handling for async Merge Append
+SELECT * FROM async_pt ORDER BY b, a;
DROP FOREIGN TABLE async_p_broken;
-- Check case where multiple partitions use the same connection
@@ -3966,6 +3973,20 @@ DELETE FROM result_tbl;
-- Test COPY TO when foreign table is partition
COPY async_pt TO stdout; --error
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
+-- Test async Merge Append rescan
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+SELECT
+ ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10)
+FROM generate_series(1, 3) g(i);
+
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
@@ -3981,6 +4002,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
-- partitionwise joins
SET enable_partitionwise_join TO true;
@@ -4021,6 +4047,10 @@ SELECT * FROM async_pt WHERE a < 3000;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt WHERE a < 2000;
+-- Test interaction of async Merge Append with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000 ORDER BY b, a;
+
-- Test interaction of async execution with run-time partition pruning
SET plan_cache_mode TO force_generic_plan;
@@ -4041,6 +4071,18 @@ EXECUTE async_pt_query (2000, 505);
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test interaction of async Merge Append with run-time partition pruning
+PREPARE async_pt_merge_query (int, int) AS
+ SELECT * FROM async_pt WHERE a < $1 AND b === $2 ORDER BY b, a;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_merge_query (3000, 505);
+EXECUTE async_pt_merge_query (3000, 505);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_merge_query (2000, 505);
+EXECUTE async_pt_merge_query (2000, 505);
+
RESET plan_cache_mode;
CREATE TABLE local_tbl(a int, b int, c text);
@@ -4219,6 +4261,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
DROP FOREIGN TABLE foreign_tbl CASCADE;
DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+
+SET enable_partitionwise_join TO ON;
+
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+ ORDER BY i,j
+ LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+ EXECUTE async_pt_query(1, 1);
+EXECUTE async_pt_query(1, 1);
+RESET plan_cache_mode;
+
+RESET enable_partitionwise_join;
+
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
+
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 229f41353eb..dc20fe770df 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5556,6 +5556,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
+ <varlistentry id="guc-enable-async-merge-append" xreflabel="enable_async_merge_append">
+ <term><varname>enable_async_merge_append</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>enable_async_merge_append</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables or disables the query planner's use of async-aware
+ merge append plan types. The default is <literal>on</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
<term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
<indexterm>
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index cf7ddbb01f4..f839f5f255c 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -18,6 +18,7 @@
#include "executor/executor.h"
#include "executor/instrument.h"
#include "executor/nodeAppend.h"
+#include "executor/nodeMergeAppend.h"
#include "executor/nodeForeignscan.h"
/*
@@ -122,6 +123,9 @@ ExecAsyncResponse(AsyncRequest *areq)
case T_AppendState:
ExecAsyncAppendResponse(areq);
break;
+ case T_MergeAppendState:
+ ExecAsyncMergeAppendResponse(areq);
+ break;
default:
/* If the node doesn't support async, caller messed up. */
elog(ERROR, "unrecognized node type: %d",
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 591be1018d8..de8784dcdf7 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -24,6 +24,15 @@
* to a common sort key. The MergeAppend node merges these streams
* to produce output sorted the same way.
*
+ * MergeAppend supports async-capable subplans (e.g. foreign scans).
+ * Async execution is beneficial during the initial heap fill, where
+ * all async subplans are kicked off concurrently and their first
+ * tuples are fetched in parallel. In steady state, however, the
+ * heap algorithm requires the next tuple from one specific subplan
+ * (the one at the heap top), so execution is effectively synchronous
+ * at that point — we must block until that particular subplan
+ * delivers its next tuple.
+ *
* MergeAppend nodes don't make use of their left and right
* subtrees, rather they maintain a list of subplans so
* a typical MergeAppend node looks like this in the plan tree:
@@ -45,6 +54,7 @@
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "utils/sortsupport.h"
+#include "utils/wait_event.h"
/*
* We have one slot for each item in the heap array. We use SlotNumber
@@ -56,6 +66,11 @@ typedef int32 SlotNumber;
static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
static int heap_compare_slots(Datum a, Datum b, void *arg);
+static void classify_matching_subplans(MergeAppendState *node);
+static void ExecMergeAppendAsyncBegin(MergeAppendState *node);
+static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan);
+static void ExecMergeAppendAsyncEventWait(MergeAppendState *node);
+
/* ----------------------------------------------------------------
* ExecInitMergeAppend
@@ -87,10 +102,15 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
-1,
NULL);
+ if (mergestate->as.nasyncplans > 0 && mergestate->as.valid_subplans_identified)
+ classify_matching_subplans(mergestate);
+
mergestate->ms_slots = palloc0_array(TupleTableSlot *, mergestate->as.nplans);
mergestate->ms_heap = binaryheap_allocate(mergestate->as.nplans, heap_compare_slots,
mergestate);
+ mergestate->ms_asyncremain = NULL;
+
/*
* initialize sort-key information
*/
@@ -157,8 +177,13 @@ ExecMergeAppend(PlanState *pstate)
node->as.valid_subplans =
ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
node->as.valid_subplans_identified = true;
+ classify_matching_subplans(node);
}
+ /* If there are any async subplans, begin executing them. */
+ if (node->as.nasyncplans > 0)
+ ExecMergeAppendAsyncBegin(node);
+
/*
* First time through: pull the first tuple from each valid subplan,
* and set up the heap.
@@ -170,6 +195,16 @@ ExecMergeAppend(PlanState *pstate)
if (!TupIsNull(node->ms_slots[i]))
binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
}
+
+ /* Look at valid async subplans */
+ i = -1;
+ while ((i = bms_next_member(node->as.valid_asyncplans, i)) >= 0)
+ {
+ ExecMergeAppendAsyncGetNext(node, i);
+ if (!TupIsNull(node->ms_slots[i]))
+ binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+ }
+
binaryheap_build(node->ms_heap);
node->ms_initialized = true;
}
@@ -264,8 +299,141 @@ ExecEndMergeAppend(MergeAppendState *node)
void
ExecReScanMergeAppend(MergeAppendState *node)
{
+ int nasyncplans = node->as.nasyncplans;
+
ExecReScanAppendBase(&node->as);
+ /* Reset MergeAppend-specific state */
+ if (nasyncplans > 0)
+ {
+ bms_free(node->ms_asyncremain);
+ node->ms_asyncremain = NULL;
+ }
binaryheap_reset(node->ms_heap);
node->ms_initialized = false;
}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's ms_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's as.valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(MergeAppendState *node)
+{
+ Assert(node->as.valid_subplans_identified);
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->as.valid_subplans))
+ {
+ node->ms_asyncremain = NULL;
+ return;
+ }
+
+ /* No valid async subplans identified. */
+ if (!classify_matching_subplans_common(&node->as.valid_subplans,
+ node->as.asyncplans,
+ &node->as.valid_asyncplans))
+ node->ms_asyncremain = NULL;
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncBegin
+ *
+ * Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncBegin(MergeAppendState *node)
+{
+ /* ExecMergeAppend() identifies valid subplans */
+ Assert(node->as.valid_subplans_identified);
+
+ /* Initialize state variables. */
+ node->ms_asyncremain = bms_copy(node->as.valid_asyncplans);
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (bms_is_empty(node->ms_asyncremain))
+ return;
+
+ ExecAppendBaseAsyncBegin(&node->as);
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncGetNext
+ *
+ * Get the next tuple from specified asynchronous subplan.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan)
+{
+ /*
+ * All initial async requests were fired by ExecAppendBaseAsyncBegin.
+ * The result may already be cached from a prior event wait — if so,
+ * nothing to do. Otherwise, wait for the specific subplan to deliver
+ * a tuple or report exhaustion.
+ */
+ while (TupIsNull(node->ms_slots[mplan]) &&
+ bms_is_member(mplan, node->ms_asyncremain))
+ {
+ CHECK_FOR_INTERRUPTS();
+ ExecMergeAppendAsyncEventWait(node);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncMergeAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncMergeAppendResponse(AsyncRequest *areq)
+{
+ MergeAppendState *node = (MergeAppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /* The request would have been pending for a callback. */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, the subplan is exhausted. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ node->ms_asyncremain = bms_del_member(node->ms_asyncremain,
+ areq->request_index);
+ return;
+ }
+
+ /* Save result directly into the merge slot array. */
+ node->ms_slots[areq->request_index] = slot;
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncEventWait(MergeAppendState *node)
+{
+ /* We should never be called when there are no valid async subplans. */
+ Assert(bms_num_members(node->ms_asyncremain) > 0);
+
+ ExecAppendBaseAsyncEventWait(&node->as, -1 /* no timeout */ ,
+ WAIT_EVENT_MERGE_APPEND_READY);
+}
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 1c575e56ff6..b6109e5b91e 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -164,6 +164,7 @@ bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
bool enable_presorted_aggregate = true;
bool enable_async_append = true;
+bool enable_async_merge_append = true;
typedef struct
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 22bd93596ee..a727d063f76 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1451,6 +1451,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
List *subplans = NIL;
ListCell *subpaths;
RelOptInfo *rel = best_path->path.parent;
+ bool consider_async = false;
/*
* We don't have the actual creation of the MergeAppend node split out
@@ -1466,6 +1467,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
node->ab.apprelids = rel->relids;
node->ab.child_append_relid_sets = best_path->child_append_relid_sets;
+ consider_async = (enable_async_merge_append &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
+
/*
* Compute sort column info, and adjust MergeAppend's tlist as needed.
* Because we pass adjust_tlist_in_place = true, we may ignore the
@@ -1566,6 +1571,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
subplan = sort_plan;
}
+ /* If needed, check to see if subplan can be executed asynchronously */
+ if (consider_async)
+ mark_async_capable_plan(subplan, subpath);
+
subplans = lappend(subplans, subplan);
}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 6be80d2daad..98b8c8d177d 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -139,6 +139,7 @@ LOGICAL_APPLY_SEND_DATA "Waiting for a logical replication leader apply process
LOGICAL_PARALLEL_APPLY_STATE_CHANGE "Waiting for a logical replication parallel apply process to change state."
LOGICAL_SYNC_DATA "Waiting for a logical replication remote server to send data for initial table synchronization."
LOGICAL_SYNC_STATE_CHANGE "Waiting for a logical replication remote server to change state."
+MERGE_APPEND_READY "Waiting for subplan nodes of a <literal>MergeAppend</literal> plan node to be ready."
MESSAGE_QUEUE_INTERNAL "Waiting for another process to be attached to a shared message queue."
MESSAGE_QUEUE_PUT_MESSAGE "Waiting to write a protocol message to a shared message queue."
MESSAGE_QUEUE_RECEIVE "Waiting to receive bytes from a shared message queue."
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 0a862693fcd..9848964b024 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -861,6 +861,14 @@
boot_val => 'true',
},
+{ name => 'enable_async_merge_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
+ short_desc => 'Enables the planner\'s use of async merge append plans.',
+ flags => 'GUC_EXPLAIN',
+ variable => 'enable_async_merge_append',
+ boot_val => 'true',
+},
+
+
{ name => 'enable_bitmapscan', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
short_desc => 'Enables the planner\'s use of bitmap-scan plans.',
flags => 'GUC_EXPLAIN',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index cf15597385b..9b8de8581bc 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -413,6 +413,7 @@
# - Planner Method Configuration -
#enable_async_append = on
+#enable_async_merge_append = on
#enable_bitmapscan = on
#enable_gathermerge = on
#enable_hashagg = on
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index dfcf45099ba..2255cc68b21 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -19,5 +19,6 @@
extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
extern void ExecEndMergeAppend(MergeAppendState *node);
extern void ExecReScanMergeAppend(MergeAppendState *node);
+extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq);
#endif /* NODEMERGEAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 72b80a4a975..774f5c6d1dc 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1580,6 +1580,9 @@ typedef struct MergeAppendState
TupleTableSlot **ms_slots; /* array of length ms_nplans */
struct binaryheap *ms_heap; /* binary heap of slot indices */
bool ms_initialized; /* are subplans started? */
+
+ /* Merge-specific async tracking */
+ Bitmapset *ms_asyncremain; /* remaining asynchronous plans */
} MergeAppendState;
/* ----------------
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index f2fd5d31507..798af1fcd5c 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
extern PGDLLIMPORT bool enable_presorted_aggregate;
extern PGDLLIMPORT bool enable_async_append;
+extern PGDLLIMPORT bool enable_async_merge_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 132b56a5864..422ca8b7d1f 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -156,6 +156,7 @@ select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
enable_async_append | on
+ enable_async_merge_append | on
enable_bitmapscan | on
enable_distinct_reordering | on
enable_eager_aggregate | on
@@ -180,7 +181,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(25 rows)
+(26 rows)
-- There are always wait event descriptions for various types. InjectionPoint
-- may be present or absent, depending on history since last postmaster start.
--
2.39.5 (Apple Git-154)
[application/octet-stream] v17-0003-Extract-common-Append-MergeAppend-executor-logic.patch (23.9K, 3-v17-0003-Extract-common-Append-MergeAppend-executor-logic.patch)
download | inline diff:
From efd1d59a83e144126ccf726fa0b157e7a3cfb138 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sun, 5 Apr 2026 03:53:13 +0300
Subject: [PATCH v17 3/5] Extract common Append/MergeAppend executor logic into
execAppend.c
Extract shared non-async executor operations for Append and MergeAppend
nodes into a new execAppend.c file, reducing code duplication.
The extracted functions operate on the common AppendBaseState base type
introduced in the previous commit:
- ExecInitAppendBase(): shared subplan initialization, partition pruning
setup, and result tuple slot creation.
- ExecEndAppendBase(): shut down all subplan nodes.
- ExecReScanAppendBase(): propagate rescan to subplans and reset pruning.
Async subplan detection, setup, and execution remain in nodeAppend.c,
since MergeAppend does not yet support async. The tuple-fetching logic
also remains specific to each node type, preserving their distinct
execution semantics (sequential iteration for Append, binary heap merge
for MergeAppend).
Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru
Author: Matheus Alcantara <[email protected]>
Co-authored-by: Alexander Korotkov <[email protected]>
Reviewed-by: Alexander Pyhalov <[email protected]>
Reviewed-by: Alena Rybakina <[email protected]>
---
src/backend/executor/Makefile | 1 +
src/backend/executor/execAppend.c | 208 ++++++++++++++++++++++++
src/backend/executor/meson.build | 1 +
src/backend/executor/nodeAppend.c | 216 ++++---------------------
src/backend/executor/nodeMergeAppend.c | 151 ++---------------
src/include/executor/execAppend.h | 26 +++
6 files changed, 282 insertions(+), 321 deletions(-)
create mode 100644 src/backend/executor/execAppend.c
create mode 100644 src/include/executor/execAppend.h
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 11118d0ce02..2b12a1eb17e 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
+ execAppend.o \
execAsync.o \
execCurrent.o \
execExpr.o \
diff --git a/src/backend/executor/execAppend.c b/src/backend/executor/execAppend.c
new file mode 100644
index 00000000000..9599d10a952
--- /dev/null
+++ b/src/backend/executor/execAppend.c
@@ -0,0 +1,208 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAppend.c
+ * This code provides support functions for executing MergeAppend and
+ * Append nodes.
+ *
+ * Copyright (c) 2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAppend.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "executor/execAppend.h"
+#include "executor/execPartition.h"
+#include "executor/executor.h"
+
+/* Begin all of the subscans of an AppendBase node. */
+void
+ExecInitAppendBase(AppendBaseState *state,
+ AppendBase *node,
+ EState *estate,
+ int eflags,
+ int first_partial_plan,
+ int *first_valid_partial_plan)
+{
+ PlanState **appendplanstates;
+ const TupleTableSlotOps *appendops;
+ Bitmapset *validsubplans;
+ int nplans;
+ int firstvalid;
+ int i,
+ j;
+
+ /* If run-time partition pruning is enabled, then set that up now */
+ if (node->part_prune_index >= 0)
+ {
+ PartitionPruneState *prunestate;
+
+ /*
+ * Set up pruning data structure. This also initializes the set of
+ * subplans to initialize (validsubplans) by taking into account the
+ * result of performing initial pruning if any.
+ */
+ prunestate = ExecInitPartitionExecPruning(&state->ps,
+ list_length(node->subplans),
+ node->part_prune_index,
+ node->apprelids,
+ &validsubplans);
+ state->prune_state = prunestate;
+ nplans = bms_num_members(validsubplans);
+
+ /*
+ * When no run-time pruning is required and there's at least one
+ * subplan, we can fill valid_subplans immediately, preventing later
+ * calls to ExecFindMatchingSubPlans.
+ */
+ if (!prunestate->do_exec_prune && nplans > 0)
+ {
+ state->valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+ state->valid_subplans_identified = true;
+ }
+ }
+ else
+ {
+ nplans = list_length(node->subplans);
+
+ /*
+ * When run-time partition pruning is not enabled we can just mark all
+ * subplans as valid; they must also all be initialized.
+ */
+ Assert(nplans > 0);
+ state->valid_subplans = validsubplans =
+ bms_add_range(NULL, 0, nplans - 1);
+ state->valid_subplans_identified = true;
+ state->prune_state = NULL;
+ }
+
+ appendplanstates = palloc0_array(PlanState *, nplans);
+
+ /*
+ * call ExecInitNode on each of the valid plans to be executed and save
+ * the results into the appendplanstates array.
+ *
+ * While at it, find out the first valid partial plan.
+ */
+ j = 0;
+ firstvalid = nplans;
+ i = -1;
+ while ((i = bms_next_member(validsubplans, i)) >= 0)
+ {
+ Plan *initNode = (Plan *) list_nth(node->subplans, i);
+
+ /*
+ * Record the lowest appendplans index which is a valid partial plan.
+ */
+ if (i >= first_partial_plan && j < firstvalid)
+ firstvalid = j;
+
+ appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+ }
+
+ if (first_valid_partial_plan)
+ *first_valid_partial_plan = firstvalid;
+
+ state->plans = appendplanstates;
+ state->nplans = nplans;
+
+ /*
+ * Initialize Append's result tuple type and slot. If the child plans all
+ * produce the same fixed slot type, we can use that slot type; otherwise
+ * make a virtual slot. (Note that the result slot itself is used only to
+ * return a null tuple at end of execution; real tuples are returned to
+ * the caller in the children's own result slots. What we are doing here
+ * is allowing the parent plan node to optimize if the Append will return
+ * only one kind of slot.)
+ */
+ appendops = ExecGetCommonSlotOps(appendplanstates, j);
+ if (appendops != NULL)
+ {
+ ExecInitResultTupleSlotTL(&state->ps, appendops);
+ }
+ else
+ {
+ ExecInitResultTupleSlotTL(&state->ps, &TTSOpsVirtual);
+ /* show that the output slot type is not fixed */
+ state->ps.resultopsset = true;
+ state->ps.resultopsfixed = false;
+ }
+
+ /* Initialize async state to safe defaults */
+ state->asyncplans = NULL;
+ state->nasyncplans = 0;
+ state->asyncrequests = NULL;
+ state->asyncresults = NULL;
+ state->needrequest = NULL;
+ state->eventset = NULL;
+ state->valid_asyncplans = NULL;
+
+ /*
+ * Miscellaneous initialization
+ */
+ state->ps.ps_ProjInfo = NULL;
+}
+
+void
+ExecReScanAppendBase(AppendBaseState *node)
+{
+ int i;
+
+ /*
+ * If any PARAM_EXEC Params used in pruning expressions have changed, then
+ * we'd better unset the valid subplans so that they are reselected for
+ * the new parameter values.
+ */
+ if (node->prune_state &&
+ bms_overlap(node->ps.chgParam,
+ node->prune_state->execparamids))
+ {
+ node->valid_subplans_identified = false;
+ bms_free(node->valid_subplans);
+ node->valid_subplans = NULL;
+ bms_free(node->valid_asyncplans);
+ node->valid_asyncplans = NULL;
+ }
+
+ for (i = 0; i < node->nplans; i++)
+ {
+ PlanState *subnode = node->plans[i];
+
+ /*
+ * ExecReScan doesn't know about my subplans, so I have to do
+ * changed-parameter signaling myself.
+ */
+ if (node->ps.chgParam != NULL)
+ UpdateChangedParamSet(subnode, node->ps.chgParam);
+
+ /*
+ * If chgParam of subnode is not null then plan will be re-scanned by
+ * first ExecProcNode.
+ */
+ if (subnode->chgParam == NULL)
+ ExecReScan(subnode);
+ }
+}
+
+/* Shuts down the subplans of an AppendBase node. */
+void
+ExecEndAppendBase(AppendBaseState *node)
+{
+ PlanState **subplans;
+ int nplans;
+ int i;
+
+ /*
+ * get information from the node
+ */
+ subplans = node->plans;
+ nplans = node->nplans;
+
+ /*
+ * shut down each of the subscans
+ */
+ for (i = 0; i < nplans; i++)
+ ExecEndNode(subplans[i]);
+}
diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build
index dc45be0b2ce..c2f261ff22d 100644
--- a/src/backend/executor/meson.build
+++ b/src/backend/executor/meson.build
@@ -2,6 +2,7 @@
backend_sources += files(
'execAmi.c',
+ 'execAppend.c',
'execAsync.c',
'execCurrent.c',
'execExpr.c',
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 272bf52fc2d..f267ffe13fa 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,6 +57,7 @@
#include "postgres.h"
+#include "executor/execAppend.h"
#include "executor/execAsync.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
@@ -111,15 +112,10 @@ AppendState *
ExecInitAppend(Append *node, EState *estate, int eflags)
{
AppendState *appendstate = makeNode(AppendState);
- PlanState **appendplanstates;
- const TupleTableSlotOps *appendops;
- Bitmapset *validsubplans;
Bitmapset *asyncplans;
- int nplans;
int nasyncplans;
- int firstvalid;
- int i,
- j;
+ int nplans;
+ int i;
/* check for unsupported flags */
Assert(!(eflags & EXEC_FLAG_MARK));
@@ -136,124 +132,38 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->as_syncdone = false;
appendstate->as_begun = false;
- /* If run-time partition pruning is enabled, then set that up now */
- if (node->ab.part_prune_index >= 0)
- {
- PartitionPruneState *prunestate;
+ /* Initialize common fields */
+ ExecInitAppendBase(&appendstate->as,
+ &node->ab,
+ estate,
+ eflags,
+ node->first_partial_plan,
+ &appendstate->as_first_partial_plan);
- /*
- * Set up pruning data structure. This also initializes the set of
- * subplans to initialize (validsubplans) by taking into account the
- * result of performing initial pruning if any.
- */
- prunestate = ExecInitPartitionExecPruning(&appendstate->as.ps,
- list_length(node->ab.subplans),
- node->ab.part_prune_index,
- node->ab.apprelids,
- &validsubplans);
- appendstate->as.prune_state = prunestate;
- nplans = bms_num_members(validsubplans);
-
- /*
- * When no run-time pruning is required and there's at least one
- * subplan, we can fill as_valid_subplans immediately, preventing
- * later calls to ExecFindMatchingSubPlans.
- */
- if (!prunestate->do_exec_prune && nplans > 0)
- {
- appendstate->as.valid_subplans = bms_add_range(NULL, 0, nplans - 1);
- appendstate->as.valid_subplans_identified = true;
- }
- }
- else
- {
- nplans = list_length(node->ab.subplans);
-
- /*
- * When run-time partition pruning is not enabled we can just mark all
- * subplans as valid; they must also all be initialized.
- */
- Assert(nplans > 0);
- appendstate->as.valid_subplans = validsubplans =
- bms_add_range(NULL, 0, nplans - 1);
- appendstate->as.valid_subplans_identified = true;
- appendstate->as.prune_state = NULL;
- }
-
- appendplanstates = (PlanState **) palloc(nplans *
- sizeof(PlanState *));
+ nplans = appendstate->as.nplans;
/*
- * call ExecInitNode on each of the valid plans to be executed and save
- * the results into the appendplanstates array.
- *
- * While at it, find out the first valid partial plan.
+ * Detect async-capable subplans. When executing EvalPlanQual, we treat
+ * them as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
*/
- j = 0;
asyncplans = NULL;
nasyncplans = 0;
- firstvalid = nplans;
- i = -1;
- while ((i = bms_next_member(validsubplans, i)) >= 0)
+ for (i = 0; i < nplans; i++)
{
- Plan *initNode = (Plan *) list_nth(node->ab.subplans, i);
-
- /*
- * Record async subplans. When executing EvalPlanQual, we treat them
- * as sync ones; don't do this when initializing an EvalPlanQual plan
- * tree.
- */
- if (initNode->async_capable && estate->es_epq_active == NULL)
+ if (appendstate->as.plans[i]->plan->async_capable &&
+ estate->es_epq_active == NULL)
{
- asyncplans = bms_add_member(asyncplans, j);
+ asyncplans = bms_add_member(asyncplans, i);
nasyncplans++;
}
-
- /*
- * Record the lowest appendplans index which is a valid partial plan.
- */
- if (i >= node->first_partial_plan && j < firstvalid)
- firstvalid = j;
-
- appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
- }
-
- appendstate->as_first_partial_plan = firstvalid;
- appendstate->as.plans = appendplanstates;
- appendstate->as.nplans = nplans;
-
- /*
- * Initialize Append's result tuple type and slot. If the child plans all
- * produce the same fixed slot type, we can use that slot type; otherwise
- * make a virtual slot. (Note that the result slot itself is used only to
- * return a null tuple at end of execution; real tuples are returned to
- * the caller in the children's own result slots. What we are doing here
- * is allowing the parent plan node to optimize if the Append will return
- * only one kind of slot.)
- */
- appendops = ExecGetCommonSlotOps(appendplanstates, j);
- if (appendops != NULL)
- {
- ExecInitResultTupleSlotTL(&appendstate->as.ps, appendops);
- }
- else
- {
- ExecInitResultTupleSlotTL(&appendstate->as.ps, &TTSOpsVirtual);
- /* show that the output slot type is not fixed */
- appendstate->as.ps.resultopsset = true;
- appendstate->as.ps.resultopsfixed = false;
}
/* Initialize async state */
appendstate->as.asyncplans = asyncplans;
appendstate->as.nasyncplans = nasyncplans;
- appendstate->as.asyncrequests = NULL;
- appendstate->as.asyncresults = NULL;
appendstate->as_nasyncresults = 0;
appendstate->as_nasyncremain = 0;
- appendstate->as.needrequest = NULL;
- appendstate->as.eventset = NULL;
- appendstate->as.valid_asyncplans = NULL;
if (nasyncplans > 0)
{
@@ -267,7 +177,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
areq = palloc_object(AsyncRequest);
areq->requestor = (PlanState *) appendstate;
- areq->requestee = appendplanstates[i];
+ areq->requestee = appendstate->as.plans[i];
areq->request_index = i;
areq->callback_pending = false;
areq->request_complete = false;
@@ -283,12 +193,6 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
classify_matching_subplans(appendstate);
}
- /*
- * Miscellaneous initialization
- */
-
- appendstate->as.ps.ps_ProjInfo = NULL;
-
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
@@ -402,67 +306,21 @@ ExecAppend(PlanState *pstate)
void
ExecEndAppend(AppendState *node)
{
- PlanState **appendplans;
- int nplans;
- int i;
-
- /*
- * get information from the node
- */
- appendplans = node->as.plans;
- nplans = node->as.nplans;
-
- /*
- * shut down each of the subscans
- */
- for (i = 0; i < nplans; i++)
- ExecEndNode(appendplans[i]);
+ ExecEndAppendBase(&node->as);
}
void
ExecReScanAppend(AppendState *node)
{
int nasyncplans = node->as.nasyncplans;
- int i;
-
- /*
- * If any PARAM_EXEC Params used in pruning expressions have changed, then
- * we'd better unset the valid subplans so that they are reselected for
- * the new parameter values.
- */
- if (node->as.prune_state &&
- bms_overlap(node->as.ps.chgParam,
- node->as.prune_state->execparamids))
- {
- node->as.valid_subplans_identified = false;
- bms_free(node->as.valid_subplans);
- node->as.valid_subplans = NULL;
- bms_free(node->as.valid_asyncplans);
- node->as.valid_asyncplans = NULL;
- }
-
- for (i = 0; i < node->as.nplans; i++)
- {
- PlanState *subnode = node->as.plans[i];
-
- /*
- * ExecReScan doesn't know about my subplans, so I have to do
- * changed-parameter signaling myself.
- */
- if (node->as.ps.chgParam != NULL)
- UpdateChangedParamSet(subnode, node->as.ps.chgParam);
- /*
- * If chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode or by first ExecAsyncRequest.
- */
- if (subnode->chgParam == NULL)
- ExecReScan(subnode);
- }
+ ExecReScanAppendBase(&node->as);
/* Reset async state */
if (nasyncplans > 0)
{
+ int i;
+
i = -1;
while ((i = bms_next_member(node->as.asyncplans, i)) >= 0)
{
@@ -878,17 +736,6 @@ mark_invalid_subplans_as_finished(AppendState *node)
static void
ExecAppendAsyncBegin(AppendState *node)
{
- int i;
-
- /* Backward scan is not supported by async-aware Appends. */
- Assert(ScanDirectionIsForward(node->as.ps.state->es_direction));
-
- /* We should never be called when there are no subplans */
- Assert(node->as.nplans > 0);
-
- /* We should never be called when there are no async subplans. */
- Assert(node->as.nasyncplans > 0);
-
/* If we've yet to determine the valid subplans then do so now. */
if (!node->as.valid_subplans_identified)
{
@@ -908,16 +755,19 @@ ExecAppendAsyncBegin(AppendState *node)
return;
/* Make a request for each of the valid async subplans. */
- i = -1;
- while ((i = bms_next_member(node->as.valid_asyncplans, i)) >= 0)
{
- AsyncRequest *areq = node->as.asyncrequests[i];
+ int i = -1;
- Assert(areq->request_index == i);
- Assert(!areq->callback_pending);
+ while ((i = bms_next_member(node->as.valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as.asyncrequests[i];
- /* Do the actual work. */
- ExecAsyncRequest(areq);
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
}
}
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index d7d2de08147..6928152f16f 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -38,6 +38,7 @@
#include "postgres.h"
+#include "executor/execAppend.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "executor/nodeMergeAppend.h"
@@ -66,12 +67,7 @@ MergeAppendState *
ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
{
MergeAppendState *mergestate = makeNode(MergeAppendState);
- PlanState **mergeplanstates;
- const TupleTableSlotOps *mergeops;
- Bitmapset *validsubplans;
- int nplans;
- int i,
- j;
+ int i;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -83,94 +79,18 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
mergestate->as.ps.state = estate;
mergestate->as.ps.ExecProcNode = ExecMergeAppend;
- /* If run-time partition pruning is enabled, then set that up now */
- if (node->ab.part_prune_index >= 0)
- {
- PartitionPruneState *prunestate;
-
- /*
- * Set up pruning data structure. This also initializes the set of
- * subplans to initialize (validsubplans) by taking into account the
- * result of performing initial pruning if any.
- */
- prunestate = ExecInitPartitionExecPruning(&mergestate->as.ps,
- list_length(node->ab.subplans),
- node->ab.part_prune_index,
- node->ab.apprelids,
- &validsubplans);
- mergestate->as.prune_state = prunestate;
- nplans = bms_num_members(validsubplans);
-
- /*
- * When no run-time pruning is required and there's at least one
- * subplan, we can fill ms_valid_subplans immediately, preventing
- * later calls to ExecFindMatchingSubPlans.
- */
- if (!prunestate->do_exec_prune && nplans > 0)
- mergestate->as.valid_subplans = bms_add_range(NULL, 0, nplans - 1);
- }
- else
- {
- nplans = list_length(node->ab.subplans);
+ /* Initialize common fields */
+ ExecInitAppendBase(&mergestate->as,
+ &node->ab,
+ estate,
+ eflags,
+ -1,
+ NULL);
- /*
- * When run-time partition pruning is not enabled we can just mark all
- * subplans as valid; they must also all be initialized.
- */
- Assert(nplans > 0);
- mergestate->as.valid_subplans = validsubplans =
- bms_add_range(NULL, 0, nplans - 1);
- mergestate->as.prune_state = NULL;
- }
-
- mergeplanstates = palloc_array(PlanState *, nplans);
- mergestate->as.plans = mergeplanstates;
- mergestate->as.nplans = nplans;
-
- mergestate->ms_slots = palloc0_array(TupleTableSlot *, nplans);
- mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
+ mergestate->ms_slots = palloc0_array(TupleTableSlot *, mergestate->as.nplans);
+ mergestate->ms_heap = binaryheap_allocate(mergestate->as.nplans, heap_compare_slots,
mergestate);
- /*
- * call ExecInitNode on each of the valid plans to be executed and save
- * the results into the mergeplanstates array.
- */
- j = 0;
- i = -1;
- while ((i = bms_next_member(validsubplans, i)) >= 0)
- {
- Plan *initNode = (Plan *) list_nth(node->ab.subplans, i);
-
- mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
- }
-
- /*
- * Initialize MergeAppend's result tuple type and slot. If the child
- * plans all produce the same fixed slot type, we can use that slot type;
- * otherwise make a virtual slot. (Note that the result slot itself is
- * used only to return a null tuple at end of execution; real tuples are
- * returned to the caller in the children's own result slots. What we are
- * doing here is allowing the parent plan node to optimize if the
- * MergeAppend will return only one kind of slot.)
- */
- mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
- if (mergeops != NULL)
- {
- ExecInitResultTupleSlotTL(&mergestate->as.ps, mergeops);
- }
- else
- {
- ExecInitResultTupleSlotTL(&mergestate->as.ps, &TTSOpsVirtual);
- /* show that the output slot type is not fixed */
- mergestate->as.ps.resultopsset = true;
- mergestate->as.ps.resultopsfixed = false;
- }
-
- /*
- * Miscellaneous initialization
- */
- mergestate->as.ps.ps_ProjInfo = NULL;
-
/*
* initialize sort-key information
*/
@@ -335,59 +255,14 @@ heap_compare_slots(Datum a, Datum b, void *arg)
void
ExecEndMergeAppend(MergeAppendState *node)
{
- PlanState **mergeplans;
- int nplans;
- int i;
-
- /*
- * get information from the node
- */
- mergeplans = node->as.plans;
- nplans = node->as.nplans;
-
- /*
- * shut down each of the subscans
- */
- for (i = 0; i < nplans; i++)
- ExecEndNode(mergeplans[i]);
+ ExecEndAppendBase(&node->as);
}
void
ExecReScanMergeAppend(MergeAppendState *node)
{
- int i;
+ ExecReScanAppendBase(&node->as);
- /*
- * If any PARAM_EXEC Params used in pruning expressions have changed, then
- * we'd better unset the valid subplans so that they are reselected for
- * the new parameter values.
- */
- if (node->as.prune_state &&
- bms_overlap(node->as.ps.chgParam,
- node->as.prune_state->execparamids))
- {
- bms_free(node->as.valid_subplans);
- node->as.valid_subplans = NULL;
- }
-
- for (i = 0; i < node->as.nplans; i++)
- {
- PlanState *subnode = node->as.plans[i];
-
- /*
- * ExecReScan doesn't know about my subplans, so I have to do
- * changed-parameter signaling myself.
- */
- if (node->as.ps.chgParam != NULL)
- UpdateChangedParamSet(subnode, node->as.ps.chgParam);
-
- /*
- * If chgParam of subnode is not null then plan will be re-scanned by
- * first ExecProcNode.
- */
- if (subnode->chgParam == NULL)
- ExecReScan(subnode);
- }
binaryheap_reset(node->ms_heap);
node->ms_initialized = false;
}
diff --git a/src/include/executor/execAppend.h b/src/include/executor/execAppend.h
new file mode 100644
index 00000000000..a8f41bad921
--- /dev/null
+++ b/src/include/executor/execAppend.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ * execAppend.h
+ * Support functions for MergeAppend and Append nodes.
+ *
+ * Copyright (c) 2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/executor/execAppend.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECAPPEND_H
+#define EXECAPPEND_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecInitAppendBase(AppendBaseState *state,
+ AppendBase *node,
+ EState *estate,
+ int eflags,
+ int first_partial_plan,
+ int *first_valid_partial_plan);
+extern void ExecEndAppendBase(AppendBaseState *node);
+extern void ExecReScanAppendBase(AppendBaseState *node);
+
+#endif /* EXECAPPEND_H */
--
2.39.5 (Apple Git-154)
[application/octet-stream] v17-0004-Move-async-infrastructure-into-shared-AppendBase.patch (19.2K, 4-v17-0004-Move-async-infrastructure-into-shared-AppendBase.patch)
download | inline diff:
From 468291fa8541bdbb417158150270d1de891ace3b Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sun, 5 Apr 2026 04:42:32 +0300
Subject: [PATCH v17 4/5] Move async infrastructure into shared AppendBase
functions
Move all async-related code from nodeAppend.c into the shared
execAppend.c, preparing MergeAppend to support async foreign scan
subplans.
- ExecInitAppendBase() now detects async-capable subplans and allocates
async request/result state.
- ExecReScanAppendBase() now resets async request state.
- ExecAppendBaseAsyncBegin(): fire async requests (moved from
nodeAppend.c's ExecAppendAsyncBegin).
- ExecAppendBaseAsyncEventWait(): wait/poll for async events (moved
from nodeAppend.c's ExecAppendAsyncEventWait).
- classify_matching_subplans_common(): new helper to split valid
subplans into sync and async sets.
- MergeAppend now uses valid_subplans_identified flag instead of
checking valid_subplans == NULL.
Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru
Author: Matheus Alcantara <[email protected]>
Co-authored-by: Alexander Korotkov <[email protected]>
Reviewed-by: Alexander Pyhalov <[email protected]>
Reviewed-by: Alena Rybakina <[email protected]>
---
src/backend/executor/execAppend.c | 232 ++++++++++++++++++++++++-
src/backend/executor/nodeAppend.c | 208 ++--------------------
src/backend/executor/nodeMergeAppend.c | 5 +-
src/include/executor/execAppend.h | 7 +
4 files changed, 250 insertions(+), 202 deletions(-)
diff --git a/src/backend/executor/execAppend.c b/src/backend/executor/execAppend.c
index 9599d10a952..d6bebabbd32 100644
--- a/src/backend/executor/execAppend.c
+++ b/src/backend/executor/execAppend.c
@@ -14,8 +14,14 @@
#include "postgres.h"
#include "executor/execAppend.h"
+#include "executor/execAsync.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
+#include "miscadmin.h"
+#include "storage/latch.h"
+#include "storage/waiteventset.h"
+
+#define EVENT_BUFFER_SIZE 16
/* Begin all of the subscans of an AppendBase node. */
void
@@ -29,7 +35,9 @@ ExecInitAppendBase(AppendBaseState *state,
PlanState **appendplanstates;
const TupleTableSlotOps *appendops;
Bitmapset *validsubplans;
+ Bitmapset *asyncplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
@@ -87,12 +95,25 @@ ExecInitAppendBase(AppendBaseState *state,
* While at it, find out the first valid partial plan.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->subplans, i);
+ /*
+ * Record async subplans. When executing EvalPlanQual, we treat them
+ * as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
/*
* Record the lowest appendplans index which is a valid partial plan.
*/
@@ -130,15 +151,38 @@ ExecInitAppendBase(AppendBaseState *state,
state->ps.resultopsfixed = false;
}
- /* Initialize async state to safe defaults */
- state->asyncplans = NULL;
- state->nasyncplans = 0;
+ /* Initialize async state */
+ state->asyncplans = asyncplans;
+ state->nasyncplans = nasyncplans;
state->asyncrequests = NULL;
state->asyncresults = NULL;
state->needrequest = NULL;
state->eventset = NULL;
state->valid_asyncplans = NULL;
+ if (nasyncplans > 0)
+ {
+ state->asyncrequests = palloc0_array(AsyncRequest *, nplans);
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc_object(AsyncRequest);
+ areq->requestor = (PlanState *) state;
+ areq->requestee = appendplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ state->asyncrequests[i] = areq;
+ }
+
+ state->asyncresults = palloc0_array(TupleTableSlot *, nasyncplans);
+ }
+
/*
* Miscellaneous initialization
*/
@@ -149,6 +193,7 @@ void
ExecReScanAppendBase(AppendBaseState *node)
{
int i;
+ int nasyncplans = node->nasyncplans;
/*
* If any PARAM_EXEC Params used in pruning expressions have changed, then
@@ -184,6 +229,187 @@ ExecReScanAppendBase(AppendBaseState *node)
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
+
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->needrequest);
+ node->needrequest = NULL;
+ }
+}
+
+/* Wait or poll for file descriptor events and fire callbacks. */
+void
+ExecAppendBaseAsyncEventWait(AppendBaseState *node, int timeout,
+ uint32 wait_event_info)
+{
+ int nevents = node->nasyncplans + 2; /* one for PM death and
+ * one for latch */
+ int noccurred;
+ int i;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+
+ Assert(node->eventset == NULL);
+
+ node->eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
+ AddWaitEventToSet(node->eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /*
+ * No need for further processing if none of the subplans configured any
+ * events.
+ */
+ if (GetNumRegisteredWaitEvents(node->eventset) == 1)
+ {
+ FreeWaitEventSet(node->eventset);
+ node->eventset = NULL;
+ return;
+ }
+
+ /*
+ * Add the process latch to the set, so that we wake up to process the
+ * standard interrupts with CHECK_FOR_INTERRUPTS().
+ *
+ * NOTE: For historical reasons, it's important that this is added to the
+ * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
+ * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
+ * any other events are in the set. That's a poor design, it's
+ * questionable for postgres_fdw to be doing that in the first place, but
+ * we cannot change it now. The pattern has possibly been copied to other
+ * extensions too.
+ */
+ AddWaitEventToSet(node->eventset, WL_LATCH_SET, PGINVALID_SOCKET,
+ MyLatch, NULL);
+
+ /* Return at most EVENT_BUFFER_SIZE events in one call. */
+ if (nevents > EVENT_BUFFER_SIZE)
+ nevents = EVENT_BUFFER_SIZE;
+
+ /*
+ * If the timeout is -1, wait until at least one event occurs. If the
+ * timeout is 0, poll for events, but do not wait at all.
+ */
+ noccurred = WaitEventSetWait(node->eventset, timeout, occurred_event,
+ nevents, wait_event_info);
+ FreeWaitEventSet(node->eventset);
+ node->eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ if (areq->callback_pending)
+ {
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+
+ /* Handle standard interrupts */
+ if ((w->events & WL_LATCH_SET) != 0)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+ }
+}
+
+/* Begin executing async-capable subplans. */
+void
+ExecAppendBaseAsyncBegin(AppendBaseState *node)
+{
+ int i;
+
+ /* Backward scan is not supported by async-aware Appends. */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ /* We should never be called when there are no subplans */
+ Assert(node->nplans > 0);
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->nasyncplans > 0);
+
+ /* Make a request for each of the valid async subplans. */
+ i = -1;
+ while ((i = bms_next_member(node->valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+}
+
+/*
+ * classify_matching_subplans_common
+ * Common part of classify_matching_subplans() for Append and MergeAppend.
+ *
+ * Splits valid_subplans into sync and async sets. Returns false if there
+ * are no valid async subplans, true otherwise.
+ */
+bool
+classify_matching_subplans_common(Bitmapset **valid_subplans,
+ Bitmapset *asyncplans,
+ Bitmapset **valid_asyncplans)
+{
+ Assert(*valid_asyncplans == NULL);
+
+ /* Checked by classify_matching_subplans() */
+ Assert(!bms_is_empty(*valid_subplans));
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(*valid_subplans, asyncplans))
+ return false;
+
+ /* Get valid async subplans. */
+ *valid_asyncplans = bms_intersect(asyncplans,
+ *valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ *valid_subplans = bms_del_members(*valid_subplans,
+ *valid_asyncplans);
+ return true;
}
/* Shuts down the subplans of an AppendBase node. */
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index f267ffe13fa..95ed4d86e20 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -84,7 +84,6 @@ struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
-#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
@@ -112,10 +111,6 @@ AppendState *
ExecInitAppend(Append *node, EState *estate, int eflags)
{
AppendState *appendstate = makeNode(AppendState);
- Bitmapset *asyncplans;
- int nasyncplans;
- int nplans;
- int i;
/* check for unsupported flags */
Assert(!(eflags & EXEC_FLAG_MARK));
@@ -140,59 +135,11 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
node->first_partial_plan,
&appendstate->as_first_partial_plan);
- nplans = appendstate->as.nplans;
+ if (appendstate->as.nasyncplans > 0 && appendstate->as.valid_subplans_identified)
+ classify_matching_subplans(appendstate);
- /*
- * Detect async-capable subplans. When executing EvalPlanQual, we treat
- * them as sync ones; don't do this when initializing an EvalPlanQual plan
- * tree.
- */
- asyncplans = NULL;
- nasyncplans = 0;
- for (i = 0; i < nplans; i++)
- {
- if (appendstate->as.plans[i]->plan->async_capable &&
- estate->es_epq_active == NULL)
- {
- asyncplans = bms_add_member(asyncplans, i);
- nasyncplans++;
- }
- }
-
- /* Initialize async state */
- appendstate->as.asyncplans = asyncplans;
- appendstate->as.nasyncplans = nasyncplans;
- appendstate->as_nasyncresults = 0;
appendstate->as_nasyncremain = 0;
- if (nasyncplans > 0)
- {
- appendstate->as.asyncrequests = (AsyncRequest **)
- palloc0(nplans * sizeof(AsyncRequest *));
-
- i = -1;
- while ((i = bms_next_member(asyncplans, i)) >= 0)
- {
- AsyncRequest *areq;
-
- areq = palloc_object(AsyncRequest);
- areq->requestor = (PlanState *) appendstate;
- areq->requestee = appendstate->as.plans[i];
- areq->request_index = i;
- areq->callback_pending = false;
- areq->request_complete = false;
- areq->result = NULL;
-
- appendstate->as.asyncrequests[i] = areq;
- }
-
- appendstate->as.asyncresults = (TupleTableSlot **)
- palloc0(nasyncplans * sizeof(TupleTableSlot *));
-
- if (appendstate->as.valid_subplans_identified)
- classify_matching_subplans(appendstate);
- }
-
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
@@ -312,29 +259,16 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
+
int nasyncplans = node->as.nasyncplans;
ExecReScanAppendBase(&node->as);
- /* Reset async state */
+ /* Reset Append-specific state */
if (nasyncplans > 0)
{
- int i;
-
- i = -1;
- while ((i = bms_next_member(node->as.asyncplans, i)) >= 0)
- {
- AsyncRequest *areq = node->as.asyncrequests[i];
-
- areq->callback_pending = false;
- areq->request_complete = false;
- areq->result = NULL;
- }
-
node->as_nasyncresults = 0;
node->as_nasyncremain = 0;
- bms_free(node->as.needrequest);
- node->as.needrequest = NULL;
}
/* Let choose_next_subplan_* function handle setting the first subplan */
@@ -754,21 +688,7 @@ ExecAppendAsyncBegin(AppendState *node)
if (node->as_nasyncremain == 0)
return;
- /* Make a request for each of the valid async subplans. */
- {
- int i = -1;
-
- while ((i = bms_next_member(node->as.valid_asyncplans, i)) >= 0)
- {
- AsyncRequest *areq = node->as.asyncrequests[i];
-
- Assert(areq->request_index == i);
- Assert(!areq->callback_pending);
-
- /* Do the actual work. */
- ExecAsyncRequest(areq);
- }
- }
+ ExecAppendBaseAsyncBegin(&node->as);
}
/* ----------------------------------------------------------------
@@ -883,105 +803,12 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
static void
ExecAppendAsyncEventWait(AppendState *node)
{
- int nevents = node->as.nasyncplans + 2;
long timeout = node->as_syncdone ? -1 : 0;
- WaitEvent occurred_event[EVENT_BUFFER_SIZE];
- int noccurred;
- int i;
/* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0);
- Assert(node->as.eventset == NULL);
- node->as.eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
- AddWaitEventToSet(node->as.eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
-
- /* Give each waiting subplan a chance to add an event. */
- i = -1;
- while ((i = bms_next_member(node->as.asyncplans, i)) >= 0)
- {
- AsyncRequest *areq = node->as.asyncrequests[i];
-
- if (areq->callback_pending)
- ExecAsyncConfigureWait(areq);
- }
-
- /*
- * No need for further processing if none of the subplans configured any
- * events.
- */
- if (GetNumRegisteredWaitEvents(node->as.eventset) == 1)
- {
- FreeWaitEventSet(node->as.eventset);
- node->as.eventset = NULL;
- return;
- }
-
- /*
- * Add the process latch to the set, so that we wake up to process the
- * standard interrupts with CHECK_FOR_INTERRUPTS().
- *
- * NOTE: For historical reasons, it's important that this is added to the
- * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
- * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
- * any other events are in the set. That's a poor design, it's
- * questionable for postgres_fdw to be doing that in the first place, but
- * we cannot change it now. The pattern has possibly been copied to other
- * extensions too.
- */
- AddWaitEventToSet(node->as.eventset, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
-
- /* Return at most EVENT_BUFFER_SIZE events in one call. */
- if (nevents > EVENT_BUFFER_SIZE)
- nevents = EVENT_BUFFER_SIZE;
-
- /*
- * If the timeout is -1, wait until at least one event occurs. If the
- * timeout is 0, poll for events, but do not wait at all.
- */
- noccurred = WaitEventSetWait(node->as.eventset, timeout, occurred_event,
- nevents, WAIT_EVENT_APPEND_READY);
- FreeWaitEventSet(node->as.eventset);
- node->as.eventset = NULL;
- if (noccurred == 0)
- return;
-
- /* Deliver notifications. */
- for (i = 0; i < noccurred; i++)
- {
- WaitEvent *w = &occurred_event[i];
-
- /*
- * Each waiting subplan should have registered its wait event with
- * user_data pointing back to its AsyncRequest.
- */
- if ((w->events & WL_SOCKET_READABLE) != 0)
- {
- AsyncRequest *areq = (AsyncRequest *) w->user_data;
-
- if (areq->callback_pending)
- {
- /*
- * Mark it as no longer needing a callback. We must do this
- * before dispatching the callback in case the callback resets
- * the flag.
- */
- areq->callback_pending = false;
-
- /* Do the actual work. */
- ExecAsyncNotify(areq);
- }
- }
-
- /* Handle standard interrupts */
- if ((w->events & WL_LATCH_SET) != 0)
- {
- ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
- }
- }
+ ExecAppendBaseAsyncEventWait(&node->as, timeout, WAIT_EVENT_APPEND_READY);
}
/* ----------------------------------------------------------------
@@ -1039,10 +866,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
static void
classify_matching_subplans(AppendState *node)
{
- Bitmapset *valid_asyncplans;
-
Assert(node->as.valid_subplans_identified);
- Assert(node->as.valid_asyncplans == NULL);
/* Nothing to do if there are no valid subplans. */
if (bms_is_empty(node->as.valid_subplans))
@@ -1052,21 +876,9 @@ classify_matching_subplans(AppendState *node)
return;
}
- /* Nothing to do if there are no valid async subplans. */
- if (!bms_overlap(node->as.valid_subplans, node->as.asyncplans))
- {
+ /* No valid async subplans identified. */
+ if (!classify_matching_subplans_common(&node->as.valid_subplans,
+ node->as.asyncplans,
+ &node->as.valid_asyncplans))
node->as_nasyncremain = 0;
- return;
- }
-
- /* Get valid async subplans. */
- valid_asyncplans = bms_intersect(node->as.asyncplans,
- node->as.valid_subplans);
-
- /* Adjust the valid subplans to contain sync subplans only. */
- node->as.valid_subplans = bms_del_members(node->as.valid_subplans,
- valid_asyncplans);
-
- /* Save valid async subplans. */
- node->as.valid_asyncplans = valid_asyncplans;
}
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 6928152f16f..591be1018d8 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -152,9 +152,12 @@ ExecMergeAppend(PlanState *pstate)
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
- if (node->as.valid_subplans == NULL)
+ if (!node->as.valid_subplans_identified)
+ {
node->as.valid_subplans =
ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+ node->as.valid_subplans_identified = true;
+ }
/*
* First time through: pull the first tuple from each valid subplan,
diff --git a/src/include/executor/execAppend.h b/src/include/executor/execAppend.h
index a8f41bad921..7f53ad89213 100644
--- a/src/include/executor/execAppend.h
+++ b/src/include/executor/execAppend.h
@@ -22,5 +22,12 @@ extern void ExecInitAppendBase(AppendBaseState *state,
int *first_valid_partial_plan);
extern void ExecEndAppendBase(AppendBaseState *node);
extern void ExecReScanAppendBase(AppendBaseState *node);
+extern void ExecAppendBaseAsyncBegin(AppendBaseState *node);
+extern void ExecAppendBaseAsyncEventWait(AppendBaseState *node,
+ int timeout,
+ uint32 wait_event_info);
+extern bool classify_matching_subplans_common(Bitmapset **valid_subplans,
+ Bitmapset *asyncplans,
+ Bitmapset **valid_asyncplans);
#endif /* EXECAPPEND_H */
--
2.39.5 (Apple Git-154)
[application/octet-stream] v17-0001-mark_async_capable-subpath-should-match-subplan.patch (3.0K, 5-v17-0001-mark_async_capable-subpath-should-match-subplan.patch)
download | inline diff:
From ce0e032a14e8ff37791ddb86033bf77f18324aca Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sun, 5 Apr 2026 03:58:23 +0300
Subject: [PATCH v17 1/5] mark_async_capable(): subpath should match subplan
mark_async_capable() believes that the path corresponds to the plan. This is
not true when creating_[merge_]append_plan() inserts (Incremental)Sort node.
In this case, mark_async_capable() can treat the Sort plan node as some other
node and crash. Fix this by explicitly handling the (Incremental)Sort nodes
as not async-capable. Also, move this check on top of the switch() as it
repeats in all the cases.
This is needed to make the MergeAppend node async-capable, which will be
implemented in the subsequent commits.
Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru
Author: Alexander Pyhalov <[email protected]>
Reviewed-by: Matheus Alcantara <[email protected]>
Reviewed-by: Alena Rybakina <[email protected]>
---
src/backend/optimizer/plan/createplan.c | 31 +++++++------------------
1 file changed, 9 insertions(+), 22 deletions(-)
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 50b0e10308b..3266615a796 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1131,19 +1131,21 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
static bool
mark_async_capable_plan(Plan *plan, Path *path)
{
+ /*
+ * If the generated plan node includes a gating Result node,
+ * a Sort node, or an IncrementalSort node, we can't execute
+ * it asynchronously.
+ */
+ if (IsA(plan, Result) || IsA(plan, Sort) ||
+ IsA(plan, IncrementalSort))
+ return false;
+
switch (nodeTag(path))
{
case T_SubqueryScanPath:
{
SubqueryScan *scan_plan = (SubqueryScan *) plan;
- /*
- * If the generated plan node includes a gating Result node,
- * we can't execute it asynchronously.
- */
- if (IsA(plan, Result))
- return false;
-
/*
* If a SubqueryScan node atop of an async-capable plan node
* is deletable, consider it as async-capable.
@@ -1158,13 +1160,6 @@ mark_async_capable_plan(Plan *plan, Path *path)
{
FdwRoutine *fdwroutine = path->parent->fdwroutine;
- /*
- * If the generated plan node includes a gating Result node,
- * we can't execute it asynchronously.
- */
- if (IsA(plan, Result))
- return false;
-
Assert(fdwroutine != NULL);
if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
@@ -1172,14 +1167,6 @@ mark_async_capable_plan(Plan *plan, Path *path)
return false;
}
case T_ProjectionPath:
-
- /*
- * If the generated plan node includes a Result node for the
- * projection, we can't execute it asynchronously.
- */
- if (IsA(plan, Result))
- return false;
-
/*
* create_projection_plan() would have pulled up the subplan, so
* check the capability using the subpath.
--
2.39.5 (Apple Git-154)
[application/octet-stream] v17-0002-Introduce-AppendBase-AppendBaseState-base-types-.patch (64.9K, 6-v17-0002-Introduce-AppendBase-AppendBaseState-base-types-.patch)
download | inline diff:
From e34e5abb9ab8a3889d87a4bbe3225b7954b802d4 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sun, 5 Apr 2026 04:00:15 +0300
Subject: [PATCH v17 2/5] Introduce AppendBase/AppendBaseState base types for
Append and MergeAppend
Introduce common base types AppendBase (plan node) and AppendBaseState
(executor state) to unify the fields shared between Append/MergeAppend and
AppendState/MergeAppendState.
AppendBase holds the subplan list, appendrel identifiers, and partition
pruning index. AppendBaseState holds the subplan state array, asynchronous
execution infrastructure, and partition pruning state.
Append and MergeAppend now embed AppendBase as their first field (ab),
while AppendState and MergeAppendState both embed AppendBaseState as
their first field (as). This follows the same C struct inheritance
pattern used by Scan/ScanState and Join/JoinState throughout the
codebase. The name "AppendBase" was chosen because just "Append" is already
taken.
Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru
Author: Matheus Alcantara <[email protected]>
Co-authored-by: Alexander Korotkov <[email protected]>
Reviewed-by: Alexander Pyhalov <[email protected]>
Reviewed-by: Alena Rybakina <[email protected]>
---
contrib/pg_overexplain/pg_overexplain.c | 8 +-
contrib/pg_plan_advice/pgpa_scan.c | 4 +-
contrib/pg_plan_advice/pgpa_walker.c | 8 +-
contrib/postgres_fdw/postgres_fdw.c | 12 +-
src/backend/commands/explain.c | 26 +--
src/backend/executor/execAmi.c | 2 +-
src/backend/executor/execCurrent.c | 4 +-
src/backend/executor/execProcnode.c | 8 +-
src/backend/executor/nodeAppend.c | 284 ++++++++++++------------
src/backend/executor/nodeMergeAppend.c | 82 +++----
src/backend/nodes/nodeFuncs.c | 8 +-
src/backend/optimizer/plan/createplan.c | 46 ++--
src/backend/optimizer/plan/setrefs.c | 48 ++--
src/backend/optimizer/plan/subselect.c | 4 +-
src/backend/utils/adt/ruleutils.c | 8 +-
src/include/nodes/execnodes.h | 63 ++++--
src/include/nodes/plannodes.h | 66 +++---
src/tools/pgindent/typedefs.list | 2 +
18 files changed, 350 insertions(+), 333 deletions(-)
diff --git a/contrib/pg_overexplain/pg_overexplain.c b/contrib/pg_overexplain/pg_overexplain.c
index b4e90909289..267f01927a2 100644
--- a/contrib/pg_overexplain/pg_overexplain.c
+++ b/contrib/pg_overexplain/pg_overexplain.c
@@ -232,18 +232,18 @@ overexplain_per_node_hook(PlanState *planstate, List *ancestors,
break;
case T_Append:
overexplain_bitmapset("Append RTIs",
- ((Append *) plan)->apprelids,
+ ((Append *) plan)->ab.apprelids,
es);
overexplain_bitmapset_list("Child Append RTIs",
- ((Append *) plan)->child_append_relid_sets,
+ ((Append *) plan)->ab.child_append_relid_sets,
es);
break;
case T_MergeAppend:
overexplain_bitmapset("Append RTIs",
- ((MergeAppend *) plan)->apprelids,
+ ((MergeAppend *) plan)->ab.apprelids,
es);
overexplain_bitmapset_list("Child Append RTIs",
- ((MergeAppend *) plan)->child_append_relid_sets,
+ ((MergeAppend *) plan)->ab.child_append_relid_sets,
es);
break;
case T_Result:
diff --git a/contrib/pg_plan_advice/pgpa_scan.c b/contrib/pg_plan_advice/pgpa_scan.c
index 0467f9b12ba..768ab35e6b3 100644
--- a/contrib/pg_plan_advice/pgpa_scan.c
+++ b/contrib/pg_plan_advice/pgpa_scan.c
@@ -149,7 +149,7 @@ pgpa_build_scan(pgpa_plan_walker_context *walker, Plan *plan,
/* Be sure to account for pulled-up scans. */
child_append_relid_sets =
- ((Append *) plan)->child_append_relid_sets;
+ ((Append *) plan)->ab.child_append_relid_sets;
break;
case T_MergeAppend:
/* Same logic here as for Append, above. */
@@ -161,7 +161,7 @@ pgpa_build_scan(pgpa_plan_walker_context *walker, Plan *plan,
/* Be sure to account for pulled-up scans. */
child_append_relid_sets =
- ((MergeAppend *) plan)->child_append_relid_sets;
+ ((MergeAppend *) plan)->ab.child_append_relid_sets;
break;
default:
strategy = PGPA_SCAN_ORDINARY;
diff --git a/contrib/pg_plan_advice/pgpa_walker.c b/contrib/pg_plan_advice/pgpa_walker.c
index e32684d2075..f2a34a1cf76 100644
--- a/contrib/pg_plan_advice/pgpa_walker.c
+++ b/contrib/pg_plan_advice/pgpa_walker.c
@@ -440,14 +440,14 @@ pgpa_walk_recursively(pgpa_plan_walker_context *walker, Plan *plan,
{
Append *aplan = (Append *) plan;
- extraplans = aplan->appendplans;
+ extraplans = aplan->ab.subplans;
}
break;
case T_MergeAppend:
{
MergeAppend *maplan = (MergeAppend *) plan;
- extraplans = maplan->mergeplans;
+ extraplans = maplan->ab.subplans;
}
break;
case T_BitmapAnd:
@@ -570,9 +570,9 @@ pgpa_relids(Plan *plan)
else if (IsA(plan, ForeignScan))
return ((ForeignScan *) plan)->fs_relids;
else if (IsA(plan, Append))
- return ((Append *) plan)->apprelids;
+ return ((Append *) plan)->ab.apprelids;
else if (IsA(plan, MergeAppend))
- return ((MergeAppend *) plan)->apprelids;
+ return ((MergeAppend *) plan)->ab.apprelids;
return NULL;
}
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 41e47cc795b..efc70a49a86 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2413,8 +2413,8 @@ find_modifytable_subplan(PlannerInfo *root,
{
Append *appendplan = (Append *) subplan;
- if (subplan_index < list_length(appendplan->appendplans))
- subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
+ if (subplan_index < list_length(appendplan->ab.subplans))
+ subplan = (Plan *) list_nth(appendplan->ab.subplans, subplan_index);
}
else if (IsA(subplan, Result) &&
outerPlan(subplan) != NULL &&
@@ -2422,8 +2422,8 @@ find_modifytable_subplan(PlannerInfo *root,
{
Append *appendplan = (Append *) outerPlan(subplan);
- if (subplan_index < list_length(appendplan->appendplans))
- subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
+ if (subplan_index < list_length(appendplan->ab.subplans))
+ subplan = (Plan *) list_nth(appendplan->ab.subplans, subplan_index);
}
/* Now, have we got a ForeignScan on the desired rel? */
@@ -7215,7 +7215,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
AppendState *requestor = (AppendState *) areq->requestor;
- WaitEventSet *set = requestor->as_eventset;
+ WaitEventSet *set = requestor->as.eventset;
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
@@ -7257,7 +7257,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
* below, because we might otherwise end up with no configured events
* other than the postmaster death event.
*/
- if (!bms_is_empty(requestor->as_needrequest))
+ if (!bms_is_empty(requestor->as.needrequest))
return;
if (GetNumRegisteredWaitEvents(set) > 1)
return;
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index e4b70166b0e..70a35d3ce34 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1228,11 +1228,11 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used)
break;
case T_Append:
*rels_used = bms_add_members(*rels_used,
- ((Append *) plan)->apprelids);
+ ((Append *) plan)->ab.apprelids);
break;
case T_MergeAppend:
*rels_used = bms_add_members(*rels_used,
- ((MergeAppend *) plan)->apprelids);
+ ((MergeAppend *) plan)->ab.apprelids);
break;
case T_Result:
*rels_used = bms_add_members(*rels_used,
@@ -1276,7 +1276,7 @@ plan_is_disabled(Plan *plan)
* includes any run-time pruned children. Ignoring those could give
* us the incorrect number of disabled nodes.
*/
- foreach(lc, aplan->appendplans)
+ foreach(lc, aplan->ab.subplans)
{
Plan *subplan = lfirst(lc);
@@ -1293,7 +1293,7 @@ plan_is_disabled(Plan *plan)
* includes any run-time pruned children. Ignoring those could give
* us the incorrect number of disabled nodes.
*/
- foreach(lc, maplan->mergeplans)
+ foreach(lc, maplan->ab.subplans)
{
Plan *subplan = lfirst(lc);
@@ -2340,13 +2340,13 @@ ExplainNode(PlanState *planstate, List *ancestors,
switch (nodeTag(plan))
{
case T_Append:
- ExplainMissingMembers(((AppendState *) planstate)->as_nplans,
- list_length(((Append *) plan)->appendplans),
+ ExplainMissingMembers(((AppendState *) planstate)->as.nplans,
+ list_length(((Append *) plan)->ab.subplans),
es);
break;
case T_MergeAppend:
- ExplainMissingMembers(((MergeAppendState *) planstate)->ms_nplans,
- list_length(((MergeAppend *) plan)->mergeplans),
+ ExplainMissingMembers(((MergeAppendState *) planstate)->as.nplans,
+ list_length(((MergeAppend *) plan)->ab.subplans),
es);
break;
default:
@@ -2390,13 +2390,13 @@ ExplainNode(PlanState *planstate, List *ancestors,
switch (nodeTag(plan))
{
case T_Append:
- ExplainMemberNodes(((AppendState *) planstate)->appendplans,
- ((AppendState *) planstate)->as_nplans,
+ ExplainMemberNodes(((AppendState *) planstate)->as.plans,
+ ((AppendState *) planstate)->as.nplans,
ancestors, es);
break;
case T_MergeAppend:
- ExplainMemberNodes(((MergeAppendState *) planstate)->mergeplans,
- ((MergeAppendState *) planstate)->ms_nplans,
+ ExplainMemberNodes(((MergeAppendState *) planstate)->as.plans,
+ ((MergeAppendState *) planstate)->as.nplans,
ancestors, es);
break;
case T_BitmapAnd:
@@ -2610,7 +2610,7 @@ static void
show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
ExplainState *es)
{
- MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
+ MergeAppend *plan = (MergeAppend *) mstate->as.ps.plan;
show_sort_group_keys((PlanState *) mstate, "Sort Key",
plan->numCols, 0, plan->sortColIdx,
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 37fe03fdc37..2d8e621208f 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -538,7 +538,7 @@ ExecSupportsBackwardScan(Plan *node)
if (((Append *) node)->nasyncplans > 0)
return false;
- foreach(l, ((Append *) node)->appendplans)
+ foreach(l, ((Append *) node)->ab.subplans)
{
if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
return false;
diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c
index 99f2b2d0c6f..37f5c7fd2c5 100644
--- a/src/backend/executor/execCurrent.c
+++ b/src/backend/executor/execCurrent.c
@@ -375,9 +375,9 @@ search_plan_tree(PlanState *node, Oid table_oid,
AppendState *astate = (AppendState *) node;
int i;
- for (i = 0; i < astate->as_nplans; i++)
+ for (i = 0; i < astate->as.nplans; i++)
{
- ScanState *elem = search_plan_tree(astate->appendplans[i],
+ ScanState *elem = search_plan_tree(astate->as.plans[i],
table_oid,
pending_rescan);
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index d35976925ae..99e8e8b6f8b 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -911,8 +911,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
AppendState *aState = (AppendState *) child_node;
int i;
- for (i = 0; i < aState->as_nplans; i++)
- ExecSetTupleBound(tuples_needed, aState->appendplans[i]);
+ for (i = 0; i < aState->as.nplans; i++)
+ ExecSetTupleBound(tuples_needed, aState->as.plans[i]);
}
else if (IsA(child_node, MergeAppendState))
{
@@ -924,8 +924,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
MergeAppendState *maState = (MergeAppendState *) child_node;
int i;
- for (i = 0; i < maState->ms_nplans; i++)
- ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+ for (i = 0; i < maState->as.nplans; i++)
+ ExecSetTupleBound(tuples_needed, maState->as.plans[i]);
}
else if (IsA(child_node, ResultState))
{
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 85c85569b5e..272bf52fc2d 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -127,9 +127,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/*
* create new AppendState for our append node
*/
- appendstate->ps.plan = (Plan *) node;
- appendstate->ps.state = estate;
- appendstate->ps.ExecProcNode = ExecAppend;
+ appendstate->as.ps.plan = (Plan *) node;
+ appendstate->as.ps.state = estate;
+ appendstate->as.ps.ExecProcNode = ExecAppend;
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
@@ -137,7 +137,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->as_begun = false;
/* If run-time partition pruning is enabled, then set that up now */
- if (node->part_prune_index >= 0)
+ if (node->ab.part_prune_index >= 0)
{
PartitionPruneState *prunestate;
@@ -146,12 +146,12 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* subplans to initialize (validsubplans) by taking into account the
* result of performing initial pruning if any.
*/
- prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
- list_length(node->appendplans),
- node->part_prune_index,
- node->apprelids,
+ prunestate = ExecInitPartitionExecPruning(&appendstate->as.ps,
+ list_length(node->ab.subplans),
+ node->ab.part_prune_index,
+ node->ab.apprelids,
&validsubplans);
- appendstate->as_prune_state = prunestate;
+ appendstate->as.prune_state = prunestate;
nplans = bms_num_members(validsubplans);
/*
@@ -161,23 +161,23 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
*/
if (!prunestate->do_exec_prune && nplans > 0)
{
- appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
- appendstate->as_valid_subplans_identified = true;
+ appendstate->as.valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+ appendstate->as.valid_subplans_identified = true;
}
}
else
{
- nplans = list_length(node->appendplans);
+ nplans = list_length(node->ab.subplans);
/*
* When run-time partition pruning is not enabled we can just mark all
* subplans as valid; they must also all be initialized.
*/
Assert(nplans > 0);
- appendstate->as_valid_subplans = validsubplans =
+ appendstate->as.valid_subplans = validsubplans =
bms_add_range(NULL, 0, nplans - 1);
- appendstate->as_valid_subplans_identified = true;
- appendstate->as_prune_state = NULL;
+ appendstate->as.valid_subplans_identified = true;
+ appendstate->as.prune_state = NULL;
}
appendplanstates = (PlanState **) palloc(nplans *
@@ -196,7 +196,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
- Plan *initNode = (Plan *) list_nth(node->appendplans, i);
+ Plan *initNode = (Plan *) list_nth(node->ab.subplans, i);
/*
* Record async subplans. When executing EvalPlanQual, we treat them
@@ -219,8 +219,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
}
appendstate->as_first_partial_plan = firstvalid;
- appendstate->appendplans = appendplanstates;
- appendstate->as_nplans = nplans;
+ appendstate->as.plans = appendplanstates;
+ appendstate->as.nplans = nplans;
/*
* Initialize Append's result tuple type and slot. If the child plans all
@@ -234,30 +234,30 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendops = ExecGetCommonSlotOps(appendplanstates, j);
if (appendops != NULL)
{
- ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
+ ExecInitResultTupleSlotTL(&appendstate->as.ps, appendops);
}
else
{
- ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
+ ExecInitResultTupleSlotTL(&appendstate->as.ps, &TTSOpsVirtual);
/* show that the output slot type is not fixed */
- appendstate->ps.resultopsset = true;
- appendstate->ps.resultopsfixed = false;
+ appendstate->as.ps.resultopsset = true;
+ appendstate->as.ps.resultopsfixed = false;
}
/* Initialize async state */
- appendstate->as_asyncplans = asyncplans;
- appendstate->as_nasyncplans = nasyncplans;
- appendstate->as_asyncrequests = NULL;
- appendstate->as_asyncresults = NULL;
+ appendstate->as.asyncplans = asyncplans;
+ appendstate->as.nasyncplans = nasyncplans;
+ appendstate->as.asyncrequests = NULL;
+ appendstate->as.asyncresults = NULL;
appendstate->as_nasyncresults = 0;
appendstate->as_nasyncremain = 0;
- appendstate->as_needrequest = NULL;
- appendstate->as_eventset = NULL;
- appendstate->as_valid_asyncplans = NULL;
+ appendstate->as.needrequest = NULL;
+ appendstate->as.eventset = NULL;
+ appendstate->as.valid_asyncplans = NULL;
if (nasyncplans > 0)
{
- appendstate->as_asyncrequests = (AsyncRequest **)
+ appendstate->as.asyncrequests = (AsyncRequest **)
palloc0(nplans * sizeof(AsyncRequest *));
i = -1;
@@ -273,13 +273,13 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
areq->request_complete = false;
areq->result = NULL;
- appendstate->as_asyncrequests[i] = areq;
+ appendstate->as.asyncrequests[i] = areq;
}
- appendstate->as_asyncresults = (TupleTableSlot **)
+ appendstate->as.asyncresults = (TupleTableSlot **)
palloc0(nasyncplans * sizeof(TupleTableSlot *));
- if (appendstate->as_valid_subplans_identified)
+ if (appendstate->as.valid_subplans_identified)
classify_matching_subplans(appendstate);
}
@@ -287,7 +287,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* Miscellaneous initialization
*/
- appendstate->ps.ps_ProjInfo = NULL;
+ appendstate->as.ps.ps_ProjInfo = NULL;
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
@@ -317,11 +317,11 @@ ExecAppend(PlanState *pstate)
Assert(!node->as_syncdone);
/* Nothing to do if there are no subplans */
- if (node->as_nplans == 0)
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ if (node->as.nplans == 0)
+ return ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
/* If there are any async subplans, begin executing them. */
- if (node->as_nasyncplans > 0)
+ if (node->as.nasyncplans > 0)
ExecAppendAsyncBegin(node);
/*
@@ -329,11 +329,11 @@ ExecAppend(PlanState *pstate)
* proceeding.
*/
if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
Assert(node->as_syncdone ||
(node->as_whichplan >= 0 &&
- node->as_whichplan < node->as_nplans));
+ node->as_whichplan < node->as.nplans));
/* And we're initialized. */
node->as_begun = true;
@@ -348,19 +348,19 @@ ExecAppend(PlanState *pstate)
/*
* try to get a tuple from an async subplan if any
*/
- if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
+ if (node->as_syncdone || !bms_is_empty(node->as.needrequest))
{
if (ExecAppendAsyncGetNext(node, &result))
return result;
Assert(!node->as_syncdone);
- Assert(bms_is_empty(node->as_needrequest));
+ Assert(bms_is_empty(node->as.needrequest));
}
/*
* figure out which sync subplan we are currently processing
*/
- Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
- subnode = node->appendplans[node->as_whichplan];
+ Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as.nplans);
+ subnode = node->as.plans[node->as_whichplan];
/*
* get a tuple from the subplan
@@ -387,7 +387,7 @@ ExecAppend(PlanState *pstate)
/* choose new sync subplan; if no sync/async subplans, we're done */
if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
}
}
@@ -409,8 +409,8 @@ ExecEndAppend(AppendState *node)
/*
* get information from the node
*/
- appendplans = node->appendplans;
- nplans = node->as_nplans;
+ appendplans = node->as.plans;
+ nplans = node->as.nplans;
/*
* shut down each of the subscans
@@ -422,7 +422,7 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
- int nasyncplans = node->as_nasyncplans;
+ int nasyncplans = node->as.nasyncplans;
int i;
/*
@@ -430,27 +430,27 @@ ExecReScanAppend(AppendState *node)
* we'd better unset the valid subplans so that they are reselected for
* the new parameter values.
*/
- if (node->as_prune_state &&
- bms_overlap(node->ps.chgParam,
- node->as_prune_state->execparamids))
+ if (node->as.prune_state &&
+ bms_overlap(node->as.ps.chgParam,
+ node->as.prune_state->execparamids))
{
- node->as_valid_subplans_identified = false;
- bms_free(node->as_valid_subplans);
- node->as_valid_subplans = NULL;
- bms_free(node->as_valid_asyncplans);
- node->as_valid_asyncplans = NULL;
+ node->as.valid_subplans_identified = false;
+ bms_free(node->as.valid_subplans);
+ node->as.valid_subplans = NULL;
+ bms_free(node->as.valid_asyncplans);
+ node->as.valid_asyncplans = NULL;
}
- for (i = 0; i < node->as_nplans; i++)
+ for (i = 0; i < node->as.nplans; i++)
{
- PlanState *subnode = node->appendplans[i];
+ PlanState *subnode = node->as.plans[i];
/*
* ExecReScan doesn't know about my subplans, so I have to do
* changed-parameter signaling myself.
*/
- if (node->ps.chgParam != NULL)
- UpdateChangedParamSet(subnode, node->ps.chgParam);
+ if (node->as.ps.chgParam != NULL)
+ UpdateChangedParamSet(subnode, node->as.ps.chgParam);
/*
* If chgParam of subnode is not null then plan will be re-scanned by
@@ -464,9 +464,9 @@ ExecReScanAppend(AppendState *node)
if (nasyncplans > 0)
{
i = -1;
- while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ while ((i = bms_next_member(node->as.asyncplans, i)) >= 0)
{
- AsyncRequest *areq = node->as_asyncrequests[i];
+ AsyncRequest *areq = node->as.asyncrequests[i];
areq->callback_pending = false;
areq->request_complete = false;
@@ -475,8 +475,8 @@ ExecReScanAppend(AppendState *node)
node->as_nasyncresults = 0;
node->as_nasyncremain = 0;
- bms_free(node->as_needrequest);
- node->as_needrequest = NULL;
+ bms_free(node->as.needrequest);
+ node->as.needrequest = NULL;
}
/* Let choose_next_subplan_* function handle setting the first subplan */
@@ -503,7 +503,7 @@ ExecAppendEstimate(AppendState *node,
{
node->pstate_len =
add_size(offsetof(ParallelAppendState, pa_finished),
- sizeof(bool) * node->as_nplans);
+ sizeof(bool) * node->as.nplans);
shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
@@ -525,7 +525,7 @@ ExecAppendInitializeDSM(AppendState *node,
pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
memset(pstate, 0, node->pstate_len);
LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
- shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
+ shm_toc_insert(pcxt->toc, node->as.ps.plan->plan_node_id, pstate);
node->as_pstate = pstate;
node->choose_next_subplan = choose_next_subplan_for_leader;
@@ -543,7 +543,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
ParallelAppendState *pstate = node->as_pstate;
pstate->pa_next_plan = 0;
- memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
+ memset(pstate->pa_finished, 0, sizeof(bool) * node->as.nplans);
}
/* ----------------------------------------------------------------
@@ -556,7 +556,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
void
ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
{
- node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
+ node->as_pstate = shm_toc_lookup(pwcxt->toc, node->as.ps.plan->plan_node_id, false);
node->choose_next_subplan = choose_next_subplan_for_worker;
}
@@ -574,7 +574,7 @@ choose_next_subplan_locally(AppendState *node)
int nextplan;
/* We should never be called when there are no subplans */
- Assert(node->as_nplans > 0);
+ Assert(node->as.nplans > 0);
/* Nothing to do if syncdone */
if (node->as_syncdone)
@@ -589,33 +589,33 @@ choose_next_subplan_locally(AppendState *node)
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
- if (node->as_nasyncplans > 0)
+ if (node->as.nasyncplans > 0)
{
/* We'd have filled as_valid_subplans already */
- Assert(node->as_valid_subplans_identified);
+ Assert(node->as.valid_subplans_identified);
}
- else if (!node->as_valid_subplans_identified)
+ else if (!node->as.valid_subplans_identified)
{
- node->as_valid_subplans =
- ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
- node->as_valid_subplans_identified = true;
+ node->as.valid_subplans =
+ ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+ node->as.valid_subplans_identified = true;
}
whichplan = -1;
}
/* Ensure whichplan is within the expected range */
- Assert(whichplan >= -1 && whichplan <= node->as_nplans);
+ Assert(whichplan >= -1 && whichplan <= node->as.nplans);
- if (ScanDirectionIsForward(node->ps.state->es_direction))
- nextplan = bms_next_member(node->as_valid_subplans, whichplan);
+ if (ScanDirectionIsForward(node->as.ps.state->es_direction))
+ nextplan = bms_next_member(node->as.valid_subplans, whichplan);
else
- nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
+ nextplan = bms_prev_member(node->as.valid_subplans, whichplan);
if (nextplan < 0)
{
/* Set as_syncdone if in async mode */
- if (node->as_nasyncplans > 0)
+ if (node->as.nasyncplans > 0)
node->as_syncdone = true;
return false;
}
@@ -639,10 +639,10 @@ choose_next_subplan_for_leader(AppendState *node)
ParallelAppendState *pstate = node->as_pstate;
/* Backward scan is not supported by parallel-aware plans */
- Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+ Assert(ScanDirectionIsForward(node->as.ps.state->es_direction));
/* We should never be called when there are no subplans */
- Assert(node->as_nplans > 0);
+ Assert(node->as.nplans > 0);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
@@ -654,18 +654,18 @@ choose_next_subplan_for_leader(AppendState *node)
else
{
/* Start with last subplan. */
- node->as_whichplan = node->as_nplans - 1;
+ node->as_whichplan = node->as.nplans - 1;
/*
* If we've yet to determine the valid subplans then do so now. If
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
- if (!node->as_valid_subplans_identified)
+ if (!node->as.valid_subplans_identified)
{
- node->as_valid_subplans =
- ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
- node->as_valid_subplans_identified = true;
+ node->as.valid_subplans =
+ ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+ node->as.valid_subplans_identified = true;
/*
* Mark each invalid plan as finished to allow the loop below to
@@ -721,10 +721,10 @@ choose_next_subplan_for_worker(AppendState *node)
ParallelAppendState *pstate = node->as_pstate;
/* Backward scan is not supported by parallel-aware plans */
- Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+ Assert(ScanDirectionIsForward(node->as.ps.state->es_direction));
/* We should never be called when there are no subplans */
- Assert(node->as_nplans > 0);
+ Assert(node->as.nplans > 0);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
@@ -737,11 +737,11 @@ choose_next_subplan_for_worker(AppendState *node)
* run-time pruning is disabled then the valid subplans will always be set
* to all subplans.
*/
- else if (!node->as_valid_subplans_identified)
+ else if (!node->as.valid_subplans_identified)
{
- node->as_valid_subplans =
- ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
- node->as_valid_subplans_identified = true;
+ node->as.valid_subplans =
+ ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+ node->as.valid_subplans_identified = true;
mark_invalid_subplans_as_finished(node);
}
@@ -761,7 +761,7 @@ choose_next_subplan_for_worker(AppendState *node)
{
int nextplan;
- nextplan = bms_next_member(node->as_valid_subplans,
+ nextplan = bms_next_member(node->as.valid_subplans,
pstate->pa_next_plan);
if (nextplan >= 0)
{
@@ -774,7 +774,7 @@ choose_next_subplan_for_worker(AppendState *node)
* Try looping back to the first valid partial plan, if there is
* one. If there isn't, arrange to bail out below.
*/
- nextplan = bms_next_member(node->as_valid_subplans,
+ nextplan = bms_next_member(node->as.valid_subplans,
node->as_first_partial_plan - 1);
pstate->pa_next_plan =
nextplan < 0 ? node->as_whichplan : nextplan;
@@ -799,7 +799,7 @@ choose_next_subplan_for_worker(AppendState *node)
/* Pick the plan we found, and advance pa_next_plan one more time. */
node->as_whichplan = pstate->pa_next_plan;
- pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
+ pstate->pa_next_plan = bms_next_member(node->as.valid_subplans,
pstate->pa_next_plan);
/*
@@ -808,7 +808,7 @@ choose_next_subplan_for_worker(AppendState *node)
*/
if (pstate->pa_next_plan < 0)
{
- int nextplan = bms_next_member(node->as_valid_subplans,
+ int nextplan = bms_next_member(node->as.valid_subplans,
node->as_first_partial_plan - 1);
if (nextplan >= 0)
@@ -850,16 +850,16 @@ mark_invalid_subplans_as_finished(AppendState *node)
Assert(node->as_pstate);
/* Shouldn't have been called when run-time pruning is not enabled */
- Assert(node->as_prune_state);
+ Assert(node->as.prune_state);
/* Nothing to do if all plans are valid */
- if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
+ if (bms_num_members(node->as.valid_subplans) == node->as.nplans)
return;
/* Mark all non-valid plans as finished */
- for (i = 0; i < node->as_nplans; i++)
+ for (i = 0; i < node->as.nplans; i++)
{
- if (!bms_is_member(i, node->as_valid_subplans))
+ if (!bms_is_member(i, node->as.valid_subplans))
node->as_pstate->pa_finished[i] = true;
}
}
@@ -881,27 +881,27 @@ ExecAppendAsyncBegin(AppendState *node)
int i;
/* Backward scan is not supported by async-aware Appends. */
- Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+ Assert(ScanDirectionIsForward(node->as.ps.state->es_direction));
/* We should never be called when there are no subplans */
- Assert(node->as_nplans > 0);
+ Assert(node->as.nplans > 0);
/* We should never be called when there are no async subplans. */
- Assert(node->as_nasyncplans > 0);
+ Assert(node->as.nasyncplans > 0);
/* If we've yet to determine the valid subplans then do so now. */
- if (!node->as_valid_subplans_identified)
+ if (!node->as.valid_subplans_identified)
{
- node->as_valid_subplans =
- ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
- node->as_valid_subplans_identified = true;
+ node->as.valid_subplans =
+ ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+ node->as.valid_subplans_identified = true;
classify_matching_subplans(node);
}
/* Initialize state variables. */
- node->as_syncdone = bms_is_empty(node->as_valid_subplans);
- node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
+ node->as_syncdone = bms_is_empty(node->as.valid_subplans);
+ node->as_nasyncremain = bms_num_members(node->as.valid_asyncplans);
/* Nothing to do if there are no valid async subplans. */
if (node->as_nasyncremain == 0)
@@ -909,9 +909,9 @@ ExecAppendAsyncBegin(AppendState *node)
/* Make a request for each of the valid async subplans. */
i = -1;
- while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
+ while ((i = bms_next_member(node->as.valid_asyncplans, i)) >= 0)
{
- AsyncRequest *areq = node->as_asyncrequests[i];
+ AsyncRequest *areq = node->as.asyncrequests[i];
Assert(areq->request_index == i);
Assert(!areq->callback_pending);
@@ -963,7 +963,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
if (node->as_syncdone)
{
Assert(node->as_nasyncremain == 0);
- *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ *result = ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
return true;
}
@@ -983,7 +983,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
int i;
/* Nothing to do if there are no async subplans needing a new request. */
- if (bms_is_empty(node->as_needrequest))
+ if (bms_is_empty(node->as.needrequest))
{
Assert(node->as_nasyncresults == 0);
return false;
@@ -996,17 +996,17 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
if (node->as_nasyncresults > 0)
{
--node->as_nasyncresults;
- *result = node->as_asyncresults[node->as_nasyncresults];
+ *result = node->as.asyncresults[node->as_nasyncresults];
return true;
}
/* Make a new request for each of the async subplans that need it. */
- needrequest = node->as_needrequest;
- node->as_needrequest = NULL;
+ needrequest = node->as.needrequest;
+ node->as.needrequest = NULL;
i = -1;
while ((i = bms_next_member(needrequest, i)) >= 0)
{
- AsyncRequest *areq = node->as_asyncrequests[i];
+ AsyncRequest *areq = node->as.asyncrequests[i];
/* Do the actual work. */
ExecAsyncRequest(areq);
@@ -1017,7 +1017,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
if (node->as_nasyncresults > 0)
{
--node->as_nasyncresults;
- *result = node->as_asyncresults[node->as_nasyncresults];
+ *result = node->as.asyncresults[node->as_nasyncresults];
return true;
}
@@ -1033,7 +1033,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
static void
ExecAppendAsyncEventWait(AppendState *node)
{
- int nevents = node->as_nasyncplans + 2;
+ int nevents = node->as.nasyncplans + 2;
long timeout = node->as_syncdone ? -1 : 0;
WaitEvent occurred_event[EVENT_BUFFER_SIZE];
int noccurred;
@@ -1042,16 +1042,16 @@ ExecAppendAsyncEventWait(AppendState *node)
/* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0);
- Assert(node->as_eventset == NULL);
- node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
- AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ Assert(node->as.eventset == NULL);
+ node->as.eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
+ AddWaitEventToSet(node->as.eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
- while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ while ((i = bms_next_member(node->as.asyncplans, i)) >= 0)
{
- AsyncRequest *areq = node->as_asyncrequests[i];
+ AsyncRequest *areq = node->as.asyncrequests[i];
if (areq->callback_pending)
ExecAsyncConfigureWait(areq);
@@ -1061,10 +1061,10 @@ ExecAppendAsyncEventWait(AppendState *node)
* No need for further processing if none of the subplans configured any
* events.
*/
- if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
+ if (GetNumRegisteredWaitEvents(node->as.eventset) == 1)
{
- FreeWaitEventSet(node->as_eventset);
- node->as_eventset = NULL;
+ FreeWaitEventSet(node->as.eventset);
+ node->as.eventset = NULL;
return;
}
@@ -1080,7 +1080,7 @@ ExecAppendAsyncEventWait(AppendState *node)
* we cannot change it now. The pattern has possibly been copied to other
* extensions too.
*/
- AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
+ AddWaitEventToSet(node->as.eventset, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
/* Return at most EVENT_BUFFER_SIZE events in one call. */
@@ -1091,10 +1091,10 @@ ExecAppendAsyncEventWait(AppendState *node)
* If the timeout is -1, wait until at least one event occurs. If the
* timeout is 0, poll for events, but do not wait at all.
*/
- noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ noccurred = WaitEventSetWait(node->as.eventset, timeout, occurred_event,
nevents, WAIT_EVENT_APPEND_READY);
- FreeWaitEventSet(node->as_eventset);
- node->as_eventset = NULL;
+ FreeWaitEventSet(node->as.eventset);
+ node->as.eventset = NULL;
if (noccurred == 0)
return;
@@ -1167,14 +1167,14 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
}
/* Save result so we can return it. */
- Assert(node->as_nasyncresults < node->as_nasyncplans);
- node->as_asyncresults[node->as_nasyncresults++] = slot;
+ Assert(node->as_nasyncresults < node->as.nasyncplans);
+ node->as.asyncresults[node->as_nasyncresults++] = slot;
/*
* Mark the subplan that returned a result as ready for a new request. We
* don't launch another one here immediately because it might complete.
*/
- node->as_needrequest = bms_add_member(node->as_needrequest,
+ node->as.needrequest = bms_add_member(node->as.needrequest,
areq->request_index);
}
@@ -1191,11 +1191,11 @@ classify_matching_subplans(AppendState *node)
{
Bitmapset *valid_asyncplans;
- Assert(node->as_valid_subplans_identified);
- Assert(node->as_valid_asyncplans == NULL);
+ Assert(node->as.valid_subplans_identified);
+ Assert(node->as.valid_asyncplans == NULL);
/* Nothing to do if there are no valid subplans. */
- if (bms_is_empty(node->as_valid_subplans))
+ if (bms_is_empty(node->as.valid_subplans))
{
node->as_syncdone = true;
node->as_nasyncremain = 0;
@@ -1203,20 +1203,20 @@ classify_matching_subplans(AppendState *node)
}
/* Nothing to do if there are no valid async subplans. */
- if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+ if (!bms_overlap(node->as.valid_subplans, node->as.asyncplans))
{
node->as_nasyncremain = 0;
return;
}
/* Get valid async subplans. */
- valid_asyncplans = bms_intersect(node->as_asyncplans,
- node->as_valid_subplans);
+ valid_asyncplans = bms_intersect(node->as.asyncplans,
+ node->as.valid_subplans);
/* Adjust the valid subplans to contain sync subplans only. */
- node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+ node->as.valid_subplans = bms_del_members(node->as.valid_subplans,
valid_asyncplans);
/* Save valid async subplans. */
- node->as_valid_asyncplans = valid_asyncplans;
+ node->as.valid_asyncplans = valid_asyncplans;
}
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 72eebd50bdf..d7d2de08147 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -79,12 +79,12 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
/*
* create new MergeAppendState for our node
*/
- mergestate->ps.plan = (Plan *) node;
- mergestate->ps.state = estate;
- mergestate->ps.ExecProcNode = ExecMergeAppend;
+ mergestate->as.ps.plan = (Plan *) node;
+ mergestate->as.ps.state = estate;
+ mergestate->as.ps.ExecProcNode = ExecMergeAppend;
/* If run-time partition pruning is enabled, then set that up now */
- if (node->part_prune_index >= 0)
+ if (node->ab.part_prune_index >= 0)
{
PartitionPruneState *prunestate;
@@ -93,12 +93,12 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
* subplans to initialize (validsubplans) by taking into account the
* result of performing initial pruning if any.
*/
- prunestate = ExecInitPartitionExecPruning(&mergestate->ps,
- list_length(node->mergeplans),
- node->part_prune_index,
- node->apprelids,
+ prunestate = ExecInitPartitionExecPruning(&mergestate->as.ps,
+ list_length(node->ab.subplans),
+ node->ab.part_prune_index,
+ node->ab.apprelids,
&validsubplans);
- mergestate->ms_prune_state = prunestate;
+ mergestate->as.prune_state = prunestate;
nplans = bms_num_members(validsubplans);
/*
@@ -107,25 +107,25 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
* later calls to ExecFindMatchingSubPlans.
*/
if (!prunestate->do_exec_prune && nplans > 0)
- mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+ mergestate->as.valid_subplans = bms_add_range(NULL, 0, nplans - 1);
}
else
{
- nplans = list_length(node->mergeplans);
+ nplans = list_length(node->ab.subplans);
/*
* When run-time partition pruning is not enabled we can just mark all
* subplans as valid; they must also all be initialized.
*/
Assert(nplans > 0);
- mergestate->ms_valid_subplans = validsubplans =
+ mergestate->as.valid_subplans = validsubplans =
bms_add_range(NULL, 0, nplans - 1);
- mergestate->ms_prune_state = NULL;
+ mergestate->as.prune_state = NULL;
}
mergeplanstates = palloc_array(PlanState *, nplans);
- mergestate->mergeplans = mergeplanstates;
- mergestate->ms_nplans = nplans;
+ mergestate->as.plans = mergeplanstates;
+ mergestate->as.nplans = nplans;
mergestate->ms_slots = palloc0_array(TupleTableSlot *, nplans);
mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
@@ -139,7 +139,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
- Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
+ Plan *initNode = (Plan *) list_nth(node->ab.subplans, i);
mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
}
@@ -156,20 +156,20 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
if (mergeops != NULL)
{
- ExecInitResultTupleSlotTL(&mergestate->ps, mergeops);
+ ExecInitResultTupleSlotTL(&mergestate->as.ps, mergeops);
}
else
{
- ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
+ ExecInitResultTupleSlotTL(&mergestate->as.ps, &TTSOpsVirtual);
/* show that the output slot type is not fixed */
- mergestate->ps.resultopsset = true;
- mergestate->ps.resultopsfixed = false;
+ mergestate->as.ps.resultopsset = true;
+ mergestate->as.ps.resultopsfixed = false;
}
/*
* Miscellaneous initialization
*/
- mergestate->ps.ps_ProjInfo = NULL;
+ mergestate->as.ps.ps_ProjInfo = NULL;
/*
* initialize sort-key information
@@ -224,26 +224,26 @@ ExecMergeAppend(PlanState *pstate)
if (!node->ms_initialized)
{
/* Nothing to do if all subplans were pruned */
- if (node->ms_nplans == 0)
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ if (node->as.nplans == 0)
+ return ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
/*
* If we've yet to determine the valid subplans then do so now. If
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
- if (node->ms_valid_subplans == NULL)
- node->ms_valid_subplans =
- ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
+ if (node->as.valid_subplans == NULL)
+ node->as.valid_subplans =
+ ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
/*
* First time through: pull the first tuple from each valid subplan,
* and set up the heap.
*/
i = -1;
- while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
+ while ((i = bms_next_member(node->as.valid_subplans, i)) >= 0)
{
- node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ node->ms_slots[i] = ExecProcNode(node->as.plans[i]);
if (!TupIsNull(node->ms_slots[i]))
binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
}
@@ -261,7 +261,7 @@ ExecMergeAppend(PlanState *pstate)
* to not pull tuples until necessary.)
*/
i = DatumGetInt32(binaryheap_first(node->ms_heap));
- node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ node->ms_slots[i] = ExecProcNode(node->as.plans[i]);
if (!TupIsNull(node->ms_slots[i]))
binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
else
@@ -271,7 +271,7 @@ ExecMergeAppend(PlanState *pstate)
if (binaryheap_empty(node->ms_heap))
{
/* All the subplans are exhausted, and so is the heap */
- result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ result = ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
}
else
{
@@ -342,8 +342,8 @@ ExecEndMergeAppend(MergeAppendState *node)
/*
* get information from the node
*/
- mergeplans = node->mergeplans;
- nplans = node->ms_nplans;
+ mergeplans = node->as.plans;
+ nplans = node->as.nplans;
/*
* shut down each of the subscans
@@ -362,24 +362,24 @@ ExecReScanMergeAppend(MergeAppendState *node)
* we'd better unset the valid subplans so that they are reselected for
* the new parameter values.
*/
- if (node->ms_prune_state &&
- bms_overlap(node->ps.chgParam,
- node->ms_prune_state->execparamids))
+ if (node->as.prune_state &&
+ bms_overlap(node->as.ps.chgParam,
+ node->as.prune_state->execparamids))
{
- bms_free(node->ms_valid_subplans);
- node->ms_valid_subplans = NULL;
+ bms_free(node->as.valid_subplans);
+ node->as.valid_subplans = NULL;
}
- for (i = 0; i < node->ms_nplans; i++)
+ for (i = 0; i < node->as.nplans; i++)
{
- PlanState *subnode = node->mergeplans[i];
+ PlanState *subnode = node->as.plans[i];
/*
* ExecReScan doesn't know about my subplans, so I have to do
* changed-parameter signaling myself.
*/
- if (node->ps.chgParam != NULL)
- UpdateChangedParamSet(subnode, node->ps.chgParam);
+ if (node->as.ps.chgParam != NULL)
+ UpdateChangedParamSet(subnode, node->as.ps.chgParam);
/*
* If chgParam of subnode is not null then plan will be re-scanned by
diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c
index 6a850349cf7..3c75a84705b 100644
--- a/src/backend/nodes/nodeFuncs.c
+++ b/src/backend/nodes/nodeFuncs.c
@@ -4825,14 +4825,14 @@ planstate_tree_walker_impl(PlanState *planstate,
switch (nodeTag(plan))
{
case T_Append:
- if (planstate_walk_members(((AppendState *) planstate)->appendplans,
- ((AppendState *) planstate)->as_nplans,
+ if (planstate_walk_members(((AppendState *) planstate)->as.plans,
+ ((AppendState *) planstate)->as.nplans,
walker, context))
return true;
break;
case T_MergeAppend:
- if (planstate_walk_members(((MergeAppendState *) planstate)->mergeplans,
- ((MergeAppendState *) planstate)->ms_nplans,
+ if (planstate_walk_members(((MergeAppendState *) planstate)->as.plans,
+ ((MergeAppendState *) planstate)->as.nplans,
walker, context))
return true;
break;
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 3266615a796..22bd93596ee 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1246,12 +1246,12 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
* child plans, to make cross-checking the sort info easier.
*/
plan = makeNode(Append);
- plan->plan.targetlist = tlist;
- plan->plan.qual = NIL;
- plan->plan.lefttree = NULL;
- plan->plan.righttree = NULL;
- plan->apprelids = rel->relids;
- plan->child_append_relid_sets = best_path->child_append_relid_sets;
+ plan->ab.plan.targetlist = tlist;
+ plan->ab.plan.qual = NIL;
+ plan->ab.plan.lefttree = NULL;
+ plan->ab.plan.righttree = NULL;
+ plan->ab.apprelids = rel->relids;
+ plan->ab.child_append_relid_sets = best_path->child_append_relid_sets;
if (pathkeys != NIL)
{
@@ -1270,7 +1270,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
&nodeSortOperators,
&nodeCollations,
&nodeNullsFirst);
- tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
+ tlist_was_changed = (orig_tlist_length != list_length(plan->ab.plan.targetlist));
}
/* If appropriate, consider async append */
@@ -1380,7 +1380,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
/* Set below if we find quals that we can use to run-time prune */
- plan->part_prune_index = -1;
+ plan->ab.part_prune_index = -1;
/*
* If any quals exist, they may be useful to perform further partition
@@ -1405,16 +1405,16 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
if (prunequal != NIL)
- plan->part_prune_index = make_partition_pruneinfo(root, rel,
- best_path->subpaths,
- prunequal);
+ plan->ab.part_prune_index = make_partition_pruneinfo(root, rel,
+ best_path->subpaths,
+ prunequal);
}
- plan->appendplans = subplans;
+ plan->ab.subplans = subplans;
plan->nasyncplans = nasyncplans;
plan->first_partial_plan = best_path->first_partial_path;
- copy_generic_path_info(&plan->plan, (Path *) best_path);
+ copy_generic_path_info(&plan->ab.plan, (Path *) best_path);
/*
* If prepare_sort_from_pathkeys added sort columns, but we were told to
@@ -1423,9 +1423,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
*/
if (tlist_was_changed && (flags & (CP_EXACT_TLIST | CP_SMALL_TLIST)))
{
- tlist = list_copy_head(plan->plan.targetlist, orig_tlist_length);
+ tlist = list_copy_head(plan->ab.plan.targetlist, orig_tlist_length);
return inject_projection_plan((Plan *) plan, tlist,
- plan->plan.parallel_safe);
+ plan->ab.plan.parallel_safe);
}
else
return (Plan *) plan;
@@ -1443,7 +1443,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
int flags)
{
MergeAppend *node = makeNode(MergeAppend);
- Plan *plan = &node->plan;
+ Plan *plan = &node->ab.plan;
List *tlist = build_path_tlist(root, &best_path->path);
int orig_tlist_length = list_length(tlist);
bool tlist_was_changed;
@@ -1463,8 +1463,8 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
plan->qual = NIL;
plan->lefttree = NULL;
plan->righttree = NULL;
- node->apprelids = rel->relids;
- node->child_append_relid_sets = best_path->child_append_relid_sets;
+ node->ab.apprelids = rel->relids;
+ node->ab.child_append_relid_sets = best_path->child_append_relid_sets;
/*
* Compute sort column info, and adjust MergeAppend's tlist as needed.
@@ -1570,7 +1570,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
}
/* Set below if we find quals that we can use to run-time prune */
- node->part_prune_index = -1;
+ node->ab.part_prune_index = -1;
/*
* If any quals exist, they may be useful to perform further partition
@@ -1587,12 +1587,12 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
Assert(best_path->path.param_info == NULL);
if (prunequal != NIL)
- node->part_prune_index = make_partition_pruneinfo(root, rel,
- best_path->subpaths,
- prunequal);
+ node->ab.part_prune_index = make_partition_pruneinfo(root, rel,
+ best_path->subpaths,
+ prunequal);
}
- node->mergeplans = subplans;
+ node->ab.subplans = subplans;
/*
* If prepare_sort_from_pathkeys added sort columns, but we were told to
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index ff0e875f2a2..af7ceccb8ad 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1881,10 +1881,10 @@ set_append_references(PlannerInfo *root,
* check quals. If it's got exactly one child plan, then it's not doing
* anything useful at all, and we can strip it out.
*/
- Assert(aplan->plan.qual == NIL);
+ Assert(aplan->ab.plan.qual == NIL);
/* First, we gotta recurse on the children */
- foreach(l, aplan->appendplans)
+ foreach(l, aplan->ab.subplans)
{
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
@@ -1897,11 +1897,11 @@ set_append_references(PlannerInfo *root,
* plan may execute the non-parallel aware child multiple times. (If you
* change these rules, update create_append_path to match.)
*/
- if (list_length(aplan->appendplans) == 1)
+ if (list_length(aplan->ab.subplans) == 1)
{
- Plan *p = (Plan *) linitial(aplan->appendplans);
+ Plan *p = (Plan *) linitial(aplan->ab.subplans);
- if (p->parallel_aware == aplan->plan.parallel_aware)
+ if (p->parallel_aware == aplan->ab.plan.parallel_aware)
{
Plan *result;
@@ -1909,7 +1909,7 @@ set_append_references(PlannerInfo *root,
/* Remember that we removed an Append */
record_elided_node(root->glob, p->plan_node_id, T_Append,
- offset_relid_set(aplan->apprelids, rtoffset));
+ offset_relid_set(aplan->ab.apprelids, rtoffset));
return result;
}
@@ -1922,19 +1922,19 @@ set_append_references(PlannerInfo *root,
*/
set_dummy_tlist_references((Plan *) aplan, rtoffset);
- aplan->apprelids = offset_relid_set(aplan->apprelids, rtoffset);
+ aplan->ab.apprelids = offset_relid_set(aplan->ab.apprelids, rtoffset);
/*
* Add PartitionPruneInfo, if any, to PlannerGlobal and update the index.
* Also update the RT indexes present in it to add the offset.
*/
- if (aplan->part_prune_index >= 0)
- aplan->part_prune_index =
- register_partpruneinfo(root, aplan->part_prune_index, rtoffset);
+ if (aplan->ab.part_prune_index >= 0)
+ aplan->ab.part_prune_index =
+ register_partpruneinfo(root, aplan->ab.part_prune_index, rtoffset);
/* We don't need to recurse to lefttree or righttree ... */
- Assert(aplan->plan.lefttree == NULL);
- Assert(aplan->plan.righttree == NULL);
+ Assert(aplan->ab.plan.lefttree == NULL);
+ Assert(aplan->ab.plan.righttree == NULL);
return (Plan *) aplan;
}
@@ -1958,10 +1958,10 @@ set_mergeappend_references(PlannerInfo *root,
* or check quals. If it's got exactly one child plan, then it's not
* doing anything useful at all, and we can strip it out.
*/
- Assert(mplan->plan.qual == NIL);
+ Assert(mplan->ab.plan.qual == NIL);
/* First, we gotta recurse on the children */
- foreach(l, mplan->mergeplans)
+ foreach(l, mplan->ab.subplans)
{
lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
}
@@ -1975,11 +1975,11 @@ set_mergeappend_references(PlannerInfo *root,
* multiple times. (If you change these rules, update
* create_merge_append_path to match.)
*/
- if (list_length(mplan->mergeplans) == 1)
+ if (list_length(mplan->ab.subplans) == 1)
{
- Plan *p = (Plan *) linitial(mplan->mergeplans);
+ Plan *p = (Plan *) linitial(mplan->ab.subplans);
- if (p->parallel_aware == mplan->plan.parallel_aware)
+ if (p->parallel_aware == mplan->ab.plan.parallel_aware)
{
Plan *result;
@@ -1987,7 +1987,7 @@ set_mergeappend_references(PlannerInfo *root,
/* Remember that we removed a MergeAppend */
record_elided_node(root->glob, p->plan_node_id, T_MergeAppend,
- offset_relid_set(mplan->apprelids, rtoffset));
+ offset_relid_set(mplan->ab.apprelids, rtoffset));
return result;
}
@@ -2000,19 +2000,19 @@ set_mergeappend_references(PlannerInfo *root,
*/
set_dummy_tlist_references((Plan *) mplan, rtoffset);
- mplan->apprelids = offset_relid_set(mplan->apprelids, rtoffset);
+ mplan->ab.apprelids = offset_relid_set(mplan->ab.apprelids, rtoffset);
/*
* Add PartitionPruneInfo, if any, to PlannerGlobal and update the index.
* Also update the RT indexes present in it to add the offset.
*/
- if (mplan->part_prune_index >= 0)
- mplan->part_prune_index =
- register_partpruneinfo(root, mplan->part_prune_index, rtoffset);
+ if (mplan->ab.part_prune_index >= 0)
+ mplan->ab.part_prune_index =
+ register_partpruneinfo(root, mplan->ab.part_prune_index, rtoffset);
/* We don't need to recurse to lefttree or righttree ... */
- Assert(mplan->plan.lefttree == NULL);
- Assert(mplan->plan.righttree == NULL);
+ Assert(mplan->ab.plan.lefttree == NULL);
+ Assert(mplan->ab.plan.righttree == NULL);
return (Plan *) mplan;
}
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index ccec1eaa7fe..e21eb0f8725 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2904,7 +2904,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
case T_Append:
{
- foreach(l, ((Append *) plan)->appendplans)
+ foreach(l, ((Append *) plan)->ab.subplans)
{
context.paramids =
bms_add_members(context.paramids,
@@ -2919,7 +2919,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
case T_MergeAppend:
{
- foreach(l, ((MergeAppend *) plan)->mergeplans)
+ foreach(l, ((MergeAppend *) plan)->ab.subplans)
{
context.paramids =
bms_add_members(context.paramids,
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index e5f2b6082ce..75e0fe4d472 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -5520,9 +5520,9 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan)
* natural choice.
*/
if (IsA(plan, Append))
- dpns->outer_plan = linitial(((Append *) plan)->appendplans);
+ dpns->outer_plan = linitial(((Append *) plan)->ab.subplans);
else if (IsA(plan, MergeAppend))
- dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans);
+ dpns->outer_plan = linitial(((MergeAppend *) plan)->ab.subplans);
else
dpns->outer_plan = outerPlan(plan);
@@ -8498,10 +8498,10 @@ resolve_special_varno(Node *node, deparse_context *context,
if (IsA(dpns->plan, Append))
context->appendparents = bms_union(context->appendparents,
- ((Append *) dpns->plan)->apprelids);
+ ((Append *) dpns->plan)->ab.apprelids);
else if (IsA(dpns->plan, MergeAppend))
context->appendparents = bms_union(context->appendparents,
- ((MergeAppend *) dpns->plan)->apprelids);
+ ((MergeAppend *) dpns->plan)->ab.apprelids);
push_child_plan(dpns, dpns->outer_plan, &save_dpns);
resolve_special_varno((Node *) tle->expr, context,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 684e398f824..72b80a4a975 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1484,6 +1484,39 @@ typedef struct ModifyTableState
List *mt_mergeJoinConditions;
} ModifyTableState;
+/* ----------------
+ * AppendBaseState information
+ *
+ * Common base for AppendState and MergeAppendState.
+ * Contains fields shared by both node types: the array of subplan
+ * states, asynchronous execution infrastructure, and partition
+ * pruning state.
+ * ----------------
+ */
+typedef struct AppendBaseState
+{
+ pg_node_attr(abstract)
+
+ PlanState ps; /* its first field is NodeTag */
+ PlanState **plans; /* array of PlanStates for my inputs */
+ int nplans;
+
+ /* Asynchronous execution state */
+ Bitmapset *asyncplans; /* asynchronous plans indexes */
+ int nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **asyncresults; /* unreturned results of async plans */
+ Bitmapset *needrequest; /* asynchronous plans needing a new request */
+ struct WaitEventSet *eventset; /* WaitEventSet used to configure file
+ * descriptor wait events */
+
+ /* Partition pruning state */
+ struct PartitionPruneState *prune_state;
+ bool valid_subplans_identified; /* is valid_asyncplans valid? */
+ Bitmapset *valid_subplans;
+ Bitmapset *valid_asyncplans; /* valid asynchronous plans indexes */
+} AppendBaseState;
+
/* ----------------
* AppendState information
*
@@ -1505,30 +1538,21 @@ struct PartitionPruneState;
struct AppendState
{
- PlanState ps; /* its first field is NodeTag */
- PlanState **appendplans; /* array of PlanStates for my inputs */
- int as_nplans;
+ AppendBaseState as; /* its first field is NodeTag */
+
int as_whichplan;
bool as_begun; /* false means need to initialize */
- Bitmapset *as_asyncplans; /* asynchronous plans indexes */
- int as_nasyncplans; /* # of asynchronous plans */
- AsyncRequest **as_asyncrequests; /* array of AsyncRequests */
- TupleTableSlot **as_asyncresults; /* unreturned results of async plans */
- int as_nasyncresults; /* # of valid entries in as_asyncresults */
+ int as_nasyncresults; /* # of valid entries in asyncresults */
bool as_syncdone; /* true if all synchronous plans done in
* asynchronous mode, else false */
int as_nasyncremain; /* # of remaining asynchronous plans */
- Bitmapset *as_needrequest; /* asynchronous plans needing a new request */
- struct WaitEventSet *as_eventset; /* WaitEventSet used to configure file
- * descriptor wait events */
- int as_first_partial_plan; /* Index of 'appendplans' containing
+ int as_first_partial_plan; /* Index of 'as.plans' containing
* the first partial plan */
+
+ /* Parallel append specific */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
- struct PartitionPruneState *as_prune_state;
- bool as_valid_subplans_identified; /* is as_valid_subplans valid? */
- Bitmapset *as_valid_subplans;
- Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */
+
bool (*choose_next_subplan) (AppendState *);
};
@@ -1549,16 +1573,13 @@ struct AppendState
*/
typedef struct MergeAppendState
{
- PlanState ps; /* its first field is NodeTag */
- PlanState **mergeplans; /* array of PlanStates for my inputs */
- int ms_nplans;
+ AppendBaseState as; /* its first field is NodeTag */
+
int ms_nkeys;
SortSupport ms_sortkeys; /* array of length ms_nkeys */
TupleTableSlot **ms_slots; /* array of length ms_nplans */
struct binaryheap *ms_heap; /* binary heap of slot indices */
bool ms_initialized; /* are subplans started? */
- struct PartitionPruneState *ms_prune_state;
- Bitmapset *ms_valid_subplans;
} MergeAppendState;
/* ----------------
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index e2c00576d41..e61f76208b4 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -389,38 +389,48 @@ typedef struct ModifyTable
struct PartitionPruneInfo; /* forward reference to struct below */
/* ----------------
- * Append node -
- * Generate the concatenation of the results of sub-plans.
+ * AppendBase node -
+ * Common base for Append and MergeAppend plan nodes.
+ * Contains fields shared by both node types: the list of subplans,
+ * appendrel identifiers, and run-time partition pruning info.
* ----------------
*/
-typedef struct Append
+typedef struct AppendBase
{
- Plan plan;
+ pg_node_attr(abstract)
- /* RTIs of appendrel(s) formed by this node */
- Bitmapset *apprelids;
+ Plan plan; /* its first field is NodeTag */
+ Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */
+ List *child_append_relid_sets; /* sets of RTIs of appendrels
+ * consolidated into this node */
+ List *subplans; /* List of Plans (formerly
+ * appendplans/mergeplans) */
- /* sets of RTIs of appendrels consolidated into this node */
- List *child_append_relid_sets;
+ /*
+ * Index into PlannedStmt.partPruneInfos and parallel lists in EState:
+ * es_part_prune_states and es_part_prune_results. Set to -1 if no
+ * run-time pruning is used.
+ */
+ int part_prune_index;
+} AppendBase;
- /* plans to run */
- List *appendplans;
+/* ----------------
+ * Append node -
+ * Generate the concatenation of the results of sub-plans.
+ * ----------------
+ */
+typedef struct Append
+{
+ AppendBase ab; /* its first field is NodeTag */
/* # of asynchronous plans */
int nasyncplans;
/*
- * All 'appendplans' preceding this index are non-partial plans. All
- * 'appendplans' from this index onwards are partial plans.
+ * All 'subplans' preceding this index are non-partial plans. All
+ * 'subplans' from this index onwards are partial plans.
*/
int first_partial_plan;
-
- /*
- * Index into PlannedStmt.partPruneInfos and parallel lists in EState:
- * es_part_prune_states and es_part_prune_results. Set to -1 if no
- * run-time pruning is used.
- */
- int part_prune_index;
} Append;
/* ----------------
@@ -430,16 +440,7 @@ typedef struct Append
*/
typedef struct MergeAppend
{
- Plan plan;
-
- /* RTIs of appendrel(s) formed by this node */
- Bitmapset *apprelids;
-
- /* sets of RTIs of appendrels consolidated into this node */
- List *child_append_relid_sets;
-
- /* plans to run */
- List *mergeplans;
+ AppendBase ab; /* its first field is NodeTag */
/* these fields are just like the sort-key info in struct Sort: */
@@ -457,13 +458,6 @@ typedef struct MergeAppend
/* NULLS FIRST/LAST directions */
bool *nullsFirst pg_node_attr(array_size(numCols));
-
- /*
- * Index into PlannedStmt.partPruneInfos and parallel lists in EState:
- * es_part_prune_states and es_part_prune_results. Set to -1 if no
- * run-time pruning is used.
- */
- int part_prune_index;
} MergeAppend;
/* ----------------
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8e9c06547d6..159083d4e66 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -126,6 +126,8 @@ AnlExprData
AnlIndexData
AnyArrayType
Append
+AppendBase
+AppendBaseState
AppendPath
AppendPathInput
AppendRelInfo
--
2.39.5 (Apple Git-154)
view thread (33+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Asynchronous MergeAppend
In-Reply-To: <CAPpHfdsO8zYpDW==D6T5N0cJ+AzK7a_OyXJoYU1kFi=xZFTLuQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox