From d6b6cb19c326d79693c2e6238cfbe96f5b0fa0e0 Mon Sep 17 00:00:00 2001 From: Haibo Yan Date: Thu, 23 Apr 2026 12:20:59 -0700 Subject: [PATCH v1 2/5] createas: use buffered-insert API for CTAS Adopt the buffered-insert lifecycle API in the CTAS dest receiver. CTAS uses table_buffered_insert_begin()/put()/end() with a NULL flush callback and falls back to the existing single-row path when needed. Retain the volatile-function check and EXECUTE-path fallback conservatively for the initial patch series. Add focused CTAS regression tests. --- src/backend/commands/createas.c | 112 ++++++++++++++++++---- src/test/regress/expected/select_into.out | 98 +++++++++++++++++++ src/test/regress/sql/select_into.sql | 44 +++++++++ 3 files changed, 234 insertions(+), 20 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 6dbb831ca89..4ba25c9e336 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -40,6 +40,7 @@ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/queryjumble.h" +#include "optimizer/optimizer.h" #include "parser/analyze.h" #include "rewrite/rewriteHandler.h" #include "tcop/tcopprot.h" @@ -57,7 +58,9 @@ typedef struct ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ CommandId output_cid; /* cmin to insert in output tuples */ uint32 ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + BulkInsertState bistate; /* bulk insert state (fallback path only) */ + TableBufferedInsertState buffered_state; /* buffered-insert state, or NULL */ + bool use_buffered_insert; /* true if buffered path is eligible */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -260,7 +263,15 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, Assert(!is_matview); /* excluded by syntax */ ExecuteQuery(pstate, estmt, into, params, dest, qc); - /* get object address that intorel_startup saved for us */ + /* + * get object address that intorel_startup saved for us. + * + * Note: use_buffered_insert stays false (its palloc0 default) for the + * EXECUTE path. We conservatively skip the buffered-insert + * optimization here because the prepared statement's plan is not + * available for volatile-function inspection at this point. Relaxing + * this is outside Patch 0002 scope. + */ address = ((DR_intorel *) dest)->reladdr; return address; @@ -323,6 +334,24 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, plan = pg_plan_query(query, pstate->p_sourcetext, CURSOR_OPT_PARALLEL_OK, params, NULL); + /* + * Conservative implementation choice: disable the buffered-insert + * path if the planned target list contains volatile functions. + * + * The buffered-insert API contract does not require this check — the + * CTAS target table is created within this statement and cannot be + * referenced by the source query. This guard is retained for the + * initial patch series as a caller-local conservatism and can be + * relaxed after validation without any API change. + */ + { + DR_intorel *myState = (DR_intorel *) dest; + + myState->use_buffered_insert = + !contain_volatile_functions_after_planning( + (Expr *) plan->planTree->targetlist); + } + /* * Use a snapshot with an updated command ID to ensure this query sees * results of any previously executed queries. (This could only @@ -564,10 +593,45 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ - if (!into->skipData) - myState->bistate = GetBulkInsertState(); - else + if (into->skipData) + { myState->bistate = NULL; + myState->buffered_state = NULL; + } + else if (myState->use_buffered_insert) + { + /* + * Try the buffered-insert path. Pass NULL flush callback -- CTAS + * has no indexes, triggers, or per-tuple post-insert work. + */ + myState->ti_options |= TABLE_INSERT_BAS_BULKWRITE; + myState->buffered_state = + table_buffered_insert_begin(intoRelationDesc, + myState->output_cid, + myState->ti_options, + NULL, NULL); + + if (myState->buffered_state != NULL) + { + /* Buffered path active; bistate is managed inside the AM */ + myState->bistate = NULL; + } + else + { + /* AM does not support buffered inserts; fall back */ + myState->bistate = GetBulkInsertState(); + } + } + else + { + /* + * Fallback to single-row path. This is reached when the + * volatile-function conservative guard fired, or for CTAS via + * EXECUTE (where use_buffered_insert stays false by default). + */ + myState->buffered_state = NULL; + myState->bistate = GetBulkInsertState(); + } /* * Valid smgr_targblock implies something already wrote to the relation. @@ -587,19 +651,18 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) /* Nothing to insert if WITH NO DATA is specified. */ if (!myState->into->skipData) { - /* - * Note that the input slot might not be of the type of the target - * relation. That's supported by table_tuple_insert(), but slightly - * less efficient than inserting with the right slot - but the - * alternative would be to copy into a slot of the right type, which - * would not be cheap either. This also doesn't allow accessing per-AM - * data (say a tuple's xmin), but since we don't do that here... - */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + if (myState->buffered_state != NULL) + { + table_buffered_insert_put(myState->buffered_state, slot); + } + else + { + table_tuple_insert(myState->rel, + slot, + myState->output_cid, + myState->ti_options, + myState->bistate); + } } /* We know this is a newly created relation, so there are no indexes */ @@ -618,8 +681,17 @@ intorel_shutdown(DestReceiver *self) if (!into->skipData) { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); + if (myState->buffered_state != NULL) + { + /* end() flushes remaining tuples and subsumes finish_bulk_insert */ + table_buffered_insert_end(myState->buffered_state); + myState->buffered_state = NULL; + } + else + { + FreeBulkInsertState(myState->bistate); + table_finish_bulk_insert(myState->rel, myState->ti_options); + } } /* close rel, but keep lock until commit */ diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out index d04ca2b1bf7..36cc783fda5 100644 --- a/src/test/regress/expected/select_into.out +++ b/src/test/regress/expected/select_into.out @@ -220,3 +220,101 @@ NOTICE: relation "ctas_ine_tbl" already exists, skipping (0 rows) DROP TABLE ctas_ine_tbl; +-- +-- Tests for CTAS with buffered-insert path. +-- +-- These tests verify correctness of CTAS results under both the buffered +-- path (no volatile functions) and the fallback single-row path (volatile +-- functions present, or EXECUTE). Path selection is not directly observable +-- in SQL output; these tests validate that results are correct regardless. +-- +-- Buffered path: small row count, verify contents. +CREATE TABLE ctas_buffered_1 AS SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g; +SELECT count(*) FROM ctas_buffered_1; + count +------- + 5 +(1 row) + +SELECT * FROM ctas_buffered_1 ORDER BY a; + a | b +---+---- + 1 | 10 + 2 | 20 + 3 | 30 + 4 | 40 + 5 | 50 +(5 rows) + +DROP TABLE ctas_buffered_1; +-- Buffered path: enough rows to trigger auto-flush (>1000 slot threshold). +CREATE TABLE ctas_buffered_2 AS SELECT g AS a FROM generate_series(1, 2500) g; +SELECT count(*) FROM ctas_buffered_2; + count +------- + 2500 +(1 row) + +SELECT min(a), max(a) FROM ctas_buffered_2; + min | max +-----+------ + 1 | 2500 +(1 row) + +DROP TABLE ctas_buffered_2; +-- Buffered path: wide tuples to exercise byte-threshold flushing. +CREATE TABLE ctas_buffered_wide AS + SELECT g AS id, + repeat('x', 200) AS col1, + repeat('y', 200) AS col2, + repeat('z', 200) AS col3 + FROM generate_series(1, 100) g; +SELECT count(*) FROM ctas_buffered_wide; + count +------- + 100 +(1 row) + +SELECT id, length(col1), length(col2), length(col3) FROM ctas_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id; + id | length | length | length +-----+--------+--------+-------- + 1 | 200 | 200 | 200 + 50 | 200 | 200 | 200 + 100 | 200 | 200 | 200 +(3 rows) + +DROP TABLE ctas_buffered_wide; +-- Fallback path: random() triggers the conservative volatile-function guard. +-- Result must still be correct through the single-row insertion path. +CREATE TABLE ctas_volatile AS SELECT g AS a, random() AS r FROM generate_series(1, 10) g; +SELECT count(*) FROM ctas_volatile; + count +------- + 10 +(1 row) + +SELECT a FROM ctas_volatile ORDER BY a; + a +---- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 +(10 rows) + +DROP TABLE ctas_volatile; +-- WITH NO DATA: no insertion path exercised; verify unchanged behavior. +CREATE TABLE ctas_nodata AS SELECT 1 AS a WITH NO DATA; +SELECT count(*) FROM ctas_nodata; + count +------- + 0 +(1 row) + +DROP TABLE ctas_nodata; diff --git a/src/test/regress/sql/select_into.sql b/src/test/regress/sql/select_into.sql index f71e3940e0a..8147113cee3 100644 --- a/src/test/regress/sql/select_into.sql +++ b/src/test/regress/sql/select_into.sql @@ -136,3 +136,47 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) CREATE TABLE IF NOT EXISTS ctas_ine_tbl AS EXECUTE ctas_ine_query; -- ok DROP TABLE ctas_ine_tbl; + +-- +-- Tests for CTAS with buffered-insert path. +-- +-- These tests verify correctness of CTAS results under both the buffered +-- path (no volatile functions) and the fallback single-row path (volatile +-- functions present, or EXECUTE). Path selection is not directly observable +-- in SQL output; these tests validate that results are correct regardless. +-- + +-- Buffered path: small row count, verify contents. +CREATE TABLE ctas_buffered_1 AS SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g; +SELECT count(*) FROM ctas_buffered_1; +SELECT * FROM ctas_buffered_1 ORDER BY a; +DROP TABLE ctas_buffered_1; + +-- Buffered path: enough rows to trigger auto-flush (>1000 slot threshold). +CREATE TABLE ctas_buffered_2 AS SELECT g AS a FROM generate_series(1, 2500) g; +SELECT count(*) FROM ctas_buffered_2; +SELECT min(a), max(a) FROM ctas_buffered_2; +DROP TABLE ctas_buffered_2; + +-- Buffered path: wide tuples to exercise byte-threshold flushing. +CREATE TABLE ctas_buffered_wide AS + SELECT g AS id, + repeat('x', 200) AS col1, + repeat('y', 200) AS col2, + repeat('z', 200) AS col3 + FROM generate_series(1, 100) g; +SELECT count(*) FROM ctas_buffered_wide; +SELECT id, length(col1), length(col2), length(col3) FROM ctas_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id; +DROP TABLE ctas_buffered_wide; + +-- Fallback path: random() triggers the conservative volatile-function guard. +-- Result must still be correct through the single-row insertion path. +CREATE TABLE ctas_volatile AS SELECT g AS a, random() AS r FROM generate_series(1, 10) g; +SELECT count(*) FROM ctas_volatile; +SELECT a FROM ctas_volatile ORDER BY a; +DROP TABLE ctas_volatile; + +-- WITH NO DATA: no insertion path exercised; verify unchanged behavior. +CREATE TABLE ctas_nodata AS SELECT 1 AS a WITH NO DATA; +SELECT count(*) FROM ctas_nodata; +DROP TABLE ctas_nodata; -- 2.52.0