From ee069b0d3c9b15ad8d4b2ab0bbd73f1392ed02c3 Mon Sep 17 00:00:00 2001 From: Haibo Yan Date: Thu, 23 Apr 2026 11:14:36 -0700 Subject: [PATCH v1 1/5] tableam/heapam: add buffered-insert lifecycle API for heap Add an optional Table AM buffered-insert lifecycle API with begin/put/flush/end callbacks and tableam wrappers. Implement the API for heap only, including internal buffering, heap_multi_insert()-based flushes, optional per-tuple flush callbacks, and explicit end-of-session cleanup. Add a small Table AM-level test module for lifecycle validation. No caller adoption is included in this patch. --- src/backend/access/heap/heapam.c | 321 +++++++++++++++++- src/backend/access/heap/heapam_handler.c | 7 +- src/backend/access/table/tableamapi.c | 12 + src/include/access/heapam.h | 12 + src/include/access/tableam.h | 159 +++++++++ src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + .../modules/test_buffered_insert/Makefile | 23 ++ .../expected/test_buffered_insert.out | 108 ++++++ .../modules/test_buffered_insert/meson.build | 33 ++ .../sql/test_buffered_insert.sql | 41 +++ .../test_buffered_insert--1.0.sql | 30 ++ .../test_buffered_insert.c | 233 +++++++++++++ .../test_buffered_insert.control | 4 + src/tools/pgindent/typedefs.list | 4 + 15 files changed, 970 insertions(+), 19 deletions(-) create mode 100644 src/test/modules/test_buffered_insert/Makefile create mode 100644 src/test/modules/test_buffered_insert/expected/test_buffered_insert.out create mode 100644 src/test/modules/test_buffered_insert/meson.build create mode 100644 src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert.c create mode 100644 src/test/modules/test_buffered_insert/test_buffered_insert.control diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index abfd8e8970a..ae994dd202d 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -53,6 +53,7 @@ #include "utils/datum.h" #include "utils/injection_point.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/spccache.h" #include "utils/syscache.h" @@ -1982,6 +1983,263 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate) } +/* ------------------------------------------------------------------------ + * Heap buffered-insert lifecycle implementation. + * ------------------------------------------------------------------------ + */ + +/* + * Private state for the heap buffered-insert session. + * + * Memory-context hierarchy (all destroyed by MemoryContextDelete(state_ctx)): + * + * state_ctx ("HeapBufferedInsert") -- long-lived, owns stable state + * ├── batch_ctx ("HeapBufInsBatch") -- per-batch HeapTuples + * └── mi_ctx ("HeapBufInsFlush") -- per-flush transient allocations + * └── heap_multi_insert scratch + toasted tuple copies + * + * state_ctx owns: this struct, the HeapTuple pointer array, bistate, + * scratch_slot (callback-only). + * batch_ctx owns: HeapTuples produced by ExecCopySlotHeapTuple() during + * put(). It is reset (not deleted) after each flush, which bulk-frees all + * per-batch allocations. + * mi_ctx owns: heap_multi_insert's transient allocations (including any + * toasted-tuple copies produced by heap_prepare_insert). It is reset after + * each flush once the flush callback (if any) has finished reading the + * post-prepare tuples. + */ +typedef struct HeapBufferedInsertState +{ + TableBufferedInsertStateData base; + CommandId cid; + int options; + BulkInsertState bistate; + TableBufferedInsertFlushCb flush_cb; + void *flush_ctx; + HeapTuple *tuples; /* buffered HeapTuples (in batch_ctx) */ + int ntuples; + int max_tuples; + Size buffered_bytes; /* sum of tuples[i]->t_len */ + TupleTableSlot *scratch_slot; /* callback-only; HeapTuple-backed */ + MemoryContext batch_ctx; /* per-batch HeapTuples; reset after flush */ + MemoryContext mi_ctx; /* reset after each heap_multi_insert */ + MemoryContext state_ctx; /* long-lived; owns stable state */ +} HeapBufferedInsertState; + +#define HEAP_BUFFERED_INSERT_MAX_TUPLES 1000 +#define HEAP_BUFFERED_INSERT_MAX_BYTES 65535 + +static void heap_buffered_insert_do_flush(HeapBufferedInsertState *hstate); +static void heap_buffered_insert_finish_bulkinsert(HeapBufferedInsertState *hstate); + +/* heap_multi_insert core, takes HeapTuples directly (defined near heap_multi_insert) */ +static void heap_multi_insert_raw(Relation relation, HeapTuple *heaptuples, + int ntuples, CommandId cid, uint32 options, + BulkInsertState bistate); + +TableBufferedInsertState +heap_buffered_insert_begin(Relation rel, CommandId cid, int options, + TableBufferedInsertFlushCb flush_cb, + void *flush_ctx) +{ + HeapBufferedInsertState *hstate; + MemoryContext state_ctx; + MemoryContext old_ctx; + + state_ctx = AllocSetContextCreate(CurrentMemoryContext, + "HeapBufferedInsert", + ALLOCSET_DEFAULT_SIZES); + old_ctx = MemoryContextSwitchTo(state_ctx); + + hstate = palloc0(sizeof(HeapBufferedInsertState)); + hstate->base.rel = rel; + hstate->cid = cid; + hstate->options = options; + hstate->flush_cb = flush_cb; + hstate->flush_ctx = flush_ctx; + hstate->state_ctx = state_ctx; + + hstate->max_tuples = HEAP_BUFFERED_INSERT_MAX_TUPLES; + hstate->tuples = palloc_array(HeapTuple, hstate->max_tuples); + hstate->ntuples = 0; + hstate->buffered_bytes = 0; + + if (options & TABLE_INSERT_BAS_BULKWRITE) + hstate->bistate = GetBulkInsertState(); + else + hstate->bistate = NULL; + + hstate->batch_ctx = AllocSetContextCreate(state_ctx, + "HeapBufInsBatch", + ALLOCSET_DEFAULT_SIZES); + + hstate->mi_ctx = AllocSetContextCreate(state_ctx, + "HeapBufInsFlush", + ALLOCSET_DEFAULT_SIZES); + + /* + * Allocate the scratch slot only when a flush callback is present. + * No-callback callers (CTAS, CMV, RMV) pay zero slot overhead. + */ + if (flush_cb != NULL) + hstate->scratch_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), + &TTSOpsHeapTuple); + else + hstate->scratch_slot = NULL; + + MemoryContextSwitchTo(old_ctx); + + return &hstate->base; +} + +void +heap_buffered_insert_put(TableBufferedInsertState state, TupleTableSlot *slot) +{ + HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state; + MemoryContext old_ctx; + HeapTuple htup; + + /* Auto-flush if tuple count is at capacity. */ + if (hstate->ntuples >= hstate->max_tuples) + heap_buffered_insert_do_flush(hstate); + + /* + * Produce a self-contained HeapTuple in batch_ctx. ExecCopySlotHeapTuple + * invokes the slot-type's copy_heap_tuple method, which for every + * built-in slot type allocates a new HeapTuple in CurrentMemoryContext. + * This is the *only* per-row materialization -- the flush path consumes + * the HeapTuple directly via heap_multi_insert_raw() with no further + * slot conversion or copy. + */ + old_ctx = MemoryContextSwitchTo(hstate->batch_ctx); + htup = ExecCopySlotHeapTuple(slot); + MemoryContextSwitchTo(old_ctx); + + hstate->tuples[hstate->ntuples++] = htup; + hstate->buffered_bytes += htup->t_len; + + /* Byte-threshold flush: exact tracking from the HeapTuple t_len. */ + if (hstate->buffered_bytes > HEAP_BUFFERED_INSERT_MAX_BYTES) + heap_buffered_insert_do_flush(hstate); +} + +void +heap_buffered_insert_flush(TableBufferedInsertState state) +{ + HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state; + + if (hstate->ntuples > 0) + heap_buffered_insert_do_flush(hstate); +} + +void +heap_buffered_insert_end(TableBufferedInsertState state) +{ + HeapBufferedInsertState *hstate = (HeapBufferedInsertState *) state; + + /* Flush any remaining tuples */ + if (hstate->ntuples > 0) + heap_buffered_insert_do_flush(hstate); + + /* Clean up the scratch slot used for flush callback */ + if (hstate->scratch_slot != NULL) + ExecDropSingleTupleTableSlot(hstate->scratch_slot); + + /* Perform finish-bulk-insert cleanup (subsumes table_finish_bulk_insert) */ + heap_buffered_insert_finish_bulkinsert(hstate); + + /* + * Release all memory owned by the state, including batch_ctx and mi_ctx + * (both are children of state_ctx). + */ + MemoryContextDelete(hstate->state_ctx); +} + +/* + * Internal: flush all buffered HeapTuples via heap_multi_insert_raw, then + * invoke the flush callback (if any) once per written tuple. Resets both + * per-batch and per-flush contexts so no payload survives across cycles. + * + * Callback-path isolation: the scratch_slot and tuple-to-slot conversion + * only execute when flush_cb is non-NULL, so no-callback callers (CTAS, + * CMV, RMV) pay zero callback overhead per tuple. + */ +static void +heap_buffered_insert_do_flush(HeapBufferedInsertState *hstate) +{ + MemoryContext old_ctx; + int ntuples = hstate->ntuples; + + Assert(ntuples > 0); + + /* + * Switch to the per-flush memory context. heap_multi_insert_raw's + * transient allocations (including any toasted-tuple copies from + * heap_prepare_insert) land here and are bulk-freed after the callback. + */ + old_ctx = MemoryContextSwitchTo(hstate->mi_ctx); + + heap_multi_insert_raw(hstate->base.rel, + hstate->tuples, + ntuples, + hstate->cid, + hstate->options, + hstate->bistate); + + MemoryContextSwitchTo(old_ctx); + + /* + * Invoke the flush callback once per flushed tuple, in insertion order. + * After heap_multi_insert_raw, each tuples[i]->t_self holds the stored + * TID, and tuples[i] points to the post-prepare (possibly toasted) tuple + * in mi_ctx (if toasted) or the original in batch_ctx (if not). Both + * contexts are still alive here. + */ + if (hstate->flush_cb != NULL) + { + for (int i = 0; i < ntuples; i++) + { + ExecStoreHeapTuple(hstate->tuples[i], hstate->scratch_slot, false); + hstate->scratch_slot->tts_tid = hstate->tuples[i]->t_self; + hstate->scratch_slot->tts_tableOid = hstate->tuples[i]->t_tableOid; + hstate->flush_cb(hstate->flush_ctx, hstate->scratch_slot); + ExecClearTuple(hstate->scratch_slot); + } + } + + /* Reset both contexts now that the callback is done. */ + MemoryContextReset(hstate->mi_ctx); + MemoryContextReset(hstate->batch_ctx); + + hstate->ntuples = 0; + hstate->buffered_bytes = 0; +} + +/* + * Perform heap-specific finish-bulk-insert cleanup. + * + * This is the buffered-insert equivalent of what callers of the non-buffered + * path achieve by calling FreeBulkInsertState() + table_finish_bulk_insert() + * at teardown. end() must subsume both. + * + * For the current in-tree heap AM, the cleanup consists of: + * + * 1. Release the BulkInsertState (buffer pin + bulk-write access strategy). + * + * 2. Any action that heap's finish_bulk_insert AM callback would perform. + * Heap does not currently register that callback (the slot in + * heapam_methods is NULL), so no additional action is required. + */ +static void +heap_buffered_insert_finish_bulkinsert(HeapBufferedInsertState *hstate) +{ + if (hstate->bistate != NULL) + FreeBulkInsertState(hstate->bistate); + + /* Heap does not currently register a finish_bulk_insert AM callback. */ +} + + /* * heap_insert - insert tuple into a heap * @@ -2268,22 +2526,24 @@ heap_multi_insert_pages(HeapTuple *heaptuples, int done, int ntuples, Size saveF } /* - * heap_multi_insert - insert multiple tuples into a heap + * heap_multi_insert_raw - core multi-insert for a HeapTuple array * - * This is like heap_insert(), but inserts multiple tuples in one operation. - * That's faster than calling heap_insert() in a loop, because when multiple - * tuples can be inserted on a single page, we can write just a single WAL - * record covering all of them, and only need to lock/unlock the page once. + * Takes an array of pre-formed HeapTuples, runs heap_prepare_insert on each + * (toast + header setup), and inserts them into heap pages. The heaptuples + * array is updated in-place: after return, each entry points to the prepared + * (possibly toasted) tuple with t_self set to the stored TID. + * + * This is the shared core for heap_multi_insert (slot-based callers) and + * heap_buffered_insert_do_flush (HeapTuple-based callers). * * Note: this leaks memory into the current memory context. You can create a * temporary context before calling this, if that's a problem. */ -void -heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, - CommandId cid, uint32 options, BulkInsertState bistate) +static void +heap_multi_insert_raw(Relation relation, HeapTuple *heaptuples, int ntuples, + CommandId cid, uint32 options, BulkInsertState bistate) { TransactionId xid = GetCurrentTransactionId(); - HeapTuple *heaptuples; int i; int ndone; PGAlignedBlock scratch; @@ -2306,16 +2566,11 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, saveFreeSpace = RelationGetTargetPageFreeSpace(relation, HEAP_DEFAULT_FILLFACTOR); - /* Toast and set header data in all the slots */ - heaptuples = palloc(ntuples * sizeof(HeapTuple)); + /* Toast and set header data in all the tuples */ for (i = 0; i < ntuples; i++) { - HeapTuple tuple; - - tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL); - slots[i]->tts_tableOid = RelationGetRelid(relation); - tuple->t_tableOid = slots[i]->tts_tableOid; - heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid, + heaptuples[i]->t_tableOid = RelationGetRelid(relation); + heaptuples[i] = heap_prepare_insert(relation, heaptuples[i], xid, cid, options); } @@ -2639,11 +2894,41 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CacheInvalidateHeapTuple(relation, heaptuples[i], NULL); } + pgstat_count_heap_insert(relation, ntuples); +} + +/* + * heap_multi_insert - insert multiple tuples into a heap + * + * This is like heap_insert(), but inserts multiple tuples in one operation. + * That's faster than calling heap_insert() in a loop, because when multiple + * tuples can be inserted on a single page, we can write just a single WAL + * record covering all of them, and only need to lock/unlock the page once. + * + * Note: this leaks memory into the current memory context. You can create a + * temporary context before calling this, if that's a problem. + */ +void +heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, + CommandId cid, uint32 options, BulkInsertState bistate) +{ + HeapTuple *heaptuples; + int i; + + heaptuples = palloc(ntuples * sizeof(HeapTuple)); + for (i = 0; i < ntuples; i++) + { + heaptuples[i] = ExecFetchSlotHeapTuple(slots[i], true, NULL); + slots[i]->tts_tableOid = RelationGetRelid(relation); + } + + heap_multi_insert_raw(relation, heaptuples, ntuples, cid, options, bistate); + /* copy t_self fields back to the caller's slots */ for (i = 0; i < ntuples; i++) slots[i]->tts_tid = heaptuples[i]->t_self; - pgstat_count_heap_insert(relation, ntuples); + pfree(heaptuples); } /* diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 20d3b46e062..65226b07a5c 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2697,7 +2697,12 @@ static const TableAmRoutine heapam_methods = { .scan_bitmap_next_tuple = heapam_scan_bitmap_next_tuple, .scan_sample_next_block = heapam_scan_sample_next_block, - .scan_sample_next_tuple = heapam_scan_sample_next_tuple + .scan_sample_next_tuple = heapam_scan_sample_next_tuple, + + .buffered_insert_begin = heap_buffered_insert_begin, + .buffered_insert_put = heap_buffered_insert_put, + .buffered_insert_flush = heap_buffered_insert_flush, + .buffered_insert_end = heap_buffered_insert_end, }; diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 5450a27faeb..a0ff123fa83 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -93,6 +93,18 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); + /* + * Buffered-insert callbacks: either all four are NULL (AM does not + * support buffered inserts), or all four are non-NULL. No partially + * populated groups are allowed. + */ + Assert((routine->buffered_insert_begin == NULL) == + (routine->buffered_insert_put == NULL)); + Assert((routine->buffered_insert_begin == NULL) == + (routine->buffered_insert_flush == NULL)); + Assert((routine->buffered_insert_begin == NULL) == + (routine->buffered_insert_end == NULL)); + return routine; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 5176478c295..d8b34e37627 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -379,6 +379,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, uint32 options, BulkInsertState bistate); + +/* heap buffered-insert lifecycle */ +extern TableBufferedInsertState heap_buffered_insert_begin(Relation rel, + CommandId cid, + int options, + TableBufferedInsertFlushCb flush_cb, + void *flush_ctx); +extern void heap_buffered_insert_put(TableBufferedInsertState state, + TupleTableSlot *slot); +extern void heap_buffered_insert_flush(TableBufferedInsertState state); +extern void heap_buffered_insert_end(TableBufferedInsertState state); + extern TM_Result heap_delete(Relation relation, const ItemPointerData *tid, CommandId cid, uint32 options, Snapshot crosscheck, bool wait, TM_FailureData *tmfd); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index f2c36696bca..d5a1875d93c 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -284,6 +284,7 @@ typedef struct TM_IndexDeleteOp #define TABLE_INSERT_SKIP_FSM 0x0002 #define TABLE_INSERT_FROZEN 0x0004 #define TABLE_INSERT_NO_LOGICAL 0x0008 +#define TABLE_INSERT_BAS_BULKWRITE 0x0020 /* "options" flag bits for table_tuple_delete */ #define TABLE_DELETE_CHANGING_PARTITION (1 << 0) @@ -307,6 +308,40 @@ typedef void (*IndexBuildCallback) (Relation index, bool tupleIsAlive, void *state); +/* + * State for a buffered-insert session. AMs embed this as the first field of + * their private state struct. The base struct is minimal: only the target + * relation is exposed, which is needed for AM dispatch via the inline wrapper + * functions. + * + * AMs that do not support buffered inserts leave the buffered_insert_begin + * callback NULL; callers detect this via table_buffered_insert_begin() + * returning NULL. + */ +typedef struct TableBufferedInsertStateData +{ + Relation rel; /* target relation -- needed for AM dispatch */ +} TableBufferedInsertStateData; + +typedef TableBufferedInsertStateData *TableBufferedInsertState; + +/* + * Callback invoked once per flushed tuple, in insertion order, after the AM + * writes a batch of buffered tuples to storage. + * + * The slot is an AM-owned scratch object used solely as a handoff vehicle. + * It contains the stored tuple with TID (or AM-equivalent locator) set. + * It is valid only for the duration of this callback invocation; the AM may + * reuse the same scratch slot across successive invocations within a single + * flush. If the caller needs data beyond the callback, it must copy within + * the callback body. + * + * The AM must not assume the callback is cheap, side-effect-free, or + * non-throwing. + */ +typedef void (*TableBufferedInsertFlushCb)(void *context, + TupleTableSlot *slot); + /* * API struct for a table AM. Note this must be allocated in a * server-lifetime manner, typically as a static const struct, which then gets @@ -614,6 +649,64 @@ typedef struct TableAmRoutine void (*finish_bulk_insert) (Relation rel, uint32 options); + /* ------------------------------------------------------------------------ + * Buffered-insert lifecycle callbacks. + * + * Optional optimization for batch inserts. All four callbacks must be + * either NULL together (AM does not support buffered inserts) or non-NULL + * together (validated at AM registration time). No partially populated + * groups are allowed. + * ------------------------------------------------------------------------ + */ + + /* + * Begin a buffered-insert session. Returns an opaque state handle whose + * first bytes are a TableBufferedInsertStateData with rel set. The AM + * stores cid, options, flush_cb, and flush_ctx in its private extension + * of the state struct. + * + * flush_cb may be NULL, in which case the AM skips per-tuple notification + * after flushing. + * + * Optional callback -- NULL means the AM does not support buffered inserts. + */ + TableBufferedInsertState (*buffered_insert_begin)( + Relation rel, + CommandId cid, + int options, + TableBufferedInsertFlushCb flush_cb, + void *flush_ctx); + + /* + * Submit one tuple for buffered insertion. The AM reads the tuple data + * from the slot and captures it internally before returning. The caller + * retains ownership of the slot and may reuse it immediately. + * + * The AM may auto-flush during this call if its internal buffer is full; + * the caller must be prepared for the flush callback to fire during put(). + */ + void (*buffered_insert_put)( + TableBufferedInsertState state, + TupleTableSlot *slot); + + /* + * Force-flush all buffered tuples to storage. Invokes the flush callback + * (if non-NULL) once per flushed tuple, in insertion order, using an + * AM-owned scratch slot. After return, the internal buffer is empty. + */ + void (*buffered_insert_flush)( + TableBufferedInsertState state); + + /* + * Flush remaining buffered tuples, perform finish-bulk-insert cleanup + * (e.g. FSM update for heap), and release all resources owned by the + * state. The state pointer is invalid after this call. Callers must + * not separately call table_finish_bulk_insert() -- end() subsumes it. + */ + void (*buffered_insert_end)( + TableBufferedInsertState state); + + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ @@ -1666,6 +1759,72 @@ table_finish_bulk_insert(Relation rel, uint32 options) } +/* ---------------------------------------------------------------------------- + * Buffered-insert lifecycle functions. + * ---------------------------------------------------------------------------- + */ + +/* + * Begin a buffered-insert session for the given relation. + * + * Returns NULL if the relation's AM does not support buffered inserts, in + * which case the caller should fall back to the single-row insert path. + * + * flush_cb may be NULL when no per-tuple post-insert work is needed (e.g. + * CTAS/CMV/RMV). When non-NULL, it is invoked once per flushed tuple in + * insertion order, using an AM-owned scratch slot valid only for the duration + * of each callback invocation. + */ +static inline TableBufferedInsertState +table_buffered_insert_begin(Relation rel, CommandId cid, int options, + TableBufferedInsertFlushCb flush_cb, + void *flush_ctx) +{ + if (rel->rd_tableam->buffered_insert_begin == NULL) + return NULL; + return rel->rd_tableam->buffered_insert_begin(rel, cid, options, + flush_cb, flush_ctx); +} + +/* + * Submit one tuple for buffered insertion. The AM reads from the slot and + * captures the data internally; the caller retains slot ownership and may + * reuse it immediately after this call returns. + * + * The AM may auto-flush during put() if its buffer is full; the flush + * callback (if any) may fire during this call. + */ +static inline void +table_buffered_insert_put(TableBufferedInsertState state, + TupleTableSlot *slot) +{ + state->rel->rd_tableam->buffered_insert_put(state, slot); +} + +/* + * Force-flush all buffered tuples to storage. The flush callback (if + * non-NULL) fires once per flushed tuple in insertion order. After return + * the buffer is empty. + */ +static inline void +table_buffered_insert_flush(TableBufferedInsertState state) +{ + state->rel->rd_tableam->buffered_insert_flush(state); +} + +/* + * Flush remaining buffered tuples, perform finish-bulk-insert cleanup + * (e.g. FSM update for heap), and release all resources. The state pointer + * is invalid after this call. Do not separately call + * table_finish_bulk_insert() -- end() subsumes it. + */ +static inline void +table_buffered_insert_end(TableBufferedInsertState state) +{ + state->rel->rd_tableam->buffered_insert_end(state); +} + + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 0a74ab5c86f..fa964043d4f 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -20,6 +20,7 @@ SUBDIRS = \ test_binaryheap \ test_bitmapset \ test_bloomfilter \ + test_buffered_insert \ test_cloexec \ test_checksums \ test_copy_callbacks \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index 4bca42bb370..aae892699b9 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -20,6 +20,7 @@ subdir('test_autovacuum') subdir('test_binaryheap') subdir('test_bitmapset') subdir('test_bloomfilter') +subdir('test_buffered_insert') subdir('test_cloexec') subdir('test_checksums') subdir('test_copy_callbacks') diff --git a/src/test/modules/test_buffered_insert/Makefile b/src/test/modules/test_buffered_insert/Makefile new file mode 100644 index 00000000000..5c4926a4c3a --- /dev/null +++ b/src/test/modules/test_buffered_insert/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_buffered_insert/Makefile + +MODULE_big = test_buffered_insert +OBJS = \ + $(WIN32RES) \ + test_buffered_insert.o +PGFILEDESC = "test_buffered_insert - test Table AM buffered-insert lifecycle" + +EXTENSION = test_buffered_insert +DATA = test_buffered_insert--1.0.sql + +REGRESS = test_buffered_insert + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_buffered_insert +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out b/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out new file mode 100644 index 00000000000..79163952f6f --- /dev/null +++ b/src/test/modules/test_buffered_insert/expected/test_buffered_insert.out @@ -0,0 +1,108 @@ +CREATE EXTENSION test_buffered_insert; +-- Target table: single integer column. +CREATE TABLE buffered_insert_test (a INT); +-- Basic test: insert 5 rows via buffered-insert with NULL flush callback. +SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 5); + test_buffered_insert_basic +---------------------------- + 5 +(1 row) + +SELECT count(*) FROM buffered_insert_test; + count +------- + 5 +(1 row) + +SELECT a FROM buffered_insert_test ORDER BY a; + a +--- + 1 + 2 + 3 + 4 + 5 +(5 rows) + +TRUNCATE buffered_insert_test; +-- Test with flush callback: insert 5 rows, verify callback count matches. +SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 5); + test_buffered_insert_with_callback +------------------------------------ + 5 +(1 row) + +SELECT count(*) FROM buffered_insert_test; + count +------- + 5 +(1 row) + +SELECT a FROM buffered_insert_test ORDER BY a; + a +--- + 1 + 2 + 3 + 4 + 5 +(5 rows) + +TRUNCATE buffered_insert_test; +-- Trigger auto-flush: insert more rows than HEAP_BUFFERED_INSERT_MAX_SLOTS +-- (1000) to verify auto-flush during put() works correctly. +SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 1500); + test_buffered_insert_basic +---------------------------- + 1500 +(1 row) + +SELECT count(*) FROM buffered_insert_test; + count +------- + 1500 +(1 row) + +-- Spot-check first and last values. +SELECT min(a), max(a) FROM buffered_insert_test; + min | max +-----+------ + 1 | 1500 +(1 row) + +TRUNCATE buffered_insert_test; +-- Verify flush callback fires for all rows including auto-flushed batches. +SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 1500); + test_buffered_insert_with_callback +------------------------------------ + 1500 +(1 row) + +SELECT count(*) FROM buffered_insert_test; + count +------- + 1500 +(1 row) + +TRUNCATE buffered_insert_test; +-- Test explicit flush() mid-session: flush after first 5 rows, insert 5 more, +-- then end(). Callback count must equal total rows. +SELECT test_buffered_insert_flush_mid('buffered_insert_test'::regclass, 10); + test_buffered_insert_flush_mid +-------------------------------- + 10 +(1 row) + +SELECT count(*) FROM buffered_insert_test; + count +------- + 10 +(1 row) + +SELECT min(a), max(a) FROM buffered_insert_test; + min | max +-----+----- + 1 | 10 +(1 row) + +DROP TABLE buffered_insert_test; diff --git a/src/test/modules/test_buffered_insert/meson.build b/src/test/modules/test_buffered_insert/meson.build new file mode 100644 index 00000000000..d738ccb1a84 --- /dev/null +++ b/src/test/modules/test_buffered_insert/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2022-2026, PostgreSQL Global Development Group + +test_buffered_insert_sources = files( + 'test_buffered_insert.c', +) + +if host_system == 'windows' + test_buffered_insert_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_buffered_insert', + '--FILEDESC', 'test_buffered_insert - test Table AM buffered-insert lifecycle',]) +endif + +test_buffered_insert = shared_module('test_buffered_insert', + test_buffered_insert_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_buffered_insert + +test_install_data += files( + 'test_buffered_insert.control', + 'test_buffered_insert--1.0.sql', +) + +tests += { + 'name': 'test_buffered_insert', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_buffered_insert', + ], + }, +} diff --git a/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql b/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql new file mode 100644 index 00000000000..23ac09cc9bc --- /dev/null +++ b/src/test/modules/test_buffered_insert/sql/test_buffered_insert.sql @@ -0,0 +1,41 @@ +CREATE EXTENSION test_buffered_insert; + +-- Target table: single integer column. +CREATE TABLE buffered_insert_test (a INT); + +-- Basic test: insert 5 rows via buffered-insert with NULL flush callback. +SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 5); +SELECT count(*) FROM buffered_insert_test; +SELECT a FROM buffered_insert_test ORDER BY a; + +TRUNCATE buffered_insert_test; + +-- Test with flush callback: insert 5 rows, verify callback count matches. +SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 5); +SELECT count(*) FROM buffered_insert_test; +SELECT a FROM buffered_insert_test ORDER BY a; + +TRUNCATE buffered_insert_test; + +-- Trigger auto-flush: insert more rows than HEAP_BUFFERED_INSERT_MAX_SLOTS +-- (1000) to verify auto-flush during put() works correctly. +SELECT test_buffered_insert_basic('buffered_insert_test'::regclass, 1500); +SELECT count(*) FROM buffered_insert_test; +-- Spot-check first and last values. +SELECT min(a), max(a) FROM buffered_insert_test; + +TRUNCATE buffered_insert_test; + +-- Verify flush callback fires for all rows including auto-flushed batches. +SELECT test_buffered_insert_with_callback('buffered_insert_test'::regclass, 1500); +SELECT count(*) FROM buffered_insert_test; + +TRUNCATE buffered_insert_test; + +-- Test explicit flush() mid-session: flush after first 5 rows, insert 5 more, +-- then end(). Callback count must equal total rows. +SELECT test_buffered_insert_flush_mid('buffered_insert_test'::regclass, 10); +SELECT count(*) FROM buffered_insert_test; +SELECT min(a), max(a) FROM buffered_insert_test; + +DROP TABLE buffered_insert_test; diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql b/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql new file mode 100644 index 00000000000..26aa0635240 --- /dev/null +++ b/src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql @@ -0,0 +1,30 @@ +/* src/test/modules/test_buffered_insert/test_buffered_insert--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_buffered_insert" to load this file. \quit + +-- +-- Insert rows through the buffered-insert lifecycle with a NULL flush +-- callback (the CTAS/CMV/RMV pattern). Returns the number of rows put(). +-- +CREATE FUNCTION test_buffered_insert_basic(pg_catalog.regclass, pg_catalog.int4) + RETURNS pg_catalog.int4 + AS 'MODULE_PATHNAME' LANGUAGE C; + +-- +-- Insert rows through the buffered-insert lifecycle with a flush callback +-- that counts invocations. Returns the total number of flush-callback +-- invocations (should equal the number of rows inserted). +-- +CREATE FUNCTION test_buffered_insert_with_callback(pg_catalog.regclass, pg_catalog.int4) + RETURNS pg_catalog.int4 + AS 'MODULE_PATHNAME' LANGUAGE C; + +-- +-- Exercise explicit flush() mid-session: inserts half the rows, calls +-- flush(), inserts the other half, then calls end(). Returns the total +-- number of flush-callback invocations (should equal nrows). +-- +CREATE FUNCTION test_buffered_insert_flush_mid(pg_catalog.regclass, pg_catalog.int4) + RETURNS pg_catalog.int4 + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert.c b/src/test/modules/test_buffered_insert/test_buffered_insert.c new file mode 100644 index 00000000000..fbda7b1c70b --- /dev/null +++ b/src/test/modules/test_buffered_insert/test_buffered_insert.c @@ -0,0 +1,233 @@ +/*-------------------------------------------------------------------------- + * + * test_buffered_insert.c + * Minimal validation for the Table AM buffered-insert lifecycle API. + * + * Exercises begin/put/flush/end directly at the Table AM layer on a + * real heap relation, without going through any higher-level caller + * (CTAS, COPY, etc.). This keeps the test within Patch 0001 scope. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/test/modules/test_buffered_insert/test_buffered_insert.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/tableam.h" +#include "access/table.h" +#include "access/xact.h" +#include "catalog/namespace.h" +#include "executor/tuptable.h" +#include "fmgr.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" + +PG_MODULE_MAGIC; + +/* + * test_buffered_insert_basic(regclass, int4) + * + * Opens the given relation, inserts nrows tuples through the buffered-insert + * API with a NULL flush callback, and returns the number of rows put(). + * The tuples inserted have the form (i) for a single-integer-column table. + */ +PG_FUNCTION_INFO_V1(test_buffered_insert_basic); +Datum +test_buffered_insert_basic(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nrows = PG_GETARG_INT32(1); + Relation rel; + TableBufferedInsertState state; + TupleTableSlot *slot; + TupleDesc tupdesc; + int i; + + rel = table_open(relid, RowExclusiveLock); + tupdesc = RelationGetDescr(rel); + + state = table_buffered_insert_begin(rel, + GetCurrentCommandId(true), + TABLE_INSERT_BAS_BULKWRITE, + NULL, NULL); + + if (state == NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("AM does not support buffered inserts"))); + + slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + + for (i = 0; i < nrows; i++) + { + ExecClearTuple(slot); + slot->tts_values[0] = Int32GetDatum(i + 1); + slot->tts_isnull[0] = false; + ExecStoreVirtualTuple(slot); + + table_buffered_insert_put(state, slot); + } + + table_buffered_insert_end(state); + + ExecDropSingleTupleTableSlot(slot); + table_close(rel, RowExclusiveLock); + + PG_RETURN_INT32(nrows); +} + +/* + * Flush callback context: counts invocations and verifies each slot has + * a valid TID. + */ +typedef struct FlushCbContext +{ + int count; +} FlushCbContext; + +static void +test_flush_callback(void *context, TupleTableSlot *slot) +{ + FlushCbContext *ctx = (FlushCbContext *) context; + + if (!ItemPointerIsValid(&slot->tts_tid)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("flush callback received slot with invalid TID"))); + + ctx->count++; +} + +/* + * test_buffered_insert_with_callback(regclass, int4) + * + * Same as basic, but passes a flush callback that counts invocations and + * validates TIDs. Returns the total flush-callback invocation count. + */ +PG_FUNCTION_INFO_V1(test_buffered_insert_with_callback); +Datum +test_buffered_insert_with_callback(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nrows = PG_GETARG_INT32(1); + Relation rel; + TableBufferedInsertState state; + TupleTableSlot *slot; + TupleDesc tupdesc; + FlushCbContext ctx; + int i; + + ctx.count = 0; + + rel = table_open(relid, RowExclusiveLock); + tupdesc = RelationGetDescr(rel); + + state = table_buffered_insert_begin(rel, + GetCurrentCommandId(true), + TABLE_INSERT_BAS_BULKWRITE, + test_flush_callback, + &ctx); + + if (state == NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("AM does not support buffered inserts"))); + + slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + + for (i = 0; i < nrows; i++) + { + ExecClearTuple(slot); + slot->tts_values[0] = Int32GetDatum(i + 1); + slot->tts_isnull[0] = false; + ExecStoreVirtualTuple(slot); + + table_buffered_insert_put(state, slot); + } + + table_buffered_insert_end(state); + + ExecDropSingleTupleTableSlot(slot); + table_close(rel, RowExclusiveLock); + + PG_RETURN_INT32(ctx.count); +} + +/* + * test_buffered_insert_flush_mid(regclass, int4) + * + * Exercises explicit flush() mid-session: inserts half the rows, calls + * flush(), inserts the other half, then calls end(). Returns the total + * flush-callback count, which should equal nrows. + */ +PG_FUNCTION_INFO_V1(test_buffered_insert_flush_mid); +Datum +test_buffered_insert_flush_mid(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nrows = PG_GETARG_INT32(1); + Relation rel; + TableBufferedInsertState state; + TupleTableSlot *slot; + TupleDesc tupdesc; + FlushCbContext ctx; + int half = nrows / 2; + int i; + + ctx.count = 0; + + rel = table_open(relid, RowExclusiveLock); + tupdesc = RelationGetDescr(rel); + + state = table_buffered_insert_begin(rel, + GetCurrentCommandId(true), + TABLE_INSERT_BAS_BULKWRITE, + test_flush_callback, + &ctx); + + if (state == NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("AM does not support buffered inserts"))); + + slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); + + /* First half */ + for (i = 0; i < half; i++) + { + ExecClearTuple(slot); + slot->tts_values[0] = Int32GetDatum(i + 1); + slot->tts_isnull[0] = false; + ExecStoreVirtualTuple(slot); + + table_buffered_insert_put(state, slot); + } + + /* Explicit flush mid-session */ + table_buffered_insert_flush(state); + + /* Second half */ + for (i = half; i < nrows; i++) + { + ExecClearTuple(slot); + slot->tts_values[0] = Int32GetDatum(i + 1); + slot->tts_isnull[0] = false; + ExecStoreVirtualTuple(slot); + + table_buffered_insert_put(state, slot); + } + + /* end() flushes the remaining tuples */ + table_buffered_insert_end(state); + + ExecDropSingleTupleTableSlot(slot); + table_close(rel, RowExclusiveLock); + + PG_RETURN_INT32(ctx.count); +} diff --git a/src/test/modules/test_buffered_insert/test_buffered_insert.control b/src/test/modules/test_buffered_insert/test_buffered_insert.control new file mode 100644 index 00000000000..3221c765820 --- /dev/null +++ b/src/test/modules/test_buffered_insert/test_buffered_insert.control @@ -0,0 +1,4 @@ +comment = 'Test code for Table AM buffered-insert lifecycle' +default_version = '1.0' +module_pathname = '$libdir/test_buffered_insert' +relocatable = true diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9f1dd55213d..257aee1e684 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1255,6 +1255,7 @@ HeadlineWordEntry HeapCheckContext HeapCheckReadStreamData HeapPageFreeze +HeapBufferedInsertState HeapScanDesc HeapScanDescData HeapTuple @@ -3123,6 +3124,9 @@ T_Action T_WorkerStatus TableAmRoutine TableAttachInfo +TableBufferedInsertFlushCb +TableBufferedInsertState +TableBufferedInsertStateData TableDataInfo TableFunc TableFuncRoutine -- 2.52.0