public inbox for [email protected]
help / color / mirror / Atom feedFrom: Haibo Yan <[email protected]>
To: Daniil Davydov <[email protected]>
Cc: Jingtang Zhang <[email protected]>
Cc: [email protected]
Cc: PostgreSQL-development <[email protected]>
Cc: Bharath Rupireddy <[email protected]>
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
Date: Mon, 27 Apr 2026 21:27:16 -0700
Message-ID: <CABXr29ExTDUoWZsiA0w+mkrjkJWKhPdiseM7vHJgHY-Mwq9PNg@mail.gmail.com> (raw)
In-Reply-To: <CAJDiXgiD4G0rBwWk6=0TTy9AYHi4LmPL6AS5VpMLf8OEbAxEhw@mail.gmail.com>
References: <CALj2ACXdrOmB6Na9amHWZHKvRT3Z0nwTRsCwoMT-npOBtmXLXg@mail.gmail.com>
<[email protected]>
<CALj2ACX5UMWVFdrRNUE0KDrg54WV1cumBXwcETXhrPc1ibKAQA@mail.gmail.com>
<CAAWbhmj5Pio3nOUakObzLGCSS9dwFfgsNVDhwTGzXNwZc00uCQ@mail.gmail.com>
<CALj2ACVHC=c6eC9SRxhcTUrnXvNDNkEBgedi2WkVJYRb=0sWYw@mail.gmail.com>
<CALj2ACVE2h=LnFnpr3rh+6SZzdwzW5EZOYG2Z0t=p28Fn75eag@mail.gmail.com>
<CALj2ACWT0Rz8oybWBm5W4CeS0DvFkwaw-pEvGArhDLyPbZnW_g@mail.gmail.com>
<CALj2ACWxO3HPtpYZb765LZk-uKVuAvZPO1HDeZ8=mzMgVPgaww@mail.gmail.com>
<CALj2ACXJA4QQ_6zAHez0Uy-9t-ebmpox2y1QBja+mF4QP+h8WQ@mail.gmail.com>
<CAD21AoD97mhzF8cqsd2v1jg9z8xfvAJrPx6Wvi+Ev0Hmu96LJA@mail.gmail.com>
<CALj2ACUcv5pZoB0=gRrz54M9+YT9JCmo6FYyo5WUS6wnS+em=A@mail.gmail.com>
<CALj2ACWm77YofBMs9x3Zmp3ctNAhcS4TvPVuXKdfwCr22FqOHg@mail.gmail.com>
<[email protected]>
<CALj2ACWqVzhxDuWNTWAH-LuADvsyX0r-wpwgeJ+Q1FnAKjY5Yw@mail.gmail.com>
<[email protected]>
<CALj2ACU70HZm+0QRJdkGA5RdJUo4zPYnV2hzkiV-wH5QS2PAEQ@mail.gmail.com>
<[email protected]>
<CALj2ACVMV=gMROte2=0LBFnSCRvzL4D9WK6oQ9ZHr4Qj2S8xWA@mail.gmail.com>
<[email protected]>
<CALj2ACX9vVYHYkX8e6w058EuAs8JL5EsnzadTxGhpiE_Ep_ByA@mail.gmail.com>
<[email protected]>
<CALj2ACWTrx1zxWvq8Uj2rEwCsDgQHeJ53WdvzZUw3kW+_VPG6A@mail.gmail.com>
<CALj2ACUz5+_YNEa4ZY-XG960_oXefM50MjD71VgSCAVDkF3bzQ@mail.gmail.com>
<[email protected]>
<CALj2ACX90L5Mb5Vv=jsvhOdZ8BVsfpZf-CdCGhtm2N+bGUCSjg@mail.gmail.com>
<CALj2ACXnCvcYvaz4aDH_ezUBJanz_Boi8W=76fOwrxiwSnUFOQ@mail.gmail.com>
<[email protected]>
<[email protected]>
<CALj2ACUBwVu_1LOWF07Acv+dWx5NO7uTqAPMNwqi5WpJRw69_Q@mail.gmail.com>
<[email protected]>
<CALj2ACWUUTFrTF0L_xdj64i9xsjf_+4nt_HPEBiM17cxEk4CWg@mail.gmail.com>
<CAPsk3_AuOsLcOrgTX4-QDP0Qv+AHBuuH3fTK6MwVacH6gvR1Nw@mail.gmail.com>
<CALj2ACUVE8CYvYrudem4bR7W3sNRE-akC4B996K65_7C6xTBJQ@mail.gmail.com>
<[email protected]>
<CAJDiXggcx+v7eKruvvBK-mpyf3Y3e8vgBJhcZwhkm4p6907edw@mail.gmail.com>
<CAJDiXgjTe+kYXh7mT9iYTfyVh7takX9UsuVDi8mpYN3p8009yQ@mail.gmail.com>
<[email protected]>
<CAJDiXgiD4G0rBwWk6=0TTy9AYHi4LmPL6AS5VpMLf8OEbAxEhw@mail.gmail.com>
On Sun, Apr 26, 2026 at 2:56 PM Daniil Davydov <[email protected]> wrote:
> Hi,
>
> On Sun, Apr 6, 2025 at 8:55 PM Jingtang Zhang <[email protected]>
> wrote:
> >
> > It was quite a while since I last looked at the patch. I've tested it
> again,
> > and still get regression on patched version where a table has many
> columns.
> > And it is totally CPU-bounded on tts_virtual_copyslot.
> >
> > Unpatched version:
> > 1 col:
> > Time: 8909.714 ms (00:08.910)
> > Time: 8803.579 ms (00:08.804)
> > Time: 8600.415 ms (00:08.600)
> > 32 cols:
> > Time: 12911.699 ms (00:12.912)
> > Time: 13543.491 ms (00:13.543)
> > Time: 13325.368 ms (00:13.325)
> >
> > Patched version:
> > 1 col:
> > Time: 3532.841 ms (00:03.533)
> > Time: 3598.223 ms (00:03.598)
> > Time: 3515.858 ms (00:03.516)
> > 32 cols:
> > Time: 35647.724 ms (00:35.648)
> > Time: 35596.233 ms (00:35.596)
> > Time: 35669.106 ms (00:35.669)
> >
>
> Hm, maybe I didn't choose the best way to measure performance. Can you
> please share how you do it?
>
> > I've tested your patch with tuplestore and found the regression does not
> exist
> > anymore, but I haven't look deep inside it.
> >
> > Patched version (with tuplestore):
> > 1 col:
> > Time: 3500.502 ms (00:03.501)
> > Time: 3486.886 ms (00:03.487)
> > Time: 3514.233 ms (00:03.514)
> > 32 cols:
> > Time: 10375.391 ms (00:10.375)
> > Time: 10248.256 ms (00:10.248)
> > Time: 10248.289 ms (00:10.248)
> >
> > It seems to be a good idea if there is no other issue with your patch.
>
> As far as I understand, the use of multi inserts for queries like
> "INSERT INTO ... SELECT FROM" is not discussed here anymore due to the
> fact that in such cases we will have to take into account the volatile
> functions and ROW triggers.
> I've been thinking about this for a while and made a patch as an
> experiment. The principles that the patch works on are listed below.
> 1)
> Since performance decreases for single INSERTs (within a multi inserts
> mechanism), I designed this feature as an option for the table. Thus,
> if the user knows that he will perform a lot of inserts on the table,
> he can specify "WITH (append_optimized=true)".
> 2)
> The availability of volatile functions is monitored during the
> construction of a subtree for a ModifyTable node. I'm not that
> familiar with the query plan construction mechanism, but it seems to
> me that this way we can track any occurrence of volatile functions.
> Of course, most volatile functions don't stop us from using multi
> inserts, but checking each such function would take a very long time,
> so the very fact of having a volatile function is enough for us to
> abandon multi-inserts.
> 3)
> Default expressions of the target table are also checked for volatile
> functions. The same rules apply to them as in (2). As an exception, I
> allowed the use of SERIAL in the column data type, since this is a
> fairly common use case.
> 4)
> If the target table contains any ROW triggers, we don't use multi insert.
> 5)
> Patch also contains a regression test. This is a "sandbox" where you
> can do some experiments with append-optimized tables.
>
> I hope that patch (targeted on 'master' branch,
> 2c7bd2ba507e273f2d7fe1b2f6d30775ed4f3c09) will be useful for this
> thread.
>
> --
> Best regards,
> Daniil Davydov
>
Hi all,
I picked this work up again and implemented the full 5-patch series.
The series is structured as follows:
- 0001 adds the buffered-insert lifecycle API in tableam/heapam and
provides the heap implementation.
- 0002 adopts the API for CTAS.
- 0003 adopts the API for CREATE MATERIALIZED VIEW and REFRESH
MATERIALIZED VIEW.
- 0004 adopts the API for COPY FROM.
- 0005 adopts the API for a restricted first step of INSERT INTO … SELECT
.
I also reran performance testing locally on my machine:
- Hardware: MacBook Pro M4, 36GB RAM
- shared_buffers: 128MB
I compared the unpatched baseline against the current patched series for
CTAS, CMV, RMV, and INSERT INTO ... SELECT.
Table 1 — Median (ms)
Workload
10K Before
10K After
10K Improv
100K Before
100K After
100K Improv
1M Before
1M After
1M Improv
CTAS
1.60
1.17
+26.9%
9.19
5.70
+38.0%
105.61
73.28
+30.6%
CMV
2.11
1.77
+16.1%
10.28
6.20
+39.7%
110.10
79.64
+27.7%
RMV
1.62
1.19
+26.5%
9.91
5.43
+45.2%
106.04
69.57
+34.4%
INSERT … SELECT
1.72
0.89
+48.3%
15.39
7.46
+51.5%
228.24
84.66
+62.9%
Table 2 — Average (ms)
Workload
10K Before
10K After
10K Improv
100K Before
100K After
100K Improv
1M Before
1M After
1M Improv
CTAS
1.65
1.26
+24.1%
9.37
5.82
+37.9%
104.81
74.31
+29.1%
CMV
2.34
1.82
+22.2%
10.32
6.25
+39.4%
110.32
80.50
+27.0%
RMV
1.92
1.21
+36.9%
9.86
5.49
+44.3%
106.79
69.53
+34.9%
INSERT … SELECT
1.69
0.90
+46.8%
15.45
7.39
+52.2%
210.62
85.19
+59.6%
These numbers look ok to me to continue the discussion with the current
design and implementation.
One point worth calling out explicitly: the INSERT INTO ... SELECT support
in patch 5 is intentionally limited, as described above. It is not intended
to claim broad executor coverage yet.
One further improvement still seems possible: making the heap
implementation cache raw HeapTuple bytes directly instead of maintaining
buffered slot arrays. I looked at that direction, but did not include it in
this series because it felt like a larger scope change than what I wanted
for v1.
At this point, I think the current series is in reasonable shape and I’d
really appreciate review on both the API shape and the caller adoptions.
Thanks,
Haibo
Attachments:
[application/octet-stream] v1-0001-tableam-heapam-add-buffered-insert-lifecycle-API-.patch (41.2K, 3-v1-0001-tableam-heapam-add-buffered-insert-lifecycle-API-.patch)
download | inline diff:
From ee069b0d3c9b15ad8d4b2ab0bbd73f1392ed02c3 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 11:14:36 -0700
Subject: [PATCH v1 1/5] tableam/heapam: add buffered-insert lifecycle API for
heap
Add an optional Table AM buffered-insert lifecycle API with
begin/put/flush/end callbacks and tableam wrappers.
Implement the API for heap only, including internal buffering,
heap_multi_insert()-based flushes, optional per-tuple flush callbacks,
and explicit end-of-session cleanup.
Add a small Table AM-level test module for lifecycle validation.
No caller adoption is included in this patch.
---
src/backend/access/heap/heapam.c | 321 +++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 7 +-
src/backend/access/table/tableamapi.c | 12 +
src/include/access/heapam.h | 12 +
src/include/access/tableam.h | 159 +++++++++
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/test_buffered_insert/Makefile | 23 ++
.../expected/test_buffered_insert.out | 108 ++++++
.../modules/test_buffered_insert/meson.build | 33 ++
.../sql/test_buffered_insert.sql | 41 +++
.../test_buffered_insert--1.0.sql | 30 ++
.../test_buffered_insert.c | 233 +++++++++++++
.../test_buffered_insert.control | 4 +
src/tools/pgindent/typedefs.list | 4 +
15 files changed, 970 insertions(+), 19 deletions(-)
create mode 100644 src/test/modules/test_buffered_insert/Makefile
create mode 100644 src/test/modules/test_buffered_insert/expected/test_buffered_insert.out
create mode 100644 src/test/modules/test_buffered_insert/meson.build
create mode 100644 src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql
create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql
create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert.c
create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert.control
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index abfd8e8970a..ae994dd202d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -53,6 +53,7 @@
#include "utils/datum.h"
#include "utils/injection_point.h"
#include "utils/inval.h"
+#include "utils/memutils.h"
#include "utils/spccache.h"
#include "utils/syscache.h"
@@ -1982,6 +1983,263 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
}
+/* ------------------------------------------------------------------------
+ * Heap buffered-insert lifecycle implementation.
+ * ------------------------------------------------------------------------
+ */
+
+/*
+ * Private state for the heap buffered-insert session.
+ *
+ * Memory-context hierarchy (all destroyed by MemoryContextDelete(state_ctx)):
+ *
+ * state_ctx ("HeapBufferedInsert") -- long-lived, owns stable state
+ * ├── batch_ctx ("HeapBufInsBatch") -- per-batch HeapTuples
+ * └── mi_ctx ("HeapBufInsFlush") -- per-flush transient allocations
+ * └── heap_multi_insert scratch + toasted tuple copies
+ *
+ * state_ctx owns: this struct, the HeapTuple pointer array, bistate,
+ * scratch_slot (callback-only).
+ * batch_ctx owns: HeapTuples produced by ExecCopySlotHeapTuple() during
+ * put(). It is reset (not deleted) after each flush, which bulk-frees all
+ * per-batch allocations.
+ * mi_ctx owns: heap_multi_insert's transient allocations (including any
+ * toasted-tuple copies produced by heap_prepare_insert). It is reset after
+ * each flush once the flush callback (if any) has finished reading the
+ * post-prepare tuples.
+ */
+typedef struct HeapBufferedInsertState
+{
+ TableBufferedInsertStateData base;
+ CommandId cid;
+ int options;
+ BulkInsertState bistate;
+ TableBufferedInsertFlushCb flush_cb;
+ void *flush_ctx;
+ HeapTuple *tuples; /* buffered HeapTuples (in batch_ctx) */
+ int ntuples;
+ int max_tuples;
+ Size buffered_bytes; /* sum of tuples[i]->t_len */
+ TupleTableSlot *scratch_slot; /* callback-only; HeapTuple-backed */
+ MemoryContext batch_ctx; /* per-batch HeapTuples; reset after flush */
+ MemoryContext mi_ctx; /* reset after each heap_multi_insert */
+ MemoryContext state_ctx; /* long-lived; owns stable state */
+} HeapBufferedInsertState;
+
+#define HEAP_BUFFERED_INSERT_MAX_TUPLES 1000
+#define HEAP_BUFFERED_INSERT_MAX_BYTES 65535
+
+static void heap_buffered_insert_do_flush(HeapBufferedInsertState *hstate);
+static void heap_buffered_insert_finish_bulkinsert(HeapBufferedInsertState *hstate);
+
+/* heap_multi_insert core, takes HeapTuples directly (defined near heap_multi_insert) */
+static void heap_multi_insert_raw(Relation relation, HeapTuple *heaptuples,
+ int ntuples, CommandId cid, uint32 options,
+ BulkInsertState bistate);
+
+TableBufferedInsertState
+heap_buffered_insert_begin(Relation rel, CommandId cid, int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx)
+{
+ HeapBufferedInsertState *hstate;
+ MemoryContext state_ctx;
+ MemoryContext old_ctx;
+
+ state_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "HeapBufferedInsert",
+ ALLOCSET_DEFAULT_SIZES);
+ old_ctx = MemoryContextSwitchTo(state_ctx);
+
+ hstate = palloc0(sizeof(HeapBufferedInsertState));
+ hstate->base.rel = rel;
+ hstate->cid = cid;
+ hstate->options = options;
+ hstate->flush_cb = flush_cb;
+ hstate->flush_ctx = flush_ctx;
+ hstate->state_ctx = state_ctx;
+
+ hstate->max_tuples = HEAP_BUFFERED_INSERT_MAX_TUPLES;
+ hstate->tuples = palloc_array(HeapTuple, hstate->max_tuples);
+ hstate->ntuples = 0;
+ hstate->buffered_bytes = 0;
+
+ if (options & TABLE_INSERT_BAS_BULKWRITE)
+ hstate->bistate = GetBulkInsertState();
+ else
+ hstate->bistate = NULL;
+
+ hstate->batch_ctx = AllocSetContextCreate(state_ctx,
+ "HeapBufInsBatch",
+ ALLOCSET_DEFAULT_SIZES);
+
+ hstate->mi_ctx = AllocSetContextCreate(state_ctx,
+ "HeapBufInsFlush",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Allocate the scratch slot only when a flush callback is present.
+ * No-callback callers (CTAS, CMV, RMV) pay zero slot overhead.
+ */
+ if (flush_cb != NULL)
+ hstate->scratch_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel),
+ &TTSOpsHeapTuple);
+ else
+ hstate->scratch_slot = NULL;
+
+ MemoryContextSwitchTo(old_ctx);
+
+ return &hstate->base;
+}
+
+void
+heap_buffered_insert_put(TableBufferedInsertState state, TupleTableSlot *slot)
+{
+ HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state;
+ MemoryContext old_ctx;
+ HeapTuple htup;
+
+ /* Auto-flush if tuple count is at capacity. */
+ if (hstate->ntuples >= hstate->max_tuples)
+ heap_buffered_insert_do_flush(hstate);
+
+ /*
+ * Produce a self-contained HeapTuple in batch_ctx. ExecCopySlotHeapTuple
+ * invokes the slot-type's copy_heap_tuple method, which for every
+ * built-in slot type allocates a new HeapTuple in CurrentMemoryContext.
+ * This is the *only* per-row materialization -- the flush path consumes
+ * the HeapTuple directly via heap_multi_insert_raw() with no further
+ * slot conversion or copy.
+ */
+ old_ctx = MemoryContextSwitchTo(hstate->batch_ctx);
+ htup = ExecCopySlotHeapTuple(slot);
+ MemoryContextSwitchTo(old_ctx);
+
+ hstate->tuples[hstate->ntuples++] = htup;
+ hstate->buffered_bytes += htup->t_len;
+
+ /* Byte-threshold flush: exact tracking from the HeapTuple t_len. */
+ if (hstate->buffered_bytes > HEAP_BUFFERED_INSERT_MAX_BYTES)
+ heap_buffered_insert_do_flush(hstate);
+}
+
+void
+heap_buffered_insert_flush(TableBufferedInsertState state)
+{
+ HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state;
+
+ if (hstate->ntuples > 0)
+ heap_buffered_insert_do_flush(hstate);
+}
+
+void
+heap_buffered_insert_end(TableBufferedInsertState state)
+{
+ HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state;
+
+ /* Flush any remaining tuples */
+ if (hstate->ntuples > 0)
+ heap_buffered_insert_do_flush(hstate);
+
+ /* Clean up the scratch slot used for flush callback */
+ if (hstate->scratch_slot != NULL)
+ ExecDropSingleTupleTableSlot(hstate->scratch_slot);
+
+ /* Perform finish-bulk-insert cleanup (subsumes table_finish_bulk_insert) */
+ heap_buffered_insert_finish_bulkinsert(hstate);
+
+ /*
+ * Release all memory owned by the state, including batch_ctx and mi_ctx
+ * (both are children of state_ctx).
+ */
+ MemoryContextDelete(hstate->state_ctx);
+}
+
+/*
+ * Internal: flush all buffered HeapTuples via heap_multi_insert_raw, then
+ * invoke the flush callback (if any) once per written tuple. Resets both
+ * per-batch and per-flush contexts so no payload survives across cycles.
+ *
+ * Callback-path isolation: the scratch_slot and tuple-to-slot conversion
+ * only execute when flush_cb is non-NULL, so no-callback callers (CTAS,
+ * CMV, RMV) pay zero callback overhead per tuple.
+ */
+static void
+heap_buffered_insert_do_flush(HeapBufferedInsertState *hstate)
+{
+ MemoryContext old_ctx;
+ int ntuples = hstate->ntuples;
+
+ Assert(ntuples > 0);
+
+ /*
+ * Switch to the per-flush memory context. heap_multi_insert_raw's
+ * transient allocations (including any toasted-tuple copies from
+ * heap_prepare_insert) land here and are bulk-freed after the callback.
+ */
+ old_ctx = MemoryContextSwitchTo(hstate->mi_ctx);
+
+ heap_multi_insert_raw(hstate->base.rel,
+ hstate->tuples,
+ ntuples,
+ hstate->cid,
+ hstate->options,
+ hstate->bistate);
+
+ MemoryContextSwitchTo(old_ctx);
+
+ /*
+ * Invoke the flush callback once per flushed tuple, in insertion order.
+ * After heap_multi_insert_raw, each tuples[i]->t_self holds the stored
+ * TID, and tuples[i] points to the post-prepare (possibly toasted) tuple
+ * in mi_ctx (if toasted) or the original in batch_ctx (if not). Both
+ * contexts are still alive here.
+ */
+ if (hstate->flush_cb != NULL)
+ {
+ for (int i = 0; i < ntuples; i++)
+ {
+ ExecStoreHeapTuple(hstate->tuples[i], hstate->scratch_slot, false);
+ hstate->scratch_slot->tts_tid = hstate->tuples[i]->t_self;
+ hstate->scratch_slot->tts_tableOid = hstate->tuples[i]->t_tableOid;
+ hstate->flush_cb(hstate->flush_ctx, hstate->scratch_slot);
+ ExecClearTuple(hstate->scratch_slot);
+ }
+ }
+
+ /* Reset both contexts now that the callback is done. */
+ MemoryContextReset(hstate->mi_ctx);
+ MemoryContextReset(hstate->batch_ctx);
+
+ hstate->ntuples = 0;
+ hstate->buffered_bytes = 0;
+}
+
+/*
+ * Perform heap-specific finish-bulk-insert cleanup.
+ *
+ * This is the buffered-insert equivalent of what callers of the non-buffered
+ * path achieve by calling FreeBulkInsertState() + table_finish_bulk_insert()
+ * at teardown. end() must subsume both.
+ *
+ * For the current in-tree heap AM, the cleanup consists of:
+ *
+ * 1. Release the BulkInsertState (buffer pin + bulk-write access strategy).
+ *
+ * 2. Any action that heap's finish_bulk_insert AM callback would perform.
+ * Heap does not currently register that callback (the slot in
+ * heapam_methods is NULL), so no additional action is required.
+ */
+static void
+heap_buffered_insert_finish_bulkinsert(HeapBufferedInsertState *hstate)
+{
+ if (hstate->bistate != NULL)
+ FreeBulkInsertState(hstate->bistate);
+
+ /* Heap does not currently register a finish_bulk_insert AM callback. */
+}
+
+
/*
* heap_insert - insert tuple into a heap
*
@@ -2268,22 +2526,24 @@ heap_multi_insert_pages(HeapTuple *heaptuples, int done, int ntuples, Size saveF
}
/*
- * heap_multi_insert - insert multiple tuples into a heap
+ * heap_multi_insert_raw - core multi-insert for a HeapTuple array
*
- * This is like heap_insert(), but inserts multiple tuples in one operation.
- * That's faster than calling heap_insert() in a loop, because when multiple
- * tuples can be inserted on a single page, we can write just a single WAL
- * record covering all of them, and only need to lock/unlock the page once.
+ * Takes an array of pre-formed HeapTuples, runs heap_prepare_insert on each
+ * (toast + header setup), and inserts them into heap pages. The heaptuples
+ * array is updated in-place: after return, each entry points to the prepared
+ * (possibly toasted) tuple with t_self set to the stored TID.
+ *
+ * This is the shared core for heap_multi_insert (slot-based callers) and
+ * heap_buffered_insert_do_flush (HeapTuple-based callers).
*
* Note: this leaks memory into the current memory context. You can create a
* temporary context before calling this, if that's a problem.
*/
-void
-heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
- CommandId cid, uint32 options, BulkInsertState bistate)
+static void
+heap_multi_insert_raw(Relation relation, HeapTuple *heaptuples, int ntuples,
+ CommandId cid, uint32 options, BulkInsertState bistate)
{
TransactionId xid = GetCurrentTransactionId();
- HeapTuple *heaptuples;
int i;
int ndone;
PGAlignedBlock scratch;
@@ -2306,16 +2566,11 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
HEAP_DEFAULT_FILLFACTOR);
- /* Toast and set header data in all the slots */
- heaptuples = palloc(ntuples * sizeof(HeapTuple));
+ /* Toast and set header data in all the tuples */
for (i = 0; i < ntuples; i++)
{
- HeapTuple tuple;
-
- tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
- slots[i]->tts_tableOid = RelationGetRelid(relation);
- tuple->t_tableOid = slots[i]->tts_tableOid;
- heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+ heaptuples[i]->t_tableOid = RelationGetRelid(relation);
+ heaptuples[i] = heap_prepare_insert(relation, heaptuples[i], xid, cid,
options);
}
@@ -2639,11 +2894,41 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
}
+ pgstat_count_heap_insert(relation, ntuples);
+}
+
+/*
+ * heap_multi_insert - insert multiple tuples into a heap
+ *
+ * This is like heap_insert(), but inserts multiple tuples in one operation.
+ * That's faster than calling heap_insert() in a loop, because when multiple
+ * tuples can be inserted on a single page, we can write just a single WAL
+ * record covering all of them, and only need to lock/unlock the page once.
+ *
+ * Note: this leaks memory into the current memory context. You can create a
+ * temporary context before calling this, if that's a problem.
+ */
+void
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
+ CommandId cid, uint32 options, BulkInsertState bistate)
+{
+ HeapTuple *heaptuples;
+ int i;
+
+ heaptuples = palloc(ntuples * sizeof(HeapTuple));
+ for (i = 0; i < ntuples; i++)
+ {
+ heaptuples[i] = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+ slots[i]->tts_tableOid = RelationGetRelid(relation);
+ }
+
+ heap_multi_insert_raw(relation, heaptuples, ntuples, cid, options, bistate);
+
/* copy t_self fields back to the caller's slots */
for (i = 0; i < ntuples; i++)
slots[i]->tts_tid = heaptuples[i]->t_self;
- pgstat_count_heap_insert(relation, ntuples);
+ pfree(heaptuples);
}
/*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 20d3b46e062..65226b07a5c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2697,7 +2697,12 @@ static const TableAmRoutine heapam_methods = {
.scan_bitmap_next_tuple = heapam_scan_bitmap_next_tuple,
.scan_sample_next_block = heapam_scan_sample_next_block,
- .scan_sample_next_tuple = heapam_scan_sample_next_tuple
+ .scan_sample_next_tuple = heapam_scan_sample_next_tuple,
+
+ .buffered_insert_begin = heap_buffered_insert_begin,
+ .buffered_insert_put = heap_buffered_insert_put,
+ .buffered_insert_flush = heap_buffered_insert_flush,
+ .buffered_insert_end = heap_buffered_insert_end,
};
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 5450a27faeb..a0ff123fa83 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -93,6 +93,18 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->scan_sample_next_block != NULL);
Assert(routine->scan_sample_next_tuple != NULL);
+ /*
+ * Buffered-insert callbacks: either all four are NULL (AM does not
+ * support buffered inserts), or all four are non-NULL. No partially
+ * populated groups are allowed.
+ */
+ Assert((routine->buffered_insert_begin == NULL) ==
+ (routine->buffered_insert_put == NULL));
+ Assert((routine->buffered_insert_begin == NULL) ==
+ (routine->buffered_insert_flush == NULL));
+ Assert((routine->buffered_insert_begin == NULL) ==
+ (routine->buffered_insert_end == NULL));
+
return routine;
}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 5176478c295..d8b34e37627 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -379,6 +379,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
extern void heap_multi_insert(Relation relation, TupleTableSlot **slots,
int ntuples, CommandId cid, uint32 options,
BulkInsertState bistate);
+
+/* heap buffered-insert lifecycle */
+extern TableBufferedInsertState heap_buffered_insert_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx);
+extern void heap_buffered_insert_put(TableBufferedInsertState state,
+ TupleTableSlot *slot);
+extern void heap_buffered_insert_flush(TableBufferedInsertState state);
+extern void heap_buffered_insert_end(TableBufferedInsertState state);
+
extern TM_Result heap_delete(Relation relation, const ItemPointerData *tid,
CommandId cid, uint32 options, Snapshot crosscheck,
bool wait, TM_FailureData *tmfd);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index f2c36696bca..d5a1875d93c 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -284,6 +284,7 @@ typedef struct TM_IndexDeleteOp
#define TABLE_INSERT_SKIP_FSM 0x0002
#define TABLE_INSERT_FROZEN 0x0004
#define TABLE_INSERT_NO_LOGICAL 0x0008
+#define TABLE_INSERT_BAS_BULKWRITE 0x0020
/* "options" flag bits for table_tuple_delete */
#define TABLE_DELETE_CHANGING_PARTITION (1 << 0)
@@ -307,6 +308,40 @@ typedef void (*IndexBuildCallback) (Relation index,
bool tupleIsAlive,
void *state);
+/*
+ * State for a buffered-insert session. AMs embed this as the first field of
+ * their private state struct. The base struct is minimal: only the target
+ * relation is exposed, which is needed for AM dispatch via the inline wrapper
+ * functions.
+ *
+ * AMs that do not support buffered inserts leave the buffered_insert_begin
+ * callback NULL; callers detect this via table_buffered_insert_begin()
+ * returning NULL.
+ */
+typedef struct TableBufferedInsertStateData
+{
+ Relation rel; /* target relation -- needed for AM dispatch */
+} TableBufferedInsertStateData;
+
+typedef TableBufferedInsertStateData *TableBufferedInsertState;
+
+/*
+ * Callback invoked once per flushed tuple, in insertion order, after the AM
+ * writes a batch of buffered tuples to storage.
+ *
+ * The slot is an AM-owned scratch object used solely as a handoff vehicle.
+ * It contains the stored tuple with TID (or AM-equivalent locator) set.
+ * It is valid only for the duration of this callback invocation; the AM may
+ * reuse the same scratch slot across successive invocations within a single
+ * flush. If the caller needs data beyond the callback, it must copy within
+ * the callback body.
+ *
+ * The AM must not assume the callback is cheap, side-effect-free, or
+ * non-throwing.
+ */
+typedef void (*TableBufferedInsertFlushCb)(void *context,
+ TupleTableSlot *slot);
+
/*
* API struct for a table AM. Note this must be allocated in a
* server-lifetime manner, typically as a static const struct, which then gets
@@ -614,6 +649,64 @@ typedef struct TableAmRoutine
void (*finish_bulk_insert) (Relation rel, uint32 options);
+ /* ------------------------------------------------------------------------
+ * Buffered-insert lifecycle callbacks.
+ *
+ * Optional optimization for batch inserts. All four callbacks must be
+ * either NULL together (AM does not support buffered inserts) or non-NULL
+ * together (validated at AM registration time). No partially populated
+ * groups are allowed.
+ * ------------------------------------------------------------------------
+ */
+
+ /*
+ * Begin a buffered-insert session. Returns an opaque state handle whose
+ * first bytes are a TableBufferedInsertStateData with rel set. The AM
+ * stores cid, options, flush_cb, and flush_ctx in its private extension
+ * of the state struct.
+ *
+ * flush_cb may be NULL, in which case the AM skips per-tuple notification
+ * after flushing.
+ *
+ * Optional callback -- NULL means the AM does not support buffered inserts.
+ */
+ TableBufferedInsertState (*buffered_insert_begin)(
+ Relation rel,
+ CommandId cid,
+ int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx);
+
+ /*
+ * Submit one tuple for buffered insertion. The AM reads the tuple data
+ * from the slot and captures it internally before returning. The caller
+ * retains ownership of the slot and may reuse it immediately.
+ *
+ * The AM may auto-flush during this call if its internal buffer is full;
+ * the caller must be prepared for the flush callback to fire during put().
+ */
+ void (*buffered_insert_put)(
+ TableBufferedInsertState state,
+ TupleTableSlot *slot);
+
+ /*
+ * Force-flush all buffered tuples to storage. Invokes the flush callback
+ * (if non-NULL) once per flushed tuple, in insertion order, using an
+ * AM-owned scratch slot. After return, the internal buffer is empty.
+ */
+ void (*buffered_insert_flush)(
+ TableBufferedInsertState state);
+
+ /*
+ * Flush remaining buffered tuples, perform finish-bulk-insert cleanup
+ * (e.g. FSM update for heap), and release all resources owned by the
+ * state. The state pointer is invalid after this call. Callers must
+ * not separately call table_finish_bulk_insert() -- end() subsumes it.
+ */
+ void (*buffered_insert_end)(
+ TableBufferedInsertState state);
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
@@ -1666,6 +1759,72 @@ table_finish_bulk_insert(Relation rel, uint32 options)
}
+/* ----------------------------------------------------------------------------
+ * Buffered-insert lifecycle functions.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * Begin a buffered-insert session for the given relation.
+ *
+ * Returns NULL if the relation's AM does not support buffered inserts, in
+ * which case the caller should fall back to the single-row insert path.
+ *
+ * flush_cb may be NULL when no per-tuple post-insert work is needed (e.g.
+ * CTAS/CMV/RMV). When non-NULL, it is invoked once per flushed tuple in
+ * insertion order, using an AM-owned scratch slot valid only for the duration
+ * of each callback invocation.
+ */
+static inline TableBufferedInsertState
+table_buffered_insert_begin(Relation rel, CommandId cid, int options,
+ TableBufferedInsertFlushCb flush_cb,
+ void *flush_ctx)
+{
+ if (rel->rd_tableam->buffered_insert_begin == NULL)
+ return NULL;
+ return rel->rd_tableam->buffered_insert_begin(rel, cid, options,
+ flush_cb, flush_ctx);
+}
+
+/*
+ * Submit one tuple for buffered insertion. The AM reads from the slot and
+ * captures the data internally; the caller retains slot ownership and may
+ * reuse it immediately after this call returns.
+ *
+ * The AM may auto-flush during put() if its buffer is full; the flush
+ * callback (if any) may fire during this call.
+ */
+static inline void
+table_buffered_insert_put(TableBufferedInsertState state,
+ TupleTableSlot *slot)
+{
+ state->rel->rd_tableam->buffered_insert_put(state, slot);
+}
+
+/*
+ * Force-flush all buffered tuples to storage. The flush callback (if
+ * non-NULL) fires once per flushed tuple in insertion order. After return
+ * the buffer is empty.
+ */
+static inline void
+table_buffered_insert_flush(TableBufferedInsertState state)
+{
+ state->rel->rd_tableam->buffered_insert_flush(state);
+}
+
+/*
+ * Flush remaining buffered tuples, perform finish-bulk-insert cleanup
+ * (e.g. FSM update for heap), and release all resources. The state pointer
+ * is invalid after this call. Do not separately call
+ * table_finish_bulk_insert() -- end() subsumes it.
+ */
+static inline void
+table_buffered_insert_end(TableBufferedInsertState state)
+{
+ state->rel->rd_tableam->buffered_insert_end(state);
+}
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 0a74ab5c86f..fa964043d4f 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -20,6 +20,7 @@ SUBDIRS = \
test_binaryheap \
test_bitmapset \
test_bloomfilter \
+ test_buffered_insert \
test_cloexec \
test_checksums \
test_copy_callbacks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 4bca42bb370..aae892699b9 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -20,6 +20,7 @@ subdir('test_autovacuum')
subdir('test_binaryheap')
subdir('test_bitmapset')
subdir('test_bloomfilter')
+subdir('test_buffered_insert')
subdir('test_cloexec')
subdir('test_checksums')
subdir('test_copy_callbacks')
diff --git a/src/test/modules/test_buffered_insert/Makefile b/src/test/modules/test_buffered_insert/Makefile
new file mode 100644
index 00000000000..5c4926a4c3a
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_buffered_insert/Makefile
+
+MODULE_big = test_buffered_insert
+OBJS = \
+ $(WIN32RES) \
+ test_buffered_insert.o
+PGFILEDESC = "test_buffered_insert - test Table AM buffered-insert lifecycle"
+
+EXTENSION = test_buffered_insert
+DATA = test_buffered_insert--1.0.sql
+
+REGRESS = test_buffered_insert
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_buffered_insert
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out b/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out
new file mode 100644
index 00000000000..79163952f6f
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out
@@ -0,0 +1,108 @@
+CREATE EXTENSION test_buffered_insert;
+-- Target table: single integer column.
+CREATE TABLE buffered_insert_test (a INT);
+-- Basic test: insert 5 rows via buffered-insert with NULL flush callback.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 5);
+ test_buffered_insert_basic
+----------------------------
+ 5
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 5
+(1 row)
+
+SELECT a FROM buffered_insert_test ORDER BY a;
+ a
+---
+ 1
+ 2
+ 3
+ 4
+ 5
+(5 rows)
+
+TRUNCATE buffered_insert_test;
+-- Test with flush callback: insert 5 rows, verify callback count matches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 5);
+ test_buffered_insert_with_callback
+------------------------------------
+ 5
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 5
+(1 row)
+
+SELECT a FROM buffered_insert_test ORDER BY a;
+ a
+---
+ 1
+ 2
+ 3
+ 4
+ 5
+(5 rows)
+
+TRUNCATE buffered_insert_test;
+-- Trigger auto-flush: insert more rows than HEAP_BUFFERED_INSERT_MAX_SLOTS
+-- (1000) to verify auto-flush during put() works correctly.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 1500);
+ test_buffered_insert_basic
+----------------------------
+ 1500
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 1500
+(1 row)
+
+-- Spot-check first and last values.
+SELECT min(a), max(a) FROM buffered_insert_test;
+ min | max
+-----+------
+ 1 | 1500
+(1 row)
+
+TRUNCATE buffered_insert_test;
+-- Verify flush callback fires for all rows including auto-flushed batches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 1500);
+ test_buffered_insert_with_callback
+------------------------------------
+ 1500
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 1500
+(1 row)
+
+TRUNCATE buffered_insert_test;
+-- Test explicit flush() mid-session: flush after first 5 rows, insert 5 more,
+-- then end(). Callback count must equal total rows.
+SELECT test_buffered_insert_flush_mid('buffered_insert_test'::regclass, 10);
+ test_buffered_insert_flush_mid
+--------------------------------
+ 10
+(1 row)
+
+SELECT count(*) FROM buffered_insert_test;
+ count
+-------
+ 10
+(1 row)
+
+SELECT min(a), max(a) FROM buffered_insert_test;
+ min | max
+-----+-----
+ 1 | 10
+(1 row)
+
+DROP TABLE buffered_insert_test;
diff --git a/src/test/modules/test_buffered_insert/meson.build b/src/test/modules/test_buffered_insert/meson.build
new file mode 100644
index 00000000000..d738ccb1a84
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2022-2026, PostgreSQL Global Development Group
+
+test_buffered_insert_sources = files(
+ 'test_buffered_insert.c',
+)
+
+if host_system == 'windows'
+ test_buffered_insert_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'test_buffered_insert',
+ '--FILEDESC', 'test_buffered_insert - test Table AM buffered-insert lifecycle',])
+endif
+
+test_buffered_insert = shared_module('test_buffered_insert',
+ test_buffered_insert_sources,
+ kwargs: pg_test_mod_args,
+)
+test_install_libs += test_buffered_insert
+
+test_install_data += files(
+ 'test_buffered_insert.control',
+ 'test_buffered_insert--1.0.sql',
+)
+
+tests += {
+ 'name': 'test_buffered_insert',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'regress': {
+ 'sql': [
+ 'test_buffered_insert',
+ ],
+ },
+}
diff --git a/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql b/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql
new file mode 100644
index 00000000000..23ac09cc9bc
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql
@@ -0,0 +1,41 @@
+CREATE EXTENSION test_buffered_insert;
+
+-- Target table: single integer column.
+CREATE TABLE buffered_insert_test (a INT);
+
+-- Basic test: insert 5 rows via buffered-insert with NULL flush callback.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 5);
+SELECT count(*) FROM buffered_insert_test;
+SELECT a FROM buffered_insert_test ORDER BY a;
+
+TRUNCATE buffered_insert_test;
+
+-- Test with flush callback: insert 5 rows, verify callback count matches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 5);
+SELECT count(*) FROM buffered_insert_test;
+SELECT a FROM buffered_insert_test ORDER BY a;
+
+TRUNCATE buffered_insert_test;
+
+-- Trigger auto-flush: insert more rows than HEAP_BUFFERED_INSERT_MAX_SLOTS
+-- (1000) to verify auto-flush during put() works correctly.
+SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 1500);
+SELECT count(*) FROM buffered_insert_test;
+-- Spot-check first and last values.
+SELECT min(a), max(a) FROM buffered_insert_test;
+
+TRUNCATE buffered_insert_test;
+
+-- Verify flush callback fires for all rows including auto-flushed batches.
+SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 1500);
+SELECT count(*) FROM buffered_insert_test;
+
+TRUNCATE buffered_insert_test;
+
+-- Test explicit flush() mid-session: flush after first 5 rows, insert 5 more,
+-- then end(). Callback count must equal total rows.
+SELECT test_buffered_insert_flush_mid('buffered_insert_test'::regclass, 10);
+SELECT count(*) FROM buffered_insert_test;
+SELECT min(a), max(a) FROM buffered_insert_test;
+
+DROP TABLE buffered_insert_test;
diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql b/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql
new file mode 100644
index 00000000000..26aa0635240
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql
@@ -0,0 +1,30 @@
+/* src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_buffered_insert" to load this file. \quit
+
+--
+-- Insert rows through the buffered-insert lifecycle with a NULL flush
+-- callback (the CTAS/CMV/RMV pattern). Returns the number of rows put().
+--
+CREATE FUNCTION test_buffered_insert_basic(pg_catalog.regclass, pg_catalog.int4)
+ RETURNS pg_catalog.int4
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+--
+-- Insert rows through the buffered-insert lifecycle with a flush callback
+-- that counts invocations. Returns the total number of flush-callback
+-- invocations (should equal the number of rows inserted).
+--
+CREATE FUNCTION test_buffered_insert_with_callback(pg_catalog.regclass, pg_catalog.int4)
+ RETURNS pg_catalog.int4
+ AS 'MODULE_PATHNAME' LANGUAGE C;
+
+--
+-- Exercise explicit flush() mid-session: inserts half the rows, calls
+-- flush(), inserts the other half, then calls end(). Returns the total
+-- number of flush-callback invocations (should equal nrows).
+--
+CREATE FUNCTION test_buffered_insert_flush_mid(pg_catalog.regclass, pg_catalog.int4)
+ RETURNS pg_catalog.int4
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert.c b/src/test/modules/test_buffered_insert/test_buffered_insert.c
new file mode 100644
index 00000000000..fbda7b1c70b
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/test_buffered_insert.c
@@ -0,0 +1,233 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_buffered_insert.c
+ * Minimal validation for the Table AM buffered-insert lifecycle API.
+ *
+ * Exercises begin/put/flush/end directly at the Table AM layer on a
+ * real heap relation, without going through any higher-level caller
+ * (CTAS, COPY, etc.). This keeps the test within Patch 0001 scope.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_buffered_insert/test_buffered_insert.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/tableam.h"
+#include "access/table.h"
+#include "access/xact.h"
+#include "catalog/namespace.h"
+#include "executor/tuptable.h"
+#include "fmgr.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+/*
+ * test_buffered_insert_basic(regclass, int4)
+ *
+ * Opens the given relation, inserts nrows tuples through the buffered-insert
+ * API with a NULL flush callback, and returns the number of rows put().
+ * The tuples inserted have the form (i) for a single-integer-column table.
+ */
+PG_FUNCTION_INFO_V1(test_buffered_insert_basic);
+Datum
+test_buffered_insert_basic(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nrows = PG_GETARG_INT32(1);
+ Relation rel;
+ TableBufferedInsertState state;
+ TupleTableSlot *slot;
+ TupleDesc tupdesc;
+ int i;
+
+ rel = table_open(relid, RowExclusiveLock);
+ tupdesc = RelationGetDescr(rel);
+
+ state = table_buffered_insert_begin(rel,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_BAS_BULKWRITE,
+ NULL, NULL);
+
+ if (state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("AM does not support buffered inserts")));
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ for (i = 0; i < nrows; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ table_buffered_insert_end(state);
+
+ ExecDropSingleTupleTableSlot(slot);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_INT32(nrows);
+}
+
+/*
+ * Flush callback context: counts invocations and verifies each slot has
+ * a valid TID.
+ */
+typedef struct FlushCbContext
+{
+ int count;
+} FlushCbContext;
+
+static void
+test_flush_callback(void *context, TupleTableSlot *slot)
+{
+ FlushCbContext *ctx = (FlushCbContext *) context;
+
+ if (!ItemPointerIsValid(&slot->tts_tid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("flush callback received slot with invalid TID")));
+
+ ctx->count++;
+}
+
+/*
+ * test_buffered_insert_with_callback(regclass, int4)
+ *
+ * Same as basic, but passes a flush callback that counts invocations and
+ * validates TIDs. Returns the total flush-callback invocation count.
+ */
+PG_FUNCTION_INFO_V1(test_buffered_insert_with_callback);
+Datum
+test_buffered_insert_with_callback(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nrows = PG_GETARG_INT32(1);
+ Relation rel;
+ TableBufferedInsertState state;
+ TupleTableSlot *slot;
+ TupleDesc tupdesc;
+ FlushCbContext ctx;
+ int i;
+
+ ctx.count = 0;
+
+ rel = table_open(relid, RowExclusiveLock);
+ tupdesc = RelationGetDescr(rel);
+
+ state = table_buffered_insert_begin(rel,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_BAS_BULKWRITE,
+ test_flush_callback,
+ &ctx);
+
+ if (state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("AM does not support buffered inserts")));
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ for (i = 0; i < nrows; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ table_buffered_insert_end(state);
+
+ ExecDropSingleTupleTableSlot(slot);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_INT32(ctx.count);
+}
+
+/*
+ * test_buffered_insert_flush_mid(regclass, int4)
+ *
+ * Exercises explicit flush() mid-session: inserts half the rows, calls
+ * flush(), inserts the other half, then calls end(). Returns the total
+ * flush-callback count, which should equal nrows.
+ */
+PG_FUNCTION_INFO_V1(test_buffered_insert_flush_mid);
+Datum
+test_buffered_insert_flush_mid(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ int32 nrows = PG_GETARG_INT32(1);
+ Relation rel;
+ TableBufferedInsertState state;
+ TupleTableSlot *slot;
+ TupleDesc tupdesc;
+ FlushCbContext ctx;
+ int half = nrows / 2;
+ int i;
+
+ ctx.count = 0;
+
+ rel = table_open(relid, RowExclusiveLock);
+ tupdesc = RelationGetDescr(rel);
+
+ state = table_buffered_insert_begin(rel,
+ GetCurrentCommandId(true),
+ TABLE_INSERT_BAS_BULKWRITE,
+ test_flush_callback,
+ &ctx);
+
+ if (state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("AM does not support buffered inserts")));
+
+ slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
+
+ /* First half */
+ for (i = 0; i < half; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ /* Explicit flush mid-session */
+ table_buffered_insert_flush(state);
+
+ /* Second half */
+ for (i = half; i < nrows; i++)
+ {
+ ExecClearTuple(slot);
+ slot->tts_values[0] = Int32GetDatum(i + 1);
+ slot->tts_isnull[0] = false;
+ ExecStoreVirtualTuple(slot);
+
+ table_buffered_insert_put(state, slot);
+ }
+
+ /* end() flushes the remaining tuples */
+ table_buffered_insert_end(state);
+
+ ExecDropSingleTupleTableSlot(slot);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_INT32(ctx.count);
+}
diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert.control b/src/test/modules/test_buffered_insert/test_buffered_insert.control
new file mode 100644
index 00000000000..3221c765820
--- /dev/null
+++ b/src/test/modules/test_buffered_insert/test_buffered_insert.control
@@ -0,0 +1,4 @@
+comment = 'Test code for Table AM buffered-insert lifecycle'
+default_version = '1.0'
+module_pathname = '$libdir/test_buffered_insert'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9f1dd55213d..257aee1e684 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1255,6 +1255,7 @@ HeadlineWordEntry
HeapCheckContext
HeapCheckReadStreamData
HeapPageFreeze
+HeapBufferedInsertState
HeapScanDesc
HeapScanDescData
HeapTuple
@@ -3123,6 +3124,9 @@ T_Action
T_WorkerStatus
TableAmRoutine
TableAttachInfo
+TableBufferedInsertFlushCb
+TableBufferedInsertState
+TableBufferedInsertStateData
TableDataInfo
TableFunc
TableFuncRoutine
--
2.52.0
[application/octet-stream] v1-0005-executor-adopt-buffered-insert-API-for-restricted.patch (24.3K, 4-v1-0005-executor-adopt-buffered-insert-API-for-restricted.patch)
download | inline diff:
From f2a38d918a0cb3f4279c75f399c82eedb37c909d Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Sun, 26 Apr 2026 22:24:20 -0700
Subject: [PATCH v1 5/5] executor: adopt buffered-insert API for restricted
INSERT ... SELECT
Add a restricted first-step buffered-insert adoption for
`INSERT INTO ... SELECT` in the executor.
This patch integrates the existing buffered-insert lifecycle API into the
plain non-partitioned heap `CMD_INSERT` path in `ExecInsert()` /
`ExecModifyTable()`, using a local flush callback for post-insert work.
The buffered path is intentionally narrow. It falls back to the existing
single-row path for cases outside the restricted scope, including:
- ON CONFLICT
- RETURNING
- partitioned targets
- BEFORE ROW / INSTEAD OF triggers
- FDW targets
- MERGE / cross-partition UPDATE insert side
- volatile target-side default expressions
The flush callback performs the minimal post-insert executor work needed
for this first step:
- index maintenance via `ExecInsertIndexTuples()`
- AFTER ROW trigger firing via `ExecARInsertTriggers()`
- `es_processed` accounting
Add focused regression coverage for:
- basic bulk INSERT ... SELECT
- indexed targets
- AFTER ROW trigger behavior
- index + trigger combination
- fallback cases (ON CONFLICT, RETURNING, BEFORE ROW trigger,
partitioned target, volatile target defaults)
- zero-row insert
---
src/backend/executor/nodeModifyTable.c | 164 +++++++++++
src/include/nodes/execnodes.h | 5 +
src/test/regress/expected/insert_buffered.out | 271 ++++++++++++++++++
src/test/regress/parallel_schedule | 2 +-
src/test/regress/sql/insert_buffered.sql | 209 ++++++++++++++
5 files changed, 650 insertions(+), 1 deletion(-)
create mode 100644 src/test/regress/expected/insert_buffered.out
create mode 100644 src/test/regress/sql/insert_buffered.sql
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 4cb057ca4f9..06af0637407 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -83,6 +83,85 @@ typedef struct MTTargetRelLookup
int relationIndex; /* rel's index in resultRelInfo[] array */
} MTTargetRelLookup;
+/*
+ * Flush callback context for buffered INSERT INTO ... SELECT.
+ *
+ * These are the parameters required by the three callback operations
+ * (ExecInsertIndexTuples, ExecARInsertTriggers, es_processed++):
+ *
+ * - estate, resultRelInfo: required by ExecInsertIndexTuples/ExecARInsertTriggers.
+ * - transition_capture: 5th parameter of ExecARInsertTriggers; the non-buffered
+ * path passes mtstate->mt_transition_capture for CMD_INSERT. NULL would
+ * silently break statement-level triggers with REFERENCING NEW TABLE.
+ * - canSetTag: the non-buffered path gates es_processed++ on this after the
+ * insert; the buffered return-NULL bypasses that gate, so the callback
+ * replicates it.
+ */
+typedef struct ExecBufferedInsertFlushState
+{
+ EState *estate;
+ ResultRelInfo *resultRelInfo;
+ TransitionCaptureState *transition_capture;
+ bool canSetTag;
+} ExecBufferedInsertFlushState;
+
+/*
+ * Flush callback for buffered INSERT INTO ... SELECT.
+ *
+ * Called once per flushed tuple after heap_multi_insert() completes a batch.
+ * Performs index maintenance, AFTER ROW trigger firing, and tuple counting.
+ */
+static void
+ExecBufferedInsertFlushCb(void *context, TupleTableSlot *slot)
+{
+ ExecBufferedInsertFlushState *ctx = (ExecBufferedInsertFlushState *) context;
+ ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+ EState *estate = ctx->estate;
+ List *recheckIndexes = NIL;
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(resultRelInfo, estate, 0,
+ slot, NIL, NULL);
+
+ ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+ ctx->transition_capture);
+
+ list_free(recheckIndexes);
+
+ if (ctx->canSetTag)
+ (estate->es_processed)++;
+}
+
+/*
+ * Check whether a relation has volatile default expressions.
+ *
+ * Conservative target-side restriction: if any column default contains a
+ * volatile function (excluding nextval), the buffered-insert path is not used.
+ * This mirrors COPY FROM's volatile_defexprs check.
+ */
+static bool
+ExecRelHasVolatileDefaults(Relation rel)
+{
+ TupleConstr *constr = RelationGetDescr(rel)->constr;
+
+ if (constr == NULL || constr->num_defval == 0)
+ return false;
+
+ for (int i = 0; i < constr->num_defval; i++)
+ {
+ Node *expr;
+
+ if (constr->defval[i].adbin == NULL)
+ continue;
+
+ expr = stringToNode(constr->defval[i].adbin);
+
+ if (contain_volatile_functions_not_nextval(expr))
+ return true;
+ }
+ return false;
+}
+
/*
* Context struct for a ModifyTable operation, containing basic execution
* state and some output variables populated by ExecUpdateAct() and
@@ -1269,6 +1348,49 @@ ExecInsert(ModifyTableContext *context,
}
else
{
+ /*
+ * Buffered-insert path: lazily open the session on first call,
+ * then submit tuples via put() instead of single-row insert.
+ * Post-insert work (indexes, triggers) fires in the flush callback.
+ */
+ if (mtstate->mt_buffered_insert_eligible &&
+ mtstate->mt_bi_state == NULL)
+ {
+ ExecBufferedInsertFlushState *flush_ctx;
+
+ flush_ctx = palloc(sizeof(ExecBufferedInsertFlushState));
+ flush_ctx->estate = estate;
+ flush_ctx->resultRelInfo = resultRelInfo;
+ flush_ctx->transition_capture = mtstate->mt_transition_capture;
+ flush_ctx->canSetTag = canSetTag;
+ mtstate->mt_bi_flush_ctx = flush_ctx;
+
+ mtstate->mt_bi_state =
+ table_buffered_insert_begin(resultRelationDesc,
+ estate->es_output_cid,
+ TABLE_INSERT_BAS_BULKWRITE,
+ ExecBufferedInsertFlushCb,
+ flush_ctx);
+ if (mtstate->mt_bi_state == NULL)
+ mtstate->mt_buffered_insert_eligible = false;
+ }
+
+ if (mtstate->mt_bi_state != NULL)
+ {
+ /*
+ * Pre-insert validation already ran in this else-branch
+ * above the ON CONFLICT test — specifically:
+ * tts_tableOid init, ExecComputeStoredGenerated,
+ * ExecWithCheckOptions (RLS), ExecConstraints,
+ * ExecPartitionCheck.
+ * This inner else-branch (no ON CONFLICT) is reached only
+ * after all of those. Submit the validated tuple to the
+ * AM buffer; post-insert work fires in the flush callback.
+ */
+ table_buffered_insert_put(mtstate->mt_bi_state, slot);
+ return NULL;
+ }
+
/* insert the tuple normally */
table_tuple_insert(resultRelationDesc, slot,
estate->es_output_cid,
@@ -5027,6 +5149,15 @@ ExecModifyTable(PlanState *pstate)
if (estate->es_insert_pending_result_relations != NIL)
ExecPendingInserts(estate);
+ /*
+ * Flush and clean up buffered-insert session if active.
+ */
+ if (node->mt_bi_state != NULL)
+ {
+ table_buffered_insert_end(node->mt_bi_state);
+ node->mt_bi_state = NULL;
+ }
+
/*
* We're done, but fire AFTER STATEMENT triggers before exiting.
*/
@@ -5773,6 +5904,30 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
resultRelInfo->ri_BatchSize = 1;
}
+ /*
+ * For CMD_INSERT without ON CONFLICT/RETURNING/partitioning/BEFORE ROW
+ * triggers, determine if the restricted buffered-insert path is eligible.
+ * AM support is resolved lazily at first ExecInsert() call.
+ */
+ if (operation == CMD_INSERT)
+ {
+ ModifyTable *mtnode = (ModifyTable *) mtstate->ps.plan;
+
+ resultRelInfo = mtstate->resultRelInfo;
+ mtstate->mt_buffered_insert_eligible =
+ (mtnode->onConflictAction == ONCONFLICT_NONE &&
+ resultRelInfo->ri_projectReturning == NULL &&
+ resultRelInfo->ri_RelationDesc->rd_rel->relkind !=
+ RELKIND_PARTITIONED_TABLE &&
+ !(resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row) &&
+ !(resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_instead_row) &&
+ resultRelInfo->ri_FdwRoutine == NULL &&
+ mtstate->operation == CMD_INSERT &&
+ !ExecRelHasVolatileDefaults(resultRelInfo->ri_RelationDesc));
+ }
+
/*
* Lastly, if this is not the primary (canSetTag) ModifyTable node, add it
* to estate->es_auxmodifytables so that it will be run to completion by
@@ -5802,6 +5957,15 @@ ExecEndModifyTable(ModifyTableState *node)
{
int i;
+ /*
+ * Defensive: clean up buffered-insert if end() was not reached above.
+ */
+ if (node->mt_bi_state != NULL)
+ {
+ table_buffered_insert_end(node->mt_bi_state);
+ node->mt_bi_state = NULL;
+ }
+
/*
* Allow any FDWs to shut down
*/
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 13359180d25..7790bb0ba38 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1508,6 +1508,11 @@ typedef struct ModifyTableState
List *mt_updateColnosLists;
List *mt_mergeActionLists;
List *mt_mergeJoinConditions;
+
+ /* Buffered-insert state for restricted INSERT INTO ... SELECT */
+ bool mt_buffered_insert_eligible;
+ struct TableBufferedInsertStateData *mt_bi_state;
+ void *mt_bi_flush_ctx; /* private to nodeModifyTable.c */
} ModifyTableState;
/* ----------------
diff --git a/src/test/regress/expected/insert_buffered.out b/src/test/regress/expected/insert_buffered.out
new file mode 100644
index 00000000000..3ee43ae8b04
--- /dev/null
+++ b/src/test/regress/expected/insert_buffered.out
@@ -0,0 +1,271 @@
+--
+-- Tests for buffered-insert adoption in INSERT INTO ... SELECT (Patch 0005).
+-- Restricted first step: non-partitioned heap target, no ON CONFLICT,
+-- no RETURNING, no BEFORE ROW triggers.
+--
+-- ============================================================
+-- T1: Basic bulk insert (exercises multiple auto-flush cycles)
+-- ============================================================
+CREATE TABLE bi_target_basic (id int, val text);
+INSERT INTO bi_target_basic
+SELECT g, 'row-' || g FROM generate_series(1, 2000) g;
+SELECT count(*) FROM bi_target_basic;
+ count
+-------
+ 2000
+(1 row)
+
+SELECT min(id), max(id) FROM bi_target_basic;
+ min | max
+-----+------
+ 1 | 2000
+(1 row)
+
+DROP TABLE bi_target_basic;
+-- ============================================================
+-- T2: Indexed target
+-- ============================================================
+CREATE TABLE bi_target_idx (id int, val text);
+CREATE INDEX bi_target_idx_id ON bi_target_idx (id);
+INSERT INTO bi_target_idx
+SELECT g, 'row-' || g FROM generate_series(1, 500) g;
+SELECT count(*) FROM bi_target_idx;
+ count
+-------
+ 500
+(1 row)
+
+-- Verify index is usable and correct
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_idx WHERE id BETWEEN 1 AND 500;
+ count
+-------
+ 500
+(1 row)
+
+RESET enable_seqscan;
+DROP TABLE bi_target_idx;
+-- ============================================================
+-- T3: AFTER ROW trigger
+-- ============================================================
+CREATE TABLE bi_target_trig (id int, val text);
+CREATE TABLE bi_audit (id int, val text, logged_at timestamp DEFAULT now());
+CREATE FUNCTION bi_audit_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+CREATE TRIGGER bi_target_trig_after
+ AFTER INSERT ON bi_target_trig
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_fn();
+INSERT INTO bi_target_trig
+SELECT g, 'row-' || g FROM generate_series(1, 50) g;
+SELECT count(*) FROM bi_target_trig;
+ count
+-------
+ 50
+(1 row)
+
+SELECT count(*) FROM bi_audit;
+ count
+-------
+ 50
+(1 row)
+
+-- Verify insertion order is preserved
+SELECT bool_and(t.id = a.id) AS order_preserved
+FROM (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_target_trig) t
+JOIN (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_audit) a
+ON t.rn = a.rn;
+ order_preserved
+-----------------
+ t
+(1 row)
+
+DROP TABLE bi_target_trig CASCADE;
+DROP TABLE bi_audit;
+DROP FUNCTION bi_audit_fn;
+-- ============================================================
+-- T4: Index + AFTER ROW trigger combined
+-- ============================================================
+CREATE TABLE bi_target_combo (id int, val text);
+CREATE INDEX bi_target_combo_id ON bi_target_combo (id);
+CREATE TABLE bi_audit_combo (id int, val text);
+CREATE FUNCTION bi_audit_combo_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit_combo (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+CREATE TRIGGER bi_target_combo_after
+ AFTER INSERT ON bi_target_combo
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_combo_fn();
+INSERT INTO bi_target_combo
+SELECT g, 'row-' || g FROM generate_series(1, 100) g;
+SELECT count(*) FROM bi_target_combo;
+ count
+-------
+ 100
+(1 row)
+
+SELECT count(*) FROM bi_audit_combo;
+ count
+-------
+ 100
+(1 row)
+
+-- Verify index correctness
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_combo WHERE id BETWEEN 1 AND 100;
+ count
+-------
+ 100
+(1 row)
+
+RESET enable_seqscan;
+DROP TABLE bi_target_combo CASCADE;
+DROP TABLE bi_audit_combo;
+DROP FUNCTION bi_audit_combo_fn;
+-- ============================================================
+-- T5: ON CONFLICT fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_conflict (id int PRIMARY KEY, val text);
+INSERT INTO bi_target_conflict VALUES (1, 'existing');
+INSERT INTO bi_target_conflict
+SELECT g, 'row-' || g FROM generate_series(1, 10) g
+ON CONFLICT (id) DO NOTHING;
+SELECT count(*) FROM bi_target_conflict;
+ count
+-------
+ 10
+(1 row)
+
+SELECT val FROM bi_target_conflict WHERE id = 1;
+ val
+----------
+ existing
+(1 row)
+
+DROP TABLE bi_target_conflict;
+-- ============================================================
+-- T6: RETURNING fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_ret (id int, val text);
+INSERT INTO bi_target_ret
+SELECT g, 'row-' || g FROM generate_series(1, 3) g
+RETURNING id, val;
+ id | val
+----+-------
+ 1 | row-1
+ 2 | row-2
+ 3 | row-3
+(3 rows)
+
+SELECT count(*) FROM bi_target_ret;
+ count
+-------
+ 3
+(1 row)
+
+DROP TABLE bi_target_ret;
+-- ============================================================
+-- T7: BEFORE ROW trigger fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_br (id int, val text);
+CREATE FUNCTION bi_br_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ NEW.val := NEW.val || '-modified';
+ RETURN NEW;
+END;
+$$;
+CREATE TRIGGER bi_target_br_before
+ BEFORE INSERT ON bi_target_br
+ FOR EACH ROW EXECUTE FUNCTION bi_br_fn();
+INSERT INTO bi_target_br
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+SELECT count(*) FROM bi_target_br;
+ count
+-------
+ 5
+(1 row)
+
+SELECT val FROM bi_target_br WHERE id = 1;
+ val
+----------------
+ row-1-modified
+(1 row)
+
+DROP TABLE bi_target_br;
+DROP FUNCTION bi_br_fn;
+-- ============================================================
+-- T8: Partitioned target fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_part (id int, val text) PARTITION BY RANGE (id);
+CREATE TABLE bi_target_part_1 PARTITION OF bi_target_part FOR VALUES FROM (1) TO (501);
+CREATE TABLE bi_target_part_2 PARTITION OF bi_target_part FOR VALUES FROM (501) TO (1001);
+INSERT INTO bi_target_part
+SELECT g, 'row-' || g FROM generate_series(1, 1000) g;
+SELECT count(*) FROM bi_target_part;
+ count
+-------
+ 1000
+(1 row)
+
+SELECT count(*) FROM bi_target_part_1;
+ count
+-------
+ 500
+(1 row)
+
+SELECT count(*) FROM bi_target_part_2;
+ count
+-------
+ 500
+(1 row)
+
+DROP TABLE bi_target_part;
+-- ============================================================
+-- T9: Volatile target-default fallback
+-- Expected to fall back to non-buffered path under E8.
+-- Test validates correctness; path selection is not observable
+-- from SQL output.
+-- ============================================================
+CREATE TABLE bi_target_volatile (
+ id int,
+ val text,
+ rand_val double precision DEFAULT random()
+);
+INSERT INTO bi_target_volatile (id, val)
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+SELECT count(*) FROM bi_target_volatile;
+ count
+-------
+ 5
+(1 row)
+
+-- Verify the volatile default was evaluated (all values should be distinct)
+SELECT count(DISTINCT rand_val) = count(*) AS all_distinct
+FROM bi_target_volatile;
+ all_distinct
+--------------
+ t
+(1 row)
+
+DROP TABLE bi_target_volatile;
+-- ============================================================
+-- T10: Zero-row insert
+-- ============================================================
+CREATE TABLE bi_target_zero (id int, val text);
+INSERT INTO bi_target_zero
+SELECT g, 'row-' || g FROM generate_series(1, 100) g WHERE false;
+SELECT count(*) FROM bi_target_zero;
+ count
+-------
+ 0
+(1 row)
+
+DROP TABLE bi_target_zero;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5d4f910155e..ae63c3dd73c 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -36,7 +36,7 @@ test: geometry horology tstypes regex type_sanity opr_sanity misc_sanity comment
# execute two copy tests in parallel, to check that copy itself
# is concurrent safe.
# ----------
-test: copy copyselect copydml copyencoding insert insert_conflict
+test: copy copyselect copydml copyencoding insert insert_conflict insert_buffered
# ----------
# More groups of parallel tests
diff --git a/src/test/regress/sql/insert_buffered.sql b/src/test/regress/sql/insert_buffered.sql
new file mode 100644
index 00000000000..2ca6f3525f4
--- /dev/null
+++ b/src/test/regress/sql/insert_buffered.sql
@@ -0,0 +1,209 @@
+--
+-- Tests for buffered-insert adoption in INSERT INTO ... SELECT (Patch 0005).
+-- Restricted first step: non-partitioned heap target, no ON CONFLICT,
+-- no RETURNING, no BEFORE ROW triggers.
+--
+
+-- ============================================================
+-- T1: Basic bulk insert (exercises multiple auto-flush cycles)
+-- ============================================================
+CREATE TABLE bi_target_basic (id int, val text);
+
+INSERT INTO bi_target_basic
+SELECT g, 'row-' || g FROM generate_series(1, 2000) g;
+
+SELECT count(*) FROM bi_target_basic;
+SELECT min(id), max(id) FROM bi_target_basic;
+
+DROP TABLE bi_target_basic;
+
+-- ============================================================
+-- T2: Indexed target
+-- ============================================================
+CREATE TABLE bi_target_idx (id int, val text);
+CREATE INDEX bi_target_idx_id ON bi_target_idx (id);
+
+INSERT INTO bi_target_idx
+SELECT g, 'row-' || g FROM generate_series(1, 500) g;
+
+SELECT count(*) FROM bi_target_idx;
+
+-- Verify index is usable and correct
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_idx WHERE id BETWEEN 1 AND 500;
+RESET enable_seqscan;
+
+DROP TABLE bi_target_idx;
+
+-- ============================================================
+-- T3: AFTER ROW trigger
+-- ============================================================
+CREATE TABLE bi_target_trig (id int, val text);
+CREATE TABLE bi_audit (id int, val text, logged_at timestamp DEFAULT now());
+
+CREATE FUNCTION bi_audit_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+
+CREATE TRIGGER bi_target_trig_after
+ AFTER INSERT ON bi_target_trig
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_fn();
+
+INSERT INTO bi_target_trig
+SELECT g, 'row-' || g FROM generate_series(1, 50) g;
+
+SELECT count(*) FROM bi_target_trig;
+SELECT count(*) FROM bi_audit;
+
+-- Verify insertion order is preserved
+SELECT bool_and(t.id = a.id) AS order_preserved
+FROM (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_target_trig) t
+JOIN (SELECT id, row_number() OVER (ORDER BY ctid) AS rn FROM bi_audit) a
+ON t.rn = a.rn;
+
+DROP TABLE bi_target_trig CASCADE;
+DROP TABLE bi_audit;
+DROP FUNCTION bi_audit_fn;
+
+-- ============================================================
+-- T4: Index + AFTER ROW trigger combined
+-- ============================================================
+CREATE TABLE bi_target_combo (id int, val text);
+CREATE INDEX bi_target_combo_id ON bi_target_combo (id);
+CREATE TABLE bi_audit_combo (id int, val text);
+
+CREATE FUNCTION bi_audit_combo_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ INSERT INTO bi_audit_combo (id, val) VALUES (NEW.id, NEW.val);
+ RETURN NEW;
+END;
+$$;
+
+CREATE TRIGGER bi_target_combo_after
+ AFTER INSERT ON bi_target_combo
+ FOR EACH ROW EXECUTE FUNCTION bi_audit_combo_fn();
+
+INSERT INTO bi_target_combo
+SELECT g, 'row-' || g FROM generate_series(1, 100) g;
+
+SELECT count(*) FROM bi_target_combo;
+SELECT count(*) FROM bi_audit_combo;
+
+-- Verify index correctness
+SET enable_seqscan = off;
+SELECT count(*) FROM bi_target_combo WHERE id BETWEEN 1 AND 100;
+RESET enable_seqscan;
+
+DROP TABLE bi_target_combo CASCADE;
+DROP TABLE bi_audit_combo;
+DROP FUNCTION bi_audit_combo_fn;
+
+-- ============================================================
+-- T5: ON CONFLICT fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_conflict (id int PRIMARY KEY, val text);
+
+INSERT INTO bi_target_conflict VALUES (1, 'existing');
+
+INSERT INTO bi_target_conflict
+SELECT g, 'row-' || g FROM generate_series(1, 10) g
+ON CONFLICT (id) DO NOTHING;
+
+SELECT count(*) FROM bi_target_conflict;
+SELECT val FROM bi_target_conflict WHERE id = 1;
+
+DROP TABLE bi_target_conflict;
+
+-- ============================================================
+-- T6: RETURNING fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_ret (id int, val text);
+
+INSERT INTO bi_target_ret
+SELECT g, 'row-' || g FROM generate_series(1, 3) g
+RETURNING id, val;
+
+SELECT count(*) FROM bi_target_ret;
+
+DROP TABLE bi_target_ret;
+
+-- ============================================================
+-- T7: BEFORE ROW trigger fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_br (id int, val text);
+
+CREATE FUNCTION bi_br_fn() RETURNS trigger
+LANGUAGE plpgsql AS $$
+BEGIN
+ NEW.val := NEW.val || '-modified';
+ RETURN NEW;
+END;
+$$;
+
+CREATE TRIGGER bi_target_br_before
+ BEFORE INSERT ON bi_target_br
+ FOR EACH ROW EXECUTE FUNCTION bi_br_fn();
+
+INSERT INTO bi_target_br
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+
+SELECT count(*) FROM bi_target_br;
+SELECT val FROM bi_target_br WHERE id = 1;
+
+DROP TABLE bi_target_br;
+DROP FUNCTION bi_br_fn;
+
+-- ============================================================
+-- T8: Partitioned target fallback (uses non-buffered path)
+-- ============================================================
+CREATE TABLE bi_target_part (id int, val text) PARTITION BY RANGE (id);
+CREATE TABLE bi_target_part_1 PARTITION OF bi_target_part FOR VALUES FROM (1) TO (501);
+CREATE TABLE bi_target_part_2 PARTITION OF bi_target_part FOR VALUES FROM (501) TO (1001);
+
+INSERT INTO bi_target_part
+SELECT g, 'row-' || g FROM generate_series(1, 1000) g;
+
+SELECT count(*) FROM bi_target_part;
+SELECT count(*) FROM bi_target_part_1;
+SELECT count(*) FROM bi_target_part_2;
+
+DROP TABLE bi_target_part;
+
+-- ============================================================
+-- T9: Volatile target-default fallback
+-- Expected to fall back to non-buffered path under E8.
+-- Test validates correctness; path selection is not observable
+-- from SQL output.
+-- ============================================================
+CREATE TABLE bi_target_volatile (
+ id int,
+ val text,
+ rand_val double precision DEFAULT random()
+);
+
+INSERT INTO bi_target_volatile (id, val)
+SELECT g, 'row-' || g FROM generate_series(1, 5) g;
+
+SELECT count(*) FROM bi_target_volatile;
+-- Verify the volatile default was evaluated (all values should be distinct)
+SELECT count(DISTINCT rand_val) = count(*) AS all_distinct
+FROM bi_target_volatile;
+
+DROP TABLE bi_target_volatile;
+
+-- ============================================================
+-- T10: Zero-row insert
+-- ============================================================
+CREATE TABLE bi_target_zero (id int, val text);
+
+INSERT INTO bi_target_zero
+SELECT g, 'row-' || g FROM generate_series(1, 100) g WHERE false;
+
+SELECT count(*) FROM bi_target_zero;
+
+DROP TABLE bi_target_zero;
--
2.52.0
[application/octet-stream] v1-0002-createas-use-buffered-insert-API-for-CTAS.patch (11.6K, 5-v1-0002-createas-use-buffered-insert-API-for-CTAS.patch)
download | inline diff:
From d6b6cb19c326d79693c2e6238cfbe96f5b0fa0e0 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 12:20:59 -0700
Subject: [PATCH v1 2/5] createas: use buffered-insert API for CTAS
Adopt the buffered-insert lifecycle API in the CTAS dest receiver.
CTAS uses table_buffered_insert_begin()/put()/end() with a NULL flush
callback and falls back to the existing single-row path when needed.
Retain the volatile-function check and EXECUTE-path fallback
conservatively for the initial patch series.
Add focused CTAS regression tests.
---
src/backend/commands/createas.c | 112 ++++++++++++++++++----
src/test/regress/expected/select_into.out | 98 +++++++++++++++++++
src/test/regress/sql/select_into.sql | 44 +++++++++
3 files changed, 234 insertions(+), 20 deletions(-)
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6dbb831ca89..4ba25c9e336 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -40,6 +40,7 @@
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/queryjumble.h"
+#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "rewrite/rewriteHandler.h"
#include "tcop/tcopprot.h"
@@ -57,7 +58,9 @@ typedef struct
ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */
CommandId output_cid; /* cmin to insert in output tuples */
uint32 ti_options; /* table_tuple_insert performance options */
- BulkInsertState bistate; /* bulk insert state */
+ BulkInsertState bistate; /* bulk insert state (fallback path only) */
+ TableBufferedInsertState buffered_state; /* buffered-insert state, or NULL */
+ bool use_buffered_insert; /* true if buffered path is eligible */
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -260,7 +263,15 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
Assert(!is_matview); /* excluded by syntax */
ExecuteQuery(pstate, estmt, into, params, dest, qc);
- /* get object address that intorel_startup saved for us */
+ /*
+ * get object address that intorel_startup saved for us.
+ *
+ * Note: use_buffered_insert stays false (its palloc0 default) for the
+ * EXECUTE path. We conservatively skip the buffered-insert
+ * optimization here because the prepared statement's plan is not
+ * available for volatile-function inspection at this point. Relaxing
+ * this is outside Patch 0002 scope.
+ */
address = ((DR_intorel *) dest)->reladdr;
return address;
@@ -323,6 +334,24 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params, NULL);
+ /*
+ * Conservative implementation choice: disable the buffered-insert
+ * path if the planned target list contains volatile functions.
+ *
+ * The buffered-insert API contract does not require this check — the
+ * CTAS target table is created within this statement and cannot be
+ * referenced by the source query. This guard is retained for the
+ * initial patch series as a caller-local conservatism and can be
+ * relaxed after validation without any API change.
+ */
+ {
+ DR_intorel *myState = (DR_intorel *) dest;
+
+ myState->use_buffered_insert =
+ !contain_volatile_functions_after_planning(
+ (Expr *) plan->planTree->targetlist);
+ }
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only
@@ -564,10 +593,45 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
* If WITH NO DATA is specified, there is no need to set up the state for
* bulk inserts as there are no tuples to insert.
*/
- if (!into->skipData)
- myState->bistate = GetBulkInsertState();
- else
+ if (into->skipData)
+ {
myState->bistate = NULL;
+ myState->buffered_state = NULL;
+ }
+ else if (myState->use_buffered_insert)
+ {
+ /*
+ * Try the buffered-insert path. Pass NULL flush callback -- CTAS
+ * has no indexes, triggers, or per-tuple post-insert work.
+ */
+ myState->ti_options |= TABLE_INSERT_BAS_BULKWRITE;
+ myState->buffered_state =
+ table_buffered_insert_begin(intoRelationDesc,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, NULL);
+
+ if (myState->buffered_state != NULL)
+ {
+ /* Buffered path active; bistate is managed inside the AM */
+ myState->bistate = NULL;
+ }
+ else
+ {
+ /* AM does not support buffered inserts; fall back */
+ myState->bistate = GetBulkInsertState();
+ }
+ }
+ else
+ {
+ /*
+ * Fallback to single-row path. This is reached when the
+ * volatile-function conservative guard fired, or for CTAS via
+ * EXECUTE (where use_buffered_insert stays false by default).
+ */
+ myState->buffered_state = NULL;
+ myState->bistate = GetBulkInsertState();
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -587,19 +651,18 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
/* Nothing to insert if WITH NO DATA is specified. */
if (!myState->into->skipData)
{
- /*
- * Note that the input slot might not be of the type of the target
- * relation. That's supported by table_tuple_insert(), but slightly
- * less efficient than inserting with the right slot - but the
- * alternative would be to copy into a slot of the right type, which
- * would not be cheap either. This also doesn't allow accessing per-AM
- * data (say a tuple's xmin), but since we don't do that here...
- */
- table_tuple_insert(myState->rel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ if (myState->buffered_state != NULL)
+ {
+ table_buffered_insert_put(myState->buffered_state, slot);
+ }
+ else
+ {
+ table_tuple_insert(myState->rel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+ }
}
/* We know this is a newly created relation, so there are no indexes */
@@ -618,8 +681,17 @@ intorel_shutdown(DestReceiver *self)
if (!into->skipData)
{
- FreeBulkInsertState(myState->bistate);
- table_finish_bulk_insert(myState->rel, myState->ti_options);
+ if (myState->buffered_state != NULL)
+ {
+ /* end() flushes remaining tuples and subsumes finish_bulk_insert */
+ table_buffered_insert_end(myState->buffered_state);
+ myState->buffered_state = NULL;
+ }
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->rel, myState->ti_options);
+ }
}
/* close rel, but keep lock until commit */
diff --git a/src/test/regress/expected/select_into.out b/src/test/regress/expected/select_into.out
index d04ca2b1bf7..36cc783fda5 100644
--- a/src/test/regress/expected/select_into.out
+++ b/src/test/regress/expected/select_into.out
@@ -220,3 +220,101 @@ NOTICE: relation "ctas_ine_tbl" already exists, skipping
(0 rows)
DROP TABLE ctas_ine_tbl;
+--
+-- Tests for CTAS with buffered-insert path.
+--
+-- These tests verify correctness of CTAS results under both the buffered
+-- path (no volatile functions) and the fallback single-row path (volatile
+-- functions present, or EXECUTE). Path selection is not directly observable
+-- in SQL output; these tests validate that results are correct regardless.
+--
+-- Buffered path: small row count, verify contents.
+CREATE TABLE ctas_buffered_1 AS SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM ctas_buffered_1;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM ctas_buffered_1 ORDER BY a;
+ a | b
+---+----
+ 1 | 10
+ 2 | 20
+ 3 | 30
+ 4 | 40
+ 5 | 50
+(5 rows)
+
+DROP TABLE ctas_buffered_1;
+-- Buffered path: enough rows to trigger auto-flush (>1000 slot threshold).
+CREATE TABLE ctas_buffered_2 AS SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM ctas_buffered_2;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM ctas_buffered_2;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+DROP TABLE ctas_buffered_2;
+-- Buffered path: wide tuples to exercise byte-threshold flushing.
+CREATE TABLE ctas_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM ctas_buffered_wide;
+ count
+-------
+ 100
+(1 row)
+
+SELECT id, length(col1), length(col2), length(col3) FROM ctas_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+ id | length | length | length
+-----+--------+--------+--------
+ 1 | 200 | 200 | 200
+ 50 | 200 | 200 | 200
+ 100 | 200 | 200 | 200
+(3 rows)
+
+DROP TABLE ctas_buffered_wide;
+-- Fallback path: random() triggers the conservative volatile-function guard.
+-- Result must still be correct through the single-row insertion path.
+CREATE TABLE ctas_volatile AS SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM ctas_volatile;
+ count
+-------
+ 10
+(1 row)
+
+SELECT a FROM ctas_volatile ORDER BY a;
+ a
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+DROP TABLE ctas_volatile;
+-- WITH NO DATA: no insertion path exercised; verify unchanged behavior.
+CREATE TABLE ctas_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM ctas_nodata;
+ count
+-------
+ 0
+(1 row)
+
+DROP TABLE ctas_nodata;
diff --git a/src/test/regress/sql/select_into.sql b/src/test/regress/sql/select_into.sql
index f71e3940e0a..8147113cee3 100644
--- a/src/test/regress/sql/select_into.sql
+++ b/src/test/regress/sql/select_into.sql
@@ -136,3 +136,47 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
CREATE TABLE IF NOT EXISTS ctas_ine_tbl AS EXECUTE ctas_ine_query; -- ok
DROP TABLE ctas_ine_tbl;
+
+--
+-- Tests for CTAS with buffered-insert path.
+--
+-- These tests verify correctness of CTAS results under both the buffered
+-- path (no volatile functions) and the fallback single-row path (volatile
+-- functions present, or EXECUTE). Path selection is not directly observable
+-- in SQL output; these tests validate that results are correct regardless.
+--
+
+-- Buffered path: small row count, verify contents.
+CREATE TABLE ctas_buffered_1 AS SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM ctas_buffered_1;
+SELECT * FROM ctas_buffered_1 ORDER BY a;
+DROP TABLE ctas_buffered_1;
+
+-- Buffered path: enough rows to trigger auto-flush (>1000 slot threshold).
+CREATE TABLE ctas_buffered_2 AS SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM ctas_buffered_2;
+SELECT min(a), max(a) FROM ctas_buffered_2;
+DROP TABLE ctas_buffered_2;
+
+-- Buffered path: wide tuples to exercise byte-threshold flushing.
+CREATE TABLE ctas_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM ctas_buffered_wide;
+SELECT id, length(col1), length(col2), length(col3) FROM ctas_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+DROP TABLE ctas_buffered_wide;
+
+-- Fallback path: random() triggers the conservative volatile-function guard.
+-- Result must still be correct through the single-row insertion path.
+CREATE TABLE ctas_volatile AS SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM ctas_volatile;
+SELECT a FROM ctas_volatile ORDER BY a;
+DROP TABLE ctas_volatile;
+
+-- WITH NO DATA: no insertion path exercised; verify unchanged behavior.
+CREATE TABLE ctas_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM ctas_nodata;
+DROP TABLE ctas_nodata;
--
2.52.0
[application/octet-stream] v1-0003-matview-use-buffered-insert-API-for-CMV-and-RMV.patch (12.0K, 6-v1-0003-matview-use-buffered-insert-API-for-CMV-and-RMV.patch)
download | inline diff:
From 0b810edcdc5e80ab4ec7d77a8d6687fdedd73b32 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 12:43:31 -0700
Subject: [PATCH v1 3/5] matview: use buffered-insert API for CMV and RMV
Adopt the buffered-insert lifecycle API in the transientrel datafill
path used by CREATE MATERIALIZED VIEW and REFRESH MATERIALIZED VIEW.
The path uses table_buffered_insert_begin()/put()/end() with a NULL
flush callback and falls back to the existing single-row path when
needed.
Retain the volatile-function check conservatively for the initial patch
series. Concurrent refresh is unchanged.
Add focused CMV/RMV regression tests.
---
src/backend/commands/matview.c | 99 +++++++++++++----
src/test/regress/expected/matview.out | 152 ++++++++++++++++++++++++++
src/test/regress/sql/matview.sql | 59 ++++++++++
3 files changed, 291 insertions(+), 19 deletions(-)
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index f7d8007f796..7de4acbdfb1 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -31,6 +31,7 @@
#include "executor/executor.h"
#include "executor/spi.h"
#include "miscadmin.h"
+#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lmgr.h"
@@ -50,7 +51,9 @@ typedef struct
Relation transientrel; /* relation to write to */
CommandId output_cid; /* cmin to insert in output tuples */
uint32 ti_options; /* table_tuple_insert performance options */
- BulkInsertState bistate; /* bulk insert state */
+ BulkInsertState bistate; /* bulk insert state (fallback path only) */
+ TableBufferedInsertState buffered_state; /* buffered-insert state, or NULL */
+ bool use_buffered_insert; /* true if buffered path is eligible */
} DR_transientrel;
static int matview_maintenance_depth = 0;
@@ -427,6 +430,24 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL, NULL);
+ /*
+ * Conservative implementation choice: disable the buffered-insert path
+ * if the planned target list contains volatile functions.
+ *
+ * The buffered-insert API contract does not require this check — the
+ * matview's defining query was parsed at creation time and cannot
+ * reference the transient target table. This guard is retained for the
+ * initial patch series as a caller-local conservatism and can be relaxed
+ * after validation without any API change.
+ */
+ {
+ DR_transientrel *myState = (DR_transientrel *) dest;
+
+ myState->use_buffered_insert =
+ !contain_volatile_functions_after_planning(
+ (Expr *) plan->planTree->targetlist);
+ }
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
@@ -492,7 +513,40 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
- myState->bistate = GetBulkInsertState();
+
+ if (myState->use_buffered_insert)
+ {
+ /*
+ * Try the buffered-insert path. Pass NULL flush callback — the
+ * transient table has no indexes, triggers, or per-tuple post-insert
+ * work during the datafill phase.
+ */
+ myState->ti_options |= TABLE_INSERT_BAS_BULKWRITE;
+ myState->buffered_state =
+ table_buffered_insert_begin(transientrel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, NULL);
+
+ if (myState->buffered_state != NULL)
+ {
+ myState->bistate = NULL;
+ }
+ else
+ {
+ /* AM does not support buffered inserts; fall back */
+ myState->bistate = GetBulkInsertState();
+ }
+ }
+ else
+ {
+ /*
+ * Buffered insertion not selected for this datafill. Currently this
+ * is reached when the conservative volatile-function guard fires.
+ */
+ myState->buffered_state = NULL;
+ myState->bistate = GetBulkInsertState();
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -509,20 +563,19 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- /*
- * Note that the input slot might not be of the type of the target
- * relation. That's supported by table_tuple_insert(), but slightly less
- * efficient than inserting with the right slot - but the alternative
- * would be to copy into a slot of the right type, which would not be
- * cheap either. This also doesn't allow accessing per-AM data (say a
- * tuple's xmin), but since we don't do that here...
- */
-
- table_tuple_insert(myState->transientrel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ /* Both paths accept the caller-provided slot directly. */
+ if (myState->buffered_state != NULL)
+ {
+ table_buffered_insert_put(myState->buffered_state, slot);
+ }
+ else
+ {
+ table_tuple_insert(myState->transientrel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+ }
/* We know this is a newly created relation, so there are no indexes */
@@ -537,9 +590,17 @@ transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- FreeBulkInsertState(myState->bistate);
-
- table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ if (myState->buffered_state != NULL)
+ {
+ /* end() flushes remaining tuples and subsumes finish_bulk_insert */
+ table_buffered_insert_end(myState->buffered_state);
+ myState->buffered_state = NULL;
+ }
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ }
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
diff --git a/src/test/regress/expected/matview.out b/src/test/regress/expected/matview.out
index 0355720dfc6..330ea4989f2 100644
--- a/src/test/regress/expected/matview.out
+++ b/src/test/regress/expected/matview.out
@@ -699,3 +699,155 @@ NOTICE: relation "matview_ine_tab" already exists, skipping
(0 rows)
DROP MATERIALIZED VIEW matview_ine_tab;
+--
+-- Tests for CMV/RMV with buffered-insert path.
+--
+-- These tests verify correctness under both the buffered path (no volatile
+-- functions) and the fallback single-row path (volatile functions present).
+-- Path selection is not directly observable in SQL output.
+--
+-- CMV: basic correctness
+CREATE MATERIALIZED VIEW mv_buffered_1 AS
+ SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM mv_buffered_1;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM mv_buffered_1 ORDER BY a;
+ a | b
+---+----
+ 1 | 10
+ 2 | 20
+ 3 | 30
+ 4 | 40
+ 5 | 50
+(5 rows)
+
+-- RMV: refresh repopulates correctly
+REFRESH MATERIALIZED VIEW mv_buffered_1;
+SELECT count(*) FROM mv_buffered_1;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM mv_buffered_1 ORDER BY a;
+ a | b
+---+----
+ 1 | 10
+ 2 | 20
+ 3 | 30
+ 4 | 40
+ 5 | 50
+(5 rows)
+
+DROP MATERIALIZED VIEW mv_buffered_1;
+-- CMV + RMV: bulk case to exercise auto-flush (>1000 rows)
+CREATE MATERIALIZED VIEW mv_buffered_bulk AS
+ SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM mv_buffered_bulk;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+REFRESH MATERIALIZED VIEW mv_buffered_bulk;
+SELECT count(*) FROM mv_buffered_bulk;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+DROP MATERIALIZED VIEW mv_buffered_bulk;
+-- Wide tuples: verify no regression
+CREATE MATERIALIZED VIEW mv_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM mv_buffered_wide;
+ count
+-------
+ 100
+(1 row)
+
+SELECT id, length(col1), length(col2), length(col3)
+ FROM mv_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+ id | length | length | length
+-----+--------+--------+--------
+ 1 | 200 | 200 | 200
+ 50 | 200 | 200 | 200
+ 100 | 200 | 200 | 200
+(3 rows)
+
+DROP MATERIALIZED VIEW mv_buffered_wide;
+-- Volatile-function fallback: random() triggers conservative guard.
+-- Result must still be correct through the single-row path.
+CREATE MATERIALIZED VIEW mv_volatile AS
+ SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM mv_volatile;
+ count
+-------
+ 10
+(1 row)
+
+SELECT a FROM mv_volatile ORDER BY a;
+ a
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+REFRESH MATERIALIZED VIEW mv_volatile;
+SELECT count(*) FROM mv_volatile;
+ count
+-------
+ 10
+(1 row)
+
+SELECT a FROM mv_volatile ORDER BY a;
+ a
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+DROP MATERIALIZED VIEW mv_volatile;
+-- WITH NO DATA: unchanged behavior
+CREATE MATERIALIZED VIEW mv_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM mv_nodata; -- error: not populated
+ERROR: materialized view "mv_nodata" has not been populated
+HINT: Use the REFRESH MATERIALIZED VIEW command.
+REFRESH MATERIALIZED VIEW mv_nodata WITH NO DATA;
+DROP MATERIALIZED VIEW mv_nodata;
diff --git a/src/test/regress/sql/matview.sql b/src/test/regress/sql/matview.sql
index 934426b9ae8..43ceee50e90 100644
--- a/src/test/regress/sql/matview.sql
+++ b/src/test/regress/sql/matview.sql
@@ -318,3 +318,62 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, BUFFERS OFF)
CREATE MATERIALIZED VIEW IF NOT EXISTS matview_ine_tab AS
SELECT 1 / 0 WITH NO DATA; -- ok
DROP MATERIALIZED VIEW matview_ine_tab;
+
+--
+-- Tests for CMV/RMV with buffered-insert path.
+--
+-- These tests verify correctness under both the buffered path (no volatile
+-- functions) and the fallback single-row path (volatile functions present).
+-- Path selection is not directly observable in SQL output.
+--
+
+-- CMV: basic correctness
+CREATE MATERIALIZED VIEW mv_buffered_1 AS
+ SELECT g AS a, g * 10 AS b FROM generate_series(1, 5) g;
+SELECT count(*) FROM mv_buffered_1;
+SELECT * FROM mv_buffered_1 ORDER BY a;
+
+-- RMV: refresh repopulates correctly
+REFRESH MATERIALIZED VIEW mv_buffered_1;
+SELECT count(*) FROM mv_buffered_1;
+SELECT * FROM mv_buffered_1 ORDER BY a;
+DROP MATERIALIZED VIEW mv_buffered_1;
+
+-- CMV + RMV: bulk case to exercise auto-flush (>1000 rows)
+CREATE MATERIALIZED VIEW mv_buffered_bulk AS
+ SELECT g AS a FROM generate_series(1, 2500) g;
+SELECT count(*) FROM mv_buffered_bulk;
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+REFRESH MATERIALIZED VIEW mv_buffered_bulk;
+SELECT count(*) FROM mv_buffered_bulk;
+SELECT min(a), max(a) FROM mv_buffered_bulk;
+DROP MATERIALIZED VIEW mv_buffered_bulk;
+
+-- Wide tuples: verify no regression
+CREATE MATERIALIZED VIEW mv_buffered_wide AS
+ SELECT g AS id,
+ repeat('x', 200) AS col1,
+ repeat('y', 200) AS col2,
+ repeat('z', 200) AS col3
+ FROM generate_series(1, 100) g;
+SELECT count(*) FROM mv_buffered_wide;
+SELECT id, length(col1), length(col2), length(col3)
+ FROM mv_buffered_wide WHERE id IN (1, 50, 100) ORDER BY id;
+DROP MATERIALIZED VIEW mv_buffered_wide;
+
+-- Volatile-function fallback: random() triggers conservative guard.
+-- Result must still be correct through the single-row path.
+CREATE MATERIALIZED VIEW mv_volatile AS
+ SELECT g AS a, random() AS r FROM generate_series(1, 10) g;
+SELECT count(*) FROM mv_volatile;
+SELECT a FROM mv_volatile ORDER BY a;
+REFRESH MATERIALIZED VIEW mv_volatile;
+SELECT count(*) FROM mv_volatile;
+SELECT a FROM mv_volatile ORDER BY a;
+DROP MATERIALIZED VIEW mv_volatile;
+
+-- WITH NO DATA: unchanged behavior
+CREATE MATERIALIZED VIEW mv_nodata AS SELECT 1 AS a WITH NO DATA;
+SELECT count(*) FROM mv_nodata; -- error: not populated
+REFRESH MATERIALIZED VIEW mv_nodata WITH NO DATA;
+DROP MATERIALIZED VIEW mv_nodata;
--
2.52.0
[application/octet-stream] v1-0004-copy-adopt-buffered-insert-API-for-COPY-FROM.patch (18.4K, 7-v1-0004-copy-adopt-buffered-insert-API-for-COPY-FROM.patch)
download | inline diff:
From 696e8bc0b5606385444d6f89397aa3e3b6801d43 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Thu, 23 Apr 2026 13:42:46 -0700
Subject: [PATCH v1 4/5] copy: adopt buffered-insert API for COPY FROM
Adopt the buffered-insert lifecycle API in COPY FROM for non-FDW heap
targets, moving buffering ownership from COPY's local multi-insert
infrastructure into the table AM.
COPY's existing eligibility policy remains unchanged. The non-FDW
buffered path now uses table_buffered_insert_begin()/put()/flush()/end()
with a COPY-local flush callback for per-tuple post-insert work,
including index updates and AFTER ROW INSERT triggers. FDW batching is
unchanged.
Add focused COPY regression coverage for basic correctness, bulk input,
indexed targets, and AFTER ROW trigger behavior.
---
src/backend/commands/copyfrom.c | 185 +++++++++++++++++++++++++++--
src/test/regress/expected/copy.out | 166 ++++++++++++++++++++++++++
src/test/regress/sql/copy.sql | 102 ++++++++++++++++
3 files changed, 440 insertions(+), 13 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 64ac3063c61..f733bd2d296 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -76,6 +76,8 @@
*/
#define MAX_PARTITION_BUFFERS 32
+typedef struct CopyBufferedFlushState CopyBufferedFlushState;
+
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
@@ -83,6 +85,9 @@ typedef struct CopyMultiInsertBuffer
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel if plain
* table; NULL if foreign table */
+ TableBufferedInsertState buffered_state; /* AM-owned buffered-insert
+ * state, or NULL */
+ CopyBufferedFlushState *flush_ctx; /* flush callback context, or NULL */
int nused; /* number of 'slots' containing tuples */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
@@ -102,8 +107,24 @@ typedef struct CopyMultiInsertInfo
EState *estate; /* Executor state used for COPY */
CommandId mycid; /* Command Id used for COPY */
uint32 ti_options; /* table insert options */
+ int64 *processed; /* pointer to CopyFrom's row counter */
} CopyMultiInsertInfo;
+/*
+ * Context for the buffered-insert flush callback used by COPY. Carries the
+ * state needed to perform per-tuple post-insert work (index updates, AFTER
+ * ROW INSERT triggers, error context, progress tracking).
+ */
+typedef struct CopyBufferedFlushState
+{
+ CopyFromState cstate;
+ EState *estate;
+ ResultRelInfo *resultRelInfo;
+ int64 *processed; /* pointer to CopyFrom's processed counter */
+} CopyBufferedFlushState;
+
+static void CopyBufferedFlushCallback(void *context, TupleTableSlot *slot);
+
/* non-export function prototypes */
static void ClosePipeFromProgram(CopyFromState cstate);
@@ -362,15 +383,57 @@ CopyLimitPrintoutLength(const char *str)
* ResultRelInfo.
*/
static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
{
CopyMultiInsertBuffer *buffer;
buffer = palloc_object(CopyMultiInsertBuffer);
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
buffer->resultRelInfo = rri;
- buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
+ buffer->buffered_state = NULL;
+ buffer->flush_ctx = NULL;
+
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ /*
+ * Non-FDW table: try the AM-owned buffered-insert path. The flush
+ * callback handles index updates and AFTER ROW INSERT triggers.
+ */
+ CopyBufferedFlushState *flush_ctx;
+
+ flush_ctx = palloc_object(CopyBufferedFlushState);
+ flush_ctx->cstate = miinfo->cstate;
+ flush_ctx->estate = miinfo->estate;
+ flush_ctx->resultRelInfo = rri;
+ flush_ctx->processed = miinfo->processed;
+
+ buffer->buffered_state =
+ table_buffered_insert_begin(rri->ri_RelationDesc,
+ miinfo->mycid,
+ miinfo->ti_options |
+ TABLE_INSERT_BAS_BULKWRITE,
+ CopyBufferedFlushCallback,
+ flush_ctx);
+
+ if (buffer->buffered_state != NULL)
+ {
+ /* AM-owned buffering active; no COPY-side bistate needed */
+ buffer->bistate = NULL;
+ buffer->flush_ctx = flush_ctx;
+ }
+ else
+ {
+ /* AM does not support buffered inserts; fall back */
+ pfree(flush_ctx);
+ buffer->bistate = GetBulkInsertState();
+ }
+ }
+ else
+ {
+ /* FDW table: no buffered-insert, no bistate */
+ buffer->bistate = NULL;
+ }
return buffer;
}
@@ -384,7 +447,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
{
CopyMultiInsertBuffer *buffer;
- buffer = CopyMultiInsertBufferInit(rri);
+ buffer = CopyMultiInsertBufferInit(miinfo, rri);
/* Setup back-link so we can easily find this buffer again */
rri->ri_CopyMultiInsertBuffer = buffer;
@@ -401,7 +464,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
static void
CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyFromState cstate, EState *estate, CommandId mycid,
- uint32 ti_options)
+ uint32 ti_options, int64 *processed)
{
miinfo->multiInsertBuffers = NIL;
miinfo->bufferedTuples = 0;
@@ -410,6 +473,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
miinfo->estate = estate;
miinfo->mycid = mycid;
miinfo->ti_options = ti_options;
+ miinfo->processed = processed;
/*
* Only setup the buffer when not dealing with a partitioned table.
@@ -532,6 +596,32 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
/* reset relname_only */
cstate->relname_only = false;
}
+ else if (buffer->buffered_state != NULL)
+ {
+ bool line_buf_valid = cstate->line_buf_valid;
+ uint64 save_cur_lineno = cstate->cur_lineno;
+
+ /*
+ * AM-owned buffered path: tuples were submitted via put() and are
+ * inside the AM already. Flush triggers the batch write and invokes
+ * the flush callback once per written tuple for index updates,
+ * trigger firing, and progress tracking.
+ */
+ cstate->line_buf_valid = false;
+
+ table_buffered_insert_flush(buffer->buffered_state);
+
+ /* Clear slots that were used for staging before put() */
+ for (i = 0; i < nused; i++)
+ {
+ if (slots[i] != NULL)
+ ExecClearTuple(slots[i]);
+ }
+
+ /* reset cur_lineno and line_buf_valid to what they were */
+ cstate->line_buf_valid = line_buf_valid;
+ cstate->cur_lineno = save_cur_lineno;
+ }
else
{
CommandId mycid = miinfo->mycid;
@@ -631,10 +721,20 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
/* Remove back-link to ourself */
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
- if (resultRelInfo->ri_FdwRoutine == NULL)
+ if (buffer->buffered_state != NULL)
+ {
+ /* end() flushes remaining tuples and subsumes finish_bulk_insert */
+ table_buffered_insert_end(buffer->buffered_state);
+ buffer->buffered_state = NULL;
+ if (buffer->flush_ctx != NULL)
+ pfree(buffer->flush_ctx);
+ }
+ else if (resultRelInfo->ri_FdwRoutine == NULL)
{
Assert(buffer->bistate != NULL);
FreeBulkInsertState(buffer->bistate);
+ table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
+ miinfo->ti_options);
}
else
Assert(buffer->bistate == NULL);
@@ -643,10 +743,6 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
- if (resultRelInfo->ri_FdwRoutine == NULL)
- table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
- miinfo->ti_options);
-
pfree(buffer);
}
@@ -761,12 +857,26 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
Assert(buffer != NULL);
- Assert(slot == buffer->slots[buffer->nused]);
- /* Store the line number so we can properly report any errors later */
+ /* Store the line number for error context during flush */
buffer->linenos[buffer->nused] = lineno;
- /* Record this slot as being used */
+ if (buffer->buffered_state != NULL)
+ {
+ /*
+ * AM-owned buffered path: submit the tuple directly to the AM.
+ * The AM captures the data internally; the caller retains slot
+ * ownership and may reuse it. The AM may auto-flush during put(),
+ * which fires the flush callback for already-buffered tuples.
+ */
+ table_buffered_insert_put(buffer->buffered_state, slot);
+ }
+ else
+ {
+ /* Legacy path: slot was already placed into buffer->slots by caller */
+ Assert(slot == buffer->slots[buffer->nused]);
+ }
+
buffer->nused++;
/* Update how many tuples are stored and their size */
@@ -774,6 +884,55 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
miinfo->bufferedBytes += tuplen;
}
+/*
+ * Flush callback for the AM-owned buffered-insert path.
+ *
+ * Invoked once per flushed tuple, in insertion order. The slot is an
+ * AM-owned scratch object with TID set; it is valid only for the duration
+ * of this callback.
+ */
+static void
+CopyBufferedFlushCallback(void *context, TupleTableSlot *slot)
+{
+ CopyBufferedFlushState *ctx = (CopyBufferedFlushState *) context;
+ ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+ EState *estate = ctx->estate;
+
+ /*
+ * cstate->cur_lineno is not restored per-tuple here. During auto-flush
+ * (triggered inside put()), it reflects the line being stored, which is
+ * the best available context. During explicit flush, the caller has
+ * already set line_buf_valid = false, so only the relation name appears
+ * in error context — matching the legacy multi-insert path.
+ */
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+ estate, 0, slot,
+ NIL, NULL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, recheckIndexes,
+ ctx->cstate->transition_capture);
+ list_free(recheckIndexes);
+ }
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, NIL,
+ ctx->cstate->transition_capture);
+ }
+
+ /* Update row counter and progress */
+ (*ctx->processed)++;
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ *ctx->processed);
+}
+
/*
* Copy FROM file to relation.
*/
@@ -1073,7 +1232,7 @@ CopyFrom(CopyFromState cstate)
insertMethod = CIM_MULTI;
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
- estate, mycid, ti_options);
+ estate, mycid, ti_options, &processed);
}
/*
diff --git a/src/test/regress/expected/copy.out b/src/test/regress/expected/copy.out
index 1714faab39c..ec8e781c5ce 100644
--- a/src/test/regress/expected/copy.out
+++ b/src/test/regress/expected/copy.out
@@ -594,3 +594,169 @@ id val
5 15
6 16
DROP TABLE PP;
+--
+-- Tests for COPY FROM with buffered-insert path.
+--
+-- These validate correctness of COPY FROM under the AM-owned buffered path.
+-- Path selection is not directly observable in SQL output.
+--
+-- Basic COPY: small row count
+CREATE TABLE copy_buffered_basic (a int, b text);
+COPY copy_buffered_basic FROM stdin;
+SELECT count(*) FROM copy_buffered_basic;
+ count
+-------
+ 3
+(1 row)
+
+SELECT * FROM copy_buffered_basic ORDER BY a;
+ a | b
+---+-------
+ 1 | hello
+ 2 | world
+ 3 | foo
+(3 rows)
+
+DROP TABLE copy_buffered_basic;
+-- Bulk COPY: exercise buffered path with enough rows for auto-flush.
+-- Generate data in a source table, COPY TO a file, then COPY FROM that file.
+CREATE TABLE copy_bulk_src (a int, b text);
+INSERT INTO copy_bulk_src SELECT g, 'row-' || g FROM generate_series(1, 2500) g;
+CREATE TABLE copy_bulk_dst (a int, b text);
+COPY copy_bulk_src TO '/tmp/copy_buffered_bulk_test.csv' CSV;
+COPY copy_bulk_dst FROM '/tmp/copy_buffered_bulk_test.csv' CSV;
+SELECT count(*) FROM copy_bulk_dst;
+ count
+-------
+ 2500
+(1 row)
+
+SELECT min(a), max(a) FROM copy_bulk_dst;
+ min | max
+-----+------
+ 1 | 2500
+(1 row)
+
+-- Verify data integrity
+SELECT count(*) FROM copy_bulk_dst d JOIN copy_bulk_src s ON d.a = s.a AND d.b = s.b;
+ count
+-------
+ 2500
+(1 row)
+
+DROP TABLE copy_bulk_src;
+DROP TABLE copy_bulk_dst;
+-- COPY into indexed table: verify index maintenance via flush callback
+CREATE TABLE copy_indexed (a int PRIMARY KEY, b text);
+COPY copy_indexed FROM stdin;
+SELECT count(*) FROM copy_indexed;
+ count
+-------
+ 5
+(1 row)
+
+SELECT * FROM copy_indexed ORDER BY a;
+ a | b
+---+---------
+ 1 | alpha
+ 2 | beta
+ 3 | gamma
+ 4 | delta
+ 5 | epsilon
+(5 rows)
+
+-- Verify index is usable
+SET enable_seqscan = off;
+SELECT a FROM copy_indexed WHERE a = 3;
+ a
+---
+ 3
+(1 row)
+
+RESET enable_seqscan;
+DROP TABLE copy_indexed;
+-- COPY with AFTER ROW INSERT trigger: verify trigger fires via flush callback
+CREATE TABLE copy_trigger_tgt (a int, b text);
+CREATE TABLE copy_trigger_log (a int, b text);
+CREATE FUNCTION copy_trigger_fn() RETURNS trigger AS $$
+BEGIN
+ INSERT INTO copy_trigger_log(a, b) VALUES (NEW.a, NEW.b);
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+CREATE TRIGGER copy_after_ins
+ AFTER INSERT ON copy_trigger_tgt
+ FOR EACH ROW EXECUTE FUNCTION copy_trigger_fn();
+COPY copy_trigger_tgt FROM stdin;
+SELECT count(*) FROM copy_trigger_tgt;
+ count
+-------
+ 3
+(1 row)
+
+SELECT a, b FROM copy_trigger_tgt ORDER BY a;
+ a | b
+----+--------
+ 10 | first
+ 20 | second
+ 30 | third
+(3 rows)
+
+-- Verify trigger fired for each row
+SELECT count(*) FROM copy_trigger_log;
+ count
+-------
+ 3
+(1 row)
+
+SELECT a, b FROM copy_trigger_log ORDER BY a;
+ a | b
+----+--------
+ 10 | first
+ 20 | second
+ 30 | third
+(3 rows)
+
+DROP TABLE copy_trigger_tgt CASCADE;
+DROP TABLE copy_trigger_log;
+DROP FUNCTION copy_trigger_fn;
+-- Partitioned COPY: verify correctness under partition routing with buffered path
+CREATE TABLE copy_part_routed (
+ id int,
+ val text
+) PARTITION BY RANGE (id);
+CREATE TABLE copy_part_routed_p1 PARTITION OF copy_part_routed
+ FOR VALUES FROM (1) TO (100);
+CREATE TABLE copy_part_routed_p2 PARTITION OF copy_part_routed
+ FOR VALUES FROM (100) TO (200);
+COPY copy_part_routed FROM stdin;
+-- Total row count
+SELECT count(*) FROM copy_part_routed;
+ count
+-------
+ 6
+(1 row)
+
+-- Per-partition row counts
+SELECT tableoid::regclass, count(*)
+FROM copy_part_routed
+GROUP BY tableoid ORDER BY tableoid::regclass::name;
+ tableoid | count
+---------------------+-------
+ copy_part_routed_p1 | 3
+ copy_part_routed_p2 | 3
+(2 rows)
+
+-- Content correctness
+SELECT * FROM copy_part_routed ORDER BY id;
+ id | val
+-----+---------
+ 1 | alpha
+ 50 | bravo
+ 99 | charlie
+ 100 | delta
+ 150 | echo
+ 199 | foxtrot
+(6 rows)
+
+DROP TABLE copy_part_routed;
diff --git a/src/test/regress/sql/copy.sql b/src/test/regress/sql/copy.sql
index eaad290b257..e4a10e8cf95 100644
--- a/src/test/regress/sql/copy.sql
+++ b/src/test/regress/sql/copy.sql
@@ -535,3 +535,105 @@ CREATE TABLE pp_510 PARTITION OF pp_2 FOR VALUES FROM (5) TO (10);
INSERT INTO pp SELECT g, 10 + g FROM generate_series(1,6) g;
COPY pp TO stdout(header);
DROP TABLE PP;
+
+--
+-- Tests for COPY FROM with buffered-insert path.
+--
+-- These validate correctness of COPY FROM under the AM-owned buffered path.
+-- Path selection is not directly observable in SQL output.
+--
+
+-- Basic COPY: small row count
+CREATE TABLE copy_buffered_basic (a int, b text);
+COPY copy_buffered_basic FROM stdin;
+1 hello
+2 world
+3 foo
+\.
+SELECT count(*) FROM copy_buffered_basic;
+SELECT * FROM copy_buffered_basic ORDER BY a;
+DROP TABLE copy_buffered_basic;
+
+-- Bulk COPY: exercise buffered path with enough rows for auto-flush.
+-- Generate data in a source table, COPY TO a file, then COPY FROM that file.
+CREATE TABLE copy_bulk_src (a int, b text);
+INSERT INTO copy_bulk_src SELECT g, 'row-' || g FROM generate_series(1, 2500) g;
+CREATE TABLE copy_bulk_dst (a int, b text);
+COPY copy_bulk_src TO '/tmp/copy_buffered_bulk_test.csv' CSV;
+COPY copy_bulk_dst FROM '/tmp/copy_buffered_bulk_test.csv' CSV;
+SELECT count(*) FROM copy_bulk_dst;
+SELECT min(a), max(a) FROM copy_bulk_dst;
+-- Verify data integrity
+SELECT count(*) FROM copy_bulk_dst d JOIN copy_bulk_src s ON d.a = s.a AND d.b = s.b;
+DROP TABLE copy_bulk_src;
+DROP TABLE copy_bulk_dst;
+
+-- COPY into indexed table: verify index maintenance via flush callback
+CREATE TABLE copy_indexed (a int PRIMARY KEY, b text);
+COPY copy_indexed FROM stdin;
+1 alpha
+2 beta
+3 gamma
+4 delta
+5 epsilon
+\.
+SELECT count(*) FROM copy_indexed;
+SELECT * FROM copy_indexed ORDER BY a;
+-- Verify index is usable
+SET enable_seqscan = off;
+SELECT a FROM copy_indexed WHERE a = 3;
+RESET enable_seqscan;
+DROP TABLE copy_indexed;
+
+-- COPY with AFTER ROW INSERT trigger: verify trigger fires via flush callback
+CREATE TABLE copy_trigger_tgt (a int, b text);
+CREATE TABLE copy_trigger_log (a int, b text);
+CREATE FUNCTION copy_trigger_fn() RETURNS trigger AS $$
+BEGIN
+ INSERT INTO copy_trigger_log(a, b) VALUES (NEW.a, NEW.b);
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+CREATE TRIGGER copy_after_ins
+ AFTER INSERT ON copy_trigger_tgt
+ FOR EACH ROW EXECUTE FUNCTION copy_trigger_fn();
+COPY copy_trigger_tgt FROM stdin;
+10 first
+20 second
+30 third
+\.
+SELECT count(*) FROM copy_trigger_tgt;
+SELECT a, b FROM copy_trigger_tgt ORDER BY a;
+-- Verify trigger fired for each row
+SELECT count(*) FROM copy_trigger_log;
+SELECT a, b FROM copy_trigger_log ORDER BY a;
+DROP TABLE copy_trigger_tgt CASCADE;
+DROP TABLE copy_trigger_log;
+DROP FUNCTION copy_trigger_fn;
+
+-- Partitioned COPY: verify correctness under partition routing with buffered path
+CREATE TABLE copy_part_routed (
+ id int,
+ val text
+) PARTITION BY RANGE (id);
+CREATE TABLE copy_part_routed_p1 PARTITION OF copy_part_routed
+ FOR VALUES FROM (1) TO (100);
+CREATE TABLE copy_part_routed_p2 PARTITION OF copy_part_routed
+ FOR VALUES FROM (100) TO (200);
+COPY copy_part_routed FROM stdin;
+1 alpha
+50 bravo
+99 charlie
+100 delta
+150 echo
+199 foxtrot
+\.
+-- Total row count
+SELECT count(*) FROM copy_part_routed;
+-- Per-partition row counts
+SELECT tableoid::regclass, count(*)
+FROM copy_part_routed
+GROUP BY tableoid ORDER BY tableoid::regclass::name;
+-- Content correctness
+SELECT * FROM copy_part_routed ORDER BY id;
+DROP TABLE copy_part_routed;
--
2.52.0
view thread (8+ messages)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
In-Reply-To: <CABXr29ExTDUoWZsiA0w+mkrjkJWKhPdiseM7vHJgHY-Mwq9PNg@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox