public inbox for [email protected]
help / color / mirror / Atom feedRe: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
8+ messages / 4 participants
[nested] [flat]
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2024-10-29 03:18 Jingtang Zhang <[email protected]>
0 siblings, 1 reply; 8+ messages in thread
From: Jingtang Zhang @ 2024-10-29 03:18 UTC (permalink / raw)
To: Bharath Rupireddy <[email protected]>; +Cc: [email protected]; pgsql-hackers
Hi~
Sorry for multiple comments in separate mail. Just found that the initialization
seems redundant since we have used palloc0?
> + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
> + istate->bistate = NULL;
> + istate->mistate = NULL;
---
Regards, Jingtang
^ permalink raw reply [nested|flat] 8+ messages in thread
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2024-10-30 17:51 Bharath Rupireddy <[email protected]>
parent: Jingtang Zhang <[email protected]>
0 siblings, 1 reply; 8+ messages in thread
From: Bharath Rupireddy @ 2024-10-30 17:51 UTC (permalink / raw)
To: Jingtang Zhang <[email protected]>; +Cc: [email protected]; pgsql-hackers
Hi,
Thanks for looking into this.
On Mon, Oct 28, 2024 at 8:18 PM Jingtang Zhang <[email protected]> wrote:
>
> Just found that the initialization
> seems redundant since we have used palloc0?
>
> > + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
> > + istate->bistate = NULL;
> > + istate->mistate = NULL;
Changed it to palloc() and explicit initializations of the members.
With this, only TupleTableSlot's array in HeapMultiInsertState uses
palloc0(), the rest all use explicit initializations.
> Oh, another comments for v24-0001 patch: we are in heam AM now, should we use
> something like HEAP_INSERT_BAS_BULKWRITE instead of using table AM option,
> just like other heap AM options do?
>
> > + if ((state->options & TABLE_INSERT_BAS_BULKWRITE) != 0)
> > + istate->bistate = GetBulkInsertState();
Defined HEAP_INSERT_BAS_BULKWRITE and used that in heapam.c similar to
INSERT_SKIP_FSM, INSERT_FROZEN, NO_LOGICAL.
> Little question about v24 0002 patch: would it be better to move the
> implementation of TableModifyIsMultiInsertsSupported to somewhere for table AM
> level? Seems it is a common function for future use, not a specific one for
> matview.
It's more tailored for CREATE TABLE AS and CREATE/REFRESH MATERIALIZED
VIEW in the sense that no triggers, foreign table and partitioned
table possible here. INSERT INTO SELECT and Logical Replication Apply
will have a lot more conditions (e.g. RETURNING clause, triggers etc.)
and they will need to be handled differently. So, I left
TableModifyIsMultiInsertsSupported as-is in a common place in
matview.c.
Please find the attached v25 patch set.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachments:
[application/octet-stream] v25-0001-Introduce-new-table-AM-for-multi-inserts.patch (15.8K, 2-v25-0001-Introduce-new-table-AM-for-multi-inserts.patch)
download | inline diff:
From c8277e6f5e9a72baebe993b2241d34a7d427473d Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:13:20 +0000
Subject: [PATCH v25 1/3] Introduce new table AM for multi-inserts
Until now, it's the COPY ... FROM command using multi inserts
(i.e. buffer some tuples and inserts them to table at once).
Basic idea of multi-inserts is that less WAL and reduced buffer
locking. Multi-inserts is faster than calling heap_insert() in a
loop, because when multiple tuples can be inserted on a single
page, we can write just a single WAL record covering all of them,
and only need to lock/unlock the page once.
Various other commands can benefit from this multi-inserts logic
[Reusable].
Also, there's a need to have these multi-inserts AMs (Access
Methods) as scan-like API [Usability]. With this, various table
AMs can define their own buffering and flushing strategy
[Flexibility] based on the way they store the data in the
underlying storage (e.g. columnar).
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/access/heap/heapam.c | 211 ++++++++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 6 +
src/backend/access/table/tableamapi.c | 5 +
src/include/access/heapam.h | 39 +++++
src/include/access/tableam.h | 81 +++++++++
src/tools/pgindent/typedefs.list | 3 +
6 files changed, 344 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1748eafa10..69b21cf12c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -50,6 +50,7 @@
#include "storage/procarray.h"
#include "utils/datum.h"
#include "utils/inval.h"
+#include "utils/memutils.h"
#include "utils/spccache.h"
@@ -102,7 +103,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
bool *copy);
-
+static void heap_modify_insert_end(TableModifyState *state);
/*
* Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2603,6 +2604,214 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
pgstat_count_heap_insert(relation, ntuples);
}
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ TableModifyState *state;
+ MemoryContext context;
+ MemoryContext oldcontext;
+
+ context = AllocSetContextCreate(TopTransactionContext,
+ "heap_modify memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(context);
+ state = palloc(sizeof(TableModifyState));
+ state->rel = rel;
+ state->cid = cid;
+ state->options = options;
+ state->mem_ctx = context;
+ state->buffer_flush_cb = buffer_flush_cb;
+ state->buffer_flush_ctx = buffer_flush_ctx;
+ state->data = NULL; /* To be set lazily */
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot)
+{
+ TupleTableSlot *dstslot;
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(state->mem_ctx);
+
+ /* First time through, initialize heap insert state */
+ if (state->data == NULL)
+ {
+ istate = (HeapInsertState *) palloc(sizeof(HeapInsertState));
+ istate->bistate = NULL;
+ istate->mistate = NULL;
+ state->data = istate;
+ mistate =
+ (HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState));
+ mistate->slots =
+ (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+ mistate->cur_slots = 0;
+ istate->mistate = mistate;
+
+ /*
+ * heap_multi_insert() can leak memory. So switch to this memory
+ * context before every heap_multi_insert() call and reset when
+ * finished.
+ */
+ mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "heap_multi_insert memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ if ((state->options & HEAP_INSERT_BAS_BULKWRITE) != 0)
+ istate->bistate = GetBulkInsertState();
+ }
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+ dstslot = mistate->slots[mistate->cur_slots];
+
+ if (dstslot == NULL)
+ {
+ /*
+ * We use virtual tuple slots buffered slots for leveraging the
+ * optimization it provides to minimize physical data copying. The
+ * virtual slot gets materialized when we copy (via below
+ * ExecCopySlot) the tuples from the source slot which can be of any
+ * type. This way, it is ensured that the tuple storage doesn't depend
+ * on external memory, because all the datums that aren't passed by
+ * value are copied into the slot's memory context.
+ */
+ dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+ &TTSOpsVirtual);
+
+ mistate->slots[mistate->cur_slots] = dstslot;
+ }
+
+ Assert(TTS_IS_VIRTUAL(dstslot));
+
+ /*
+ * Note that the copy clears the previous destination slot contents, so no
+ * need to explicitly ExecClearTuple() here.
+ */
+ ExecCopySlot(dstslot, slot);
+
+ mistate->cur_slots++;
+
+ if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS)
+ heap_modify_buffer_flush(state);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+
+ /* Quick exit if we have flushed already */
+ if (mistate->cur_slots == 0)
+ return;
+
+ /*
+ * heap_multi_insert() can leak memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(mistate->mem_ctx);
+ heap_multi_insert(state->rel,
+ mistate->slots,
+ mistate->cur_slots,
+ state->cid,
+ state->options,
+ istate->bistate);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(mistate->mem_ctx);
+
+ /*
+ * Invoke caller-supplied buffer flush callback after inserting rows from
+ * the buffers to heap.
+ */
+ if (state->buffer_flush_cb != NULL)
+ {
+ for (int i = 0; i < mistate->cur_slots; i++)
+ {
+ state->buffer_flush_cb(state->buffer_flush_ctx,
+ mistate->slots[i]);
+ }
+ }
+
+ mistate->cur_slots = 0;
+}
+
+/*
+ * Heap insert specific function used for performing work at the end like
+ * flushing remaining buffered tuples, cleaning up the insert state and tuple
+ * table slots used for buffered tuples etc.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+ HeapInsertState *istate;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+
+ if (istate->mistate != NULL)
+ {
+ HeapMultiInsertState *mistate = istate->mistate;
+
+ heap_modify_buffer_flush(state);
+
+ Assert(mistate->cur_slots == 0);
+
+ for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+ MemoryContextDelete(mistate->mem_ctx);
+ }
+
+ if (istate->bistate != NULL)
+ FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+ heap_modify_insert_end(state);
+ MemoryContextDelete(state->mem_ctx);
+}
+
/*
* simple_heap_insert - insert a tuple
*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index a8d95e0f1c..d2ef6b4b78 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2644,6 +2644,12 @@ static const TableAmRoutine heapam_methods = {
.tuple_insert_speculative = heapam_tuple_insert_speculative,
.tuple_complete_speculative = heapam_tuple_complete_speculative,
.multi_insert = heap_multi_insert,
+
+ .tuple_modify_begin = heap_modify_begin,
+ .tuple_modify_buffer_insert = heap_modify_buffer_insert,
+ .tuple_modify_buffer_flush = heap_modify_buffer_flush,
+ .tuple_modify_end = heap_modify_end,
+
.tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index e9b598256f..772f29b1b5 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,11 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->scan_sample_next_block != NULL);
Assert(routine->scan_sample_next_tuple != NULL);
+ Assert(routine->tuple_modify_begin != NULL);
+ Assert(routine->tuple_modify_buffer_insert != NULL);
+ Assert(routine->tuple_modify_buffer_flush != NULL);
+ Assert(routine->tuple_modify_end != NULL);
+
return routine;
}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 96cf82f97b..a9722ce947 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -37,6 +37,7 @@
#define HEAP_INSERT_FROZEN TABLE_INSERT_FROZEN
#define HEAP_INSERT_NO_LOGICAL TABLE_INSERT_NO_LOGICAL
#define HEAP_INSERT_SPECULATIVE 0x0010
+#define HEAP_INSERT_BAS_BULKWRITE TABLE_INSERT_BAS_BULKWRITE
/* "options" flag bits for heap_page_prune_and_freeze */
#define HEAP_PAGE_PRUNE_MARK_UNUSED_NOW (1 << 0)
@@ -272,6 +273,33 @@ typedef enum
PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */
} PruneReason;
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS 1000
+
+typedef struct HeapMultiInsertState
+{
+ /* Array of buffered slots */
+ TupleTableSlot **slots;
+
+ /* Number of buffered slots currently held */
+ int cur_slots;
+
+ /* Memory context for dealing with multi inserts */
+ MemoryContext mem_ctx;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+ struct BulkInsertStateData *bistate;
+ HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
/* ----------------
* function prototypes for heap access method
*
@@ -322,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
int ntuples, CommandId cid, int options,
BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
extern TM_Result heap_delete(Relation relation, ItemPointer tid,
CommandId cid, Snapshot crosscheck, bool wait,
struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index adb478a93c..57b71eef38 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -254,11 +254,42 @@ typedef struct TM_IndexDeleteOp
TM_IndexStatus *status;
} TM_IndexDeleteOp;
+struct TableModifyState;
+
+/* Callback invoked upon flushing each buffered tuple */
+typedef void (*TableModifyBufferFlushCb) (void *context,
+ TupleTableSlot *slot);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+ /* These fields are used for inserts for now */
+
+ Relation rel; /* Relation to insert to */
+ CommandId cid; /* Command ID for insert */
+ int options; /* TABLE_INSERT options */
+
+ /* Memory context for dealing with modify state variables */
+ MemoryContext mem_ctx;
+
+ /* Flush callback and its context used for multi inserts */
+ TableModifyBufferFlushCb buffer_flush_cb;
+ void *buffer_flush_ctx;
+
+ /* Table AM specific data */
+ void *data;
+} TableModifyState;
+
/* "options" flag bits for table_tuple_insert */
/* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
#define TABLE_INSERT_SKIP_FSM 0x0002
#define TABLE_INSERT_FROZEN 0x0004
#define TABLE_INSERT_NO_LOGICAL 0x0008
+/*
+ * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for
+ * HEAP_INSERT_SPECULATIVE.
+ */
+#define TABLE_INSERT_BAS_BULKWRITE 0x0020
/* flag bits for table_tuple_lock */
/* Follow tuples whose update is in progress if lock modes don't conflict */
@@ -577,6 +608,21 @@ typedef struct TableAmRoutine
void (*finish_bulk_insert) (Relation rel, int options);
+ /* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+ TableModifyState *(*tuple_modify_begin) (Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+ void (*tuple_modify_buffer_insert) (TableModifyState *state,
+ TupleTableSlot *slot);
+ void (*tuple_modify_buffer_flush) (TableModifyState *state);
+ void (*tuple_modify_end) (TableModifyState *state);
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
@@ -1608,6 +1654,41 @@ table_finish_bulk_insert(Relation rel, int options)
rel->rd_tableam->finish_bulk_insert(rel, options);
}
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+static inline TableModifyState *
+table_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ return rel->rd_tableam->tuple_modify_begin(rel,
+ cid,
+ options,
+ buffer_flush_cb,
+ buffer_flush_ctx);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_end(state);
+}
/* ------------------------------------------------------------------------
* DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 171a7dd5d2..e7ddf29c16 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1147,6 +1147,8 @@ HeadlineJsonState
HeadlineParsedText
HeadlineWordEntry
HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
HeapPageFreeze
HeapScanDesc
HeapTuple
@@ -2873,6 +2875,7 @@ TableFuncScanState
TableFuncType
TableInfo
TableLikeClause
+TableModifyState
TableSampleClause
TableScanDesc
TableScanDescData
--
2.40.1
[application/octet-stream] v25-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch (11.1K, 3-v25-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch)
download | inline diff:
From 5a16c618c4c0875544eb866bee74b68650737fbd Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:29:55 +0000
Subject: [PATCH v25 2/3] Optimize CTAS/CMV/RMV with new multi-inserts table AM
This commit optimizes the following commands for heap AM using new
multi-inserts table AM added by commit <<CHANGE_ME>>:
- CREATE TABLE AS
- CREATE MATERIALIZED VIEW
- REFRESH MATERIALIZED VIEW
Testing shows that performance of CTAS, CMV, RMV is improved by
<<TO_FILL>> respectively on <<TO_FILL>> system.
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/commands/createas.c | 60 ++++++++++++++----
src/backend/commands/matview.c | 106 +++++++++++++++++++++++++++++---
src/include/commands/matview.h | 3 +
3 files changed, 147 insertions(+), 22 deletions(-)
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 5c92e48a56..55fd439468 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -38,6 +38,7 @@
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/queryjumble.h"
+#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "rewrite/rewriteHandler.h"
#include "tcop/tcopprot.h"
@@ -56,6 +57,12 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+
+ /* Table modify state. NULL if multi-inserts isn't supported. */
+ TableModifyState *mstate;
+
+ /* True if SELECT query contains volatile functions */
+ bool volatile_funcs;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -321,6 +328,10 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params);
+ /* Check if the SELECT query has any volatile functions */
+ ((DR_intorel *) dest)->volatile_funcs =
+ contain_volatile_functions_after_planning((Expr *) query);
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only
@@ -556,16 +567,32 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->rel = intoRelationDesc;
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM;
+ myState->ti_options = TABLE_INSERT_SKIP_FSM |
+ TABLE_INSERT_BAS_BULKWRITE;
+ myState->mstate = NULL;
+ myState->bistate = NULL;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
- * bulk inserts as there are no tuples to insert.
+ * multi or bulk inserts as there are no tuples to insert.
*/
if (!into->skipData)
- myState->bistate = GetBulkInsertState();
- else
- myState->bistate = NULL;
+ {
+ if (TableModifyIsMultiInsertsSupported(myState->rel,
+ myState->volatile_funcs))
+ {
+ myState->mstate = table_modify_begin(myState->rel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, /* Multi-insert buffer
+ * flush callback */
+ NULL); /* Multi-insert buffer
+ * flush callback
+ * context */
+ }
+ else
+ myState->bistate = GetBulkInsertState();
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -593,11 +620,15 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
* would not be cheap either. This also doesn't allow accessing per-AM
* data (say a tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->rel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+
+ if (myState->mstate != NULL)
+ table_modify_buffer_insert(myState->mstate, slot);
+ else
+ table_tuple_insert(myState->rel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
}
/* We know this is a newly created relation, so there are no indexes */
@@ -616,8 +647,13 @@ intorel_shutdown(DestReceiver *self)
if (!into->skipData)
{
- FreeBulkInsertState(myState->bistate);
- table_finish_bulk_insert(myState->rel, myState->ti_options);
+ if (myState->mstate != NULL)
+ table_modify_end(myState->mstate);
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->rel, myState->ti_options);
+ }
}
/* close rel, but keep lock until commit */
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 010097873d..fa495ec533 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -30,7 +30,9 @@
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "executor/spi.h"
+#include "foreign/fdwapi.h"
#include "miscadmin.h"
+#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lmgr.h"
@@ -51,6 +53,12 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+
+ /* Table modify state. NULL if multi-inserts isn't supported. */
+ TableModifyState *mstate;
+
+ /* True if SELECT query contains volatile functions */
+ bool volatile_funcs;
} DR_transientrel;
static int matview_maintenance_depth = 0;
@@ -428,6 +436,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
+ /*
+ * Check if the stored MATERIALIZED VIEW query has any volatile functions.
+ */
+ ((DR_transientrel *) dest)->volatile_funcs =
+ contain_volatile_functions_after_planning((Expr *) query);
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
@@ -492,8 +506,26 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
*/
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
- myState->bistate = GetBulkInsertState();
+ myState->ti_options = TABLE_INSERT_SKIP_FSM |
+ TABLE_INSERT_FROZEN |
+ TABLE_INSERT_BAS_BULKWRITE;
+ myState->bistate = NULL;
+ myState->mstate = NULL;
+
+ /* Set up the state for multi or bulk inserts */
+ if (TableModifyIsMultiInsertsSupported(myState->transientrel,
+ myState->volatile_funcs))
+ {
+ myState->mstate = table_modify_begin(myState->transientrel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, /* Multi-insert buffer
+ * flush callback */
+ NULL); /* Multi-insert buffer
+ * flush callback context */
+ }
+ else
+ myState->bistate = GetBulkInsertState();
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -519,11 +551,14 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
* tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->transientrel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ if (myState->mstate != NULL)
+ table_modify_buffer_insert(myState->mstate, slot);
+ else
+ table_tuple_insert(myState->transientrel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
@@ -538,9 +573,13 @@ transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- FreeBulkInsertState(myState->bistate);
-
- table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ if (myState->mstate != NULL)
+ table_modify_end(myState->mstate);
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ }
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
@@ -984,3 +1023,50 @@ CloseMatViewIncrementalMaintenance(void)
matview_maintenance_depth--;
Assert(matview_maintenance_depth >= 0);
}
+
+/*
+ * Check if multi-inserts is supported.
+ *
+ * It's generally more efficient to prepare a bunch of tuples for insertion,
+ * and insert them in one multi-inserts call, than call
+ * table_tuple_insert() separately for every tuple. However, there are a
+ * number of reasons why we might not be able to do this. In general, can't
+ * support multi-inserts in the following cases:
+ *
+ * When there are any BEFORE/INSTEAD OF triggers on the table or any volatile
+ * functions/expressions in the SELECT query. Such triggers or volatile
+ * expressions might query the table we're inserting into and act differently
+ * if the tuples that have already been processed and prepared for insertion
+ * are not there.
+ *
+ * When inserting into partitioned table. For partitioned tables, we may still
+ * be able to perform multi-inserts. However, the possibility of this depends
+ * on which types of triggers exist on the partition. We must disable
+ * multi-inserts if the partition is a foreign table that can't use batching or
+ * it has any before row insert or insert instead triggers (same as we checked
+ * above for the parent table). We really can't know all these unless we start
+ * inserting tuples into the respective partitions. We can have an intermediate
+ * insert state to show the intent to do multi-inserts and later determine if
+ * we can use multi-inserts for the partition being inserted into.
+ *
+ * When inserting into foreign table. For foreign tables, we may still be able
+ * to do multi-inserts if the FDW supports batching.
+ */
+bool
+TableModifyIsMultiInsertsSupported(Relation rel, bool volatile_funcs)
+{
+ if (volatile_funcs)
+ return false;
+
+ /*
+ * For CREATE TABLE AS, CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED
+ * VIEW, we really can't have triggers or can't create table as
+ * partitioned or foreign. So, we will assert.
+ */
+ Assert(rel->trigdesc == NULL);
+ Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
+ Assert(rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE);
+
+ /* Can support multi-inserts */
+ return true;
+}
diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h
index c8811e8fc7..28abd7b89b 100644
--- a/src/include/commands/matview.h
+++ b/src/include/commands/matview.h
@@ -33,4 +33,7 @@ extern DestReceiver *CreateTransientRelDestReceiver(Oid transientoid);
extern bool MatViewIncrementalMaintenanceIsEnabled(void);
+extern bool TableModifyIsMultiInsertsSupported(Relation rel,
+ bool volatile_funcs);
+
#endif /* MATVIEW_H */
--
2.40.1
[application/octet-stream] v25-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch (14.3K, 4-v25-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch)
download | inline diff:
From 16d488cff14c0ce9ace648a1b99e507edb184e74 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:30:41 +0000
Subject: [PATCH v25 3/3] Use new multi-inserts table AM for COPY ... FROM
This commit uses the new multi-inserts table AM added by commit
<<CHANGE_ME>> for COPY ... FROM command.
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/commands/copyfrom.c | 254 +++++++++++++++--------
src/include/commands/copyfrom_internal.h | 4 +-
src/tools/pgindent/typedefs.list | 1 +
3 files changed, 171 insertions(+), 88 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 07cbd5d22b..18fb609cbe 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -74,14 +74,27 @@
*/
#define MAX_PARTITION_BUFFERS 32
+/* Context for multi-inserts buffer flush callback */
+typedef struct MultiInsertBufferFlushCtx
+{
+ CopyFromState cstate;
+ ResultRelInfo *resultRelInfo;
+ EState *estate;
+} MultiInsertBufferFlushCtx;
+
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TableModifyState *mstate; /* Table insert state; NULL if foreign table */
+ TupleTableSlot **slots; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
- BulkInsertState bistate; /* BulkInsertState for this rel if plain
- * table; NULL if foreign table */
+ TupleTableSlot *mislot; /* Slot used for multi-inserts */
+ MultiInsertBufferFlushCtx *mibufferctx; /* Multi-inserts buffer flush
+ * callback context */
int nused; /* number of 'slots' containing tuples */
+ int currslotno; /* Current buffered slot number that's being
+ * flushed; Used to get correct cur_lineno for
+ * errors while in flush callback. */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -216,19 +229,96 @@ CopyLimitPrintoutLength(const char *str)
return res;
}
+/*
+ * Implements for multi-inserts buffer flush callback
+ * i.e. TableModifyEndCallback.
+ *
+ * NB: Caller must take care of opening and closing the indices.
+ */
+static void
+MultiInsertBufferFlushCb(void *context, TupleTableSlot *slot)
+{
+ MultiInsertBufferFlushCtx *mibufferctx = (MultiInsertBufferFlushCtx *) context;
+ CopyFromState cstate = mibufferctx->cstate;
+ ResultRelInfo *resultRelInfo = mibufferctx->resultRelInfo;
+ EState *estate = mibufferctx->estate;
+ CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+ /*
+ * If there are any indexes, update them for all the inserted tuples, and
+ * run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+ recheckIndexes =
+ ExecInsertIndexTuples(resultRelInfo,
+ slot, estate, false,
+ false, NULL, NIL, false);
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, recheckIndexes,
+ cstate->transition_capture);
+
+ list_free(recheckIndexes);
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+ * anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, NIL,
+ cstate->transition_capture);
+ }
+
+ Assert(buffer->currslotno <= buffer->nused);
+}
+
/*
* Allocate memory and initialize a new CopyMultiInsertBuffer for this
* ResultRelInfo.
*/
static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+ CopyFromState cstate, EState *estate)
{
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ buffer->mibufferctx =
+ (MultiInsertBufferFlushCtx *) palloc(sizeof(MultiInsertBufferFlushCtx));
+ buffer->mibufferctx->cstate = cstate;
+ buffer->mibufferctx->resultRelInfo = rri;
+ buffer->mibufferctx->estate = estate;
+
+ buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+ miinfo->mycid,
+ miinfo->ti_options,
+ MultiInsertBufferFlushCb,
+ buffer->mibufferctx);
+
+ buffer->slots = NULL;
+ }
+ else
+ {
+ buffer->mstate = NULL;
+ buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ }
+
+ buffer->mislot = NULL;
buffer->resultRelInfo = rri;
- buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
return buffer;
@@ -239,11 +329,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
*/
static inline void
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
- ResultRelInfo *rri)
+ ResultRelInfo *rri, CopyFromState cstate,
+ EState *estate)
{
CopyMultiInsertBuffer *buffer;
- buffer = CopyMultiInsertBufferInit(rri);
+ buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
/* Setup back-link so we can easily find this buffer again */
rri->ri_CopyMultiInsertBuffer = buffer;
@@ -276,7 +367,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
* tuples their way for the first time.
*/
if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
- CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+ CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
}
/*
@@ -320,8 +411,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
int batch_size = resultRelInfo->ri_BatchSize;
int sent = 0;
- Assert(buffer->bistate == NULL);
-
/* Ensure that the FDW supports batching and it's enabled */
Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
Assert(batch_size > 1);
@@ -393,13 +482,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
}
else
{
- CommandId mycid = miinfo->mycid;
- int ti_options = miinfo->ti_options;
bool line_buf_valid = cstate->line_buf_valid;
uint64 save_cur_lineno = cstate->cur_lineno;
- MemoryContext oldcontext;
-
- Assert(buffer->bistate != NULL);
/*
* Print error context information correctly, if one of the operations
@@ -407,56 +491,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
*/
cstate->line_buf_valid = false;
- /*
- * table_multi_insert may leak memory, so switch to short-lived memory
- * context before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- table_multi_insert(resultRelInfo->ri_RelationDesc,
- slots,
- nused,
- mycid,
- ti_options,
- buffer->bistate);
- MemoryContextSwitchTo(oldcontext);
+ Assert(buffer->currslotno <= buffer->nused);
+ buffer->currslotno = 0;
- for (i = 0; i < nused; i++)
- {
- /*
- * If there are any indexes, update them for all the inserted
- * tuples, and run AFTER ROW INSERT triggers.
- */
- if (resultRelInfo->ri_NumIndices > 0)
- {
- List *recheckIndexes;
-
- cstate->cur_lineno = buffer->linenos[i];
- recheckIndexes =
- ExecInsertIndexTuples(resultRelInfo,
- buffer->slots[i], estate, false,
- false, NULL, NIL, false);
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], recheckIndexes,
- cstate->transition_capture);
- list_free(recheckIndexes);
- }
+ table_modify_buffer_flush(buffer->mstate);
- /*
- * There's no indexes, but see if we need to run AFTER ROW INSERT
- * triggers anyway.
- */
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
- {
- cstate->cur_lineno = buffer->linenos[i];
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], NIL,
- cstate->transition_capture);
- }
+ Assert(buffer->currslotno <= buffer->nused);
+ buffer->currslotno = 0;
- ExecClearTuple(slots[i]);
- }
+ /*
+ * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+ * in the flush callback CopyModifyBufferFlushCallback.
+ */
/* Update the row counter and progress of the COPY command */
*processed += nused;
@@ -492,19 +538,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
if (resultRelInfo->ri_FdwRoutine == NULL)
{
- Assert(buffer->bistate != NULL);
- FreeBulkInsertState(buffer->bistate);
+ table_modify_end(buffer->mstate);
+ ExecDropSingleTupleTableSlot(buffer->mislot);
+ pfree(buffer->mibufferctx);
}
else
- Assert(buffer->bistate == NULL);
-
- /* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
- ExecDropSingleTupleTableSlot(buffer->slots[i]);
+ {
+ /* Since we only create slots on demand, just drop the non-null ones. */
+ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(buffer->slots[i]);
- if (resultRelInfo->ri_FdwRoutine == NULL)
- table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
- miinfo->ti_options);
+ pfree(buffer->slots);
+ }
pfree(buffer);
}
@@ -598,15 +643,36 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
{
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
int nused;
+ TupleTableSlot *slot;
Assert(buffer != NULL);
Assert(buffer->nused < MAX_BUFFERED_TUPLES);
nused = buffer->nused;
- if (buffer->slots[nused] == NULL)
- buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
- return buffer->slots[nused];
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ if (buffer->mislot == NULL)
+ {
+ buffer->mislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+ &TTSOpsVirtual);
+ }
+
+ /* Caller must clear the slot */
+ slot = buffer->mislot;
+ }
+ else
+ {
+ if (buffer->slots[nused] == NULL)
+ {
+ slot = table_slot_create(rri->ri_RelationDesc, NULL);
+ buffer->slots[nused] = slot;
+ }
+ else
+ slot = buffer->slots[nused];
+ }
+
+ return slot;
}
/*
@@ -620,7 +686,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
Assert(buffer != NULL);
- Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+ if (rri->ri_FdwRoutine != NULL)
+ Assert(slot == buffer->slots[buffer->nused]);
+#endif
/* Store the line number so we can properly report any errors later */
buffer->linenos[buffer->nused] = lineno;
@@ -628,6 +698,22 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
/* Record this slot as being used */
buffer->nused++;
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ Assert(slot == buffer->mislot);
+ buffer->currslotno = 0;
+
+ table_modify_buffer_insert(buffer->mstate, slot);
+ }
+ else
+ {
+ /*
+ * The slot previously might point into the per-tuple context. For
+ * batching it needs to be longer lived.
+ */
+ ExecMaterializeSlot(slot);
+ }
+
/* Update how many tuples are stored and their size */
miinfo->bufferedTuples++;
miinfo->bufferedBytes += tuplen;
@@ -841,7 +927,7 @@ CopyFrom(CopyFromState cstate)
/*
* It's generally more efficient to prepare a bunch of tuples for
* insertion, and insert them in one
- * table_multi_insert()/ExecForeignBatchInsert() call, than call
+ * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
* table_tuple_insert()/ExecForeignInsert() separately for every tuple.
* However, there are a number of reasons why we might not be able to do
* this. These are explained below.
@@ -925,7 +1011,8 @@ CopyFrom(CopyFromState cstate)
insertMethod = CIM_MULTI;
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
- estate, mycid, ti_options);
+ estate, mycid,
+ ti_options | TABLE_INSERT_BAS_BULKWRITE);
}
/*
@@ -1094,7 +1181,8 @@ CopyFrom(CopyFromState cstate)
{
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
- resultRelInfo);
+ resultRelInfo, cstate,
+ estate);
}
else if (insertMethod == CIM_MULTI_CONDITIONAL &&
!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
@@ -1224,12 +1312,6 @@ CopyFrom(CopyFromState cstate)
/* Store the slot in the multi-insert buffer, when enabled. */
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
{
- /*
- * The slot previously might point into the per-tuple
- * context. For batching it needs to be longer lived.
- */
- ExecMaterializeSlot(myslot);
-
/* Add this tuple to the tuple buffer */
CopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo, myslot,
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
typedef enum CopyInsertMethod
{
CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */
- CIM_MULTI, /* always use table_multi_insert or
+ CIM_MULTI, /* always use table_modify_buffer_insert or
* ExecForeignBatchInsert */
- CIM_MULTI_CONDITIONAL, /* use table_multi_insert or
+ CIM_MULTI_CONDITIONAL, /* use table_modify_buffer_insert or
* ExecForeignBatchInsert only if valid */
} CopyInsertMethod;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e7ddf29c16..bf21e43ce1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1664,6 +1664,7 @@ MonotonicFunction
MorphOpaque
MsgType
MultiAssignRef
+MultiInsertBufferFlushCtx
MultiSortSupport
MultiSortSupportData
MultiXactId
--
2.40.1
^ permalink raw reply [nested|flat] 8+ messages in thread
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2024-10-31 04:17 Jingtang Zhang <[email protected]>
parent: Bharath Rupireddy <[email protected]>
0 siblings, 1 reply; 8+ messages in thread
From: Jingtang Zhang @ 2024-10-31 04:17 UTC (permalink / raw)
To: Bharath Rupireddy <[email protected]>; +Cc: [email protected]; pgsql-hackers
Hi~
I did some performance test these days, and I have some findings.
From the archive months ago, I found there were discussions about which type
of TupleTableSlot to use for buffering tuples. A single column mat view was
used for evaluation. Finally we used virtual one.
However when I test with a 32-columns mat view, I get regression.
Test case:
-- prepare
create table test as
select
i as id0,
i + 1 as id1,
i + 2 as id2,
i + 3 as id3,
i + 4 as id4,
i + 5 as id5,
i + 6 as id6,
i + 7 as id7,
i + 8 as id8,
i + 9 as id9,
i + 10 as id10,
i + 11 as id11,
i + 12 as id12,
i + 13 as id13,
i + 14 as id14,
i + 15 as id15,
i + 0.01 as f0,
i + 0.1 as f1,
i + 0.2 as f2,
i + 0.3 as f3,
i + 0.4 as f4,
i + 0.5 as f5,
i + 0.6 as f6,
i + 0.7 as f7,
i + 0.8 as f8,
i + 0.9 as f9,
i + 1.01 as f10,
i + 1.1 as f11,
i + 1.2 as f12,
i + 1.3 as f13,
i + 1.4 as f14,
i + 1.5 as f15,
i + 1.6 as f16
from generate_series(1,5000000) i;
-- run
create materialized view m1 as select * from test;
HEAD:
Time: 13615.542 ms (00:13.616)
Time: 13545.706 ms (00:13.546)
Time: 13578.475 ms (00:13.578)
Patched
Time: 20112.734 ms (00:20.113)
Time: 19996.957 ms (00:19.997)
Time: 19936.871 ms (00:19.937)
I did a quick perf, the overhead seems to come from virtual tuple materialization.
HEAD:
12.29% postgres [.] pg_checksum_block
6.33% postgres [.] GetPrivateRefCountEntry
5.40% postgres [.] pg_comp_crc32c_sse42
4.54% [kernel] [k] copy_user_enhanced_fast_string
2.69% postgres [.] BufferIsValid
1.52% postgres [.] XLogRecordAssemble
Patched:
11.75% postgres [.] tts_virtual_materialize
8.87% postgres [.] pg_checksum_block
8.17% postgres [.] slot_deform_heap_tuple
8.09% postgres [.] heap_compute_data_size
6.17% postgres [.] fill_val
3.81% postgres [.] heap_fill_tuple
3.37% postgres [.] tts_virtual_copyslot
2.62% [kernel] [k] copy_user_enhanced_fast_string
Not sure if it is a universal situation.
—
Regards, Jingtang
^ permalink raw reply [nested|flat] 8+ messages in thread
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2025-03-09 11:27 Daniil Davydov <[email protected]>
parent: Jingtang Zhang <[email protected]>
0 siblings, 1 reply; 8+ messages in thread
From: Daniil Davydov @ 2025-03-09 11:27 UTC (permalink / raw)
To: Jingtang Zhang <[email protected]>; +Cc: Bharath Rupireddy <[email protected]>; [email protected]; pgsql-hackers
Hi,
A few days ago I came up with an idea to implement multi insert
optimization wherever possible. I prepared a raw patch
and it showed a great performance gain (up to 4 times during INSERT
... INTO ... in the best case).
Then I was very happy to find this thread. You did a great job and I
want to help you to bring the matter to an end.
On Thu, Oct 31, 2024 at 11:17 AM Jingtang Zhang <[email protected]> wrote:
> I did some performance test these days, and I have some findings.
> HEAD:
> 12.29% postgres [.] pg_checksum_block
> 6.33% postgres [.] GetPrivateRefCountEntry
> 5.40% postgres [.] pg_comp_crc32c_sse42
> 4.54% [kernel] [k] copy_user_enhanced_fast_string
> 2.69% postgres [.] BufferIsValid
> 1.52% postgres [.] XLogRecordAssemble
>
> Patched:
> 11.75% postgres [.] tts_virtual_materialize
> 8.87% postgres [.] pg_checksum_block
> 8.17% postgres [.] slot_deform_heap_tuple
> 8.09% postgres [.] heap_compute_data_size
> 6.17% postgres [.] fill_val
> 3.81% postgres [.] heap_fill_tuple
> 3.37% postgres [.] tts_virtual_copyslot
> 2.62% [kernel] [k] copy_user_enhanced_fast_string
I applied v25 patches on the master branch and made some measurements
to find out what is the bottleneck in this case. The 'time' utility
showed that without a patch, this query will run 1.5 times slower. I
also made a few flamegraphs for this test. Most of the time is spent
calling
these two functions : tts_virtual_copyslot and heap_form_tuple.
All tests were run in virtual machine with these CPU characteristics:
Architecture: x86_64
CPU(s): 2
On-line CPU(s) list: 0,1
Virtualization features:
Virtualization: AMD-V
Hypervisor vendor: KVM
Virtualization type: full
Caches (sum of all):
L1d: 128 KiB (2 instances)
L1i: 128 KiB (2 instances)
L2: 1 MiB (2 instances)
L3: 32 MiB (2 instances)
NUMA:
NUMA node(s): 1
NUMA node0 CPU(s): 0,1
In my implementation, I used Tuplestore functionality to store tuples.
In order to get rid of getting stuck in the above mentioned functions,
I crossed it with the current implementation (v25 patches) and got a
10% increase in performance (for the test above). I also set up v22
patches to
compare performance (with/without tuplestore) for INSERT ... INTO ...
queries (with -j 4 -c 10 parameters for pgbech), and there also was an
increase in TPS (about 3-4%).
I attach a patch that adds Tuplestore to v25. What do you think about this idea?
--
Best regards,
Daniil Davydov
Attachments:
[text/x-patch] 0001-Replace-holding-tuples-in-virtual-slots-with-tuplest.patch (5.9K, 2-0001-Replace-holding-tuples-in-virtual-slots-with-tuplest.patch)
download | inline diff:
From a59cfcbb05bb07c94a4c0ad6531baa5e531629ae Mon Sep 17 00:00:00 2001
From: Daniil Davidov <[email protected]>
Date: Sun, 9 Mar 2025 16:37:44 +0700
Subject: [PATCH] Replace holding tuples in virtual slots with tuplestorage
During performance testing, it was found out that in the current
implementation a lot of the program's time is spent calling two functions :
tts_virtual_copyslot and heap_fill_tuple. Calls to these functions are related
to the fact that tuples are stored in virtual_tts, so I propose to replace this
logic with Tuplestore functionality.
Discussion: https://www.postgresql.org/message-id/9F9326B4-8AD9-4858-B1C1-559FC64E6E93%40gmail.com
---
src/backend/access/heap/heapam.c | 67 +++++++++++++++-----------------
src/include/access/heapam.h | 9 ++++-
2 files changed, 38 insertions(+), 38 deletions(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index acdce1a4b4..276480213a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2665,7 +2665,6 @@ void
heap_modify_buffer_insert(TableModifyState *state,
TupleTableSlot *slot)
{
- TupleTableSlot *dstslot;
HeapInsertState *istate;
HeapMultiInsertState *mistate;
MemoryContext oldcontext;
@@ -2682,8 +2681,10 @@ heap_modify_buffer_insert(TableModifyState *state,
mistate =
(HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState));
mistate->slots =
- (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
- mistate->cur_slots = 0;
+ (TupleTableSlot **) palloc0(sizeof(void *) * HEAP_MAX_BUFFERED_SLOTS);
+ mistate->tstore = tuplestore_begin_heap(false, false, work_mem);
+ mistate->nused = 0;
+
istate->mistate = mistate;
/*
@@ -2702,36 +2703,11 @@ heap_modify_buffer_insert(TableModifyState *state,
istate = (HeapInsertState *) state->data;
Assert(istate->mistate != NULL);
mistate = istate->mistate;
- dstslot = mistate->slots[mistate->cur_slots];
-
- if (dstslot == NULL)
- {
- /*
- * We use virtual tuple slots buffered slots for leveraging the
- * optimization it provides to minimize physical data copying. The
- * virtual slot gets materialized when we copy (via below
- * ExecCopySlot) the tuples from the source slot which can be of any
- * type. This way, it is ensured that the tuple storage doesn't depend
- * on external memory, because all the datums that aren't passed by
- * value are copied into the slot's memory context.
- */
- dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
- &TTSOpsVirtual);
-
- mistate->slots[mistate->cur_slots] = dstslot;
- }
-
- Assert(TTS_IS_VIRTUAL(dstslot));
-
- /*
- * Note that the copy clears the previous destination slot contents, so no
- * need to explicitly ExecClearTuple() here.
- */
- ExecCopySlot(dstslot, slot);
- mistate->cur_slots++;
+ tuplestore_puttupleslot(mistate->tstore, slot);
+ mistate->nused += 1;
- if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS)
+ if (mistate->nused >= HEAP_MAX_BUFFERED_SLOTS)
heap_modify_buffer_flush(state);
MemoryContextSwitchTo(oldcontext);
@@ -2746,19 +2722,35 @@ heap_modify_buffer_flush(TableModifyState *state)
HeapInsertState *istate;
HeapMultiInsertState *mistate;
MemoryContext oldcontext;
+ TupleDesc tupdesc;
/* Quick exit if we haven't inserted anything yet */
if (state->data == NULL)
return;
+ tupdesc = RelationGetDescr(state->rel);
istate = (HeapInsertState *) state->data;
Assert(istate->mistate != NULL);
mistate = istate->mistate;
/* Quick exit if we have flushed already */
- if (mistate->cur_slots == 0)
+ if (mistate->nused == 0)
return;
+ for (int i = 0; i < mistate->nused; i++)
+ {
+ bool ok;
+
+ if (istate->mistate->slots[i] == NULL)
+ {
+ istate->mistate->slots[i] =
+ MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple);
+ }
+ ok = tuplestore_gettupleslot(mistate->tstore, true, false,
+ istate->mistate->slots[i]);
+ Assert(ok);
+ }
+
/*
* heap_multi_insert() can leak memory, so switch to short-lived memory
* context before calling it.
@@ -2766,7 +2758,7 @@ heap_modify_buffer_flush(TableModifyState *state)
oldcontext = MemoryContextSwitchTo(mistate->mem_ctx);
heap_multi_insert(state->rel,
mistate->slots,
- mistate->cur_slots,
+ mistate->nused,
state->cid,
state->options,
istate->bistate);
@@ -2779,14 +2771,15 @@ heap_modify_buffer_flush(TableModifyState *state)
*/
if (state->buffer_flush_cb != NULL)
{
- for (int i = 0; i < mistate->cur_slots; i++)
+ for (int i = 0; i < mistate->nused; i++)
{
state->buffer_flush_cb(state->buffer_flush_ctx,
mistate->slots[i]);
}
}
- mistate->cur_slots = 0;
+ tuplestore_clear(mistate->tstore);
+ mistate->nused = 0;
}
/*
@@ -2811,11 +2804,13 @@ heap_modify_insert_end(TableModifyState *state)
heap_modify_buffer_flush(state);
- Assert(mistate->cur_slots == 0);
+ Assert(mistate->nused== 0);
for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(mistate->slots[i]);
+ tuplestore_end(mistate->tstore);
+
MemoryContextDelete(mistate->mem_ctx);
}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index fdbbf9b8e8..5d8e672059 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -27,8 +27,10 @@
#include "storage/lockdefs.h"
#include "storage/read_stream.h"
#include "storage/shm_toc.h"
+#include "tcop/dest.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
+#include "utils/tuplestore.h"
/* "options" flag bits for heap_insert */
@@ -285,8 +287,11 @@ typedef struct HeapMultiInsertState
/* Array of buffered slots */
TupleTableSlot **slots;
- /* Number of buffered slots currently held */
- int cur_slots;
+ /* Holds the tuple set */
+ Tuplestorestate *tstore;
+
+ /* Number of buffered tuples currently held */
+ int nused;
/* Memory context for dealing with multi inserts */
MemoryContext mem_ctx;
--
2.43.0
^ permalink raw reply [nested|flat] 8+ messages in thread
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2025-03-17 04:49 Daniil Davydov <[email protected]>
parent: Daniil Davydov <[email protected]>
0 siblings, 1 reply; 8+ messages in thread
From: Daniil Davydov @ 2025-03-17 04:49 UTC (permalink / raw)
To: Jingtang Zhang <[email protected]>; +Cc: Bharath Rupireddy <[email protected]>; [email protected]; pgsql-hackers
Hi,
Recently I took more careful measurements of the performance. I
compared three branches with each other: HEAD, Patched and Patched
with tuplestore.
Here are the results :
1)
Test case : matview creation test attached in the email from Jingtang Zhang.
10 measurements for each branch.
Result in wall clock execution time :
HEAD
30.532 +- 0.59 seconds elapsed
Patched
20.454 +- 0.114 seconds elapsed
Patched with tuplestore
19.653 +- 0.111 seconds elapsed
2)
-- init.sql
drop table test_insert;
vacuum;
checkpoint;
create table test_insert(i int, f float);
-- iowrite.sql
insert into test_insert select g, (g % 100) / 100.0 from
generate_series(1, 1000000) as g;
Test case :
pgbench -f iowrite.sql -n -j 4 -c 10 -T 40
5 measurements for each branch.
Result in tps :
HEAD
1.025 +- 0.009
Patched
2.923 +- 0.032
Patched with tuplestore
2.987 +- 0.011
P.S.
I cannot find a commitfest entry for this patch. Should we add it there?
--
Best regards,
Daniil Davydov
^ permalink raw reply [nested|flat] 8+ messages in thread
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2025-04-06 13:54 Jingtang Zhang <[email protected]>
parent: Daniil Davydov <[email protected]>
0 siblings, 1 reply; 8+ messages in thread
From: Jingtang Zhang @ 2025-04-06 13:54 UTC (permalink / raw)
To: Daniil Davydov <[email protected]>; +Cc: [email protected]; pgsql-hackers; Bharath Rupireddy <[email protected]>
Hi, Daniil Davydov.
Thanks for sharing your patch!
It was quite a while since I last looked at the patch. I've tested it again,
and still get regression on patched version where a table has many columns.
And it is totally CPU-bounded on tts_virtual_copyslot.
Unpatched version:
1 col:
Time: 8909.714 ms (00:08.910)
Time: 8803.579 ms (00:08.804)
Time: 8600.415 ms (00:08.600)
32 cols:
Time: 12911.699 ms (00:12.912)
Time: 13543.491 ms (00:13.543)
Time: 13325.368 ms (00:13.325)
Patched version:
1 col:
Time: 3532.841 ms (00:03.533)
Time: 3598.223 ms (00:03.598)
Time: 3515.858 ms (00:03.516)
32 cols:
Time: 35647.724 ms (00:35.648)
Time: 35596.233 ms (00:35.596)
Time: 35669.106 ms (00:35.669)
I've tested your patch with tuplestore and found the regression does not exist
anymore, but I haven't look deep inside it.
Patched version (with tuplestore):
1 col:
Time: 3500.502 ms (00:03.501)
Time: 3486.886 ms (00:03.487)
Time: 3514.233 ms (00:03.514)
32 cols:
Time: 10375.391 ms (00:10.375)
Time: 10248.256 ms (00:10.248)
Time: 10248.289 ms (00:10.248)
It seems to be a good idea if there is no other issue with your patch.
--
Regards,
Jingtang
^ permalink raw reply [nested|flat] 8+ messages in thread
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2025-04-07 06:26 Daniil Davydov <[email protected]>
parent: Jingtang Zhang <[email protected]>
0 siblings, 1 reply; 8+ messages in thread
From: Daniil Davydov @ 2025-04-07 06:26 UTC (permalink / raw)
To: Jingtang Zhang <[email protected]>; +Cc: [email protected]; pgsql-hackers; Bharath Rupireddy <[email protected]>
Hi,
On Sun, Apr 6, 2025 at 8:55 PM Jingtang Zhang <[email protected]> wrote:
>
> It was quite a while since I last looked at the patch. I've tested it again,
> and still get regression on patched version where a table has many columns.
> And it is totally CPU-bounded on tts_virtual_copyslot.
>
> Unpatched version:
> 1 col:
> Time: 8909.714 ms (00:08.910)
> Time: 8803.579 ms (00:08.804)
> Time: 8600.415 ms (00:08.600)
> 32 cols:
> Time: 12911.699 ms (00:12.912)
> Time: 13543.491 ms (00:13.543)
> Time: 13325.368 ms (00:13.325)
>
> Patched version:
> 1 col:
> Time: 3532.841 ms (00:03.533)
> Time: 3598.223 ms (00:03.598)
> Time: 3515.858 ms (00:03.516)
> 32 cols:
> Time: 35647.724 ms (00:35.648)
> Time: 35596.233 ms (00:35.596)
> Time: 35669.106 ms (00:35.669)
>
Hm, maybe I didn't choose the best way to measure performance. Can you
please share how you do it?
> I've tested your patch with tuplestore and found the regression does not exist
> anymore, but I haven't look deep inside it.
>
> Patched version (with tuplestore):
> 1 col:
> Time: 3500.502 ms (00:03.501)
> Time: 3486.886 ms (00:03.487)
> Time: 3514.233 ms (00:03.514)
> 32 cols:
> Time: 10375.391 ms (00:10.375)
> Time: 10248.256 ms (00:10.248)
> Time: 10248.289 ms (00:10.248)
>
> It seems to be a good idea if there is no other issue with your patch.
As far as I understand, the use of multi inserts for queries like
"INSERT INTO ... SELECT FROM" is not discussed here anymore due to the
fact that in such cases we will have to take into account the volatile
functions and ROW triggers.
I've been thinking about this for a while and made a patch as an
experiment. The principles that the patch works on are listed below.
1)
Since performance decreases for single INSERTs (within a multi inserts
mechanism), I designed this feature as an option for the table. Thus,
if the user knows that he will perform a lot of inserts on the table,
he can specify "WITH (append_optimized=true)".
2)
The availability of volatile functions is monitored during the
construction of a subtree for a ModifyTable node. I'm not that
familiar with the query plan construction mechanism, but it seems to
me that this way we can track any occurrence of volatile functions.
Of course, most volatile functions don't stop us from using multi
inserts, but checking each such function would take a very long time,
so the very fact of having a volatile function is enough for us to
abandon multi-inserts.
3)
Default expressions of the target table are also checked for volatile
functions. The same rules apply to them as in (2). As an exception, I
allowed the use of SERIAL in the column data type, since this is a
fairly common use case.
4)
If the target table contains any ROW triggers, we don't use multi insert.
5)
Patch also contains a regression test. This is a "sandbox" where you
can do some experiments with append-optimized tables.
I hope that patch (targeted on 'master' branch,
2c7bd2ba507e273f2d7fe1b2f6d30775ed4f3c09) will be useful for this
thread.
--
Best regards,
Daniil Davydov
Attachments:
[text/x-patch] v1-0001-Meet-append-optimized-tables.patch (42.0K, 2-v1-0001-Meet-append-optimized-tables.patch)
download | inline diff:
From 224378c11d270aabe28bdd32efacd37ed1984bd1 Mon Sep 17 00:00:00 2001
From: Daniil Davidov <[email protected]>
Date: Mon, 7 Apr 2025 12:55:50 +0700
Subject: [PATCH v1] Meet append optimized tables
---
src/backend/access/common/reloptions.c | 11 +
src/backend/access/heap/heapam.c | 205 ++++++++++++++++++
src/backend/access/heap/heapam_handler.c | 5 +
src/backend/access/table/tableamapi.c | 5 +
src/backend/commands/explain.c | 5 +-
src/backend/executor/execExpr.c | 17 +-
src/backend/executor/execProcnode.c | 9 +
src/backend/executor/nodeModifyTable.c | 194 ++++++++++++++++-
src/backend/optimizer/plan/createplan.c | 1 +
src/backend/optimizer/util/clauses.c | 28 ++-
src/include/access/heapam.h | 41 ++++
src/include/access/tableam.h | 84 +++++++
src/include/nodes/execnodes.h | 6 +
src/include/nodes/plannodes.h | 2 +
src/include/optimizer/optimizer.h | 3 +
src/include/utils/rel.h | 10 +
.../regress/expected/append_optimized.out | 161 ++++++++++++++
src/test/regress/parallel_schedule | 2 +
src/test/regress/sql/append_optimized.sql | 105 +++++++++
19 files changed, 879 insertions(+), 15 deletions(-)
create mode 100644 src/test/regress/expected/append_optimized.out
create mode 100644 src/test/regress/sql/append_optimized.sql
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 46c1dce222d..9652cf4179b 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -166,6 +166,15 @@ static relopt_bool boolRelOpts[] =
},
true
},
+ {
+ {
+ "append_optimized",
+ "Enables using batching for insertion algorithm whenever it possible",
+ RELOPT_KIND_HEAP,
+ AccessExclusiveLock
+ },
+ false
+ },
/* list terminator */
{{NULL}}
};
@@ -1905,6 +1914,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
offsetof(StdRdOptions, vacuum_index_cleanup)},
{"vacuum_truncate", RELOPT_TYPE_BOOL,
offsetof(StdRdOptions, vacuum_truncate), offsetof(StdRdOptions, vacuum_truncate_set)},
+ {"append_optimized", RELOPT_TYPE_BOOL,
+ offsetof(StdRdOptions, append_optimized)},
{"vacuum_max_eager_freeze_failure_rate", RELOPT_TYPE_REAL,
offsetof(StdRdOptions, vacuum_max_eager_freeze_failure_rate)}
};
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index ed2e3021799..415eef4c35d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -51,6 +51,7 @@
#include "utils/datum.h"
#include "utils/injection_point.h"
#include "utils/inval.h"
+#include "utils/memutils.h"
#include "utils/spccache.h"
#include "utils/syscache.h"
@@ -106,6 +107,7 @@ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
bool *copy);
+static void heap_modify_insert_end(TableModifyState *state);
/*
* Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2674,6 +2676,209 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
pgstat_count_heap_insert(relation, ntuples);
}
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ TableModifyState *state;
+ MemoryContext context;
+ MemoryContext oldcontext;
+
+ Assert(RelationIsAppendOptimized(rel));
+ context = AllocSetContextCreate(TopTransactionContext,
+ "heap_modify memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(context);
+ state = palloc(sizeof(TableModifyState));
+ state->rel = rel;
+ state->cid = cid;
+ state->options = options;
+ state->mem_ctx = context;
+ state->buffer_flush_cb = buffer_flush_cb;
+ state->buffer_flush_ctx = buffer_flush_ctx;
+ state->data = NULL; /* To be set lazily */
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot)
+{
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ Assert(RelationIsAppendOptimized(state->rel));
+ oldcontext = MemoryContextSwitchTo(state->mem_ctx);
+
+ /* First time through, initialize heap insert state */
+ if (state->data == NULL)
+ {
+ istate = (HeapInsertState *) palloc(sizeof(HeapInsertState));
+ istate->bistate = NULL;
+ istate->mistate = NULL;
+ state->data = istate;
+ mistate =
+ (HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState));
+ mistate->slots =
+ (TupleTableSlot **) palloc0(sizeof(void *) * HEAP_MAX_BUFFERED_SLOTS);
+ mistate->tstore = tuplestore_begin_heap(false, false, work_mem);
+ mistate->nused = 0;
+ istate->mistate = mistate;
+
+ /*
+ * heap_multi_insert() can leak memory. So switch to this memory
+ * context before every heap_multi_insert() call and reset when
+ * finished.
+ */
+ mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "heap_multi_insert memory context",
+ ALLOCSET_DEFAULT_SIZES);
+ istate->bistate = GetBulkInsertState();
+ }
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+
+ tuplestore_puttupleslot(mistate->tstore, slot);
+ mistate->nused += 1;
+
+ if (mistate->nused >= HEAP_MAX_BUFFERED_SLOTS)
+ heap_modify_buffer_flush(state);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+ TupleDesc tupdesc;
+
+ Assert(RelationIsAppendOptimized(state->rel));
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ tupdesc = RelationGetDescr(state->rel);
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+
+ /* Quick exit if we have flushed already */
+ if (mistate->nused == 0)
+ return;
+
+ for (int i = 0; i < mistate->nused; i++)
+ {
+ bool ok;
+
+ if (istate->mistate->slots[i] == NULL)
+ {
+ istate->mistate->slots[i] =
+ MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple);
+ }
+ ok = tuplestore_gettupleslot(mistate->tstore, true, false,
+ istate->mistate->slots[i]);
+ Assert(ok);
+ }
+
+ /*
+ * heap_multi_insert() can leak memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(mistate->mem_ctx);
+ heap_multi_insert(state->rel,
+ mistate->slots,
+ mistate->nused,
+ state->cid,
+ state->options,
+ istate->bistate);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(mistate->mem_ctx);
+
+ /*
+ * Invoke caller-supplied buffer flush callback after inserting rows from
+ * the buffers to heap.
+ */
+ if (state->buffer_flush_cb != NULL)
+ {
+ for (int i = 0; i < mistate->nused; i++)
+ {
+ state->buffer_flush_cb(state->buffer_flush_ctx,
+ mistate->slots[i]);
+ }
+ }
+
+ tuplestore_clear(mistate->tstore);
+ mistate->nused = 0;
+}
+
+/*
+ * Heap insert specific function used for performing work at the end like
+ * flushing remaining buffered tuples, cleaning up the insert state and tuple
+ * table slots used for buffered tuples etc.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+ HeapInsertState *istate;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+
+ if (istate->mistate != NULL)
+ {
+ HeapMultiInsertState *mistate = istate->mistate;
+
+ heap_modify_buffer_flush(state);
+
+ Assert(mistate->nused == 0);
+
+ for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+ tuplestore_end(mistate->tstore);
+ MemoryContextDelete(mistate->mem_ctx);
+ }
+
+ if (istate->bistate != NULL)
+ FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+ heap_modify_insert_end(state);
+ MemoryContextDelete(state->mem_ctx);
+}
+
/*
* simple_heap_insert - insert a tuple
*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index ac082fefa77..56880165ed0 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2643,6 +2643,11 @@ static const TableAmRoutine heapam_methods = {
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
+ .tuple_modify_begin = heap_modify_begin,
+ .tuple_modify_buffer_insert = heap_modify_buffer_insert,
+ .tuple_modify_buffer_flush = heap_modify_buffer_flush,
+ .tuple_modify_end = heap_modify_end,
+
.tuple_fetch_row_version = heapam_fetch_row_version,
.tuple_get_latest_tid = heap_get_latest_tid,
.tuple_tid_valid = heapam_tuple_tid_valid,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 476663b66aa..ae30c5a21a8 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -94,6 +94,11 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->scan_sample_next_block != NULL);
Assert(routine->scan_sample_next_tuple != NULL);
+ Assert(routine->tuple_modify_begin != NULL);
+ Assert(routine->tuple_modify_buffer_insert != NULL);
+ Assert(routine->tuple_modify_buffer_flush != NULL);
+ Assert(routine->tuple_modify_end != NULL);
+
return routine;
}
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index ef8aa489af8..31ce1fa7acb 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1399,7 +1399,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
switch (((ModifyTable *) plan)->operation)
{
case CMD_INSERT:
- pname = operation = "Insert";
+ if (((ModifyTable *) plan)->canUseBatching)
+ pname = operation = "MultiInsert";
+ else
+ pname = operation = "Insert";
break;
case CMD_UPDATE:
pname = operation = "Update";
diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c
index f1569879b52..f2d3a236fbc 100644
--- a/src/backend/executor/execExpr.c
+++ b/src/backend/executor/execExpr.c
@@ -103,7 +103,11 @@ static void ExecInitJsonCoercion(ExprState *state, JsonReturning *returning,
ErrorSaveContext *escontext, bool omit_quotes,
bool exists_coerce,
Datum *resv, bool *resnull);
-
+/*
+ * Every time when we find volatile function during expresstion evaluating, we
+ * must set this flag, so higher level code can process it appropriately.
+ */
+static bool volatile_func_flag = false;
/*
* ExecInitExpr: prepare an expression tree for execution
@@ -264,6 +268,9 @@ ExecInitQual(List *qual, PlanState *parent)
scratch.resvalue = &state->resvalue;
scratch.resnull = &state->resnull;
+ /* Reset flag indicating the presence of volatile functions in qual */
+ volatile_func_flag = false;
+
foreach_ptr(Expr, node, qual)
{
/* first evaluate expression */
@@ -276,6 +283,10 @@ ExecInitQual(List *qual, PlanState *parent)
state->steps_len - 1);
}
+ /* Possibly update information about batch-insert-capability */
+ if (parent && !parent->has_volatile)
+ parent->has_volatile = volatile_func_flag;
+
/* adjust jump targets */
foreach_int(jump, adjust_jumps)
{
@@ -1193,6 +1204,10 @@ ExecInitExprRec(Expr *node, ExprState *state,
{
FuncExpr *func = (FuncExpr *) node;
+ /* Higher level code will handle it */
+ if (func_volatile(func->funcid))
+ volatile_func_flag = true;
+
ExecInitFunc(&scratch, node,
func->args, func->funcid, func->inputcollid,
state);
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index f5f9cfbeead..2383ef7ea4b 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -416,6 +416,15 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
result->instrument = InstrAlloc(1, estate->es_instrument,
result->async_capable);
+ /* Check whether some nodes below has volatile functions */
+ if ((outerPlanState(result) != NULL &&
+ outerPlanState(result)->has_volatile) ||
+ (innerPlanState(result) != NULL &&
+ innerPlanState(result)->has_volatile))
+ {
+ result->has_volatile = true;
+ }
+
return result;
}
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 309e27f8b5f..bbaf91bcbac 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -55,6 +55,7 @@
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
+#include "catalog/pg_proc.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
@@ -67,6 +68,8 @@
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/datum.h"
+#include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
@@ -130,6 +133,18 @@ typedef struct UpdateContext
LockTupleMode lockmode;
} UpdateContext;
+typedef struct InsertModifyBufferFlushContext
+{
+ ResultRelInfo *resultRelInfo;
+ EState *estate;
+ ModifyTableState *mtstate;
+} InsertModifyBufferFlushContext;
+
+static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL;
+static TableModifyState *table_modify_state = NULL;
+
+static void InsertModifyBufferFlushCallback(void *context,
+ TupleTableSlot *slot);
static void ExecBatchInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
@@ -174,6 +189,8 @@ static TupleTableSlot *ExecMergeNotMatched(ModifyTableContext *context,
ResultRelInfo *resultRelInfo,
bool canSetTag);
+static bool ContainVolatileFunctionsChecker(Oid func_id, void *context);
+static bool IsMultiInsertCapable(ModifyTableState *mtstate);
/*
* Verify that the tuples to be produced by INSERT match the
@@ -806,6 +823,31 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
return ExecProject(newProj);
}
+static void
+InsertModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+ InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context;
+ ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+ EState *estate = ctx->estate;
+
+ /* Caller must take care of opening and closing the indices */
+
+ /*
+ * If there are any indexes, update them for all the inserted tuples, and
+ * run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ recheckIndexes =
+ ExecInsertIndexTuples(resultRelInfo,
+ slot, estate, false,
+ false, NULL, NIL, false);
+ list_free(recheckIndexes);
+ }
+}
+
/* ----------------------------------------------------------------
* ExecInsert
*
@@ -1209,17 +1251,22 @@ ExecInsert(ModifyTableContext *context,
}
else
{
- /* insert the tuple normally */
- table_tuple_insert(resultRelationDesc, slot,
- estate->es_output_cid,
- 0, NULL);
-
- /* insert index entries for tuple */
- if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
- slot, estate, false,
- false, NULL, NIL,
- false);
+ if (table_modify_state != NULL)
+ table_modify_buffer_insert(table_modify_state, slot);
+ else
+ {
+ /* insert the tuple normally */
+ table_tuple_insert(resultRelationDesc, slot,
+ estate->es_output_cid,
+ 0, NULL);
+
+ /* insert index entries for tuple */
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+ slot, estate, false,
+ false, NULL, NIL,
+ false);
+ }
}
}
@@ -4586,6 +4633,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
mtstate->mt_mergeActionLists = mergeActionLists;
mtstate->mt_mergeJoinConditions = mergeJoinConditions;
+ /*
+ * Previous ModifyTable node execution (if any) should have released
+ * these resources.
+ */
+ Assert(insert_modify_buffer_flush_context == NULL &&
+ table_modify_state == NULL);
+
/*----------
* Resolve the target relation. This is the same as:
*
@@ -4999,6 +5053,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
*
* We only do this for INSERT, so that for UPDATE/DELETE the batch size
* remains set to 0.
+ *
+ * Also determine whether we can use batching for this INSERT command.
*/
if (operation == CMD_INSERT)
{
@@ -5016,6 +5072,27 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
else
resultRelInfo->ri_BatchSize = 1;
+
+ if (IsMultiInsertCapable(mtstate))
+ {
+ insert_modify_buffer_flush_context =
+ (InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext));
+ insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo;
+ insert_modify_buffer_flush_context->estate = estate;
+ insert_modify_buffer_flush_context->mtstate = mtstate;
+
+ Assert(estate->es_output_cid != InvalidCommandId);
+
+ table_modify_state =
+ table_modify_begin(resultRelInfo->ri_RelationDesc,
+ estate->es_output_cid,
+ 0,
+ InsertModifyBufferFlushCallback,
+ insert_modify_buffer_flush_context);
+
+ /* For more accurate EXPLAIN output */
+ node->canUseBatching = true;
+ }
}
/*
@@ -5034,6 +5111,90 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
return mtstate;
}
+/*
+ * Returns true if batch insert can be performed in table whithin current query.
+ * We impose the following rules:
+ * 1) Batching is supported only for ordinary tables without ROW triggers
+ * and with append_optimized option set.
+ * 2) Batching is not supported for queries, containing RETURNING clause.
+ * 3) Batching is not supported for queries, containing any volatile
+ * functions in plan tree.
+ * 4) Batching is supported only for tables, that hasn't volatile default
+ * expressions.
+ */
+static bool
+IsMultiInsertCapable(ModifyTableState *mtstate)
+{
+ ResultRelInfo *relinfo = mtstate->resultRelInfo;
+ TupleDesc tdesc = RelationGetDescr(relinfo->ri_RelationDesc);
+ bool has_row_triggers;
+
+ Assert(mtstate->operation == CMD_INSERT);
+
+ has_row_triggers =
+ (relinfo->ri_TrigDesc != NULL &&
+ (relinfo->ri_TrigDesc->trig_insert_after_row ||
+ relinfo->ri_TrigDesc->trig_insert_before_row ||
+ relinfo->ri_TrigDesc->trig_insert_instead_row));
+
+ /* Check (1) - (3) conditions. */
+ if (!RelationIsAppendOptimized(relinfo->ri_RelationDesc) ||
+ relinfo->ri_projectReturning ||
+ has_row_triggers)
+ {
+ return false;
+ }
+
+ /* Check last condition. */
+
+ /*
+ * By default, this variable is calculated in the end of ExecInitNode
+ * processing, but we need it now.
+ */
+ if ((outerPlanState(mtstate) != NULL &&
+ outerPlanState(mtstate)->has_volatile) ||
+ (innerPlanState(mtstate) != NULL &&
+ innerPlanState(mtstate)->has_volatile))
+ {
+ mtstate->ps.has_volatile = true;
+ return false;
+ }
+
+ for (AttrNumber i = 0; i < tdesc->natts; i++)
+ {
+ Node *defexpr;
+ if (!TupleDescAttr(tdesc, i)->atthasdef)
+ continue;
+
+ defexpr = TupleDescGetDefault(tdesc, i + 1);
+ if (contain_volatile_functions_extended(defexpr,
+ ContainVolatileFunctionsChecker))
+ {
+ return false;
+ }
+ }
+
+ /* All conditions are met - we can perform batch insert on table. */
+ return true;
+}
+
+/*
+ * Supportive function for IsMultiInsertCapable.
+ *
+ * To decide whether we can use batching, we should iterate across all default
+ * expressions in target table and check if they contain any volatile functions.
+ *
+ * But not all functions are considered dangerous in terms of batching. We can
+ * allow some volatile functions to appear in default expressions. For now, we
+ * only allow to use nextval (in order not to dismiss batching if target table
+ * has SERIAL filed).
+ */
+static bool ContainVolatileFunctionsChecker(Oid func_id, void *context)
+{
+ return (func_volatile(func_id) == PROVOLATILE_VOLATILE &&
+ func_id != F_NEXTVAL);
+}
+
/* ----------------------------------------------------------------
* ExecEndModifyTable
*
@@ -5047,6 +5208,17 @@ ExecEndModifyTable(ModifyTableState *node)
{
int i;
+ if (table_modify_state != NULL)
+ {
+ Assert(node->operation == CMD_INSERT);
+
+ table_modify_end(table_modify_state);
+ table_modify_state = NULL;
+
+ pfree(insert_modify_buffer_flush_context);
+ insert_modify_buffer_flush_context = NULL;
+ }
+
/*
* Allow any FDWs to shut down
*/
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index a8f22a8c154..7bf13de1e93 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -7133,6 +7133,7 @@ make_modifytable(PlannerInfo *root, Plan *subplan,
node->operation = operation;
node->canSetTag = canSetTag;
+ node->canUseBatching = false;
node->nominalRelation = nominalRelation;
node->rootRelation = rootRelation;
node->partColsUpdated = partColsUpdated;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 26a3e050086..91ee85e9157 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -157,6 +157,14 @@ static Node *substitute_actual_srf_parameters_mutator(Node *node,
substitute_actual_srf_parameters_context *context);
static bool pull_paramids_walker(Node *node, Bitmapset **context);
+/*
+ * Allow user to supply specific checker for "contain_volatile_functions" call.
+ * In general it is not used, but for example append-optimized tables needs to
+ * ignore some types of volatile functions during default expressions check.
+ */
+
+static bool contain_volatile_functions_checker(Oid func_id, void *context);
+static check_function_callback checker = contain_volatile_functions_checker;
/*****************************************************************************
* Aggregate-function clause manipulation
@@ -541,6 +549,23 @@ contain_volatile_functions(Node *clause)
return contain_volatile_functions_walker(clause, NULL);
}
+/*
+ * Same as above, but allows to specify user-defined check_function_callback.
+ */
+bool
+contain_volatile_functions_extended(Node *clause,
+ check_function_callback ud_checker)
+{
+ bool res;
+ check_function_callback prev_checker = checker;
+
+ checker = ud_checker;
+ res = contain_volatile_functions_walker(clause, NULL);
+ checker = prev_checker;
+
+ return res;
+}
+
static bool
contain_volatile_functions_checker(Oid func_id, void *context)
{
@@ -553,8 +578,7 @@ contain_volatile_functions_walker(Node *node, void *context)
if (node == NULL)
return false;
/* Check for volatile functions in node itself */
- if (check_functions_in_node(node, contain_volatile_functions_checker,
- context))
+ if (check_functions_in_node(node, checker, context))
return true;
if (IsA(node, NextValueExpr))
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index e48fe434cd3..96b9e925e66 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -30,6 +30,7 @@
#include "storage/shm_toc.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
+#include "utils/tuplestore.h"
/* "options" flag bits for heap_insert */
@@ -270,6 +271,35 @@ typedef enum
PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */
} PruneReason;
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS 1000
+
+typedef struct HeapMultiInsertState
+{
+ /* Array of buffered slots */
+ TupleTableSlot **slots;
+
+ /* Holds the tuple set */
+ Tuplestorestate *tstore;
+
+ /* Number of buffered tuples currently held */
+ int nused;
+
+ /* Memory context for dealing with multi inserts */
+ MemoryContext mem_ctx;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+ struct BulkInsertStateData *bistate;
+ HeapMultiInsertState *mistate;
+} HeapInsertState;
+
/* ----------------
* function prototypes for heap access method
*
@@ -320,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
int ntuples, CommandId cid, int options,
BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
extern TM_Result heap_delete(Relation relation, ItemPointer tid,
CommandId cid, Snapshot crosscheck, bool wait,
struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8713e12cbfb..3942463b715 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -248,12 +248,44 @@ typedef struct TM_IndexDeleteOp
TM_IndexStatus *status;
} TM_IndexDeleteOp;
+struct TableModifyState;
+
+/* Callback invoked upon flushing each buffered tuple */
+typedef void (*TableModifyBufferFlushCb) (void *context,
+ TupleTableSlot *slot);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+ /* These fields are used for inserts for now */
+
+ Relation rel; /* Relation to insert to */
+ CommandId cid; /* Command ID for insert */
+ int options; /* TABLE_INSERT options */
+
+ /* Memory context for dealing with modify state variables */
+ MemoryContext mem_ctx;
+
+ /* Flush callback and its context used for multi inserts */
+ TableModifyBufferFlushCb buffer_flush_cb;
+ void *buffer_flush_ctx;
+
+ /* Table AM specific data */
+ void *data;
+} TableModifyState;
+
/* "options" flag bits for table_tuple_insert */
/* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
#define TABLE_INSERT_SKIP_FSM 0x0002
#define TABLE_INSERT_FROZEN 0x0004
#define TABLE_INSERT_NO_LOGICAL 0x0008
+/*
+ * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for
+ * HEAP_INSERT_SPECULATIVE.
+ */
+#define TABLE_INSERT_BAS_BULKWRITE 0x0020
+
/* flag bits for table_tuple_lock */
/* Follow tuples whose update is in progress if lock modes don't conflict */
#define TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS (1 << 0)
@@ -571,6 +603,21 @@ typedef struct TableAmRoutine
void (*finish_bulk_insert) (Relation rel, int options);
+ /* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+ TableModifyState *(*tuple_modify_begin) (Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+ void (*tuple_modify_buffer_insert) (TableModifyState *state,
+ TupleTableSlot *slot);
+ void (*tuple_modify_buffer_flush) (TableModifyState *state);
+ void (*tuple_modify_end) (TableModifyState *state);
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
@@ -1560,6 +1607,43 @@ table_finish_bulk_insert(Relation rel, int options)
}
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+static inline TableModifyState *
+table_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ return rel->rd_tableam->tuple_modify_begin(rel,
+ cid,
+ options,
+ buffer_flush_cb,
+ buffer_flush_ctx);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_end(state);
+}
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 5b6cadb5a6c..cbd798187eb 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1200,6 +1200,12 @@ typedef struct PlanState
bool async_capable; /* true if node is async-capable */
+ /*
+ * Qual of current node or any qual of nodes lower down the plan tree has
+ * at least one volatile function.
+ */
+ bool has_volatile;
+
/*
* Scanslot's descriptor if known. This is a bit of a hack, but otherwise
* it's hard for expression compilation to optimize based on the
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 658d76225e4..3a38040d991 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -292,6 +292,8 @@ typedef struct ModifyTable
CmdType operation;
/* do we set the command tag/es_processed? */
bool canSetTag;
+ /* do we use batching during INSERT? */
+ bool canUseBatching;
/* Parent RT index for use of EXPLAIN */
Index nominalRelation;
/* Root RT index, if partitioned/inherited */
diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h
index 546828b54bd..9bda34d21bc 100644
--- a/src/include/optimizer/optimizer.h
+++ b/src/include/optimizer/optimizer.h
@@ -22,6 +22,7 @@
#ifndef OPTIMIZER_H
#define OPTIMIZER_H
+#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
/*
@@ -142,6 +143,8 @@ extern Expr *canonicalize_qual(Expr *qual, bool is_check);
extern bool contain_mutable_functions(Node *clause);
extern bool contain_mutable_functions_after_planning(Expr *expr);
extern bool contain_volatile_functions(Node *clause);
+extern bool contain_volatile_functions_extended(Node *clause,
+ check_function_callback ud_checker);
extern bool contain_volatile_functions_after_planning(Expr *expr);
extern bool contain_volatile_functions_not_nextval(Node *clause);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index b552359915f..e548954d81d 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -348,6 +348,7 @@ typedef struct StdRdOptions
StdRdOptIndexCleanup vacuum_index_cleanup; /* controls index vacuuming */
bool vacuum_truncate; /* enables vacuum to truncate a relation */
bool vacuum_truncate_set; /* whether vacuum_truncate is set */
+ bool append_optimized; /* use optimized insertion algorithm */
/*
* Fraction of pages in a relation that vacuum can eagerly scan and fail
@@ -367,6 +368,15 @@ typedef struct StdRdOptions
((relation)->rd_options ? \
((StdRdOptions *) (relation)->rd_options)->toast_tuple_target : (defaulttarg))
+/*
+ * RelationIsAppendOptimized
+ * Check whether relation can use batching for insertion
+ */
+ #define RelationIsAppendOptimized(relation) \
+ (AssertMacro(RelationIsValid(relation)), \
+ (relation)->rd_options ? \
+ ((StdRdOptions *) (relation)->rd_options)->append_optimized : false)
+
/*
* RelationGetFillFactor
* Returns the relation's fillfactor. Note multiple eval of argument!
diff --git a/src/test/regress/expected/append_optimized.out b/src/test/regress/expected/append_optimized.out
new file mode 100644
index 00000000000..57b45a20e61
--- /dev/null
+++ b/src/test/regress/expected/append_optimized.out
@@ -0,0 +1,161 @@
+-- Not all INSERT queries are suitable for using batching. All conditions are
+-- listed in nodeModifyTable.c
+-- In this test we want to check whether append_optimized table correcly
+-- determines when to use batching.
+CREATE TABLE optimized_tbl (
+ int_data INT DEFAULT random()
+) WITH (append_optimized=true);
+CREATE TABLE rows_source (int_data INT);
+INSERT INTO rows_source SELECT generate_series(1, 10);
+-- Must not use batching here, because optimized_tbl has volatile function
+-- whithin default expression.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+ QUERY PLAN
+---------------------------------------------------------------------
+ Insert on optimized_tbl (cost=0.00..35.50 rows=0 width=0)
+ -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4)
+(2 rows)
+
+-- Now default expression not prevent us from using batching.
+ALTER TABLE optimized_tbl ALTER COLUMN int_data SET DEFAULT 0;
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+ QUERY PLAN
+---------------------------------------------------------------------
+ MultiInsert on optimized_tbl (cost=0.00..35.50 rows=0 width=0)
+ -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4)
+(2 rows)
+
+-- Must not use batching here, because WHERE clause contains volatile function.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source
+ WHERE int_data > random();
+ QUERY PLAN
+--------------------------------------------------------------------
+ Insert on optimized_tbl (cost=0.00..54.63 rows=0 width=0)
+ -> Seq Scan on rows_source (cost=0.00..54.63 rows=850 width=4)
+ Filter: ((int_data)::double precision > random())
+(3 rows)
+
+-- Now WHERE clause not prevent us from using batching.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source
+ WHERE int_data > 2;
+ QUERY PLAN
+--------------------------------------------------------------------
+ MultiInsert on optimized_tbl (cost=0.00..41.88 rows=0 width=0)
+ -> Seq Scan on rows_source (cost=0.00..41.88 rows=850 width=4)
+ Filter: (int_data > 2)
+(3 rows)
+
+-- Create ROW trigger on optimized_tbl.
+CREATE OR REPLACE FUNCTION my_trigger_function()
+RETURNS TRIGGER AS $$
+BEGIN
+ NEW.int_data := NEW.int_data * 10;
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+CREATE TRIGGER my_row_trigger
+BEFORE INSERT ON optimized_tbl
+FOR EACH ROW
+EXECUTE FUNCTION my_trigger_function();
+-- Must not use batching here, because optimized_tbl has ROW trigger.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+ QUERY PLAN
+---------------------------------------------------------------------
+ Insert on optimized_tbl (cost=0.00..35.50 rows=0 width=0)
+ -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4)
+(2 rows)
+
+DROP TRIGGER my_row_trigger ON optimized_tbl;
+DROP FUNCTION my_trigger_function();
+-- Must not use batching here, because RETURNING clause is specified.
+EXPLAIN INSERT INTO optimized_tbl VALUES (100) RETURNING int_data;
+ QUERY PLAN
+-----------------------------------------------------------
+ Insert on optimized_tbl (cost=0.00..0.01 rows=1 width=4)
+ -> Result (cost=0.00..0.01 rows=1 width=4)
+(2 rows)
+
+-- Now RETURNING not prevent us from using batching.
+EXPLAIN INSERT INTO optimized_tbl VALUES (100);
+ QUERY PLAN
+----------------------------------------------------------------
+ MultiInsert on optimized_tbl (cost=0.00..0.01 rows=0 width=0)
+ -> Result (cost=0.00..0.01 rows=1 width=4)
+(2 rows)
+
+TRUNCATE optimized_tbl;
+CREATE INDEX idx_test_int_data ON optimized_tbl (int_data);
+-- Fill source table with more data, so there will be several buffers flushs
+-- during INSERT opration.
+INSERT INTO rows_source SELECT generate_series(11, 10000);
+-- It is OK to use batching.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+ QUERY PLAN
+-----------------------------------------------------------------------
+ MultiInsert on optimized_tbl (cost=0.00..159.75 rows=0 width=0)
+ -> Seq Scan on rows_source (cost=0.00..159.75 rows=11475 width=4)
+(2 rows)
+
+INSERT INTO optimized_tbl
+SELECT int_data FROM rows_source;
+-- Check whether both index and table contains all inserted rows.
+SELECT COUNT(*) FROM optimized_tbl;
+ count
+-------
+ 10000
+(1 row)
+
+ANALYZE optimized_tbl;
+SELECT c.relname, c.reltuples
+FROM pg_class c
+JOIN pg_index i ON c.oid = i.indexrelid
+WHERE i.indrelid = 'optimized_tbl'::regclass;
+ relname | reltuples
+-------------------+-----------
+ idx_test_int_data | 10000
+(1 row)
+
+-- We allow to use SERIAL field in append_optimized table. Check whether such
+-- fields behave correctly.
+CREATE TABLE test_serial(
+ id SERIAL,
+ int_data INT
+) WITH (append_optimized=true);
+CREATE TABLE small_source(int_data INT);
+INSERT INTO small_source SELECT generate_series(1, 10);
+EXPLAIN INSERT INTO test_serial(int_data)
+ SELECT int_data FROM small_source;
+ QUERY PLAN
+----------------------------------------------------------------------
+ MultiInsert on test_serial (cost=0.00..48.25 rows=0 width=0)
+ -> Seq Scan on small_source (cost=0.00..48.25 rows=2550 width=8)
+(2 rows)
+
+INSERT INTO test_serial(int_data)
+SELECT int_data FROM small_source;
+SELECT * FROM test_serial;
+ id | int_data
+----+----------
+ 1 | 1
+ 2 | 2
+ 3 | 3
+ 4 | 4
+ 5 | 5
+ 6 | 6
+ 7 | 7
+ 8 | 8
+ 9 | 9
+ 10 | 10
+(10 rows)
+
+-- Cleanup
+DROP TABLE optimized_tbl;
+DROP TABLE rows_source;
+DROP TABLE test_serial;
+DROP TABLE small_source;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 0a35f2f8f6a..0cda71a358d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -136,3 +136,5 @@ test: fast_default
# run tablespace test at the end because it drops the tablespace created during
# setup that other tests may use.
test: tablespace
+
+test: append_optimized
diff --git a/src/test/regress/sql/append_optimized.sql b/src/test/regress/sql/append_optimized.sql
new file mode 100644
index 00000000000..ce3ffab2d52
--- /dev/null
+++ b/src/test/regress/sql/append_optimized.sql
@@ -0,0 +1,105 @@
+-- Not all INSERT queries are suitable for using batching. All conditions are
+-- listed in nodeModifyTable.c
+-- In this test we want to check whether append_optimized table correcly
+-- determines when to use batching.
+
+CREATE TABLE optimized_tbl (
+ int_data INT DEFAULT random()
+) WITH (append_optimized=true);
+
+CREATE TABLE rows_source (int_data INT);
+INSERT INTO rows_source SELECT generate_series(1, 10);
+
+-- Must not use batching here, because optimized_tbl has volatile function
+-- whithin default expression.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+
+-- Now default expression not prevent us from using batching.
+ALTER TABLE optimized_tbl ALTER COLUMN int_data SET DEFAULT 0;
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+
+-- Must not use batching here, because WHERE clause contains volatile function.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source
+ WHERE int_data > random();
+
+-- Now WHERE clause not prevent us from using batching.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source
+ WHERE int_data > 2;
+
+-- Create ROW trigger on optimized_tbl.
+CREATE OR REPLACE FUNCTION my_trigger_function()
+RETURNS TRIGGER AS $$
+BEGIN
+ NEW.int_data := NEW.int_data * 10;
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER my_row_trigger
+BEFORE INSERT ON optimized_tbl
+FOR EACH ROW
+EXECUTE FUNCTION my_trigger_function();
+
+-- Must not use batching here, because optimized_tbl has ROW trigger.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+
+DROP TRIGGER my_row_trigger ON optimized_tbl;
+DROP FUNCTION my_trigger_function();
+
+-- Must not use batching here, because RETURNING clause is specified.
+EXPLAIN INSERT INTO optimized_tbl VALUES (100) RETURNING int_data;
+
+-- Now RETURNING not prevent us from using batching.
+EXPLAIN INSERT INTO optimized_tbl VALUES (100);
+
+TRUNCATE optimized_tbl;
+CREATE INDEX idx_test_int_data ON optimized_tbl (int_data);
+
+-- Fill source table with more data, so there will be several buffers flushs
+-- during INSERT opration.
+INSERT INTO rows_source SELECT generate_series(11, 10000);
+
+-- It is OK to use batching.
+EXPLAIN INSERT INTO optimized_tbl
+ SELECT int_data FROM rows_source;
+
+INSERT INTO optimized_tbl
+SELECT int_data FROM rows_source;
+
+-- Check whether both index and table contains all inserted rows.
+SELECT COUNT(*) FROM optimized_tbl;
+ANALYZE optimized_tbl;
+
+SELECT c.relname, c.reltuples
+FROM pg_class c
+JOIN pg_index i ON c.oid = i.indexrelid
+WHERE i.indrelid = 'optimized_tbl'::regclass;
+
+-- We allow to use SERIAL field in append_optimized table. Check whether such
+-- fields behave correctly.
+CREATE TABLE test_serial(
+ id SERIAL,
+ int_data INT
+) WITH (append_optimized=true);
+
+CREATE TABLE small_source(int_data INT);
+INSERT INTO small_source SELECT generate_series(1, 10);
+
+EXPLAIN INSERT INTO test_serial(int_data)
+ SELECT int_data FROM small_source;
+
+INSERT INTO test_serial(int_data)
+SELECT int_data FROM small_source;
+
+SELECT * FROM test_serial;
+
+-- Cleanup
+DROP TABLE optimized_tbl;
+DROP TABLE rows_source;
+DROP TABLE test_serial;
+DROP TABLE small_source;
--
2.43.0
^ permalink raw reply [nested|flat] 8+ messages in thread
* Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
@ 2026-04-28 04:27 Haibo Yan <[email protected]>
parent: Daniil Davydov <[email protected]>
0 siblings, 0 replies; 8+ messages in thread
From: Haibo Yan @ 2026-04-28 04:27 UTC (permalink / raw)
To: Daniil Davydov <[email protected]>; +Cc: Jingtang Zhang <[email protected]>; [email protected]; pgsql-hackers; Bharath Rupireddy <[email protected]>
On Sun, Apr 26, 2026 at 2:56 PM Daniil Davydov <[email protected]> wrote:
> Hi,
>
> On Sun, Apr 6, 2025 at 8:55 PM Jingtang Zhang <[email protected]>
> wrote:
> >
> > It was quite a while since I last looked at the patch. I've tested it
> again,
> > and still get regression on patched version where a table has many
> columns.
> > And it is totally CPU-bounded on tts_virtual_copyslot.
> >
> > Unpatched version:
> > 1 col:
> > Time: 8909.714 ms (00:08.910)
> > Time: 8803.579 ms (00:08.804)
> > Time: 8600.415 ms (00:08.600)
> > 32 cols:
> > Time: 12911.699 ms (00:12.912)
> > Time: 13543.491 ms (00:13.543)
> > Time: 13325.368 ms (00:13.325)
> >
> > Patched version:
> > 1 col:
> > Time: 3532.841 ms (00:03.533)
> > Time: 3598.223 ms (00:03.598)
> > Time: 3515.858 ms (00:03.516)
> > 32 cols:
> > Time: 35647.724 ms (00:35.648)
> > Time: 35596.233 ms (00:35.596)
> > Time: 35669.106 ms (00:35.669)
> >
>
> Hm, maybe I didn't choose the best way to measure performance. Can you
> please share how you do it?
>
> > I've tested your patch with tuplestore and found the regression does not
> exist
> > anymore, but I haven't look deep inside it.
> >
> > Patched version (with tuplestore):
> > 1 col:
> > Time: 3500.502 ms (00:03.501)
> > Time: 3486.886 ms (00:03.487)
> > Time: 3514.233 ms (00:03.514)
> > 32 cols:
> > Time: 10375.391 ms (00:10.375)
> > Time: 10248.256 ms (00:10.248)
> > Time: 10248.289 ms (00:10.248)
> >
> > It seems to be a good idea if there is no other issue with your patch.
>
> As far as I understand, the use of multi inserts for queries like
> "INSERT INTO ... SELECT FROM" is not discussed here anymore due to the
> fact that in such cases we will have to take into account the volatile
> functions and ROW triggers.
> I've been thinking about this for a while and made a patch as an
> experiment. The principles that the patch works on are listed below.
> 1)
> Since performance decreases for single INSERTs (within a multi inserts
> mechanism), I designed this feature as an option for the table. Thus,
> if the user knows that he will perform a lot of inserts on the table,
> he can specify "WITH (append_optimized=true)".
> 2)
> The availability of volatile functions is monitored during the
> construction of a subtree for a ModifyTable node. I'm not that
> familiar with the query plan construction mechanism, but it seems to
> me that this way we can track any occurrence of volatile functions.
> Of course, most volatile functions don't stop us from using multi
> inserts, but checking each such function would take a very long time,
> so the very fact of having a volatile function is enough for us to
> abandon multi-inserts.
> 3)
> Default expressions of the target table are also checked for volatile
> functions. The same rules apply to them as in (2). As an exception, I
> allowed the use of SERIAL in the column data type, since this is a
> fairly common use case.
> 4)
> If the target table contains any ROW triggers, we don't use multi insert.
> 5)
> Patch also contains a regression test. This is a "sandbox" where you
> can do some experiments with append-optimized tables.
>
> I hope that patch (targeted on 'master' branch,
> 2c7bd2ba507e273f2d7fe1b2f6d30775ed4f3c09) will be useful for this
> thread.
>
> --
> Best regards,
> Daniil Davydov
>
Hi all,
I picked this work up again and implemented the full 5-patch series.
The series is structured as follows:
- 0001 adds the buffered-insert lifecycle API in tableam/heapam and
provides the heap implementation.
- 0002 adopts the API for CTAS.
- 0003 adopts the API for CREATE MATERIALIZED VIEW and REFRESH
MATERIALIZED VIEW.
- 0004 adopts the API for COPY FROM.
- 0005 adopts the API for a restricted first step of INSERT INTO … SELECT
.
I also reran performance testing locally on my machine:
- Hardware: MacBook Pro M4, 36GB RAM
- shared_buffers: 128MB
I compared the unpatched baseline against the current patched series for
CTAS, CMV, RMV, and INSERT INTO ... SELECT.
Table 1 — Median (ms)
Workload
10K Before
10K After
10K Improv
100K Before
100K After
100K Improv
1M Before
1M After
1M Improv
CTAS
1.60
1.17
+26.9%
9.19
5.70
+38.0%
105.61
73.28
+30.6%
CMV
2.11
1.77
+16.1%
10.28
6.20
+39.7%
110.10
79.64
+27.7%
RMV
1.62
1.19
+26.5%
9.91
5.43
+45.2%
106.04
69.57
+34.4%
INSERT … SELECT
1.72
0.89
+48.3%
15.39
7.46
+51.5%
228.24
84.66
+62.9%
Table 2 — Average (ms)
Workload
10K Before
10K After
10K Improv
100K Before
100K After
100K Improv
1M Before
1M After
1M Improv
CTAS
1.65
1.26
+24.1%
9.37
5.82
+37.9%
104.81
74.31
+29.1%
CMV
2.34
1.82
+22.2%
10.32
6.25
+39.4%
110.32
80.50
+27.0%
RMV
1.92
1.21
+36.9%
9.86
5.49
+44.3%
106.79
69.53
+34.9%
INSERT … SELECT
1.69
0.90
+46.8%
15.45
7.39
+52.2%
210.62
85.19
+59.6%
These numbers look ok to me to continue the discussion with the current
design and implementation.
One point worth calling out explicitly: the INSERT INTO ... SELECT support
in patch 5 is intentionally limited, as described above. It is not intended
to claim broad executor coverage yet.
One further improvement still seems possible: making the heap
implementation cache raw HeapTuple bytes directly instead of maintaining
buffered slot arrays. I looked at that direction, but did not include it in
this series because it felt like a larger scope change than what I wanted
for v1.
At this point, I think the current series is in reasonable shape and I’d
really appreciate review on both the API shape and the caller adoptions.
Thanks,
Haibo
Attachments:
[application/octet-stream] v1-0001-tableam-heapam-add-buffered-insert-lifecycle-API-.patch (41.2K, 3-v1-0001-tableam-heapam-add-buffered-insert-lifecycle-API-.patch)
download | inline diff:
From ee069b0d3c9b15ad8d4b2ab0bbd73f1392ed02c3 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 11:14:36 -0700
Subject: [PATCH v1 1/5] tableam/heapam: add buffered-insert lifecycle API for
heap
Add an optional Table AM buffered-insert lifecycle API with
begin/put/flush/end callbacks and tableam wrappers.
Implement the API for heap only, including internal buffering,
heap_multi_insert()-based flushes, optional per-tuple flush callbacks,
and explicit end-of-session cleanup.
Add a small Table AM-level test module for lifecycle validation.
No caller adoption is included in this patch.
---
src/backend/access/heap/heapam.c | 321 +++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 7 +-
src/backend/access/table/tableamapi.c | 12 +
src/include/access/heapam.h | 12 +
src/include/access/tableam.h | 159 +++++++++
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/test_buffered_insert/Makefile | 23 ++
.../expected/test_buffered_insert.out | 108 ++++++
.../modules/test_buffered_insert/meson.build | 33 ++
.../sql/test_buffered_insert.sql | 41 +++
.../test_buffered_insert--1.0.sql | 30 ++
.../test_buffered_insert.c | 233 +++++++++++++
.../test_buffered_insert.control | 4 +
src/tools/pgindent/typedefs.list | 4 +
15 files changed, 970 insertions(+), 19 deletions(-)
create mode 100644 src/test/modules/test_buffered_insert/Makefile
create mode 100644 src/test/modules/test_buffered_insert/expected/test_buffered_insert.out
create mode 100644 src/test/modules/test_buffered_insert/meson.build
create mode 100644 src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql
create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql
create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert.c
create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert.control
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index abfd8e8970a..ae994dd202d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -53,6 +53,7 @@
#include "utils/datum.h"
#include "utils/injection_point.h"
#include "utils/inval.h"
+#include "utils/memutils.h"
#include "utils/spccache.h"
#include "utils/syscache.h"
@@ -1982,6 +1983,263 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
}
+/* ------------------------------------------------------------------------
+ * Heap buffered-insert lifecycle implementation.
+ * ------------------------------------------------------------------------
+ */
+
+/*
+ * Private state for the heap buffered-insert session.
+ *
+ * Memory-context hierarchy (all destroyed by MemoryContextDelete(state_ctx)):
+ *
+ * state_ctx ("HeapBufferedInsert") -- long-lived, owns stable state
+ * ├── batch_ctx ("HeapBufInsBatch") -- per-batch HeapTuples
+ * └── mi_ctx ("HeapBufInsFlush") -- per-flush transient allocations
+ * └── heap_multi_insert scratch + toasted tuple copies
+ *
+ * state_ctx owns: this struct, the HeapTuple pointer array, bistate,
+ * scratch_slot (callback-only).
+ * batch_ctx owns: HeapTuples produced by ExecCopySlotHeapTuple() during
+ * put(). It is reset (not deleted) after each flush, which bulk-frees all
+ * per-batch allocations.
+ * mi_ctx owns: heap_multi_insert's transient allocations (including any
+ * toasted-tuple copies produced by heap_prepare_insert). It is reset after
+ * each flush once the flush callback (if any) has finished reading the
+ * post-prepare tuples.
+ */
+typedef struct HeapBufferedInsertState
+{
+ TableBufferedInsertStateData base;
+ CommandId cid;
+ int options;
+ BulkInsertState bistate;
+ TableBufferedInsertFlushCb flush_cb;
+ void *flush_ctx;
+ HeapTuple *tuples; /* buffered HeapTuples (in batch_ctx) */
+ int ntuples;
+ int max_tuples;
+ Size buffered_bytes; /* sum of tuples[i]->t_len */
+ TupleTableSlot *scratch_slot; /* callback-only; HeapTuple-backed */
+ MemoryContext batch_ctx; /* per-batch HeapTuples; reset after flush */
+ MemoryContext mi_ctx; /* reset after each heap_multi_insert */
+ MemoryContext state_ctx; /* long-lived; owns stable state */
+} HeapBufferedInsertState;
+
+#define HEAP_BUFFERED_INSERT_MAX_TUPLES 1000
+#define HEAP_BUFFERED_INSERT_MAX_BYTES 65535
+
+static void heap_buffered_insert_do_flush(HeapBufferedInsertState *hstate);
+static void heap_buffered_insert_finish_bulkinsert(HeapBufferedInsertState *hstate);
+
+/* heap_multi_insert core, takes HeapTuples directly (defined near heap_multi_insert) */
+static void heap_multi_insert_raw(Relation relation, HeapTuple *heaptuples,
+ int ntuples, CommandId cid, uint32 options,
+ BulkInsertState bistate);
+
+TableBufferedInsertState
+heap_buffered_insert_begin(Relation rel, CommandId cid, int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx)
+{
+ HeapBufferedInsertState *hstate;
+ MemoryContext state_ctx;
+ MemoryContext old_ctx;
+
+ state_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "HeapBufferedInsert",
+ ALLOCSET_DEFAULT_SIZES);
+ old_ctx = MemoryContextSwitchTo(state_ctx);
+
+ hstate = palloc0(sizeof(HeapBufferedInsertState));
+ hstate->base.rel = rel;
+ hstate->cid = cid;
+ hstate->options = options;
+ hstate->flush_cb = flush_cb;
+ hstate->flush_ctx = flush_ctx;
+ hstate->state_ctx = state_ctx;
+
+ hstate->max_tuples = HEAP_BUFFERED_INSERT_MAX_TUPLES;
+ hstate->tuples = palloc_array(HeapTuple, hstate->max_tuples);
+ hstate->ntuples = 0;
+ hstate->buffered_bytes = 0;
+
+ if (options & TABLE_INSERT_BAS_BULKWRITE)
+ hstate->bistate = GetBulkInsertState();
+ else
+ hstate->bistate = NULL;
+
+ hstate->batch_ctx = AllocSetContextCreate(state_ctx,
+ "HeapBufInsBatch",
+ ALLOCSET_DEFAULT_SIZES);
+
+ hstate->mi_ctx = AllocSetContextCreate(state_ctx,
+ "HeapBufInsFlush",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Allocate the scratch slot only when a flush callback is present.
+ * No-callback callers (CTAS, CMV, RMV) pay zero slot overhead.
+ */
+ if (flush_cb != NULL)
+ hstate->scratch_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel),
+ &TTSOpsHeapTuple);
+ else
+ hstate->scratch_slot = NULL;
+
+ MemoryContextSwitchTo(old_ctx);
+
+ return &hstate->base;
+}
+
+void
+heap_buffered_insert_put(TableBufferedInsertState state, TupleTableSlot *slot)
+{
+ HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state;
+ MemoryContext old_ctx;
+ HeapTuple htup;
+
+ /* Auto-flush if tuple count is at capacity. */
+ if (hstate->ntuples >= hstate->max_tuples)
+ heap_buffered_insert_do_flush(hstate);
+
+ /*
+ * Produce a self-contained HeapTuple in batch_ctx. ExecCopySlotHeapTuple
+ * invokes the slot-type's copy_heap_tuple method, which for every
+ * built-in slot type allocates a new HeapTuple in CurrentMemoryContext.
+ * This is the *only* per-row materialization -- the flush path consumes
+ * the HeapTuple directly via heap_multi_insert_raw() with no further
+ * slot conversion or copy.
+ */
+ old_ctx = MemoryContextSwitchTo(hstate->batch_ctx);
+ htup = ExecCopySlotHeapTuple(slot);
+ MemoryContextSwitchTo(old_ctx);
+
+ hstate->tuples[hstate->ntuples++] = htup;
+ hstate->buffered_bytes += htup->t_len;
+
+ /* Byte-threshold flush: exact tracking from the HeapTuple t_len. */
+ if (hstate->buffered_bytes > HEAP_BUFFERED_INSERT_MAX_BYTES)
+ heap_buffered_insert_do_flush(hstate);
+}
+
+void
+heap_buffered_insert_flush(TableBufferedInsertState state)
+{
+ HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state;
+
+ if (hstate->ntuples > 0)
+ heap_buffered_insert_do_flush(hstate);
+}
+
+void
+heap_buffered_insert_end(TableBufferedInsertState state)
+{
+ HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state;
+
+ /* Flush any remaining tuples */
+ if (hstate->ntuples > 0)
+ heap_buffered_insert_do_flush(hstate);
+
+ /* Clean up the scratch slot used for flush callback */
+ if (hstate->scratch_slot != NULL)
+ ExecDropSingleTupleTableSlot(hstate->scratch_slot);
+
+ /* Perform finish-bulk-insert cleanup (subsumes table_finish_bulk_insert) */
+ heap_buffered_insert_finish_bulkinsert(hstate);
+
+ /*
+ * Release all memory owned by the state, including batch_ctx and mi_ctx
+ * (both are children of state_ctx).
+ */
+ MemoryContextDelete(hstate->state_ctx);
+}
+
+/*
+ * Internal: flush all buffered HeapTuples via heap_multi_insert_raw, then
+ * invoke the flush callback (if any) once per written tuple. Resets both
+ * per-batch and per-flush contexts so no payload survives across cycles.
+ *
+ * Callback-path isolation: the scratch_slot and tuple-to-slot conversion
+ * only execute when flush_cb is non-NULL, so no-callback callers (CTAS,
+ * CMV, RMV) pay zero callback overhead per tuple.
+ */
+static void
+heap_buffered_insert_do_flush(HeapBufferedInsertState *hstate)
+{
+ MemoryContext old_ctx;
+ int ntuples = hstate->ntuples;
+
+ Assert(ntuples > 0);
+
+ /*
+ * Switch to the per-flush memory context. heap_multi_insert_raw's
+ * transient allocations (including any toasted-tuple copies from
+ * heap_prepare_insert) land here and are bulk-freed after the callback.
+ */
+ old_ctx = MemoryContextSwitchTo(hstate->mi_ctx);
+
+ heap_multi_insert_raw(hstate->base.rel,
+ hstate->tuples,
+ ntuples,
+ hstate->cid,
+ hstate->options,
+ hstate->bistate);
+
+ MemoryContextSwitchTo(old_ctx);
+
+ /*
+ * Invoke the flush callback once per flushed tuple, in insertion order.
+ * After heap_multi_insert_raw, each tuples[i]->t_self holds the stored
+ * TID, and tuples[i] points to the post-prepare (possibly toasted) tuple
+ * in mi_ctx (if toasted) or the original in batch_ctx (if not). Both
+ * contexts are still alive here.
+ */
+ if (hstate->flush_cb != NULL)
+ {
+ for (int i = 0; i < ntuples; i++)
+ {
+ ExecStoreHeapTuple(hstate->tuples[i], hstate->scratch_slot, false);
+ hstate->scratch_slot->tts_tid = hstate->tuples[i]->t_self;
+ hstate->scratch_slot->tts_tableOid = hstate->tuples[i]->t_tableOid;
+ hstate->flush_cb(hstate->flush_ctx, hstate->scratch_slot);
+ ExecClearTuple(hstate->scratch_slot);
+ }
+ }
+
+ /* Reset both contexts now that the callback is done. */
+ MemoryContextReset(hstate->mi_ctx);
+ MemoryContextReset(hstate->batch_ctx);
+
+ hstate->ntuples = 0;
+ hstate->buffered_bytes = 0;
+}
+
+/*
+ * Perform heap-specific finish-bulk-insert cleanup.
+ *
+ * This is the buffered-insert equivalent of what callers of the non-buffered
+ * path achieve by calling FreeBulkInsertState() + table_finish_bulk_insert()
+ * at teardown. end() must subsume both.
+ *
+ * For the current in-tree heap AM, the cleanup consists of:
+ *
+ * 1. Release the BulkInsertState (buffer pin + bulk-write access strategy).
+ *
+ * 2. Any action that heap's finish_bulk_insert AM callback would perform.
+ * Heap does not currently register that callback (the slot in
+ * heapam_methods is NULL), so no additional action is required.
+ */
+static void
+heap_buffered_insert_finish_bulkinsert(HeapBufferedInsertState *hstate)
+{
+ if (hstate->bistate != NULL)
+ FreeBulkInsertState(hstate->bistate);
+
+ /* Heap does not currently register a finish_bulk_insert AM callback. */
+}
+
+
/*
* heap_insert - insert tuple into a heap
*
@@ -2268,22 +2526,24 @@ heap_multi_insert_pages(HeapTuple *heaptuples, int done, int ntuples, Size saveF
}
/*
- * heap_multi_insert - insert multiple tuples into a heap
+ * heap_multi_insert_raw - core multi-insert for a HeapTuple array
*
- * This is like heap_insert(), but inserts multiple tuples in one operation.
- * That's faster than calling heap_insert() in a loop, because when multiple
- * tuples can be inserted on a single page, we can write just a single WAL
- * record covering all of them, and only need to lock/unlock the page once.
+ * Takes an array of pre-formed HeapTuples, runs heap_prepare_insert on each
+ * (toast + header setup), and inserts them into heap pages. The heaptuples
+ * array is updated in-place: after return, each entry points to the prepared
+ * (possibly toasted) tuple with t_self set to the stored TID.
+ *
+ * This is the shared core for heap_multi_insert (slot-based callers) and
+ * heap_buffered_insert_do_flush (HeapTuple-based callers).
*
* Note: this leaks memory into the current memory context. You can create a
* temporary context before calling this, if that's a problem.
*/
-void
-heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
- CommandId cid, uint32 options, BulkInsertState bistate)
+static void
+heap_multi_insert_raw(Relation relation, HeapTuple *heaptuples, int ntuples,
+ CommandId cid, uint32 options, BulkInsertState bistate)
{
TransactionId xid = GetCurrentTransactionId();
- HeapTuple *heaptuples;
int i;
int ndone;
PGAlignedBlock scratch;
@@ -2306,16 +2566,11 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
HEAP_DEFAULT_FILLFACTOR);
- /* Toast and set header data in all the slots */
- heaptuples = palloc(ntuples * sizeof(HeapTuple));
+ /* Toast and set header data in all the tuples */
for (i = 0; i < ntuples; i++)
{
- HeapTuple tuple;
-
- tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
- slots[i]->tts_tableOid = RelationGetRelid(relation);
- tuple->t_tableOid = slots[i]->tts_tableOid;
- heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+ heaptuples[i]->t_tableOid = RelationGetRelid(relation);
+ heaptuples[i] = heap_prepare_insert(relation, heaptuples[i], xid, cid,
options);
}
@@ -2639,11 +2894,41 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
}
+ pgstat_count_heap_insert(relation, ntuples);
+}
+
+/*
+ * heap_multi_insert - insert multiple tuples into a heap
+ *
+ * This is like heap_insert(), but inserts multiple tuples in one operation.
+ * That's faster than calling heap_insert() in a loop, because when multiple
+ * tuples can be inserted on a single page, we can write just a single WAL
+ * record covering all of them, and only need to lock/unlock the page once.
+ *
+ * Note: this leaks memory into the current memory context. You can create a
+ * temporary context before calling this, if that's a problem.
+ */
+void
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
+ CommandId cid, uint32 options, BulkInsertState bistate)
+{
+ HeapTuple *heaptuples;
+ int i;
+
+ heaptuples = palloc(ntuples * sizeof(HeapTuple));
+ for (i = 0; i < ntuples; i++)
+ {
+ heaptuples[i] = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+ slots[i]->tts_tableOid = RelationGetRelid(relation);
+ }
+
+ heap_multi_insert_raw(relation, heaptuples, ntuples, cid, options, bistate);
+
/* copy t_self fields back to the caller's slots */
for (i = 0; i < ntuples; i++)
slots[i]->tts_tid = heaptuples[i]->t_self;
- pgstat_count_heap_insert(relation, ntuples);
+ pfree(heaptuples);
}
/*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 20d3b46e062..65226b07a5c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2697,7 +2697,12 @@ static const TableAmRoutine heapam_methods = {
.scan_bitmap_next_tuple = heapam_scan_bitmap_next_tuple,
.scan_sample_next_block = heapam_scan_sample_next_block,
- .scan_sample_next_tuple = heapam_scan_sample_next_tuple
+ .scan_sample_next_tuple = heapam_scan_sample_next_tuple,
+
+ .buffered_insert_begin = heap_buffered_insert_begin,
+ .buffered_insert_put = heap_buffered_insert_put,
+ .buffered_insert_flush = heap_buffered_insert_flush,
+ .buffered_insert_end = heap_buffered_insert_end,
};
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 5450a27faeb..a0ff123fa83 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -93,6 +93,18 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->scan_sample_next_block != NULL);
Assert(routine->scan_sample_next_tuple != NULL);
+ /*
+ * Buffered-insert callbacks: either all four are NULL (AM does not
+ * support buffered inserts), or all four are non-NULL. No partially
+ * populated groups are allowed.
+ */
+ Assert((routine->buffered_insert_begin == NULL) ==
+ (routine->buffered_insert_put == NULL));
+ Assert((routine->buffered_insert_begin == NULL) ==
+ (routine->buffered_insert_flush == NULL));
+ Assert((routine->buffered_insert_begin == NULL) ==
+ (routine->buffered_insert_end == NULL));
+
return routine;
}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 5176478c295..d8b34e37627 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -379,6 +379,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
extern void heap_multi_insert(Relation relation, TupleTableSlot **slots,
int ntuples, CommandId cid, uint32 options,
BulkInsertState bistate);
+
+/* heap buffered-insert lifecycle */
+extern TableBufferedInsertState heap_buffered_insert_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx);
+extern void heap_buffered_insert_put(TableBufferedInsertState state,
+ TupleTableSlot *slot);
+extern void heap_buffered_insert_flush(TableBufferedInsertState state);
+extern void heap_buffered_insert_end(TableBufferedInsertState state);
+
extern TM_Result heap_delete(Relation relation, const ItemPointerData *tid,
CommandId cid, uint32 options, Snapshot crosscheck,
bool wait, TM_FailureData *tmfd);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index f2c36696bca..d5a1875d93c 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -284,6 +284,7 @@ typedef struct TM_IndexDeleteOp
#define TABLE_INSERT_SKIP_FSM 0x0002
#define TABLE_INSERT_FROZEN 0x0004
#define TABLE_INSERT_NO_LOGICAL 0x0008
+#define TABLE_INSERT_BAS_BULKWRITE 0x0020
/* "options" flag bits for table_tuple_delete */
#define TABLE_DELETE_CHANGING_PARTITION (1 << 0)
@@ -307,6 +308,40 @@ typedef void (*IndexBuildCallback) (Relation index,
bool tupleIsAlive,
void *state);
+/*
+ * State for a buffered-insert session. AMs embed this as the first field of
+ * their private state struct. The base struct is minimal: only the target
+ * relation is exposed, which is needed for AM dispatch via the inline wrapper
+ * functions.
+ *
+ * AMs that do not support buffered inserts leave the buffered_insert_begin
+ * callback NULL; callers detect this via table_buffered_insert_begin()
+ * returning NULL.
+ */
+typedef struct TableBufferedInsertStateData
+{
+ Relation rel; /* target relation -- needed for AM dispatch */
+} TableBufferedInsertStateData;
+
+typedef TableBufferedInsertStateData *TableBufferedInsertState;
+
+/*
+ * Callback invoked once per flushed tuple, in insertion order, after the AM
+ * writes a batch of buffered tuples to storage.
+ *
+ * The slot is an AM-owned scratch object used solely as a handoff vehicle.
+ * It contains the stored tuple with TID (or AM-equivalent locator) set.
+ * It is valid only for the duration of this callback invocation; the AM may
+ * reuse the same scratch slot across successive invocations within a single
+ * flush. If the caller needs data beyond the callback, it must copy within
+ * the callback body.
+ *
+ * The AM must not assume the callback is cheap, side-effect-free, or
+ * non-throwing.
+ */
+typedef void (*TableBufferedInsertFlushCb)(void *context,
+ TupleTableSlot *slot);
+
/*
* API struct for a table AM. Note this must be allocated in a
* server-lifetime manner, typically as a static const struct, which then gets
@@ -614,6 +649,64 @@ typedef struct TableAmRoutine
void (*finish_bulk_insert) (Relation rel, uint32 options);
+ /* ------------------------------------------------------------------------
+ * Buffered-insert lifecycle callbacks.
+ *
+ * Optional optimization for batch inserts. All four callbacks must be
+ * either NULL together (AM does not support buffered inserts) or non-NULL
+ * together (validated at AM registration time). No partially populated
+ * groups are allowed.
+ * ------------------------------------------------------------------------
+ */
+
+ /*
+ * Begin a buffered-insert session. Returns an opaque state handle whose
+ * first bytes are a TableBufferedInsertStateData with rel set. The AM
+ * stores cid, options, flush_cb, and flush_ctx in its private extension
+ * of the state struct.
+ *
+ * flush_cb may be NULL, in which case the AM skips per-tuple notification
+ * after flushing.
+ *
+ * Optional callback -- NULL means the AM does not support buffered inserts.
+ */
+ TableBufferedInsertState (*buffered_insert_begin)(
+ Relation rel,
+ CommandId cid,
+ int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx);
+
+ /*
+ * Submit one tuple for buffered insertion. The AM reads the tuple data
+ * from the slot and captures it internally before returning. The caller
+ * retains ownership of the slot and may reuse it immediately.
+ *
+ * The AM may auto-flush during this call if its internal buffer is full;
+ * the caller must be prepared for the flush callback to fire during put().
+ */
+ void (*buffered_insert_put)(
+ TableBufferedInsertState state,
+ TupleTableSlot *slot);
+
+ /*
+ * Force-flush all buffered tuples to storage. Invokes the flush callback
+ * (if non-NULL) once per flushed tuple, in insertion order, using an
+ * AM-owned scratch slot. After return, the internal buffer is empty.
+ */
+ void (*buffered_insert_flush)(
+ TableBufferedInsertState state);
+
+ /*
+ * Flush remaining buffered tuples, perform finish-bulk-insert cleanup
+ * (e.g. FSM update for heap), and release all resources owned by the
+ * state. The state pointer is invalid after this call. Callers must
+ * not separately call table_finish_bulk_insert() -- end() subsumes it.
+ */
+ void (*buffered_insert_end)(
+ TableBufferedInsertState state);
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
@@ -1666,6 +1759,72 @@ table_finish_bulk_insert(Relation rel, uint32 options)
}
+/* ----------------------------------------------------------------------------
+ * Buffered-insert lifecycle functions.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * Begin a buffered-insert session for the given relation.
+ *
+ * Returns NULL if the relation's AM does not support buffered inserts, in
+ * which case the caller should fall back to the single-row insert path.
+ *
+ * flush_cb may be NULL when no per-tuple post-insert work is needed (e.g.
+ * CTAS/CMV/RMV). When non-NULL, it is invoked once per flushed tuple in
+ * insertion order, using an AM-owned scratch slot valid only for the duration
+ * of each callback invocation.
+ */
+static inline TableBufferedInsertState
+table_buffered_insert_begin(Relation rel, CommandId cid, int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx)
+{
+ if (rel->rd_tableam->buffered_insert_begin == NULL)
+ return NULL;
+ return rel->rd_tableam->buffered_insert_begin(rel, cid, options,
+ flush_cb, flush_ctx);
+}
+
+/*
+ * Submit one tuple for buffered insertion. The AM reads from the slot and
+ * captures the data internally; the caller retains slot ownership and may
+ * reuse it immediately after this call returns.
+ *
+ * The AM may auto-flush during put() if its buffer is full; the flush
+ * callback (if any) may fire during this call.
+ */
+static inline void
+table_buffered_insert_put(TableBufferedInsertState state,
+ TupleTableSlot *slot)
+{
+ state->rel->rd_tableam->buffered_insert_put(state, slot);
+}
+
+/*
+ * Force-flush all buffered tuples to storage. The flush callback (if
+ * non-NULL) fires once per flushed tuple in insertion order. After return
+ * the buffer is empty.
+ */
+static inline void
+table_buffered_insert_flush(TableBufferedInsertState state)
+{
+ state->rel->rd_tableam->buffered_insert_flush(state);
+}
+
+/*
+ * Flush remaining buffered tuples, perform finish-bulk-insert cleanup
+ * (e.g. FSM update for heap), and release all resources. The state pointer
+ * is invalid after this call. Do not separately call
+ * table_finish_bulk_insert() -- end() subsumes it.
+ */
+static inline void
+table_buffered_insert_end(TableBufferedInsertState state)
+{
+ state->rel->rd_tableam->buffered_insert_end(state);
+}
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 0a74ab5c86f..fa964043d4f 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -20,6 +20,7 @@ SUBDIRS = \
test_binaryheap \
test_bitmapset \
test_bloomfilter \
+ test_buffered_insert \
test_cloexec \
test_checksums \
test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 4bca42bb370..aae892699b9 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -20,6 +20,7 @@ subdir('test_autovacuum')
subdir('test_binaryheap')
subdir('test_bitmapset')
subdir('test_bloomfilter')
+subdir('test_buffered_insert')
subdir('test_cloexec')
subdir('test_checksums')
subdir('test_copy_callbacks')
diff --git a/src/test/modules/test_buffered_insert/Makefile b/src/test/modules/test_buffered_insert/Makefile
new file mode 100644
index 00000000000..5c4926a4c3a
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_buffered_insert/Makefile
+
+MODULE_big = test_buffered_insert
+OBJS = \
+ $(WIN32RES) \
+ test_buffered_insert.o
+PGFILEDESC = "test_buffered_insert - test Table AM buffered-insert lifecycle"
+
+EXTENSION = test_buffered_insert
+DATA = test_buffered_insert--1.0.sql
+
+REGRESS = test_buffered_insert
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_buffered_insert
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out b/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out
new file mode 100644
index 00000000000..79163952f6f
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out
@@ -0,0 +1,108 @@
+CREATE EXTENSION test_buffered_insert;
+-- Target table: single integer column.
+CREATE TABLE buffered_insert_test (a INT);
+-- Basic test: insert 5 rows via buffered-insert with NULL flush callback.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 5);
+ test_buffered_insert_basic
+----------------------------
+ 5
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 5
+(1 row)
+
+SELECT a FROM buffered_insert_test ORDER BY a;
+ a
+---
+ 1
+ 2
+ 3
+ 4
+ 5
+(5 rows)
+
+TRUNCATE buffered_insert_test;
+-- Test with flush callback: insert 5 rows, verify callback count matches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 5);
+ test_buffered_insert_with_callback
+------------------------------------
+ 5
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 5
+(1 row)
+
+SELECT a FROM buffered_insert_test ORDER BY a;
+ a
+---
+ 1
+ 2
+ 3
+ 4
+ 5
+(5 rows)
+
+TRUNCATE buffered_insert_test;
+-- Trigger auto-flush: insert more rows than HEAP_BUFFERED_INSERT_MAX_SLOTS
+-- (1000) to verify auto-flush during put() works correctly.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 1500);
+ test_buffered_insert_basic
+----------------------------
+ 1500
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 1500
+(1 row)
+
+-- Spot-check first and last values.
+SELECT min(a), max(a) FROM buffered_insert_test;
+ min | max
+-----+------
+ 1 | 1500
+(1 row)
+
+TRUNCATE buffered_insert_test;
+-- Verify flush callback fires for all rows including auto-flushed batches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 1500);
+ test_buffered_insert_with_callback
+------------------------------------
+ 1500
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 1500
+(1 row)
+
+TRUNCATE buffered_insert_test;
+-- Test explicit flush() mid-session: flush after first 5 rows, insert 5 more,
+-- then end(). Callback count must equal total rows.
+SELECT test_buffered_insert_flush_mid('buffered_insert_test'::regclass, 10);
+ test_buffered_insert_flush_mid
+--------------------------------
+ 10
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 10
+(1 row)
+
+SELECT min(a), max(a) FROM buffered_insert_test;
+ min | max
+-----+-----
+ 1 | 10
+(1 row)
+
+DROP TABLE buffered_insert_test;
diff --git a/src/test/modules/test_buffered_insert/meson.build b/src/test/modules/test_buffered_insert/meson.build
new file mode 100644
index 00000000000..d738ccb1a84
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2022-2026, PostgreSQL Global Development Group
+
+test_buffered_insert_sources = files(
+ 'test_buffered_insert.c',
+)
+
+if host_system == 'windows'
+ test_buffered_insert_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'test_buffered_insert',
+ '--FILEDESC', 'test_buffered_insert - test Table AM buffered-insert lifecycle',])
+endif
+
+test_buffered_insert = shared_module('test_buffered_insert',
+ test_buffered_insert_sources,
+ kwargs: pg_test_mod_args,
+)
+test_install_libs += test_buffered_insert
+
+test_install_data += files(
+ 'test_buffered_insert.control',
+ 'test_buffered_insert--1.0.sql',
+)
+
+tests += {
+ 'name': 'test_buffered_insert',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'regress': {
+ 'sql': [
+ 'test_buffered_insert',
+ ],
+ },
+}
diff --git a/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql b/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql
new file mode 100644
index 00000000000..23ac09cc9bc
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql
@@ -0,0 +1,41 @@
+CREATE EXTENSION test_buffered_insert;
+
+-- Target table: single integer column.
+CREATE TABLE buffered_insert_test (a INT);
+
+-- Basic test: insert 5 rows via buffered-insert with NULL flush callback.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 5);
+SELECT count(*) FROM buffered_insert_test;
+SELECT a FROM buffered_insert_test ORDER BY a;
+
+TRUNCATE buffered_insert_test;
+
+-- Test with flush callback: insert 5 rows, verify callback count matches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 5);
+SELECT count(*) FROM buffered_insert_test;
+SELECT a FROM buffered_insert_test ORDER BY a;
+
+TRUNCATE buffered_insert_test;
+
+-- Trigger auto-flush: insert more rows than HEAP_BUFFERED_INSERT_MAX_SLOTS
+-- (1000) to verify auto-flush during put() works correctly.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 1500);
+SELECT count(*) FROM buffered_insert_test;
+-- Spot-check first and last values.
+SELECT min(a), max(a) FROM buffered_insert_test;
+
+TRUNCATE buffered_insert_test;
+
+-- Verify flush callback fires for all rows including auto-flushed batches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 1500);
+SELECT count(*) FROM buffered_insert_test;
+
+TRUNCATE buffered_insert_test;
+
+-- Test explicit flush() mid-session: flush after first 5 rows, insert 5 more,
+-- then end(). Callback count must equal total rows.
+SELECT test_buffered_insert_flush_mid('buffered_insert_test'::regclass, 10);
+SELECT count(*) FROM buffered_insert_test;
+SELECT min(a), max(a) FROM buffered_insert_test;
+
+DROP TABLE buffered_insert_test;
diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql b/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql
new file mode 100644
index 00000000000..26aa0635240
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql
@@ -0,0 +1,30 @@
+/* src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_buffered_insert" to load this file. \quit
+
+--
+-- Insert rows through the buffered-insert lifecycle with a NULL flush
+-- callback (the CTAS/CMV/RMV pattern). Returns the number of rows put().
+--
+CREATE FUNCTION test_buffered_insert_basic(pg_catalog.regclass, pg_catalog.int4)
+ RETURNS pg_catalog.int4
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+--
+-- Insert rows through the buffered-insert lifecycle with a flush callback
+-- that counts invocations. Returns the total number of flush-callback
+-- invocations (should equal the number of rows inserted).
+--
+CREATE FUNCTION test_buffered_insert_with_callback(pg_catalog.regclass, pg_catalog.int4)
+ RETURNS pg_catalog.int4
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+--
+-- Exercise explicit flush() mid-session: inserts half the rows, calls
+-- flush(), inserts the other half, then calls end(). Returns the total
+-- number of flush-callback invocations (should equal nrows).
+--
+CREATE FUNCTION test_buffered_insert_flush_mid(pg_catalog.regclass, pg_catalog.int4)
+ RETURNS pg_catalog.int4
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert.c b/src/test/modules/test_buffered_insert/test_buffered_insert.c
new file mode 100644
index 00000000000..fbda7b1c70b
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/test_buffered_insert.c
@@ -0,0 +1,233 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_buffered_insert.c
+ * Minimal validation for the Table AM buffered-insert lifecycle API.
+ *
+ * Exercises begin/put/flush/end directly at the Table AM layer on a
+ * real heap relation, without going through any higher-level caller
+ * (CTAS, COPY, etc.). This keeps the test within Patch 0001 scope.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_buffered_insert/test_buffered_insert.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/tableam.h"
+#include "access/table.h"
+#include "access/xact.h"
+#include "catalog/namespace.h"
+#include "executor/tuptable.h"
+#include "fmgr.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+/*
+ * test_buffered_insert_basic(regclass, int4)
+ *
+ * Opens the given relation, inserts nrows tuples through the buffered-insert
+ * API with a NULL flush callback, and returns the number of rows put().
+ * The tuples inserted have the form (i) for a single-integer-column table.
+ */
+PG_FUNCTION_INFO_V1(test_buffered_insert_basic);
+Datum
+test_buffered_insert_basic(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nrows = PG_GETARG_INT32(1);
+ Relation rel;
+ TableBufferedInsertState state;
+ TupleTableSlot *slot;
+ TupleDesc tupdesc;
+ int i;
+
+ rel = table_open(relid, RowExclusiveLock);
+ tupdesc = RelationGetDescr(rel);
+
+ state = table_buffered_insert_begin(rel,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_BAS_BULKWRITE,
+ NULL, NULL);
+
+ if (state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("AM does not support buffered inserts")));
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ for (i = 0; i < nrows; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ table_buffered_insert_end(state);
+
+ ExecDropSingleTupleTableSlot(slot);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_INT32(nrows);
+}
+
+/*
+ * Flush callback context: counts invocations and verifies each slot has
+ * a valid TID.
+ */
+typedef struct FlushCbContext
+{
+ int count;
+} FlushCbContext;
+
+static void
+test_flush_callback(void *context, TupleTableSlot *slot)
+{
+ FlushCbContext *ctx = (FlushCbContext *) context;
+
+ if (!ItemPointerIsValid(&slot->tts_tid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("flush callback received slot with invalid TID")));
+
+ ctx->count++;
+}
+
+/*
+ * test_buffered_insert_with_callback(regclass, int4)
+ *
+ * Same as basic, but passes a flush callback that counts invocations and
+ * validates TIDs. Returns the total flush-callback invocation count.
+ */
+PG_FUNCTION_INFO_V1(test_buffered_insert_with_callback);
+Datum
+test_buffered_insert_with_callback(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nrows = PG_GETARG_INT32(1);
+ Relation rel;
+ TableBufferedInsertState state;
+ TupleTableSlot *slot;
+ TupleDesc tupdesc;
+ FlushCbContext ctx;
+ int i;
+
+ ctx.count = 0;
+
+ rel = table_open(relid, RowExclusiveLock);
+ tupdesc = RelationGetDescr(rel);
+
+ state = table_buffered_insert_begin(rel,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_BAS_BULKWRITE,
+ test_flush_callback,
+ &ctx);
+
+ if (state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("AM does not support buffered inserts")));
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ for (i = 0; i < nrows; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ table_buffered_insert_end(state);
+
+ ExecDropSingleTupleTableSlot(slot);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_INT32(ctx.count);
+}
+
+/*
+ * test_buffered_insert_flush_mid(regclass, int4)
+ *
+ * Exercises explicit flush() mid-session: inserts half the rows, calls
+ * flush(), inserts the other half, then calls end(). Returns the total
+ * flush-callback count, which should equal nrows.
+ */
+PG_FUNCTION_INFO_V1(test_buffered_insert_flush_mid);
+Datum
+test_buffered_insert_flush_mid(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nrows = PG_GETARG_INT32(1);
+ Relation rel;
+ TableBufferedInsertState state;
+ TupleTableSlot *slot;
+ TupleDesc tupdesc;
+ FlushCbContext ctx;
+ int half = nrows / 2;
+ int i;
+
+ ctx.count = 0;
+
+ rel = table_open(relid, RowExclusiveLock);
+ tupdesc = RelationGetDescr(rel);
+
+ state = table_buffered_insert_begin(rel,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_BAS_BULKWRITE,
+ test_flush_callback,
+ &ctx);
+
+ if (state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("AM does not support buffered inserts")));
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ /* First half */
+ for (i = 0; i < half; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ /* Explicit flush mid-session */
+ table_buffered_insert_flush(state);
+
+ /* Second half */
+ for (i = half; i < nrows; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ /* end() flushes the remaining tuples */
+ table_buffered_insert_end(state);
+
+ ExecDropSingleTupleTableSlot(slot);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_INT32(ctx.count);
+}
diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert.control b/src/test/modules/test_buffered_insert/test_buffered_insert.control
new file mode 100644
index 00000000000..3221c765820
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/test_buffered_insert.control
@@ -0,0 +1,4 @@
+comment = 'Test code for Table AM buffered-insert lifecycle'
+default_version = '1.0'
+module_pathname = '$libdir/test_buffered_insert'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9f1dd55213d..257aee1e684 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1255,6 +1255,7 @@ HeadlineWordEntry
HeapCheckContext
HeapCheckReadStreamData
HeapPageFreeze
+HeapBufferedInsertState
HeapScanDesc
HeapScanDescData
HeapTuple
@@ -3123,6 +3124,9 @@ T_Action
T_WorkerStatus
TableAmRoutine
TableAttachInfo
+TableBufferedInsertFlushCb
+TableBufferedInsertState
+TableBufferedInsertStateData
TableDataInfo
TableFunc
TableFuncRoutine
--
2.52.0
[application/octet-stream] v1-0005-executor-adopt-buffered-insert-API-for-restricted.patch (24.3K, 4-v1-0005-executor-adopt-buffered-insert-API-for-restricted.patch)
download | inline diff:
From f2a38d918a0cb3f4279c75f399c82eedb37c909d Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Sun, 26 Apr 2026 22:24:20 -0700
Subject: [PATCH v1 5/5] executor: adopt buffered-insert API for restricted
INSERT ... SELECT
Add a restricted first-step buffered-insert adoption for
`INSERT INTO ... SELECT` in the executor.
This patch integrates the existing buffered-insert lifecycle API into the
plain non-partitioned heap `CMD_INSERT` path in `ExecInsert()` /
`ExecModifyTable()`, using a local flush callback for post-insert work.
The buffered path is intentionally narrow. It falls back to the existing
single-row path for cases outside the restricted scope, including:
- ON CONFLICT
- RETURNING
- partitioned targets
- BEFORE ROW / INSTEAD OF triggers
- FDW targets
- MERGE / cross-partition UPDATE insert side
- volatile target-side default expressions
The flush callback performs the minimal post-insert executor work needed
for this first step:
- index maintenance via `ExecInsertIndexTuples()`
- AFTER ROW trigger firing via `ExecARInsertTriggers()`
- `es_processed` accounting
Add focused regression coverage for:
- basic bulk INSERT ... SELECT
- indexed targets
- AFTER ROW trigger behavior
- index + trigger combination
- fallback cases (ON CONFLICT, RETURNING, BEFORE ROW trigger,
partitioned target, volatile target defaults)
- zero-row insert
---
src/backend/executor/nodeModifyTable.c | 164 +++++++++++
src/include/nodes/execnodes.h | 5 +
src/test/regress/expected/insert_buffered.out | 271 ++++++++++++++++++
src/test/regress/parallel_schedule | 2 +-
src/test/regress/sql/insert_buffered.sql | 209 ++++++++++++++
5 files changed, 650 insertions(+), 1 deletion(-)
create mode 100644 src/test/regress/expected/insert_buffered.out
create mode 100644 src/test/regress/sql/insert_buffered.sql
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 4cb057ca4f9..06af0637407 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -83,6 +83,85 @@ typedef struct MTTargetRelLookup
int relationIndex; /* rel's index in resultRelInfo[] array */
} MTTargetRelLookup;
+/*
+ * Flush callback context for buffered INSERT INTO ... SELECT.
+ *
+ * These are the parameters required by the three callback operations
+ * (ExecInsertIndexTuples, ExecARInsertTriggers, es_processed++):
+ *
+ * - estate, resultRelInfo: required by ExecInsertIndexTuples/ExecARInsertTriggers.
+ * - transition_capture: 5th parameter of ExecARInsertTriggers; the non-buffered
+ * path passes mtstate->mt_transition_capture for CMD_INSERT. NULL would
+ * silently break statement-level triggers with REFERENCING NEW TABLE.
+ * - canSetTag: the non-buffered path gates es_processed++ on this after the
+ * insert; the buffered return-NULL bypasses that gate, so the callback
+ * replicates it.
+ */
+typedef struct ExecBufferedInsertFlushState
+{
+ EState *estate;
+ ResultRelInfo *resultRelInfo;
+ TransitionCaptureState *transition_capture;
+ bool canSetTag;
+} ExecBufferedInsertFlushState;
+
+/*
+ * Flush callback for buffered INSERT INTO ... SELECT.
+ *
+ * Called once per flushed tuple after heap_multi_insert() completes a batch.
+ * Performs index maintenance, AFTER ROW trigger firing, and tuple counting.
+ */
+static void
+ExecBufferedInsertFlushCb(void *context, TupleTableSlot *slot)
+{
+ ExecBufferedInsertFlushState *ctx = (ExecBufferedInsertFlushState *) context;
+ ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+ EState *estate = ctx->estate;
+ List *recheckIndexes = NIL;
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(resultRelInfo, estate, 0,
+ slot, NIL, NULL);
+
+ ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+ ctx->transition_capture);
+
+ list_free(recheckIndexes);
+
+ if (ctx->canSetTag)
+ (estate->es_processed)++;
+}
+
+/*
+ * Check whether a relation has volatile default expressions.
+ *
+ * Conservative target-side restriction: if any column default contains a
+ * volatile function (excluding nextval), the buffered-insert path is not used.
+ * This mirrors COPY FROM's volatile_defexprs check.
+ */
+static bool
+ExecRelHasVolatileDefaults(Relation rel)
+{
+ TupleConstr *constr = RelationGetDescr(rel)->constr;
+
+ if (constr == NULL || constr->num_defval == 0)
+ return false;
+
+ for (int i = 0; i < constr->num_defval; i++)
+ {
+ Node *expr;
+
+ if (constr->defval[i].adbin == NULL)
+ continue;
+
+ expr = stringToNode(constr->defval[i].adbin);
+
+ if (contain_volatile_functions_not_nextval(expr))
+ return true;
+ }
+ return false;
+}
+
/*
* Context struct for a ModifyTable operation, containing basic execution
* state and some output variables populated by ExecUpdateAct() and
@@ -1269,6 +1348,49 @@ ExecInsert(ModifyTableContext *context,
}
else
{
+ /*
+ * Buffered-insert path: lazily open the session on first call,
+ * then submit tuples via put() instead of single-row insert.
+ * Post-insert work (indexes, triggers) fires in the flush callback.
+ */
+ if (mtstate->mt_buffered_insert_eligible &&
+ mtstate->mt_bi_state == NULL)
+ {
+ ExecBufferedInsertFlushState *flush_ctx;
+
+ flush_ctx = palloc(sizeof(ExecBufferedInsertFlushState));
+ flush_ctx->estate = estate;
+ flush_ctx->resultRelInfo = resultRelInfo;
+ flush_ctx->transition_capture = mtstate->mt_transition_capture;
+ flush_ctx->canSetTag = canSetTag;
+ mtstate->mt_bi_flush_ctx = flush_ctx;
+
+ mtstate->mt_bi_state =
+ table_buffered_insert_begin(resultRelationDesc,
+ estate->es_output_cid,
+ TABLE_INSERT_BAS_BULKWRITE,
+ ExecBufferedInsertFlushCb,
+ flush_ctx);
+ if (mtstate->mt_bi_state == NULL)
+ mtstate->mt_buffered_insert_eligible = false;
+ }
+
+ if (mtstate->mt_bi_state != NULL)
+ {
+ /*
+ * Pre-insert validation already ran in this else-branch
+ * above the ON CONFLICT test — specifically:
+ * tts_tableOid init, ExecComputeStoredGenerated,
+ * ExecWithCheckOptions (RLS), ExecConstraints,
+ * ExecPartitionCheck.
+ * This inner else-branch (no ON CONFLICT) is reached only
+ * after all of those. Submit the validated tuple to the
+ * AM buffer; post-insert work fires in the flush callback.
+ */
+ table_buffered_insert_put(mtstate->mt_bi_state, slot);
+ return NULL;
+ }
+
/* insert the tuple normally */
table_tuple_insert(resultRelationDesc, slot,
estate->es_output_cid,
@@ -5027,6 +5149,15 @@ ExecModifyTable(PlanState *pstate)
if (estate->es_insert_pending_result_relations != NIL)
ExecPendingInserts(estate);
+ /*
+ * Flush and clean up buffered-insert session if active.
+ */
+ if (node->mt_bi_state != NULL)
+ {
+ table_buffered_insert_end(node->mt_bi_state);
+ node->mt_bi_state = NULL;
+ }
+
/*
* We're done, but fire AFTER STATEMENT triggers before exiting.
*/
@@ -5773,6 +5904,30 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
resultRelInfo->ri_BatchSize = 1;
}
+ /*
+ * For CMD_INSERT without ON CONFLICT/RETURNING/partitioning/BEFORE ROW
+ * triggers, determine if the restricted buffered-insert path is eligible.
+ * AM support is resolved lazily at first ExecInsert() call.
+ */
+ if (operation == CMD_INSERT)
+ {
+ ModifyTable *mtnode = (ModifyTable *) mtstate->ps.plan;
+
+ resultRelInfo = mtstate->resultRelInfo;
+ mtstate->mt_buffered_insert_eligible =
+ (mtnode->onConflictAction == ONCONFLICT_NONE &&
+ resultRelInfo->ri_projectReturning == NULL &&
+ resultRelInfo->ri_RelationDesc->rd_rel->relkind !=
+ RELKIND_PARTITIONED_TABLE &&
+ !(resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row) &&
+ !(resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row) &&
+ resultRelInfo->ri_FdwRoutine == NULL &&
+ mtstate->operation == CMD_INSERT &&
+ !ExecRelHasVolatileDefaults(resultRelInfo->ri_RelationDesc));
+ }
+
/*
* Lastly, if this is not the primary (canSetTag) ModifyTable node, add it
* to estate->es_auxmodifytables so that it will be run to completion by
@@ -5802,6 +5957,15 @@ ExecEndModifyTable(ModifyTableState *node)
{
int i;
+ /*
+ * Defensive: clean up buffered-insert if end() was not reached above.
+ */
+ if (node->mt_bi_state != NULL)
+ {
+ table_buffered_insert_end(node->mt_bi_state);
+ node->mt_bi_state = NULL;
+ }
+
/*
* Allow any FDWs to shut down
*/
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 13359180d25..7790bb0ba38 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1508,6 +1508,11 @@ typedef struct ModifyTableState
List *mt_updateColnosLists;
List *mt_mergeActionLists;
List *mt_mergeJoinConditions;
+
+ /* Buffered-insert state for restricted INSERT INTO ... SELECT */
+ bool mt_buffered_insert_eligible;
+ struct TableBufferedInsertStateData *mt_bi_state;
+ void *mt_bi_flush_ctx; /* private to nodeModifyTable.c */
} ModifyTableState;
/* ----------------
diff --git a/src/test/regress/expected/insert_buffered.out b/src/test/regress/expected/insert_buffered.out
new file mode 100644
index 00000000000..3ee43ae8b04
--- /dev/null
+++ b/src/test/regress/expected/insert_buffered.out
@@ -0,0 +1,271 @@
+--
+-- Tests for buffered-insert adoption in INSERT INTO ... SELECT (Patch 0005).
+-- Restricted first step: non-partitioned heap target, no ON CONFLICT,
+-- no RETURNING, no BEFORE ROW triggers.
+--
+-- ============================================================
+-- T1: Basic bulk insert (exercises multiple auto-flush cycles)
+-- ============================================================
+CREATE TABLE bi_target_basic (id int, val text);
+INSERT INTO bi_target_basic
+SELECT g, 'row-' || g FROM generate_series(1, 2000) g;
+SELECT count(*) FROM bi_target_basic;
+ count
+-------
+ 2000
+(1 row)
+
+SELECT min(id), max(id) FROM bi_target_basic;
+ min | max
+-----+------
+ 1 | 2000
+(1 row)
+
+DROP TABLE bi_target_basic;
+-- ============================================================
+-- T2: Indexed target
+-- ============================================================
+CREATE TABLE bi_target_idx (id int, val text);
+CREATE INDEX bi_target_idx_id ON bi_target_idx (id);
+INSERT INTO bi_target_idx
+SELECT g, 'row-' || g FROM generate_series(1, 500) g;
+SELECT count(*) FROM bi_target_idx;
+ count
+-------
+ 500
+(1 row)
+
+-- Verify index is usable and correct
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_idx WHERE id BETWEEN 1 AND 500;
+ count
+-------
+ 500
+(1 row)
+
+RESET enable_seqscan;
+DROP TABLE bi_target_idx;
+-- ============================================================
+-- T3: AFTER ROW trigger
+-- ============================================================
+CREATE TABLE bi_target_trig (id int, val text);
+CREATE TABLE bi_audit (id int, val text, logged_at timestamp DEFAULT now());
+CREATE FUNCTION bi_audit_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+CREATE TRIGGER bi_target_trig_after
+ AFTER INSERT ON bi_target_trig
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_fn();
+INSERT INTO bi_target_trig
+SELECT g, 'row-' || g FROM generate_series(1, 50) g;
+SELECT count(*) FROM bi_target_trig;
+ count
+-------
+ 50
+(1 row)
+
+SELECT count(*) FROM bi_audit;
+ count
+-------
+ 50
+(1 row)
+
+-- Verify insertion order is preserved
+SELECT bool_and(t.id = a.id) AS order_preserved
+FROM (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_target_trig) t
+JOIN (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_audit) a
+ON t.rn = a.rn;
+ order_preserved
+-----------------
+ t
+(1 row)
+
+DROP TABLE bi_target_trig CASCADE;
+DROP TABLE bi_audit;
+DROP FUNCTION bi_audit_fn;
+-- ============================================================
+-- T4: Index + AFTER ROW trigger combined
+-- ============================================================
+CREATE TABLE bi_target_combo (id int, val text);
+CREATE INDEX bi_target_combo_id ON bi_target_combo (id);
+CREATE TABLE bi_audit_combo (id int, val text);
+CREATE FUNCTION bi_audit_combo_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit_combo (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+CREATE TRIGGER bi_target_combo_after
+ AFTER INSERT ON bi_target_combo
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_combo_fn();
+INSERT INTO bi_target_combo
+SELECT g, 'row-' || g FROM generate_series(1, 100) g;
+SELECT count(*) FROM bi_target_combo;
+ count
+-------
+ 100
+(1 row)
+
+SELECT count(*) FROM bi_audit_combo;
+ count
+-------
+ 100
+(1 row)
+
+-- Verify index correctness
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_combo WHERE id BETWEEN 1 AND 100;
+ count
+-------
+ 100
+(1 row)
+
+RESET enable_seqscan;
+DROP TABLE bi_target_combo CASCADE;
+DROP TABLE bi_audit_combo;
+DROP FUNCTION bi_audit_combo_fn;
+-- ============================================================
+-- T5: ON CONFLICT fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_conflict (id int PRIMARY KEY, val text);
+INSERT INTO bi_target_conflict VALUES (1, 'existing');
+INSERT INTO bi_target_conflict
+SELECT g, 'row-' || g FROM generate_series(1, 10) g
+ON CONFLICT (id) DO NOTHING;
+SELECT count(*) FROM bi_target_conflict;
+ count
+-------
+ 10
+(1 row)
+
+SELECT val FROM bi_target_conflict WHERE id = 1;
+ val
+----------
+ existing
+(1 row)
+
+DROP TABLE bi_target_conflict;
+-- ============================================================
+-- T6: RETURNING fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_ret (id int, val text);
+INSERT INTO bi_target_ret
+SELECT g, 'row-' || g FROM generate_series(1, 3) g
+RETURNING id, val;
+ id | val
+----+-------
+ 1 | row-1
+ 2 | row-2
+ 3 | row-3
+(3 rows)
+
+SELECT count(*) FROM bi_target_ret;
+ count
+-------
+ 3
+(1 row)
+
+DROP TABLE bi_target_ret;
+-- ============================================================
+-- T7: BEFORE ROW trigger fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_br (id int, val text);
+CREATE FUNCTION bi_br_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ NEW.val := NEW.val || '-modified';
+ RETURN NEW;
+END;
+$$;
+CREATE TRIGGER bi_target_br_before
+ BEFORE INSERT ON bi_target_br
+ FOR EACH ROW EXECUTE FUNCTION bi_br_fn();
+INSERT INTO bi_target_br
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+SELECT count(*) FROM bi_target_br;
+ count
+-------
+ 5
+(1 row)
+
+SELECT val FROM bi_target_br WHERE id = 1;
+ val
+----------------
+ row-1-modified
+(1 row)
+
+DROP TABLE bi_target_br;
+DROP FUNCTION bi_br_fn;
+-- ============================================================
+-- T8: Partitioned target fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_part (id int, val text) PARTITION BY RANGE (id);
+CREATE TABLE bi_target_part_1 PARTITION OF bi_target_part FOR VALUES FROM (1) TO (501);
+CREATE TABLE bi_target_part_2 PARTITION OF bi_target_part FOR VALUES FROM (501) TO (1001);
+INSERT INTO bi_target_part
+SELECT g, 'row-' || g FROM generate_series(1, 1000) g;
+SELECT count(*) FROM bi_target_part;
+ count
+-------
+ 1000
+(1 row)
+
+SELECT count(*) FROM bi_target_part_1;
+ count
+-------
+ 500
+(1 row)
+
+SELECT count(*) FROM bi_target_part_2;
+ count
+-------
+ 500
+(1 row)
+
+DROP TABLE bi_target_part;
+-- ============================================================
+-- T9: Volatile target-default fallback
+-- Expected to fall back to non-buffered path under E8.
+-- Test validates correctness; path selection is not observable
+-- from SQL output.
+-- ============================================================
+CREATE TABLE bi_target_volatile (
+ id int,
+ val text,
+ rand_val double precision DEFAULT random()
+);
+INSERT INTO bi_target_volatile (id, val)
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+SELECT count(*) FROM bi_target_volatile;
+ count
+-------
+ 5
+(1 row)
+
+-- Verify the volatile default was evaluated (all values should be distinct)
+SELECT count(DISTINCT rand_val) = count(*) AS all_distinct
+FROM bi_target_volatile;
+ all_distinct
+--------------
+ t
+(1 row)
+
+DROP TABLE bi_target_volatile;
+-- ============================================================
+-- T10: Zero-row insert
+-- ============================================================
+CREATE TABLE bi_target_zero (id int, val text);
+INSERT INTO bi_target_zero
+SELECT g, 'row-' || g FROM generate_series(1, 100) g WHERE false;
+SELECT count(*) FROM bi_target_zero;
+ count
+-------
+ 0
+(1 row)
+
+DROP TABLE bi_target_zero;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5d4f910155e..ae63c3dd73c 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -36,7 +36,7 @@ test: geometry horology tstypes regex type_sanity opr_sanity misc_sanity comment
# execute two copy tests in parallel, to check that copy itself
# is concurrent safe.
# ----------
-test: copy copyselect copydml copyencoding insert insert_conflict
+test: copy copyselect copydml copyencoding insert insert_conflict insert_buffered
# ----------
# More groups of parallel tests
diff --git a/src/test/regress/sql/insert_buffered.sql b/src/test/regress/sql/insert_buffered.sql
new file mode 100644
index 00000000000..2ca6f3525f4
--- /dev/null
+++ b/src/test/regress/sql/insert_buffered.sql
@@ -0,0 +1,209 @@
+--
+-- Tests for buffered-insert adoption in INSERT INTO ... SELECT (Patch 0005).
+-- Restricted first step: non-partitioned heap target, no ON CONFLICT,
+-- no RETURNING, no BEFORE ROW triggers.
+--
+
+-- ============================================================
+-- T1: Basic bulk insert (exercises multiple auto-flush cycles)
+-- ============================================================
+CREATE TABLE bi_target_basic (id int, val text);
+
+INSERT INTO bi_target_basic
+SELECT g, 'row-' || g FROM generate_series(1, 2000) g;
+
+SELECT count(*) FROM bi_target_basic;
+SELECT min(id), max(id) FROM bi_target_basic;
+
+DROP TABLE bi_target_basic;
+
+-- ============================================================
+-- T2: Indexed target
+-- ============================================================
+CREATE TABLE bi_target_idx (id int, val text);
+CREATE INDEX bi_target_idx_id ON bi_target_idx (id);
+
+INSERT INTO bi_target_idx
+SELECT g, 'row-' || g FROM generate_series(1, 500) g;
+
+SELECT count(*) FROM bi_target_idx;
+
+-- Verify index is usable and correct
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_idx WHERE id BETWEEN 1 AND 500;
+RESET enable_seqscan;
+
+DROP TABLE bi_target_idx;
+
+-- ============================================================
+-- T3: AFTER ROW trigger
+-- ============================================================
+CREATE TABLE bi_target_trig (id int, val text);
+CREATE TABLE bi_audit (id int, val text, logged_at timestamp DEFAULT now());
+
+CREATE FUNCTION bi_audit_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+
+CREATE TRIGGER bi_target_trig_after
+ AFTER INSERT ON bi_target_trig
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_fn();
+
+INSERT INTO bi_target_trig
+SELECT g, 'row-' || g FROM generate_series(1, 50) g;
+
+SELECT count(*) FROM bi_target_trig;
+SELECT count(*) FROM bi_audit;
+
+-- Verify insertion order is preserved
+SELECT bool_and(t.id = a.id) AS order_preserved
+FROM (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_target_trig) t
+JOIN (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_audit) a
+ON t.rn = a.rn;
+
+DROP TABLE bi_target_trig CASCADE;
+DROP TABLE bi_audit;
+DROP FUNCTION bi_audit_fn;
+
+-- ============================================================
+-- T4: Index + AFTER ROW trigger combined
+-- ============================================================
+CREATE TABLE bi_target_combo (id int, val text);
+CREATE INDEX bi_target_combo_id ON bi_target_combo (id);
+CREATE TABLE bi_audit_combo (id int, val text);
+
+CREATE FUNCTION bi_audit_combo_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit_combo (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+
+CREATE TRIGGER bi_target_combo_after
+ AFTER INSERT ON bi_target_combo
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_combo_fn();
+
+INSERT INTO bi_target_combo
+SELECT g, 'row-' || g FROM generate_series(1, 100) g;
+
+SELECT count(*) FROM bi_target_combo;
+SELECT count(*) FROM bi_audit_combo;
+
+-- Verify index correctness
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_combo WHERE id BETWEEN 1 AND 100;
+RESET enable_seqscan;
+
+DROP TABLE bi_target_combo CASCADE;
+DROP TABLE bi_audit_combo;
+DROP FUNCTION bi_audit_combo_fn;
+
+-- ============================================================
+-- T5: ON CONFLICT fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_conflict (id int PRIMARY KEY, val text);
+
+INSERT INTO bi_target_conflict VALUES (1, 'existing');
+
+INSERT INTO bi_target_conflict
+SELECT g, 'row-' || g FROM generate_series(1, 10) g
+ON CONFLICT (id) DO NOTHING;
+
+SELECT count(*) FROM bi_target_conflict;
+SELECT val FROM bi_target_conflict WHERE id = 1;
+
+DROP TABLE bi_target_conflict;
+
+-- ============================================================
+-- T6: RETURNING fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_ret (id int, val text);
+
+INSERT INTO bi_target_ret
+SELECT g, 'row-' || g FROM generate_series(1, 3) g
+RETURNING id, val;
+
+SELECT count(*) FROM bi_target_ret;
+
+DROP TABLE bi_target_ret;
+
+-- ============================================================
+-- T7: BEFORE ROW trigger fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_br (id int, val text);
+
+CREATE FUNCTION bi_br_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ NEW.val := NEW.val || '-modified';
+ RETURN NEW;
+END;
+$$;
+
+CREATE TRIGGER bi_target_br_before
+ BEFORE INSERT ON bi_target_br
+ FOR EACH ROW EXECUTE FUNCTION bi_br_fn();
+
+INSERT INTO bi_target_br
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+
+SELECT count(*) FROM bi_target_br;
+SELECT val FROM bi_target_br WHERE id = 1;
+
+DROP TABLE bi_target_br;
+DROP FUNCTION bi_br_fn;
+
+-- ============================================================
+-- T8: Partitioned target fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_part (id int, val text) PARTITION BY RANGE (id);
+CREATE TABLE bi_target_part_1 PARTITION OF bi_target_part FOR VALUES FROM (1) TO (501);
+CREATE TABLE bi_target_part_2 PARTITION OF bi_target_part FOR VALUES FROM (501) TO (1001);
+
+INSERT INTO bi_target_part
+SELECT g, 'row-' || g FROM generate_series(1, 1000) g;
+
+SELECT count(*) FROM bi_target_part;
+SELECT count(*) FROM bi_target_part_1;
+SELECT count(*) FROM bi_target_part_2;
+
+DROP TABLE bi_target_part;
+
+-- ============================================================
+-- T9: Volatile target-default fallback
+-- Expected to fall back to non-buffered path under E8.
+-- Test validates correctness; path selection is not observable
+-- from SQL output.
+-- ============================================================
+CREATE TABLE bi_target_volatile (
+ id int,
+ val text,
+ rand_val double precision DEFAULT random()
+);
+
+INSERT INTO bi_target_volatile (id, val)
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+
+SELECT count(*) FROM bi_target_volatile;
+-- Verify the volatile default was evaluated (all values should be distinct)
+SELECT count(DISTINCT rand_val) = count(*) AS all_distinct
+FROM bi_target_volatile;
+
+DROP TABLE bi_target_volatile;
+
+-- ============================================================
+-- T10: Zero-row insert
+-- ============================================================
+CREATE TABLE bi_target_zero (id int, val text);
+
+INSERT INTO bi_target_zero
+SELECT g, 'row-' || g FROM generate_series(1, 100) g WHERE false;
+
+SELECT count(*) FROM bi_target_zero;
+
+DROP TABLE bi_target_zero;
--
2.52.0
[application/octet-stream] v1-0002-createas-use-buffered-insert-API-for-CTAS.patch (11.6K, 5-v1-0002-createas-use-buffered-insert-API-for-CTAS.patch)
download | inline diff:
From d6b6cb19c326d79693c2e6238cfbe96f5b0fa0e0 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 12:20:59 -0700
Subject: [PATCH v1 2/5] createas: use buffered-insert API for CTAS
Adopt the buffered-insert lifecycle API in the CTAS dest receiver.
CTAS uses table_buffered_insert_begin()/put()/end() with a NULL flush
callback and falls back to the existing single-row path when needed.
Retain the volatile-function check and EXECUTE-path fallback
conservatively for the initial patch series.
Add focused CTAS regression tests.
---
src/backend/commands/createas.c | 112 ++++++++++++++++++----
src/test/regress/expected/select_into.out | 98 +++++++++++++++++++
src/test/regress/sql/select_into.sql | 44 +++++++++
3 files changed, 234 insertions(+), 20 deletions(-)
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6dbb831ca89..4ba25c9e336 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -40,6 +40,7 @@
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/queryjumble.h"
+#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "rewrite/rewriteHandler.h"
#include "tcop/tcopprot.h"
@@ -57,7 +58,9 @@ typedef struct
ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */
CommandId output_cid; /* cmin to insert in output tuples */
uint32 ti_options; /* table_tuple_insert performance options */
- BulkInsertState bistate; /* bulk insert state */
+ BulkInsertState bistate; /* bulk insert state (fallback path only) */
+ TableBufferedInsertState buffered_state; /* buffered-insert state, or NULL */
+ bool use_buffered_insert; /* true if buffered path is eligible */
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -260,7 +263,15 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
Assert(!is_matview); /* excluded by syntax */
ExecuteQuery(pstate, estmt, into, params, dest, qc);
- /* get object address that intorel_startup saved for us */
+ /*
+ * get object address that intorel_startup saved for us.
+ *
+ * Note: use_buffered_insert stays false (its palloc0 default) for the
+ * EXECUTE path. We conservatively skip the buffered-insert
+ * optimization here because the prepared statement's plan is not
+ * available for volatile-function inspection at this point. Relaxing
+ * this is outside Patch 0002 scope.
+ */
address = ((DR_intorel *) dest)->reladdr;
return address;
@@ -323,6 +334,24 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params, NULL);
+ /*
+ * Conservative implementation choice: disable the buffered-insert
+ * path if the planned target list contains volatile functions.
+ *
+ * The buffered-insert API contract does not require this check — the
+ * CTAS target table is created within this statement and cannot be
+ * referenced by the source query. This guard is retained for the
+ * initial patch series as a caller-local conservatism and can be
+ * relaxed after validation without any API change.
+ */
+ {
+ DR_intorel *myState = (DR_intorel *) dest;
+
+ myState->use_buffered_insert =
+ !contain_volatile_functions_after_planning(
+ (Expr *) plan->planTree->targetlist);
+ }
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only
@@ -564,10 +593,45 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
* If WITH NO DATA is specified, there is no need to set up the state for
* bulk inserts as there are no tuples to insert.
*/
- if (!into->skipData)
- myState->bistate = GetBulkInsertState();
- else
+ if (into->skipData)
+ {
myState->bistate = NULL;
+ myState->buffered_state = NULL;
+ }
+ else if (myState->use_buffered_insert)
+ {
+ /*
+ * Try the buffered-insert path. Pass NULL flush callback -- CTAS
+ * has no indexes, triggers, or per-tuple post-insert work.
+ */
+ myState->ti_options |= TABLE_INSERT_BAS_BULKWRITE;
+ myState->buffered_state =
+ table_buffered_insert_begin(intoRelationDesc,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, NULL);
+
+ if (myState->buffered_state != NULL)
+ {
+ /* Buffered path active; bistate is managed inside the AM */
+ myState->bistate = NULL;
+ }
+ else
+ {
+ /* AM does not support buffered inserts; fall back */
+ myState->bistate = GetBulkInsertState();
+ }
+ }
+ else
+ {
+ /*
+ * Fallback to single-row path. This is reached when the
+ * volatile-function conservative guard fired, or for CTAS via
+ * EXECUTE (where use_buffered_insert stays false by default).
+ */
+ myState->buffered_state = NULL;
+ myState->bistate = GetBulkInsertState();
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -587,19 +651,18 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
/* Nothing to insert if WITH NO DATA is specified. */
if (!myState->into->skipData)
{
- /*
- * Note that the input slot might not be of the type of the target
- * relation. That's supported by table_tuple_insert(), but slightly
- * less efficient than inserting with the right slot - but the
- * alternative would be to copy into a slot of the right type, which
- * would not be cheap either. This also doesn't allow accessing per-AM
- * data (say a tuple's xmin), but since we don't do that here...
- */
- table_tuple_insert(myState->rel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ if (myState->buffered_state != NULL)
+ {
+ table_buffered_insert_put(myState->buffered_state, slot);
+ }
+ else
+ {
+ table_tuple_insert(myState->rel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+ }
}
/* We know this is a newly created relation, so there are no indexes */
@@ -618,8 +681,17 @@ intorel_shutdown(DestReceiver *self)
if (!into->skipData)
{
- FreeBulkInsertState(myState->bistate);
- table_finish_bulk_insert(myState->rel, myState->ti_options);
+ if (myState->buffered_state != NULL)
+ {
+ /* end() flushes remaining tuples and subsumes finish_bulk_insert */
+ table_buffered_insert_end(myState->buffered_state);
+ myState->buffered_state = NULL;
+ }
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->rel, myState->ti_options);
+ }
}
/* close rel, but keep lock until commit */
diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out
index d04ca2b1bf7..36cc783fda5 100644
--- a/src/test/regress/expected/select_into.out
+++ b/src/test/regress/expected/select_into.out
@@ -220,3 +220,101 @@ NOTICE: relation "ctas_ine_tbl" already exists, skipping
(0 rows)
DROP TABLE ctas_ine_tbl;
+--
+-- Tests for CTAS with buffered-insert path.
+--
+-- These tests verify correctness of CTAS results under both the buffered
+-- path (no volatile functions) and the fallback single-row path (volatile
+-- functions present, or EXECUTE). Path selection is not directly observable
+-- in SQL output; these tests validate that results are correct regardless.
+--
+-- Buffered path: small row count, verify contents.
+CREATE TABLE ctas_buffered_1 AS SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM ctas_buffered_1;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM ctas_buffered_1 ORDER BY a;
+ a | b
+---+----
+ 1 | 10
+ 2 | 20
+ 3 | 30
+ 4 | 40
+ 5 | 50
+(5 rows)
+
+DROP TABLE ctas_buffered_1;
+-- Buffered path: enough rows to trigger auto-flush (>1000 slot threshold).
+CREATE TABLE ctas_buffered_2 AS SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM ctas_buffered_2;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM ctas_buffered_2;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+DROP TABLE ctas_buffered_2;
+-- Buffered path: wide tuples to exercise byte-threshold flushing.
+CREATE TABLE ctas_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM ctas_buffered_wide;
+ count
+-------
+ 100
+(1 row)
+
+SELECT id, length(col1), length(col2), length(col3) FROM ctas_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+ id | length | length | length
+-----+--------+--------+--------
+ 1 | 200 | 200 | 200
+ 50 | 200 | 200 | 200
+ 100 | 200 | 200 | 200
+(3 rows)
+
+DROP TABLE ctas_buffered_wide;
+-- Fallback path: random() triggers the conservative volatile-function guard.
+-- Result must still be correct through the single-row insertion path.
+CREATE TABLE ctas_volatile AS SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM ctas_volatile;
+ count
+-------
+ 10
+(1 row)
+
+SELECT a FROM ctas_volatile ORDER BY a;
+ a
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+DROP TABLE ctas_volatile;
+-- WITH NO DATA: no insertion path exercised; verify unchanged behavior.
+CREATE TABLE ctas_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM ctas_nodata;
+ count
+-------
+ 0
+(1 row)
+
+DROP TABLE ctas_nodata;
diff --git a/src/test/regress/sql/select_into.sql b/src/test/regress/sql/select_into.sql
index f71e3940e0a..8147113cee3 100644
--- a/src/test/regress/sql/select_into.sql
+++ b/src/test/regress/sql/select_into.sql
@@ -136,3 +136,47 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
CREATE TABLE IF NOT EXISTS ctas_ine_tbl AS EXECUTE ctas_ine_query; -- ok
DROP TABLE ctas_ine_tbl;
+
+--
+-- Tests for CTAS with buffered-insert path.
+--
+-- These tests verify correctness of CTAS results under both the buffered
+-- path (no volatile functions) and the fallback single-row path (volatile
+-- functions present, or EXECUTE). Path selection is not directly observable
+-- in SQL output; these tests validate that results are correct regardless.
+--
+
+-- Buffered path: small row count, verify contents.
+CREATE TABLE ctas_buffered_1 AS SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM ctas_buffered_1;
+SELECT * FROM ctas_buffered_1 ORDER BY a;
+DROP TABLE ctas_buffered_1;
+
+-- Buffered path: enough rows to trigger auto-flush (>1000 slot threshold).
+CREATE TABLE ctas_buffered_2 AS SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM ctas_buffered_2;
+SELECT min(a), max(a) FROM ctas_buffered_2;
+DROP TABLE ctas_buffered_2;
+
+-- Buffered path: wide tuples to exercise byte-threshold flushing.
+CREATE TABLE ctas_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM ctas_buffered_wide;
+SELECT id, length(col1), length(col2), length(col3) FROM ctas_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+DROP TABLE ctas_buffered_wide;
+
+-- Fallback path: random() triggers the conservative volatile-function guard.
+-- Result must still be correct through the single-row insertion path.
+CREATE TABLE ctas_volatile AS SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM ctas_volatile;
+SELECT a FROM ctas_volatile ORDER BY a;
+DROP TABLE ctas_volatile;
+
+-- WITH NO DATA: no insertion path exercised; verify unchanged behavior.
+CREATE TABLE ctas_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM ctas_nodata;
+DROP TABLE ctas_nodata;
--
2.52.0
[application/octet-stream] v1-0003-matview-use-buffered-insert-API-for-CMV-and-RMV.patch (12.0K, 6-v1-0003-matview-use-buffered-insert-API-for-CMV-and-RMV.patch)
download | inline diff:
From 0b810edcdc5e80ab4ec7d77a8d6687fdedd73b32 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 12:43:31 -0700
Subject: [PATCH v1 3/5] matview: use buffered-insert API for CMV and RMV
Adopt the buffered-insert lifecycle API in the transientrel datafill
path used by CREATE MATERIALIZED VIEW and REFRESH MATERIALIZED VIEW.
The path uses table_buffered_insert_begin()/put()/end() with a NULL
flush callback and falls back to the existing single-row path when
needed.
Retain the volatile-function check conservatively for the initial patch
series. Concurrent refresh is unchanged.
Add focused CMV/RMV regression tests.
---
src/backend/commands/matview.c | 99 +++++++++++++----
src/test/regress/expected/matview.out | 152 ++++++++++++++++++++++++++
src/test/regress/sql/matview.sql | 59 ++++++++++
3 files changed, 291 insertions(+), 19 deletions(-)
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index f7d8007f796..7de4acbdfb1 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -31,6 +31,7 @@
#include "executor/executor.h"
#include "executor/spi.h"
#include "miscadmin.h"
+#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lmgr.h"
@@ -50,7 +51,9 @@ typedef struct
Relation transientrel; /* relation to write to */
CommandId output_cid; /* cmin to insert in output tuples */
uint32 ti_options; /* table_tuple_insert performance options */
- BulkInsertState bistate; /* bulk insert state */
+ BulkInsertState bistate; /* bulk insert state (fallback path only) */
+ TableBufferedInsertState buffered_state; /* buffered-insert state, or NULL */
+ bool use_buffered_insert; /* true if buffered path is eligible */
} DR_transientrel;
static int matview_maintenance_depth = 0;
@@ -427,6 +430,24 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL, NULL);
+ /*
+ * Conservative implementation choice: disable the buffered-insert path
+ * if the planned target list contains volatile functions.
+ *
+ * The buffered-insert API contract does not require this check — the
+ * matview's defining query was parsed at creation time and cannot
+ * reference the transient target table. This guard is retained for the
+ * initial patch series as a caller-local conservatism and can be relaxed
+ * after validation without any API change.
+ */
+ {
+ DR_transientrel *myState = (DR_transientrel *) dest;
+
+ myState->use_buffered_insert =
+ !contain_volatile_functions_after_planning(
+ (Expr *) plan->planTree->targetlist);
+ }
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
@@ -492,7 +513,40 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
- myState->bistate = GetBulkInsertState();
+
+ if (myState->use_buffered_insert)
+ {
+ /*
+ * Try the buffered-insert path. Pass NULL flush callback — the
+ * transient table has no indexes, triggers, or per-tuple post-insert
+ * work during the datafill phase.
+ */
+ myState->ti_options |= TABLE_INSERT_BAS_BULKWRITE;
+ myState->buffered_state =
+ table_buffered_insert_begin(transientrel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, NULL);
+
+ if (myState->buffered_state != NULL)
+ {
+ myState->bistate = NULL;
+ }
+ else
+ {
+ /* AM does not support buffered inserts; fall back */
+ myState->bistate = GetBulkInsertState();
+ }
+ }
+ else
+ {
+ /*
+ * Buffered insertion not selected for this datafill. Currently this
+ * is reached when the conservative volatile-function guard fires.
+ */
+ myState->buffered_state = NULL;
+ myState->bistate = GetBulkInsertState();
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -509,20 +563,19 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- /*
- * Note that the input slot might not be of the type of the target
- * relation. That's supported by table_tuple_insert(), but slightly less
- * efficient than inserting with the right slot - but the alternative
- * would be to copy into a slot of the right type, which would not be
- * cheap either. This also doesn't allow accessing per-AM data (say a
- * tuple's xmin), but since we don't do that here...
- */
-
- table_tuple_insert(myState->transientrel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ /* Both paths accept the caller-provided slot directly. */
+ if (myState->buffered_state != NULL)
+ {
+ table_buffered_insert_put(myState->buffered_state, slot);
+ }
+ else
+ {
+ table_tuple_insert(myState->transientrel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+ }
/* We know this is a newly created relation, so there are no indexes */
@@ -537,9 +590,17 @@ transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- FreeBulkInsertState(myState->bistate);
-
- table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ if (myState->buffered_state != NULL)
+ {
+ /* end() flushes remaining tuples and subsumes finish_bulk_insert */
+ table_buffered_insert_end(myState->buffered_state);
+ myState->buffered_state = NULL;
+ }
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ }
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
diff --git a/src/test/regress/expected/matview.out b/src/test/regress/expected/matview.out
index 0355720dfc6..330ea4989f2 100644
--- a/src/test/regress/expected/matview.out
+++ b/src/test/regress/expected/matview.out
@@ -699,3 +699,155 @@ NOTICE: relation "matview_ine_tab" already exists, skipping
(0 rows)
DROP MATERIALIZED VIEW matview_ine_tab;
+--
+-- Tests for CMV/RMV with buffered-insert path.
+--
+-- These tests verify correctness under both the buffered path (no volatile
+-- functions) and the fallback single-row path (volatile functions present).
+-- Path selection is not directly observable in SQL output.
+--
+-- CMV: basic correctness
+CREATE MATERIALIZED VIEW mv_buffered_1 AS
+ SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM mv_buffered_1;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM mv_buffered_1 ORDER BY a;
+ a | b
+---+----
+ 1 | 10
+ 2 | 20
+ 3 | 30
+ 4 | 40
+ 5 | 50
+(5 rows)
+
+-- RMV: refresh repopulates correctly
+REFRESH MATERIALIZED VIEW mv_buffered_1;
+SELECT count(*) FROM mv_buffered_1;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM mv_buffered_1 ORDER BY a;
+ a | b
+---+----
+ 1 | 10
+ 2 | 20
+ 3 | 30
+ 4 | 40
+ 5 | 50
+(5 rows)
+
+DROP MATERIALIZED VIEW mv_buffered_1;
+-- CMV + RMV: bulk case to exercise auto-flush (>1000 rows)
+CREATE MATERIALIZED VIEW mv_buffered_bulk AS
+ SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM mv_buffered_bulk;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+REFRESH MATERIALIZED VIEW mv_buffered_bulk;
+SELECT count(*) FROM mv_buffered_bulk;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+DROP MATERIALIZED VIEW mv_buffered_bulk;
+-- Wide tuples: verify no regression
+CREATE MATERIALIZED VIEW mv_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM mv_buffered_wide;
+ count
+-------
+ 100
+(1 row)
+
+SELECT id, length(col1), length(col2), length(col3)
+ FROM mv_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+ id | length | length | length
+-----+--------+--------+--------
+ 1 | 200 | 200 | 200
+ 50 | 200 | 200 | 200
+ 100 | 200 | 200 | 200
+(3 rows)
+
+DROP MATERIALIZED VIEW mv_buffered_wide;
+-- Volatile-function fallback: random() triggers conservative guard.
+-- Result must still be correct through the single-row path.
+CREATE MATERIALIZED VIEW mv_volatile AS
+ SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM mv_volatile;
+ count
+-------
+ 10
+(1 row)
+
+SELECT a FROM mv_volatile ORDER BY a;
+ a
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+REFRESH MATERIALIZED VIEW mv_volatile;
+SELECT count(*) FROM mv_volatile;
+ count
+-------
+ 10
+(1 row)
+
+SELECT a FROM mv_volatile ORDER BY a;
+ a
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+DROP MATERIALIZED VIEW mv_volatile;
+-- WITH NO DATA: unchanged behavior
+CREATE MATERIALIZED VIEW mv_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM mv_nodata; -- error: not populated
+ERROR: materialized view "mv_nodata" has not been populated
+HINT: Use the REFRESH MATERIALIZED VIEW command.
+REFRESH MATERIALIZED VIEW mv_nodata WITH NO DATA;
+DROP MATERIALIZED VIEW mv_nodata;
diff --git a/src/test/regress/sql/matview.sql b/src/test/regress/sql/matview.sql
index 934426b9ae8..43ceee50e90 100644
--- a/src/test/regress/sql/matview.sql
+++ b/src/test/regress/sql/matview.sql
@@ -318,3 +318,62 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, BUFFERS OFF)
CREATE MATERIALIZED VIEW IF NOT EXISTS matview_ine_tab AS
SELECT 1 / 0 WITH NO DATA; -- ok
DROP MATERIALIZED VIEW matview_ine_tab;
+
+--
+-- Tests for CMV/RMV with buffered-insert path.
+--
+-- These tests verify correctness under both the buffered path (no volatile
+-- functions) and the fallback single-row path (volatile functions present).
+-- Path selection is not directly observable in SQL output.
+--
+
+-- CMV: basic correctness
+CREATE MATERIALIZED VIEW mv_buffered_1 AS
+ SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM mv_buffered_1;
+SELECT * FROM mv_buffered_1 ORDER BY a;
+
+-- RMV: refresh repopulates correctly
+REFRESH MATERIALIZED VIEW mv_buffered_1;
+SELECT count(*) FROM mv_buffered_1;
+SELECT * FROM mv_buffered_1 ORDER BY a;
+DROP MATERIALIZED VIEW mv_buffered_1;
+
+-- CMV + RMV: bulk case to exercise auto-flush (>1000 rows)
+CREATE MATERIALIZED VIEW mv_buffered_bulk AS
+ SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM mv_buffered_bulk;
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+REFRESH MATERIALIZED VIEW mv_buffered_bulk;
+SELECT count(*) FROM mv_buffered_bulk;
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+DROP MATERIALIZED VIEW mv_buffered_bulk;
+
+-- Wide tuples: verify no regression
+CREATE MATERIALIZED VIEW mv_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM mv_buffered_wide;
+SELECT id, length(col1), length(col2), length(col3)
+ FROM mv_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+DROP MATERIALIZED VIEW mv_buffered_wide;
+
+-- Volatile-function fallback: random() triggers conservative guard.
+-- Result must still be correct through the single-row path.
+CREATE MATERIALIZED VIEW mv_volatile AS
+ SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM mv_volatile;
+SELECT a FROM mv_volatile ORDER BY a;
+REFRESH MATERIALIZED VIEW mv_volatile;
+SELECT count(*) FROM mv_volatile;
+SELECT a FROM mv_volatile ORDER BY a;
+DROP MATERIALIZED VIEW mv_volatile;
+
+-- WITH NO DATA: unchanged behavior
+CREATE MATERIALIZED VIEW mv_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM mv_nodata; -- error: not populated
+REFRESH MATERIALIZED VIEW mv_nodata WITH NO DATA;
+DROP MATERIALIZED VIEW mv_nodata;
--
2.52.0
[application/octet-stream] v1-0004-copy-adopt-buffered-insert-API-for-COPY-FROM.patch (18.4K, 7-v1-0004-copy-adopt-buffered-insert-API-for-COPY-FROM.patch)
download | inline diff:
From 696e8bc0b5606385444d6f89397aa3e3b6801d43 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 13:42:46 -0700
Subject: [PATCH v1 4/5] copy: adopt buffered-insert API for COPY FROM
Adopt the buffered-insert lifecycle API in COPY FROM for non-FDW heap
targets, moving buffering ownership from COPY's local multi-insert
infrastructure into the table AM.
COPY's existing eligibility policy remains unchanged. The non-FDW
buffered path now uses table_buffered_insert_begin()/put()/flush()/end()
with a COPY-local flush callback for per-tuple post-insert work,
including index updates and AFTER ROW INSERT triggers. FDW batching is
unchanged.
Add focused COPY regression coverage for basic correctness, bulk input,
indexed targets, and AFTER ROW trigger behavior.
---
src/backend/commands/copyfrom.c | 185 +++++++++++++++++++++++++++--
src/test/regress/expected/copy.out | 166 ++++++++++++++++++++++++++
src/test/regress/sql/copy.sql | 102 ++++++++++++++++
3 files changed, 440 insertions(+), 13 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 64ac3063c61..f733bd2d296 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -76,6 +76,8 @@
*/
#define MAX_PARTITION_BUFFERS 32
+typedef struct CopyBufferedFlushState CopyBufferedFlushState;
+
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
@@ -83,6 +85,9 @@ typedef struct CopyMultiInsertBuffer
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel if plain
* table; NULL if foreign table */
+ TableBufferedInsertState buffered_state; /* AM-owned buffered-insert
+ * state, or NULL */
+ CopyBufferedFlushState *flush_ctx; /* flush callback context, or NULL */
int nused; /* number of 'slots' containing tuples */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
@@ -102,8 +107,24 @@ typedef struct CopyMultiInsertInfo
EState *estate; /* Executor state used for COPY */
CommandId mycid; /* Command Id used for COPY */
uint32 ti_options; /* table insert options */
+ int64 *processed; /* pointer to CopyFrom's row counter */
} CopyMultiInsertInfo;
+/*
+ * Context for the buffered-insert flush callback used by COPY. Carries the
+ * state needed to perform per-tuple post-insert work (index updates, AFTER
+ * ROW INSERT triggers, error context, progress tracking).
+ */
+typedef struct CopyBufferedFlushState
+{
+ CopyFromState cstate;
+ EState *estate;
+ ResultRelInfo *resultRelInfo;
+ int64 *processed; /* pointer to CopyFrom's processed counter */
+} CopyBufferedFlushState;
+
+static void CopyBufferedFlushCallback(void *context, TupleTableSlot *slot);
+
/* non-export function prototypes */
static void ClosePipeFromProgram(CopyFromState cstate);
@@ -362,15 +383,57 @@ CopyLimitPrintoutLength(const char *str)
* ResultRelInfo.
*/
static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer;
buffer = palloc_object(CopyMultiInsertBuffer);
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->resultRelInfo = rri;
- buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
+ buffer->buffered_state = NULL;
+ buffer->flush_ctx = NULL;
+
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ /*
+ * Non-FDW table: try the AM-owned buffered-insert path. The flush
+ * callback handles index updates and AFTER ROW INSERT triggers.
+ */
+ CopyBufferedFlushState *flush_ctx;
+
+ flush_ctx = palloc_object(CopyBufferedFlushState);
+ flush_ctx->cstate = miinfo->cstate;
+ flush_ctx->estate = miinfo->estate;
+ flush_ctx->resultRelInfo = rri;
+ flush_ctx->processed = miinfo->processed;
+
+ buffer->buffered_state =
+ table_buffered_insert_begin(rri->ri_RelationDesc,
+ miinfo->mycid,
+ miinfo->ti_options |
+ TABLE_INSERT_BAS_BULKWRITE,
+ CopyBufferedFlushCallback,
+ flush_ctx);
+
+ if (buffer->buffered_state != NULL)
+ {
+ /* AM-owned buffering active; no COPY-side bistate needed */
+ buffer->bistate = NULL;
+ buffer->flush_ctx = flush_ctx;
+ }
+ else
+ {
+ /* AM does not support buffered inserts; fall back */
+ pfree(flush_ctx);
+ buffer->bistate = GetBulkInsertState();
+ }
+ }
+ else
+ {
+ /* FDW table: no buffered-insert, no bistate */
+ buffer->bistate = NULL;
+ }
return buffer;
}
@@ -384,7 +447,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
{
CopyMultiInsertBuffer *buffer;
- buffer = CopyMultiInsertBufferInit(rri);
+ buffer = CopyMultiInsertBufferInit(miinfo, rri);
/* Setup back-link so we can easily find this buffer again */
rri->ri_CopyMultiInsertBuffer = buffer;
@@ -401,7 +464,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
static void
CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyFromState cstate, EState *estate, CommandId mycid,
- uint32 ti_options)
+ uint32 ti_options, int64 *processed)
{
miinfo->multiInsertBuffers = NIL;
miinfo->bufferedTuples = 0;
@@ -410,6 +473,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
miinfo->estate = estate;
miinfo->mycid = mycid;
miinfo->ti_options = ti_options;
+ miinfo->processed = processed;
/*
* Only setup the buffer when not dealing with a partitioned table.
@@ -532,6 +596,32 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
/* reset relname_only */
cstate->relname_only = false;
}
+ else if (buffer->buffered_state != NULL)
+ {
+ bool line_buf_valid = cstate->line_buf_valid;
+ uint64 save_cur_lineno = cstate->cur_lineno;
+
+ /*
+ * AM-owned buffered path: tuples were submitted via put() and are
+ * inside the AM already. Flush triggers the batch write and invokes
+ * the flush callback once per written tuple for index updates,
+ * trigger firing, and progress tracking.
+ */
+ cstate->line_buf_valid = false;
+
+ table_buffered_insert_flush(buffer->buffered_state);
+
+ /* Clear slots that were used for staging before put() */
+ for (i = 0; i < nused; i++)
+ {
+ if (slots[i] != NULL)
+ ExecClearTuple(slots[i]);
+ }
+
+ /* reset cur_lineno and line_buf_valid to what they were */
+ cstate->line_buf_valid = line_buf_valid;
+ cstate->cur_lineno = save_cur_lineno;
+ }
else
{
CommandId mycid = miinfo->mycid;
@@ -631,10 +721,20 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
/* Remove back-link to ourself */
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
- if (resultRelInfo->ri_FdwRoutine == NULL)
+ if (buffer->buffered_state != NULL)
+ {
+ /* end() flushes remaining tuples and subsumes finish_bulk_insert */
+ table_buffered_insert_end(buffer->buffered_state);
+ buffer->buffered_state = NULL;
+ if (buffer->flush_ctx != NULL)
+ pfree(buffer->flush_ctx);
+ }
+ else if (resultRelInfo->ri_FdwRoutine == NULL)
{
Assert(buffer->bistate != NULL);
FreeBulkInsertState(buffer->bistate);
+ table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
+ miinfo->ti_options);
}
else
Assert(buffer->bistate == NULL);
@@ -643,10 +743,6 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
- if (resultRelInfo->ri_FdwRoutine == NULL)
- table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
- miinfo->ti_options);
-
pfree(buffer);
}
@@ -761,12 +857,26 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
Assert(buffer != NULL);
- Assert(slot == buffer->slots[buffer->nused]);
- /* Store the line number so we can properly report any errors later */
+ /* Store the line number for error context during flush */
buffer->linenos[buffer->nused] = lineno;
- /* Record this slot as being used */
+ if (buffer->buffered_state != NULL)
+ {
+ /*
+ * AM-owned buffered path: submit the tuple directly to the AM.
+ * The AM captures the data internally; the caller retains slot
+ * ownership and may reuse it. The AM may auto-flush during put(),
+ * which fires the flush callback for already-buffered tuples.
+ */
+ table_buffered_insert_put(buffer->buffered_state, slot);
+ }
+ else
+ {
+ /* Legacy path: slot was already placed into buffer->slots by caller */
+ Assert(slot == buffer->slots[buffer->nused]);
+ }
+
buffer->nused++;
/* Update how many tuples are stored and their size */
@@ -774,6 +884,55 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
miinfo->bufferedBytes += tuplen;
}
+/*
+ * Flush callback for the AM-owned buffered-insert path.
+ *
+ * Invoked once per flushed tuple, in insertion order. The slot is an
+ * AM-owned scratch object with TID set; it is valid only for the duration
+ * of this callback.
+ */
+static void
+CopyBufferedFlushCallback(void *context, TupleTableSlot *slot)
+{
+ CopyBufferedFlushState *ctx = (CopyBufferedFlushState *) context;
+ ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+ EState *estate = ctx->estate;
+
+ /*
+ * cstate->cur_lineno is not restored per-tuple here. During auto-flush
+ * (triggered inside put()), it reflects the line being stored, which is
+ * the best available context. During explicit flush, the caller has
+ * already set line_buf_valid = false, so only the relation name appears
+ * in error context — matching the legacy multi-insert path.
+ */
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+ estate, 0, slot,
+ NIL, NULL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, recheckIndexes,
+ ctx->cstate->transition_capture);
+ list_free(recheckIndexes);
+ }
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, NIL,
+ ctx->cstate->transition_capture);
+ }
+
+ /* Update row counter and progress */
+ (*ctx->processed)++;
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ *ctx->processed);
+}
+
/*
* Copy FROM file to relation.
*/
@@ -1073,7 +1232,7 @@ CopyFrom(CopyFromState cstate)
insertMethod = CIM_MULTI;
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
- estate, mycid, ti_options);
+ estate, mycid, ti_options, &processed);
}
/*
diff --git a/src/test/regress/expected/copy.out b/src/test/regress/expected/copy.out
index 1714faab39c..ec8e781c5ce 100644
--- a/src/test/regress/expected/copy.out
+++ b/src/test/regress/expected/copy.out
@@ -594,3 +594,169 @@ id val
5 15
6 16
DROP TABLE PP;
+--
+-- Tests for COPY FROM with buffered-insert path.
+--
+-- These validate correctness of COPY FROM under the AM-owned buffered path.
+-- Path selection is not directly observable in SQL output.
+--
+-- Basic COPY: small row count
+CREATE TABLE copy_buffered_basic (a int, b text);
+COPY copy_buffered_basic FROM stdin;
+SELECT count(*) FROM copy_buffered_basic;
+ count
+-------
+ 3
+(1 row)
+
+SELECT * FROM copy_buffered_basic ORDER BY a;
+ a | b
+---+-------
+ 1 | hello
+ 2 | world
+ 3 | foo
+(3 rows)
+
+DROP TABLE copy_buffered_basic;
+-- Bulk COPY: exercise buffered path with enough rows for auto-flush.
+-- Generate data in a source table, COPY TO a file, then COPY FROM that file.
+CREATE TABLE copy_bulk_src (a int, b text);
+INSERT INTO copy_bulk_src SELECT g, 'row-' || g FROM generate_series(1, 2500) g;
+CREATE TABLE copy_bulk_dst (a int, b text);
+COPY copy_bulk_src TO '/tmp/copy_buffered_bulk_test.csv' CSV;
+COPY copy_bulk_dst FROM '/tmp/copy_buffered_bulk_test.csv' CSV;
+SELECT count(*) FROM copy_bulk_dst;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM copy_bulk_dst;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+-- Verify data integrity
+SELECT count(*) FROM copy_bulk_dst d JOIN copy_bulk_src s ON d.a = s.a AND d.b = s.b;
+ count
+-------
+ 2500
+(1 row)
+
+DROP TABLE copy_bulk_src;
+DROP TABLE copy_bulk_dst;
+-- COPY into indexed table: verify index maintenance via flush callback
+CREATE TABLE copy_indexed (a int PRIMARY KEY, b text);
+COPY copy_indexed FROM stdin;
+SELECT count(*) FROM copy_indexed;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM copy_indexed ORDER BY a;
+ a | b
+---+---------
+ 1 | alpha
+ 2 | beta
+ 3 | gamma
+ 4 | delta
+ 5 | epsilon
+(5 rows)
+
+-- Verify index is usable
+SET enable_seqscan = off;
+SELECT a FROM copy_indexed WHERE a = 3;
+ a
+---
+ 3
+(1 row)
+
+RESET enable_seqscan;
+DROP TABLE copy_indexed;
+-- COPY with AFTER ROW INSERT trigger: verify trigger fires via flush callback
+CREATE TABLE copy_trigger_tgt (a int, b text);
+CREATE TABLE copy_trigger_log (a int, b text);
+CREATE FUNCTION copy_trigger_fn() RETURNS trigger AS $$
+BEGIN
+ INSERT INTO copy_trigger_log(a, b) VALUES (NEW.a, NEW.b);
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+CREATE TRIGGER copy_after_ins
+ AFTER INSERT ON copy_trigger_tgt
+ FOR EACH ROW EXECUTE FUNCTION copy_trigger_fn();
+COPY copy_trigger_tgt FROM stdin;
+SELECT count(*) FROM copy_trigger_tgt;
+ count
+-------
+ 3
+(1 row)
+
+SELECT a, b FROM copy_trigger_tgt ORDER BY a;
+ a | b
+----+--------
+ 10 | first
+ 20 | second
+ 30 | third
+(3 rows)
+
+-- Verify trigger fired for each row
+SELECT count(*) FROM copy_trigger_log;
+ count
+-------
+ 3
+(1 row)
+
+SELECT a, b FROM copy_trigger_log ORDER BY a;
+ a | b
+----+--------
+ 10 | first
+ 20 | second
+ 30 | third
+(3 rows)
+
+DROP TABLE copy_trigger_tgt CASCADE;
+DROP TABLE copy_trigger_log;
+DROP FUNCTION copy_trigger_fn;
+-- Partitioned COPY: verify correctness under partition routing with buffered path
+CREATE TABLE copy_part_routed (
+ id int,
+ val text
+) PARTITION BY RANGE (id);
+CREATE TABLE copy_part_routed_p1 PARTITION OF copy_part_routed
+ FOR VALUES FROM (1) TO (100);
+CREATE TABLE copy_part_routed_p2 PARTITION OF copy_part_routed
+ FOR VALUES FROM (100) TO (200);
+COPY copy_part_routed FROM stdin;
+-- Total row count
+SELECT count(*) FROM copy_part_routed;
+ count
+-------
+ 6
+(1 row)
+
+-- Per-partition row counts
+SELECT tableoid::regclass, count(*)
+FROM copy_part_routed
+GROUP BY tableoid ORDER BY tableoid::regclass::name;
+ tableoid | count
+---------------------+-------
+ copy_part_routed_p1 | 3
+ copy_part_routed_p2 | 3
+(2 rows)
+
+-- Content correctness
+SELECT * FROM copy_part_routed ORDER BY id;
+ id | val
+-----+---------
+ 1 | alpha
+ 50 | bravo
+ 99 | charlie
+ 100 | delta
+ 150 | echo
+ 199 | foxtrot
+(6 rows)
+
+DROP TABLE copy_part_routed;
diff --git a/src/test/regress/sql/copy.sql b/src/test/regress/sql/copy.sql
index eaad290b257..e4a10e8cf95 100644
--- a/src/test/regress/sql/copy.sql
+++ b/src/test/regress/sql/copy.sql
@@ -535,3 +535,105 @@ CREATE TABLE pp_510 PARTITION OF pp_2 FOR VALUES FROM (5) TO (10);
INSERT INTO pp SELECT g, 10 + g FROM generate_series(1,6) g;
COPY pp TO stdout(header);
DROP TABLE PP;
+
+--
+-- Tests for COPY FROM with buffered-insert path.
+--
+-- These validate correctness of COPY FROM under the AM-owned buffered path.
+-- Path selection is not directly observable in SQL output.
+--
+
+-- Basic COPY: small row count
+CREATE TABLE copy_buffered_basic (a int, b text);
+COPY copy_buffered_basic FROM stdin;
+1 hello
+2 world
+3 foo
+\.
+SELECT count(*) FROM copy_buffered_basic;
+SELECT * FROM copy_buffered_basic ORDER BY a;
+DROP TABLE copy_buffered_basic;
+
+-- Bulk COPY: exercise buffered path with enough rows for auto-flush.
+-- Generate data in a source table, COPY TO a file, then COPY FROM that file.
+CREATE TABLE copy_bulk_src (a int, b text);
+INSERT INTO copy_bulk_src SELECT g, 'row-' || g FROM generate_series(1, 2500) g;
+CREATE TABLE copy_bulk_dst (a int, b text);
+COPY copy_bulk_src TO '/tmp/copy_buffered_bulk_test.csv' CSV;
+COPY copy_bulk_dst FROM '/tmp/copy_buffered_bulk_test.csv' CSV;
+SELECT count(*) FROM copy_bulk_dst;
+SELECT min(a), max(a) FROM copy_bulk_dst;
+-- Verify data integrity
+SELECT count(*) FROM copy_bulk_dst d JOIN copy_bulk_src s ON d.a = s.a AND d.b = s.b;
+DROP TABLE copy_bulk_src;
+DROP TABLE copy_bulk_dst;
+
+-- COPY into indexed table: verify index maintenance via flush callback
+CREATE TABLE copy_indexed (a int PRIMARY KEY, b text);
+COPY copy_indexed FROM stdin;
+1 alpha
+2 beta
+3 gamma
+4 delta
+5 epsilon
+\.
+SELECT count(*) FROM copy_indexed;
+SELECT * FROM copy_indexed ORDER BY a;
+-- Verify index is usable
+SET enable_seqscan = off;
+SELECT a FROM copy_indexed WHERE a = 3;
+RESET enable_seqscan;
+DROP TABLE copy_indexed;
+
+-- COPY with AFTER ROW INSERT trigger: verify trigger fires via flush callback
+CREATE TABLE copy_trigger_tgt (a int, b text);
+CREATE TABLE copy_trigger_log (a int, b text);
+CREATE FUNCTION copy_trigger_fn() RETURNS trigger AS $$
+BEGIN
+ INSERT INTO copy_trigger_log(a, b) VALUES (NEW.a, NEW.b);
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+CREATE TRIGGER copy_after_ins
+ AFTER INSERT ON copy_trigger_tgt
+ FOR EACH ROW EXECUTE FUNCTION copy_trigger_fn();
+COPY copy_trigger_tgt FROM stdin;
+10 first
+20 second
+30 third
+\.
+SELECT count(*) FROM copy_trigger_tgt;
+SELECT a, b FROM copy_trigger_tgt ORDER BY a;
+-- Verify trigger fired for each row
+SELECT count(*) FROM copy_trigger_log;
+SELECT a, b FROM copy_trigger_log ORDER BY a;
+DROP TABLE copy_trigger_tgt CASCADE;
+DROP TABLE copy_trigger_log;
+DROP FUNCTION copy_trigger_fn;
+
+-- Partitioned COPY: verify correctness under partition routing with buffered path
+CREATE TABLE copy_part_routed (
+ id int,
+ val text
+) PARTITION BY RANGE (id);
+CREATE TABLE copy_part_routed_p1 PARTITION OF copy_part_routed
+ FOR VALUES FROM (1) TO (100);
+CREATE TABLE copy_part_routed_p2 PARTITION OF copy_part_routed
+ FOR VALUES FROM (100) TO (200);
+COPY copy_part_routed FROM stdin;
+1 alpha
+50 bravo
+99 charlie
+100 delta
+150 echo
+199 foxtrot
+\.
+-- Total row count
+SELECT count(*) FROM copy_part_routed;
+-- Per-partition row counts
+SELECT tableoid::regclass, count(*)
+FROM copy_part_routed
+GROUP BY tableoid ORDER BY tableoid::regclass::name;
+-- Content correctness
+SELECT * FROM copy_part_routed ORDER BY id;
+DROP TABLE copy_part_routed;
--
2.52.0
^ permalink raw reply [nested|flat] 8+ messages in thread
end of thread, other threads:[~2026-04-28 04:27 UTC | newest]
Thread overview: 8+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2024-10-29 03:18 Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM Jingtang Zhang <[email protected]>
2024-10-30 17:51 ` Bharath Rupireddy <[email protected]>
2024-10-31 04:17 ` Jingtang Zhang <[email protected]>
2025-03-09 11:27 ` Daniil Davydov <[email protected]>
2025-03-17 04:49 ` Daniil Davydov <[email protected]>
2025-04-06 13:54 ` Jingtang Zhang <[email protected]>
2025-04-07 06:26 ` Daniil Davydov <[email protected]>
2026-04-28 04:27 ` Haibo Yan <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox