From c8277e6f5e9a72baebe993b2241d34a7d427473d Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 30 Oct 2024 17:13:20 +0000 Subject: [PATCH v25 1/3] Introduce new table AM for multi-inserts Until now, it's the COPY ... FROM command using multi inserts (i.e. buffer some tuples and inserts them to table at once). Basic idea of multi-inserts is that less WAL and reduced buffer locking. Multi-inserts is faster than calling heap_insert() in a loop, because when multiple tuples can be inserted on a single page, we can write just a single WAL record covering all of them, and only need to lock/unlock the page once. Various other commands can benefit from this multi-inserts logic [Reusable]. Also, there's a need to have these multi-inserts AMs (Access Methods) as scan-like API [Usability]. With this, various table AMs can define their own buffering and flushing strategy [Flexibility] based on the way they store the data in the underlying storage (e.g. columnar). Author: Bharath Rupireddy Reviewed-by: Jeff Davis Discussion: https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com --- src/backend/access/heap/heapam.c | 211 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 6 + src/backend/access/table/tableamapi.c | 5 + src/include/access/heapam.h | 39 +++++ src/include/access/tableam.h | 81 +++++++++ src/tools/pgindent/typedefs.list | 3 + 6 files changed, 344 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1748eafa10..69b21cf12c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -50,6 +50,7 @@ #include "storage/procarray.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/spccache.h" @@ -102,7 +103,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); - +static void heap_modify_insert_end(TableModifyState *state); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -2603,6 +2604,214 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(TopTransactionContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc(sizeof(TableModifyState)); + state->rel = rel; + state->cid = cid; + state->options = options; + state->mem_ctx = context; + state->buffer_flush_cb = buffer_flush_cb; + state->buffer_flush_ctx = buffer_flush_ctx; + state->data = NULL; /* To be set lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mem_ctx); + + /* First time through, initialize heap insert state */ + if (state->data == NULL) + { + istate = (HeapInsertState *) palloc(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + mistate = + (HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState)); + mistate->slots = + (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + mistate->cur_slots = 0; + istate->mistate = mistate; + + /* + * heap_multi_insert() can leak memory. So switch to this memory + * context before every heap_multi_insert() call and reset when + * finished. + */ + mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert memory context", + ALLOCSET_DEFAULT_SIZES); + + if ((state->options & HEAP_INSERT_BAS_BULKWRITE) != 0) + istate->bistate = GetBulkInsertState(); + } + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + /* + * We use virtual tuple slots buffered slots for leveraging the + * optimization it provides to minimize physical data copying. The + * virtual slot gets materialized when we copy (via below + * ExecCopySlot) the tuples from the source slot which can be of any + * type. This way, it is ensured that the tuple storage doesn't depend + * on external memory, because all the datums that aren't passed by + * value are copied into the slot's memory context. + */ + dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel), + &TTSOpsVirtual); + + mistate->slots[mistate->cur_slots] = dstslot; + } + + Assert(TTS_IS_VIRTUAL(dstslot)); + + /* + * Note that the copy clears the previous destination slot contents, so no + * need to explicitly ExecClearTuple() here. + */ + ExecCopySlot(dstslot, slot); + + mistate->cur_slots++; + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS) + heap_modify_buffer_flush(state); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_modify_buffer_flush(TableModifyState *state) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + /* Quick exit if we have flushed already */ + if (mistate->cur_slots == 0) + return; + + /* + * heap_multi_insert() can leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(mistate->mem_ctx); + heap_multi_insert(state->rel, + mistate->slots, + mistate->cur_slots, + state->cid, + state->options, + istate->bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->mem_ctx); + + /* + * Invoke caller-supplied buffer flush callback after inserting rows from + * the buffers to heap. + */ + if (state->buffer_flush_cb != NULL) + { + for (int i = 0; i < mistate->cur_slots; i++) + { + state->buffer_flush_cb(state->buffer_flush_ctx, + mistate->slots[i]); + } + } + + mistate->cur_slots = 0; +} + +/* + * Heap insert specific function used for performing work at the end like + * flushing remaining buffered tuples, cleaning up the insert state and tuple + * table slots used for buffered tuples etc. + */ +static void +heap_modify_insert_end(TableModifyState *state) +{ + HeapInsertState *istate; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + heap_modify_buffer_flush(state); + + Assert(mistate->cur_slots == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + MemoryContextDelete(mistate->mem_ctx); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + heap_modify_insert_end(state); + MemoryContextDelete(state->mem_ctx); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index a8d95e0f1c..d2ef6b4b78 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2644,6 +2644,12 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_buffer_flush = heap_modify_buffer_flush, + .tuple_modify_end = heap_modify_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index e9b598256f..772f29b1b5 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -97,6 +97,11 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); + Assert(routine->tuple_modify_begin != NULL); + Assert(routine->tuple_modify_buffer_insert != NULL); + Assert(routine->tuple_modify_buffer_flush != NULL); + Assert(routine->tuple_modify_end != NULL); + return routine; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 96cf82f97b..a9722ce947 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -37,6 +37,7 @@ #define HEAP_INSERT_FROZEN TABLE_INSERT_FROZEN #define HEAP_INSERT_NO_LOGICAL TABLE_INSERT_NO_LOGICAL #define HEAP_INSERT_SPECULATIVE 0x0010 +#define HEAP_INSERT_BAS_BULKWRITE TABLE_INSERT_BAS_BULKWRITE /* "options" flag bits for heap_page_prune_and_freeze */ #define HEAP_PAGE_PRUNE_MARK_UNUSED_NOW (1 << 0) @@ -272,6 +273,33 @@ typedef enum PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */ } PruneReason; +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of buffered slots currently held */ + int cur_slots; + + /* Memory context for dealing with multi inserts */ + MemoryContext mem_ctx; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + + /* ---------------- * function prototypes for heap access method * @@ -322,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableModifyState *heap_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx); +extern void heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void heap_modify_buffer_flush(TableModifyState *state); +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index adb478a93c..57b71eef38 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -254,11 +254,42 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +struct TableModifyState; + +/* Callback invoked upon flushing each buffered tuple */ +typedef void (*TableModifyBufferFlushCb) (void *context, + TupleTableSlot *slot); + +/* Holds table modify state */ +typedef struct TableModifyState +{ + /* These fields are used for inserts for now */ + + Relation rel; /* Relation to insert to */ + CommandId cid; /* Command ID for insert */ + int options; /* TABLE_INSERT options */ + + /* Memory context for dealing with modify state variables */ + MemoryContext mem_ctx; + + /* Flush callback and its context used for multi inserts */ + TableModifyBufferFlushCb buffer_flush_cb; + void *buffer_flush_ctx; + + /* Table AM specific data */ + void *data; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 #define TABLE_INSERT_FROZEN 0x0004 #define TABLE_INSERT_NO_LOGICAL 0x0008 +/* + * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for + * HEAP_INSERT_SPECULATIVE. + */ +#define TABLE_INSERT_BAS_BULKWRITE 0x0020 /* flag bits for table_tuple_lock */ /* Follow tuples whose update is in progress if lock modes don't conflict */ @@ -577,6 +608,21 @@ typedef struct TableAmRoutine void (*finish_bulk_insert) (Relation rel, int options); + /* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ + TableModifyState *(*tuple_modify_begin) (Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx); + void (*tuple_modify_buffer_insert) (TableModifyState *state, + TupleTableSlot *slot); + void (*tuple_modify_buffer_flush) (TableModifyState *state); + void (*tuple_modify_end) (TableModifyState *state); + + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ @@ -1608,6 +1654,41 @@ table_finish_bulk_insert(Relation rel, int options) rel->rd_tableam->finish_bulk_insert(rel, options); } +/* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ +static inline TableModifyState * +table_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx) +{ + return rel->rd_tableam->tuple_modify_begin(rel, + cid, + options, + buffer_flush_cb, + buffer_flush_ctx); +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot); +} + +static inline void +table_modify_buffer_flush(TableModifyState *state) +{ + state->rel->rd_tableam->tuple_modify_buffer_flush(state); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + state->rel->rd_tableam->tuple_modify_end(state); +} /* ------------------------------------------------------------------------ * DDL related functionality. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 171a7dd5d2..e7ddf29c16 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1147,6 +1147,8 @@ HeadlineJsonState HeadlineParsedText HeadlineWordEntry HeapCheckContext +HeapInsertState +HeapMultiInsertState HeapPageFreeze HeapScanDesc HeapTuple @@ -2873,6 +2875,7 @@ TableFuncScanState TableFuncType TableInfo TableLikeClause +TableModifyState TableSampleClause TableScanDesc TableScanDescData -- 2.40.1