From 696e8bc0b5606385444d6f89397aa3e3b6801d43 Mon Sep 17 00:00:00 2001 From: Haibo Yan Date: Thu, 23 Apr 2026 13:42:46 -0700 Subject: [PATCH v1 4/5] copy: adopt buffered-insert API for COPY FROM Adopt the buffered-insert lifecycle API in COPY FROM for non-FDW heap targets, moving buffering ownership from COPY's local multi-insert infrastructure into the table AM. COPY's existing eligibility policy remains unchanged. The non-FDW buffered path now uses table_buffered_insert_begin()/put()/flush()/end() with a COPY-local flush callback for per-tuple post-insert work, including index updates and AFTER ROW INSERT triggers. FDW batching is unchanged. Add focused COPY regression coverage for basic correctness, bulk input, indexed targets, and AFTER ROW trigger behavior. --- src/backend/commands/copyfrom.c | 185 +++++++++++++++++++++++++++-- src/test/regress/expected/copy.out | 166 ++++++++++++++++++++++++++ src/test/regress/sql/copy.sql | 102 ++++++++++++++++ 3 files changed, 440 insertions(+), 13 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 64ac3063c61..f733bd2d296 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -76,6 +76,8 @@ */ #define MAX_PARTITION_BUFFERS 32 +typedef struct CopyBufferedFlushState CopyBufferedFlushState; + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { @@ -83,6 +85,9 @@ typedef struct CopyMultiInsertBuffer ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ BulkInsertState bistate; /* BulkInsertState for this rel if plain * table; NULL if foreign table */ + TableBufferedInsertState buffered_state; /* AM-owned buffered-insert + * state, or NULL */ + CopyBufferedFlushState *flush_ctx; /* flush callback context, or NULL */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -102,8 +107,24 @@ typedef struct CopyMultiInsertInfo EState *estate; /* Executor state used for COPY */ CommandId mycid; /* Command Id used for COPY */ uint32 ti_options; /* table insert options */ + int64 *processed; /* pointer to CopyFrom's row counter */ } CopyMultiInsertInfo; +/* + * Context for the buffered-insert flush callback used by COPY. Carries the + * state needed to perform per-tuple post-insert work (index updates, AFTER + * ROW INSERT triggers, error context, progress tracking). + */ +typedef struct CopyBufferedFlushState +{ + CopyFromState cstate; + EState *estate; + ResultRelInfo *resultRelInfo; + int64 *processed; /* pointer to CopyFrom's processed counter */ +} CopyBufferedFlushState; + +static void CopyBufferedFlushCallback(void *context, TupleTableSlot *slot); + /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); @@ -362,15 +383,57 @@ CopyLimitPrintoutLength(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = palloc_object(CopyMultiInsertBuffer); memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; + buffer->buffered_state = NULL; + buffer->flush_ctx = NULL; + + if (rri->ri_FdwRoutine == NULL) + { + /* + * Non-FDW table: try the AM-owned buffered-insert path. The flush + * callback handles index updates and AFTER ROW INSERT triggers. + */ + CopyBufferedFlushState *flush_ctx; + + flush_ctx = palloc_object(CopyBufferedFlushState); + flush_ctx->cstate = miinfo->cstate; + flush_ctx->estate = miinfo->estate; + flush_ctx->resultRelInfo = rri; + flush_ctx->processed = miinfo->processed; + + buffer->buffered_state = + table_buffered_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + miinfo->ti_options | + TABLE_INSERT_BAS_BULKWRITE, + CopyBufferedFlushCallback, + flush_ctx); + + if (buffer->buffered_state != NULL) + { + /* AM-owned buffering active; no COPY-side bistate needed */ + buffer->bistate = NULL; + buffer->flush_ctx = flush_ctx; + } + else + { + /* AM does not support buffered inserts; fall back */ + pfree(flush_ctx); + buffer->bistate = GetBulkInsertState(); + } + } + else + { + /* FDW table: no buffered-insert, no bistate */ + buffer->bistate = NULL; + } return buffer; } @@ -384,7 +447,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -401,7 +464,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, - uint32 ti_options) + uint32 ti_options, int64 *processed) { miinfo->multiInsertBuffers = NIL; miinfo->bufferedTuples = 0; @@ -410,6 +473,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, miinfo->estate = estate; miinfo->mycid = mycid; miinfo->ti_options = ti_options; + miinfo->processed = processed; /* * Only setup the buffer when not dealing with a partitioned table. @@ -532,6 +596,32 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, /* reset relname_only */ cstate->relname_only = false; } + else if (buffer->buffered_state != NULL) + { + bool line_buf_valid = cstate->line_buf_valid; + uint64 save_cur_lineno = cstate->cur_lineno; + + /* + * AM-owned buffered path: tuples were submitted via put() and are + * inside the AM already. Flush triggers the batch write and invokes + * the flush callback once per written tuple for index updates, + * trigger firing, and progress tracking. + */ + cstate->line_buf_valid = false; + + table_buffered_insert_flush(buffer->buffered_state); + + /* Clear slots that were used for staging before put() */ + for (i = 0; i < nused; i++) + { + if (slots[i] != NULL) + ExecClearTuple(slots[i]); + } + + /* reset cur_lineno and line_buf_valid to what they were */ + cstate->line_buf_valid = line_buf_valid; + cstate->cur_lineno = save_cur_lineno; + } else { CommandId mycid = miinfo->mycid; @@ -631,10 +721,20 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, /* Remove back-link to ourself */ resultRelInfo->ri_CopyMultiInsertBuffer = NULL; - if (resultRelInfo->ri_FdwRoutine == NULL) + if (buffer->buffered_state != NULL) + { + /* end() flushes remaining tuples and subsumes finish_bulk_insert */ + table_buffered_insert_end(buffer->buffered_state); + buffer->buffered_state = NULL; + if (buffer->flush_ctx != NULL) + pfree(buffer->flush_ctx); + } + else if (resultRelInfo->ri_FdwRoutine == NULL) { Assert(buffer->bistate != NULL); FreeBulkInsertState(buffer->bistate); + table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, + miinfo->ti_options); } else Assert(buffer->bistate == NULL); @@ -643,10 +743,6 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); - pfree(buffer); } @@ -761,12 +857,26 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); - /* Store the line number so we can properly report any errors later */ + /* Store the line number for error context during flush */ buffer->linenos[buffer->nused] = lineno; - /* Record this slot as being used */ + if (buffer->buffered_state != NULL) + { + /* + * AM-owned buffered path: submit the tuple directly to the AM. + * The AM captures the data internally; the caller retains slot + * ownership and may reuse it. The AM may auto-flush during put(), + * which fires the flush callback for already-buffered tuples. + */ + table_buffered_insert_put(buffer->buffered_state, slot); + } + else + { + /* Legacy path: slot was already placed into buffer->slots by caller */ + Assert(slot == buffer->slots[buffer->nused]); + } + buffer->nused++; /* Update how many tuples are stored and their size */ @@ -774,6 +884,55 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, miinfo->bufferedBytes += tuplen; } +/* + * Flush callback for the AM-owned buffered-insert path. + * + * Invoked once per flushed tuple, in insertion order. The slot is an + * AM-owned scratch object with TID set; it is valid only for the duration + * of this callback. + */ +static void +CopyBufferedFlushCallback(void *context, TupleTableSlot *slot) +{ + CopyBufferedFlushState *ctx = (CopyBufferedFlushState *) context; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + + /* + * cstate->cur_lineno is not restored per-tuple here. During auto-flush + * (triggered inside put()), it reflects the line being stored, which is + * the best available context. During explicit flush, the caller has + * already set line_buf_valid = false, so only the relation name appears + * in error context — matching the legacy multi-insert path. + */ + + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + estate, 0, slot, + NIL, NULL); + ExecARInsertTriggers(estate, resultRelInfo, + slot, recheckIndexes, + ctx->cstate->transition_capture); + list_free(recheckIndexes); + } + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + ExecARInsertTriggers(estate, resultRelInfo, + slot, NIL, + ctx->cstate->transition_capture); + } + + /* Update row counter and progress */ + (*ctx->processed)++; + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + *ctx->processed); +} + /* * Copy FROM file to relation. */ @@ -1073,7 +1232,7 @@ CopyFrom(CopyFromState cstate) insertMethod = CIM_MULTI; CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, - estate, mycid, ti_options); + estate, mycid, ti_options, &processed); } /* diff --git a/src/test/regress/expected/copy.out b/src/test/regress/expected/copy.out index 1714faab39c..ec8e781c5ce 100644 --- a/src/test/regress/expected/copy.out +++ b/src/test/regress/expected/copy.out @@ -594,3 +594,169 @@ id val 5 15 6 16 DROP TABLE PP; +-- +-- Tests for COPY FROM with buffered-insert path. +-- +-- These validate correctness of COPY FROM under the AM-owned buffered path. +-- Path selection is not directly observable in SQL output. +-- +-- Basic COPY: small row count +CREATE TABLE copy_buffered_basic (a int, b text); +COPY copy_buffered_basic FROM stdin; +SELECT count(*) FROM copy_buffered_basic; + count +------- + 3 +(1 row) + +SELECT * FROM copy_buffered_basic ORDER BY a; + a | b +---+------- + 1 | hello + 2 | world + 3 | foo +(3 rows) + +DROP TABLE copy_buffered_basic; +-- Bulk COPY: exercise buffered path with enough rows for auto-flush. +-- Generate data in a source table, COPY TO a file, then COPY FROM that file. +CREATE TABLE copy_bulk_src (a int, b text); +INSERT INTO copy_bulk_src SELECT g, 'row-' || g FROM generate_series(1, 2500) g; +CREATE TABLE copy_bulk_dst (a int, b text); +COPY copy_bulk_src TO '/tmp/copy_buffered_bulk_test.csv' CSV; +COPY copy_bulk_dst FROM '/tmp/copy_buffered_bulk_test.csv' CSV; +SELECT count(*) FROM copy_bulk_dst; + count +------- + 2500 +(1 row) + +SELECT min(a), max(a) FROM copy_bulk_dst; + min | max +-----+------ + 1 | 2500 +(1 row) + +-- Verify data integrity +SELECT count(*) FROM copy_bulk_dst d JOIN copy_bulk_src s ON d.a = s.a AND d.b = s.b; + count +------- + 2500 +(1 row) + +DROP TABLE copy_bulk_src; +DROP TABLE copy_bulk_dst; +-- COPY into indexed table: verify index maintenance via flush callback +CREATE TABLE copy_indexed (a int PRIMARY KEY, b text); +COPY copy_indexed FROM stdin; +SELECT count(*) FROM copy_indexed; + count +------- + 5 +(1 row) + +SELECT * FROM copy_indexed ORDER BY a; + a | b +---+--------- + 1 | alpha + 2 | beta + 3 | gamma + 4 | delta + 5 | epsilon +(5 rows) + +-- Verify index is usable +SET enable_seqscan = off; +SELECT a FROM copy_indexed WHERE a = 3; + a +--- + 3 +(1 row) + +RESET enable_seqscan; +DROP TABLE copy_indexed; +-- COPY with AFTER ROW INSERT trigger: verify trigger fires via flush callback +CREATE TABLE copy_trigger_tgt (a int, b text); +CREATE TABLE copy_trigger_log (a int, b text); +CREATE FUNCTION copy_trigger_fn() RETURNS trigger AS $$ +BEGIN + INSERT INTO copy_trigger_log(a, b) VALUES (NEW.a, NEW.b); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER copy_after_ins + AFTER INSERT ON copy_trigger_tgt + FOR EACH ROW EXECUTE FUNCTION copy_trigger_fn(); +COPY copy_trigger_tgt FROM stdin; +SELECT count(*) FROM copy_trigger_tgt; + count +------- + 3 +(1 row) + +SELECT a, b FROM copy_trigger_tgt ORDER BY a; + a | b +----+-------- + 10 | first + 20 | second + 30 | third +(3 rows) + +-- Verify trigger fired for each row +SELECT count(*) FROM copy_trigger_log; + count +------- + 3 +(1 row) + +SELECT a, b FROM copy_trigger_log ORDER BY a; + a | b +----+-------- + 10 | first + 20 | second + 30 | third +(3 rows) + +DROP TABLE copy_trigger_tgt CASCADE; +DROP TABLE copy_trigger_log; +DROP FUNCTION copy_trigger_fn; +-- Partitioned COPY: verify correctness under partition routing with buffered path +CREATE TABLE copy_part_routed ( + id int, + val text +) PARTITION BY RANGE (id); +CREATE TABLE copy_part_routed_p1 PARTITION OF copy_part_routed + FOR VALUES FROM (1) TO (100); +CREATE TABLE copy_part_routed_p2 PARTITION OF copy_part_routed + FOR VALUES FROM (100) TO (200); +COPY copy_part_routed FROM stdin; +-- Total row count +SELECT count(*) FROM copy_part_routed; + count +------- + 6 +(1 row) + +-- Per-partition row counts +SELECT tableoid::regclass, count(*) +FROM copy_part_routed +GROUP BY tableoid ORDER BY tableoid::regclass::name; + tableoid | count +---------------------+------- + copy_part_routed_p1 | 3 + copy_part_routed_p2 | 3 +(2 rows) + +-- Content correctness +SELECT * FROM copy_part_routed ORDER BY id; + id | val +-----+--------- + 1 | alpha + 50 | bravo + 99 | charlie + 100 | delta + 150 | echo + 199 | foxtrot +(6 rows) + +DROP TABLE copy_part_routed; diff --git a/src/test/regress/sql/copy.sql b/src/test/regress/sql/copy.sql index eaad290b257..e4a10e8cf95 100644 --- a/src/test/regress/sql/copy.sql +++ b/src/test/regress/sql/copy.sql @@ -535,3 +535,105 @@ CREATE TABLE pp_510 PARTITION OF pp_2 FOR VALUES FROM (5) TO (10); INSERT INTO pp SELECT g, 10 + g FROM generate_series(1,6) g; COPY pp TO stdout(header); DROP TABLE PP; + +-- +-- Tests for COPY FROM with buffered-insert path. +-- +-- These validate correctness of COPY FROM under the AM-owned buffered path. +-- Path selection is not directly observable in SQL output. +-- + +-- Basic COPY: small row count +CREATE TABLE copy_buffered_basic (a int, b text); +COPY copy_buffered_basic FROM stdin; +1 hello +2 world +3 foo +\. +SELECT count(*) FROM copy_buffered_basic; +SELECT * FROM copy_buffered_basic ORDER BY a; +DROP TABLE copy_buffered_basic; + +-- Bulk COPY: exercise buffered path with enough rows for auto-flush. +-- Generate data in a source table, COPY TO a file, then COPY FROM that file. +CREATE TABLE copy_bulk_src (a int, b text); +INSERT INTO copy_bulk_src SELECT g, 'row-' || g FROM generate_series(1, 2500) g; +CREATE TABLE copy_bulk_dst (a int, b text); +COPY copy_bulk_src TO '/tmp/copy_buffered_bulk_test.csv' CSV; +COPY copy_bulk_dst FROM '/tmp/copy_buffered_bulk_test.csv' CSV; +SELECT count(*) FROM copy_bulk_dst; +SELECT min(a), max(a) FROM copy_bulk_dst; +-- Verify data integrity +SELECT count(*) FROM copy_bulk_dst d JOIN copy_bulk_src s ON d.a = s.a AND d.b = s.b; +DROP TABLE copy_bulk_src; +DROP TABLE copy_bulk_dst; + +-- COPY into indexed table: verify index maintenance via flush callback +CREATE TABLE copy_indexed (a int PRIMARY KEY, b text); +COPY copy_indexed FROM stdin; +1 alpha +2 beta +3 gamma +4 delta +5 epsilon +\. +SELECT count(*) FROM copy_indexed; +SELECT * FROM copy_indexed ORDER BY a; +-- Verify index is usable +SET enable_seqscan = off; +SELECT a FROM copy_indexed WHERE a = 3; +RESET enable_seqscan; +DROP TABLE copy_indexed; + +-- COPY with AFTER ROW INSERT trigger: verify trigger fires via flush callback +CREATE TABLE copy_trigger_tgt (a int, b text); +CREATE TABLE copy_trigger_log (a int, b text); +CREATE FUNCTION copy_trigger_fn() RETURNS trigger AS $$ +BEGIN + INSERT INTO copy_trigger_log(a, b) VALUES (NEW.a, NEW.b); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER copy_after_ins + AFTER INSERT ON copy_trigger_tgt + FOR EACH ROW EXECUTE FUNCTION copy_trigger_fn(); +COPY copy_trigger_tgt FROM stdin; +10 first +20 second +30 third +\. +SELECT count(*) FROM copy_trigger_tgt; +SELECT a, b FROM copy_trigger_tgt ORDER BY a; +-- Verify trigger fired for each row +SELECT count(*) FROM copy_trigger_log; +SELECT a, b FROM copy_trigger_log ORDER BY a; +DROP TABLE copy_trigger_tgt CASCADE; +DROP TABLE copy_trigger_log; +DROP FUNCTION copy_trigger_fn; + +-- Partitioned COPY: verify correctness under partition routing with buffered path +CREATE TABLE copy_part_routed ( + id int, + val text +) PARTITION BY RANGE (id); +CREATE TABLE copy_part_routed_p1 PARTITION OF copy_part_routed + FOR VALUES FROM (1) TO (100); +CREATE TABLE copy_part_routed_p2 PARTITION OF copy_part_routed + FOR VALUES FROM (100) TO (200); +COPY copy_part_routed FROM stdin; +1 alpha +50 bravo +99 charlie +100 delta +150 echo +199 foxtrot +\. +-- Total row count +SELECT count(*) FROM copy_part_routed; +-- Per-partition row counts +SELECT tableoid::regclass, count(*) +FROM copy_part_routed +GROUP BY tableoid ORDER BY tableoid::regclass::name; +-- Content correctness +SELECT * FROM copy_part_routed ORDER BY id; +DROP TABLE copy_part_routed; -- 2.52.0