public inbox for [email protected]help / color / mirror / Atom feed
Re: New Table Access Methods for Multi and Single Inserts 10+ messages / 4 participants [nested] [flat]
* Re: New Table Access Methods for Multi and Single Inserts @ 2023-06-03 22:38 Andres Freund <[email protected]> 0 siblings, 2 replies; 10+ messages in thread From: Andres Freund @ 2023-06-03 22:38 UTC (permalink / raw) To: Bharath Rupireddy <[email protected]>; +Cc: Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; pgsql-hackers; Paul Guo <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> Hi, This patch was referenced in a discussion at pgcon, so I thought I'd give it a look, even though Bharat said that he won't have time to drive it forward... On 2021-04-19 10:21:36 +0530, Bharath Rupireddy wrote: > diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c > index bd5faf0c1f..655de8e6b7 100644 > --- a/src/backend/access/heap/heapam_handler.c > +++ b/src/backend/access/heap/heapam_handler.c > @@ -2558,6 +2558,11 @@ static const TableAmRoutine heapam_methods = { > .tuple_insert_speculative = heapam_tuple_insert_speculative, > .tuple_complete_speculative = heapam_tuple_complete_speculative, > .multi_insert = heap_multi_insert, > + .tuple_insert_begin = heap_insert_begin, > + .tuple_insert_v2 = heap_insert_v2, > + .multi_insert_v2 = heap_multi_insert_v2, > + .multi_insert_flush = heap_multi_insert_flush, > + .tuple_insert_end = heap_insert_end, > .tuple_delete = heapam_tuple_delete, > .tuple_update = heapam_tuple_update, > .tuple_lock = heapam_tuple_lock, I don't think we should have multiple callback for the insertion APIs in tableam.h. I think it'd be good to continue supporting the old table_*() functions, but supporting multiple insert APIs in each AM doesn't make much sense to me. > +/* > + * GetTupleSize - Compute the tuple size given a table slot. > + * > + * For heap tuple, buffer tuple and minimal tuple slot types return the actual > + * tuple size that exists. For virtual tuple, the size is calculated as the > + * slot does not have the tuple size. If the computed size exceeds the given > + * maxsize for the virtual tuple, this function exits, not investing time in > + * further unnecessary calculation. > + * > + * Important Notes: > + * 1) Size calculation code for virtual slots is being used from > + * tts_virtual_materialize(), hence ensure to have the same changes or fixes > + * here and also there. > + * 2) Currently, GetTupleSize() handles the existing heap, buffer, minimal and > + * virtual slots. Ensure to add related code in case any new slot type is > + * introduced. > + */ > +inline Size > +GetTupleSize(TupleTableSlot *slot, Size maxsize) > +{ > + Size sz = 0; > + HeapTuple tuple = NULL; > + > + if (TTS_IS_HEAPTUPLE(slot)) > + tuple = ((HeapTupleTableSlot *) slot)->tuple; > + else if(TTS_IS_BUFFERTUPLE(slot)) > + tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple; > + else if(TTS_IS_MINIMALTUPLE(slot)) > + tuple = ((MinimalTupleTableSlot *) slot)->tuple; > + else if(TTS_IS_VIRTUAL(slot)) I think this embeds too much knowledge of the set of slot types in core code. I don't see why it's needed either? > diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h > index 414b6b4d57..2a1470a7b6 100644 > --- a/src/include/access/tableam.h > +++ b/src/include/access/tableam.h > @@ -229,6 +229,32 @@ typedef struct TM_IndexDeleteOp > TM_IndexStatus *status; > } TM_IndexDeleteOp; > > +/* Holds table insert state. */ > +typedef struct TableInsertState I suspect we should design it to be usable for updates and deletes in the future, and thus name it TableModifyState. > +{ > + Relation rel; > + /* Bulk insert state if requested, otherwise NULL. */ > + struct BulkInsertStateData *bistate; > + CommandId cid; Hm - I'm not sure it's a good idea to force the cid to be the same for all inserts done via one TableInsertState. > + int options; > + /* Below members are only used for multi inserts. */ > + /* Array of buffered slots. */ > + TupleTableSlot **mi_slots; > + /* Number of slots that are currently buffered. */ > + int32 mi_cur_slots; > + /* > + * Access method specific information such as parameters that are needed > + * for buffering and flushing decisions can go here. > + */ > + void *mistate; I think we should instead have a generic TableModifyState, which each AM then embeds into an AM specific AM state. Forcing two very related structs to be allocated separately doesn't seem wise in this case. > @@ -1430,6 +1473,50 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, > cid, options, bistate); > } > > +static inline TableInsertState* > +table_insert_begin(Relation rel, CommandId cid, int options, > + bool alloc_bistate, bool is_multi) Why have alloc_bistate and options? > +static inline void > +table_insert_end(TableInsertState *state) > +{ > + /* Deallocate bulk insert state here, since it's AM independent. */ > + if (state->bistate) > + FreeBulkInsertState(state->bistate); > + > + state->rel->rd_tableam->tuple_insert_end(state); > +} Seems like the order in here should be swapped? Greetings, Andres Freund ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2023-06-05 02:30 Bharath Rupireddy <[email protected]> parent: Andres Freund <[email protected]> 1 sibling, 0 replies; 10+ messages in thread From: Bharath Rupireddy @ 2023-06-05 02:30 UTC (permalink / raw) To: Andres Freund <[email protected]>; +Cc: Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; pgsql-hackers; Paul Guo <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Sun, Jun 4, 2023 at 4:08 AM Andres Freund <[email protected]> wrote: > > Hi, > > This patch was referenced in a discussion at pgcon, so I thought I'd give it a > look, even though Bharat said that he won't have time to drive it forward... Thanks. I'm glad to know that the feature was discussed at PGCon. If there's an interest, I'm happy to spend time again on it. I'll look into the review comments and respond soon. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2023-08-01 16:30 Bharath Rupireddy <[email protected]> parent: Andres Freund <[email protected]> 1 sibling, 1 reply; 10+ messages in thread From: Bharath Rupireddy @ 2023-08-01 16:30 UTC (permalink / raw) To: Andres Freund <[email protected]>; +Cc: Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; pgsql-hackers; Paul Guo <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Sun, Jun 4, 2023 at 4:08 AM Andres Freund <[email protected]> wrote: > > Hi, > > This patch was referenced in a discussion at pgcon, so I thought I'd give it a > look, even though Bharat said that he won't have time to drive it forward... Thanks. Finally, I started to spend time on this. Just curious - may I know the discussion in/for which this patch is referenced? What was the motive? Is it captured somewhere? > On 2021-04-19 10:21:36 +0530, Bharath Rupireddy wrote: > > + .tuple_insert_begin = heap_insert_begin, > > + .tuple_insert_v2 = heap_insert_v2, > > + .multi_insert_v2 = heap_multi_insert_v2, > > + .multi_insert_flush = heap_multi_insert_flush, > > + .tuple_insert_end = heap_insert_end, > > I don't think we should have multiple callback for the insertion APIs in > tableam.h. I think it'd be good to continue supporting the old table_*() > functions, but supporting multiple insert APIs in each AM doesn't make much > sense to me. I named these new functions XXX_v2 for compatibility reasons. Because, it's quite possible for external modules to use existing table_tuple_insert, table_multi_insert functions. If we were to change the existing insert tableams, all the external modules using them would have to change their code, is that okay? > > +/* > > + * GetTupleSize - Compute the tuple size given a table slot. > > +inline Size > > I think this embeds too much knowledge of the set of slot types in core > code. I don't see why it's needed either? The heapam multi-insert implementation needs to know the tuple size from the slot to decide whether or not to flush the tuples from the buffers. I couldn't find a direct way then to know the tuple size from the slot, so added that helper function. With a better understanding now, I think we can rely on the memory allocated for TupleTableSlot's tts_mcxt. While this works for the materialized slots passed in to the insert functions, for non-materialized slots the flushing decision can be solely on the number of tuples stored in the buffers. Another way is to add a get_tuple_size callback to TupleTableSlotOps and let the tuple slot providers give us the tuple size. > > diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h > > index 414b6b4d57..2a1470a7b6 100644 > > --- a/src/include/access/tableam.h > > +++ b/src/include/access/tableam.h > > @@ -229,6 +229,32 @@ typedef struct TM_IndexDeleteOp > > TM_IndexStatus *status; > > } TM_IndexDeleteOp; > > > > +/* Holds table insert state. */ > > +typedef struct TableInsertState > > I suspect we should design it to be usable for updates and deletes in the > future, and thus name it TableModifyState. There are different parameters that insert/update/delete would want to pass across in the state. So, having Table{Insert/Update/Delete}State may be a better idea than having the unneeded variables lying around or having a union and state_type as INSERT/UPDATE/DELETE, no? Do you have a different thought here? > I think we should instead have a generic TableModifyState, which each AM then > embeds into an AM specific AM state. Forcing two very related structs to be > allocated separately doesn't seem wise in this case. The v7 patches have largely changed the way these options and parameters are passed, please have a look. > > +{ > > + Relation rel; > > + /* Bulk insert state if requested, otherwise NULL. */ > > + struct BulkInsertStateData *bistate; > > + CommandId cid; > > Hm - I'm not sure it's a good idea to force the cid to be the same for all > inserts done via one TableInsertState. If required, someone can always pass a new CID before every tuple_insert_v2/tuple_multi_insert_v2 call via TableInsertState. Isn't it sufficient? > > @@ -1430,6 +1473,50 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, > > cid, options, bistate); > > } > > > > +static inline TableInsertState* > > +table_insert_begin(Relation rel, CommandId cid, int options, > > + bool alloc_bistate, bool is_multi) > > Why have alloc_bistate and options? "alloc_bistate" is for the caller to specify if they need a bulk insert state or not. "options" is for the caller to specify if they need table_tuple_insert performance options such as TABLE_INSERT_SKIP_FSM, TABLE_INSERT_FROZEN, TABLE_INSERT_NO_LOGICAL. The v7 patches have changed the way these options and parameters are passed, please have a look. > > +static inline void > > +table_insert_end(TableInsertState *state) > > +{ > > + /* Deallocate bulk insert state here, since it's AM independent. */ > > + if (state->bistate) > > + FreeBulkInsertState(state->bistate); > > + > > + state->rel->rd_tableam->tuple_insert_end(state); > > +} > > Seems like the order in here should be swapped? Right. It looks like BulkInsertState is for heapam, it really doesn't have to be in table_XXX functions, hence it all the way down to heap_insert_XXX functions. I'm attaching the v7 patch set with the above review comments addressed. My initial idea behind these new insert APIs was the ability to re-use the multi insert code in COPY for CTAS and REFRESH MATERIALIZED VIEW. I'm open to more thoughts here. The v7 patches have largely changed the way state structure (heapam specific things are moved all the way down to heapam.c) is defined, the parameters are passed, and simplified the multi insert logic a lot. 0001 - introduces new single and multi insert table AM and heapam implementation of the new AM. 0002 - optimizes CREATE TABLE AS to use the new multi inserts table AM making it faster by 2.13X or 53%. 0003 - optimizes REFRESH MATERIALIZED VIEW to use the new multi inserts table AM making it faster by 1.52X or 34%. 0004 - uses the new multi inserts table AM for COPY FROM - I'm yet to spend time on this, I'll share the patch when ready. Thoughts? -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com Attachments: [application/octet-stream] v7-0001-New-table-AMs-for-single-and-multi-inserts.patch (13.7K, 2-v7-0001-New-table-AMs-for-single-and-multi-inserts.patch) download | inline diff: From 6803736e5695ab0ef06d263e9ba260db02d3b80c Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Tue, 1 Aug 2023 09:38:47 +0000 Subject: [PATCH v7] New table AMs for single and multi inserts --- src/backend/access/heap/heapam.c | 180 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 6 + src/include/access/heapam.h | 45 ++++++ src/include/access/tableam.h | 107 ++++++++++++++ 4 files changed, 338 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 7ed72abe59..ba4347026a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -68,6 +68,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -75,6 +76,7 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); +static void heap_multi_insert_flush(TableInsertState *state); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, @@ -2443,6 +2445,184 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int table_am_flags, + int table_insert_flags) +{ + TableInsertState *tistate; + + tistate = (TableInsertState *) palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->table_am_flags = table_am_flags; + tistate->table_insert_flags = table_insert_flags; + + if ((table_am_flags & TABLEAM_USE_MULTI_INSERTS) != 0 || + (table_am_flags & TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + { + tistate->table_am_data = + (HeapInsertState *) palloc0(sizeof(HeapInsertState)); + } + + if ((table_am_flags & TABLEAM_USE_MULTI_INSERTS) != 0) + { + ((HeapInsertState *) tistate->table_am_data)->mistate = + (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState)); + + ((HeapInsertState *) tistate->table_am_data)->mistate->slots = + palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + ((HeapInsertState *) tistate->table_am_data)->mistate->context = + AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + } + + if ((table_am_flags & TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->table_am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->bistate != NULL) + { + bistate = ((HeapInsertState *) state->table_am_data)->bistate; + } + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->table_insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->table_am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES) + heap_multi_insert_flush(state); +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState *state) +{ + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->table_am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->table_am_data)->mistate = NULL; + } + + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->bistate != NULL) + { + FreeBulkInsertState(((HeapInsertState *) state->table_am_data)->bistate); + } + + pfree(state->table_am_data); + state->table_am_data = NULL; + pfree(state); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +static void +heap_multi_insert_flush(TableInsertState *state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->table_am_data)->mistate; + + if (state->table_am_data != NULL && + ((HeapInsertState *) state->table_am_data)->bistate != NULL) + { + bistate = ((HeapInsertState *) state->table_am_data)->bistate; + } + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->table_insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 5a17112c91..6f144d88dd 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2568,6 +2568,12 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index faf5026519..a1ea26cbd6 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -191,6 +191,40 @@ typedef struct HeapPageFreeze } HeapPageFreeze; +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -241,6 +275,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState* heap_insert_begin(Relation rel, + CommandId cid, + int table_am_flags, + int table_insert_flags); +extern void heap_insert_v2(TableInsertState *state, + TupleTableSlot *slot); +extern void heap_multi_insert_v2(TableInsertState *state, + TupleTableSlot *slot); +extern void heap_insert_end(TableInsertState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 230bc39cc0..5ea3eeee8a 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,35 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi (buffer multiple tuples and insert them at once) inserts */ +#define TABLEAM_USE_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass of + * insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int table_am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int table_insert_flags; + + /* Table AM specific data starts here */ + void *table_am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +551,19 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int table_am_flags, + int table_insert_flags); + + void (*tuple_insert_v2) (TableInsertState *state, + TupleTableSlot *slot); + + void (*tuple_multi_insert_v2) (TableInsertState *state, + TupleTableSlot *slot); + + void (*tuple_insert_end) (TableInsertState *state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1498,71 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int table_am_flags, + int table_insert_flags) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + { + return rel->rd_tableam->tuple_insert_begin(rel, cid, table_am_flags, + table_insert_flags); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel))); +} + +static inline void +table_tuple_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_begin) + { + return state->rel->rd_tableam->tuple_insert_v2(state, slot); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_begin) + { + return state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_insert_end(TableInsertState *state) +{ + /* XXX: Really it doesn't have to be an optional callback */ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_begin) + { + return state->rel->rd_tableam->tuple_insert_end(state); + } + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + /* * Delete a tuple. * -- 2.34.1 [application/octet-stream] v7-0002-Optimize-CTAS-with-multi-inserts.patch (2.8K, 3-v7-0002-Optimize-CTAS-with-multi-inserts.patch) download | inline diff: From d999b3e11272f44b8cbe75fb4d5d4c518ab43324 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Tue, 1 Aug 2023 09:37:22 +0000 Subject: [PATCH v7] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index e91920ca14..ac30906288 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -58,9 +58,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -557,17 +555,24 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + { + int table_am_flags = TABLEAM_USE_MULTI_INSERTS | + TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY; + int table_insert_flags = TABLE_INSERT_SKIP_FSM; + + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + table_am_flags, + table_insert_flags); + } else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -595,11 +600,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -617,10 +618,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); -- 2.34.1 [application/octet-stream] v7-0003-Optimize-RMV-with-multi-inserts.patch (3.1K, 4-v7-0003-Optimize-RMV-with-multi-inserts.patch) download | inline diff: From ceb8f43687dd0baf86fac3a096d98134e06bcec8 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Tue, 1 Aug 2023 09:50:20 +0000 Subject: [PATCH v7] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 36 +++++++++++++--------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index ac2e74fa3f..c7ab2d1f44 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -52,10 +52,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -454,16 +451,18 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) { DR_transientrel *myState = (DR_transientrel *) self; Relation transientrel; + int table_am_flags = TABLEAM_USE_MULTI_INSERTS | + TABLEAM_USE_BULKWRITE_BUFFER_ACCESS_STRATEGY; + int table_insert_flags = TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN; transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + table_am_flags, + table_insert_flags); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -488,12 +487,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,14 +501,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1 ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2023-08-01 17:02 Jacob Champion <[email protected]> parent: Bharath Rupireddy <[email protected]> 0 siblings, 1 reply; 10+ messages in thread From: Jacob Champion @ 2023-08-01 17:02 UTC (permalink / raw) To: Bharath Rupireddy <[email protected]>; +Cc: Andres Freund <[email protected]>; Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; pgsql-hackers; Paul Guo <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Tue, Aug 1, 2023 at 9:31 AM Bharath Rupireddy <[email protected]> wrote: > Thanks. Finally, I started to spend time on this. Just curious - may > I know the discussion in/for which this patch is referenced? What was > the motive? Is it captured somewhere? It may not have been the only place, but we at least touched on it during the unconference: https://wiki.postgresql.org/wiki/PgCon_2023_Developer_Unconference#Table_AMs We discussed two related-but-separate ideas: 1) bulk/batch operations and 2) maintenance of TAM state across multiple related operations. --Jacob ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2024-01-17 17:27 Bharath Rupireddy <[email protected]> parent: Jacob Champion <[email protected]> 0 siblings, 1 reply; 10+ messages in thread From: Bharath Rupireddy @ 2024-01-17 17:27 UTC (permalink / raw) To: Jacob Champion <[email protected]>; +Cc: Andres Freund <[email protected]>; Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; pgsql-hackers; Paul Guo <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Tue, Aug 1, 2023 at 10:32 PM Jacob Champion <[email protected]> wrote: > > On Tue, Aug 1, 2023 at 9:31 AM Bharath Rupireddy > <[email protected]> wrote: > > Thanks. Finally, I started to spend time on this. Just curious - may > > I know the discussion in/for which this patch is referenced? What was > > the motive? Is it captured somewhere? > > It may not have been the only place, but we at least touched on it > during the unconference: > > https://wiki.postgresql.org/wiki/PgCon_2023_Developer_Unconference#Table_AMs > > We discussed two related-but-separate ideas: > 1) bulk/batch operations and > 2) maintenance of TAM state across multiple related operations. Thank you. I'm attaching v8 patch-set here which includes use of new insert TAMs for COPY FROM. With this, postgres not only will have the new TAM for inserts, but they also can make the following commands faster - CREATE TABLE AS, SELECT INTO, CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED VIEW and COPY FROM. I'll perform some testing in the coming days and post the results here, until then I appreciate any feedback on the patches. I've also added this proposal to CF - https://commitfest.postgresql.org/47/4777/. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com Attachments: [application/x-patch] v8-0001-New-TAMs-for-inserts.patch (16.2K, 2-v8-0001-New-TAMs-for-inserts.patch) download | inline diff: From cbdf2935be360017c0d62479e879630d4fec8766 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Wed, 17 Jan 2024 16:44:19 +0000 Subject: [PATCH v8] New TAMs for inserts --- src/backend/access/heap/heapam.c | 224 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 9 + src/include/access/heapam.h | 49 +++++ src/include/access/tableam.h | 143 +++++++++++++++ 4 files changed, 425 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 707460a536..7df305380e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -68,6 +68,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2446,6 +2447,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags) +{ + TableInsertState *tistate; + + tistate = palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->am_flags = am_flags; + tistate->insert_flags = insert_flags; + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 || + (am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + tistate->am_data = palloc0(sizeof(HeapInsertState)); + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + + ((HeapInsertState *) tistate->am_data)->mistate = mistate; + } + + if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate == NULL); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Create/return next free slot from multi-insert buffered slots array. + */ +TupleTableSlot * +heap_multi_insert_next_free_slot(TableInsertState * state) +{ + TupleTableSlot *slot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + slot = mistate->slots[mistate->cur_slots]; + + if (slot == NULL) + { + slot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = slot; + } + else + ExecClearTuple(slot); + + return slot; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + /* + * Caller may have got the slot using heap_multi_insert_next_free_slot, + * filled it and passed. So, skip copying in such a case. + */ + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0) + { + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + } + else + Assert(dstslot == slot); + + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 && + (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)) + heap_multi_insert_flush(state); +} + +/* + * Return pointer to multi-insert buffered slots array and number of currently + * occupied slots. + */ +TupleTableSlot ** +heap_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + HeapMultiInsertState *mistate; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + *num_slots = mistate->cur_slots; + + return mistate->slots; +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_multi_insert_flush(TableInsertState * state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState * state) +{ + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->am_data)->mistate = NULL; + } + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate); + + pfree(state->am_data); + state->am_data = NULL; + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be..795177812d 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2564,6 +2564,15 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_multi_insert_slots = heap_multi_insert_slots, + .tuple_multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 932ec0d6f2..46dba5245c 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,40 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState * heap_insert_begin(Relation rel, + CommandId cid, + int am_flags, + int insert_flags); +extern void heap_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state); +extern void heap_multi_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state, + int *num_slots); +extern void heap_multi_insert_flush(TableInsertState * state); +extern void heap_insert_end(TableInsertState * state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d..8fcaf6fe5a 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TABLEAM_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + +/* + * Skip flushing buffered tuples automatically. Responsibility lies with the + * caller to flush the buffered tuples. + */ +#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass + * of insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int insert_flags; + + /* Table AM specific data starts here */ + + void *am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +559,20 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int am_flags, + int insert_flags); + void (*tuple_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + void (*tuple_multi_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state); + TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state, + int *num_slots); + void (*tuple_multi_insert_flush) (TableInsertState * state); + void (*tuple_insert_end) (TableInsertState * state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1507,98 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int am_flags, + int insert_flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags, + insert_flags); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel))); +} + +static inline void +table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_v2) + state->rel->rd_tableam->tuple_insert_v2(state, slot); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_v2) + state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline TupleTableSlot * +table_multi_insert_next_free_slot(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_next_free_slot) + return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline TupleTableSlot ** +table_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_slots) + return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_slots access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_flush(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_flush) + state->rel->rd_tableam->tuple_multi_insert_flush(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_insert_end(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_end) + state->rel->rd_tableam->tuple_insert_end(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + /* * Delete a tuple. * -- 2.34.1 [application/x-patch] v8-0002-Optimize-CTAS-with-multi-inserts.patch (2.7K, 3-v8-0002-Optimize-CTAS-with-multi-inserts.patch) download | inline diff: From 4835495e675bb178ecb67d84e6b00de15751ce8b Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Wed, 17 Jan 2024 15:23:38 +0000 Subject: [PATCH v8] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 16a2fe65e6..3a02ea9578 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -58,9 +58,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -557,17 +555,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -595,11 +595,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -617,10 +613,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); -- 2.34.1 [application/x-patch] v8-0003-Optimize-RMV-with-multi-inserts.patch (2.9K, 4-v8-0003-Optimize-RMV-with-multi-inserts.patch) download | inline diff: From d5fd779aa51c624662eefee8349f2d3f6517c3c5 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Wed, 17 Jan 2024 15:27:37 +0000 Subject: [PATCH v8] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 1dcfbe879b..f84c79f5f0 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -52,10 +52,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -457,13 +454,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -488,12 +485,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,14 +499,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1 [application/x-patch] v8-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch (6.3K, 5-v8-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch) download | inline diff: From 24062422b0f213f188bad844b2191923ff258807 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Wed, 17 Jan 2024 16:49:52 +0000 Subject: [PATCH v8] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 4058b08134..a6c703a99e 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -77,10 +77,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -223,14 +222,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -245,7 +261,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -322,8 +338,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -395,13 +409,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -409,18 +418,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -435,7 +433,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -493,20 +491,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -593,13 +586,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -615,6 +620,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1 ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2024-01-29 07:27 Bharath Rupireddy <[email protected]> parent: Bharath Rupireddy <[email protected]> 0 siblings, 1 reply; 10+ messages in thread From: Bharath Rupireddy @ 2024-01-29 07:27 UTC (permalink / raw) To: pgsql-hackers; +Cc: Andres Freund <[email protected]>; Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Wed, Jan 17, 2024 at 10:57 PM Bharath Rupireddy <[email protected]> wrote: > > Thank you. I'm attaching v8 patch-set here which includes use of new > insert TAMs for COPY FROM. With this, postgres not only will have the > new TAM for inserts, but they also can make the following commands > faster - CREATE TABLE AS, SELECT INTO, CREATE MATERIALIZED VIEW, > REFRESH MATERIALIZED VIEW and COPY FROM. I'll perform some testing in > the coming days and post the results here, until then I appreciate any > feedback on the patches. > > I've also added this proposal to CF - > https://commitfest.postgresql.org/47/4777/. Some of the tests related to Incremental Sort added by a recent commit 0452b461bc4 in aggregates.sql are failing when the multi inserts feature is used for CTAS (like done in 0002 patch). I'm not so sure if it's because of the reduction in the CTAS execution times. Execution time for table 'btg' created with CREATE TABLE AS added by commit 0452b461bc4 with single inserts is 25.3 msec, with multi inserts is 17.7 msec. This means that the multi inserts are about 1.43 times or 30.04% faster than the single inserts. Couple of ways to make these tests pick Incremental Sort as expected - 1) CLUSTER btg USING abc; or 2) increase the number of rows in table btg to 100K from 10K. FWIW, if I reduce the number of rows in the table from 10K to 1K, the Incremental Sort won't get picked on HEAD with CTAS using single inserts. Hence, I chose option (2) to fix the issue. Please find the attached v9 patch set. [1] -- Engage incremental sort explain (COSTS OFF) SELECT x,y FROM btg GROUP BY x,y,z,w; - QUERY PLAN -------------------------------------------------- + QUERY PLAN +------------------------------ Group Group Key: x, y, z, w - -> Incremental Sort + -> Sort Sort Key: x, y, z, w - Presorted Key: x, y - -> Index Scan using btg_x_y_idx on btg -(6 rows) + -> Seq Scan on btg +(5 rows) -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com Attachments: [application/x-patch] v9-0001-New-TAMs-for-inserts.patch (16.2K, 2-v9-0001-New-TAMs-for-inserts.patch) download | inline diff: From a84107e498ffddc56ef4fbb207d6ba6e82717901 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 05:39:55 +0000 Subject: [PATCH v9 1/4] New TAMs for inserts --- src/backend/access/heap/heapam.c | 224 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 9 + src/include/access/heapam.h | 49 +++++ src/include/access/tableam.h | 143 +++++++++++++++ 4 files changed, 425 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 707460a536..7df305380e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -68,6 +68,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2446,6 +2447,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags) +{ + TableInsertState *tistate; + + tistate = palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->am_flags = am_flags; + tistate->insert_flags = insert_flags; + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 || + (am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + tistate->am_data = palloc0(sizeof(HeapInsertState)); + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + + ((HeapInsertState *) tistate->am_data)->mistate = mistate; + } + + if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate == NULL); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Create/return next free slot from multi-insert buffered slots array. + */ +TupleTableSlot * +heap_multi_insert_next_free_slot(TableInsertState * state) +{ + TupleTableSlot *slot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + slot = mistate->slots[mistate->cur_slots]; + + if (slot == NULL) + { + slot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = slot; + } + else + ExecClearTuple(slot); + + return slot; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + /* + * Caller may have got the slot using heap_multi_insert_next_free_slot, + * filled it and passed. So, skip copying in such a case. + */ + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0) + { + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + } + else + Assert(dstslot == slot); + + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 && + (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)) + heap_multi_insert_flush(state); +} + +/* + * Return pointer to multi-insert buffered slots array and number of currently + * occupied slots. + */ +TupleTableSlot ** +heap_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + HeapMultiInsertState *mistate; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + *num_slots = mistate->cur_slots; + + return mistate->slots; +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_multi_insert_flush(TableInsertState * state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState * state) +{ + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->am_data)->mistate = NULL; + } + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate); + + pfree(state->am_data); + state->am_data = NULL; + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be..795177812d 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2564,6 +2564,15 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_multi_insert_slots = heap_multi_insert_slots, + .tuple_multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4b133f6859..053be18110 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,40 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState * heap_insert_begin(Relation rel, + CommandId cid, + int am_flags, + int insert_flags); +extern void heap_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state); +extern void heap_multi_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state, + int *num_slots); +extern void heap_multi_insert_flush(TableInsertState * state); +extern void heap_insert_end(TableInsertState * state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d..8fcaf6fe5a 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TABLEAM_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + +/* + * Skip flushing buffered tuples automatically. Responsibility lies with the + * caller to flush the buffered tuples. + */ +#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass + * of insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int insert_flags; + + /* Table AM specific data starts here */ + + void *am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +559,20 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int am_flags, + int insert_flags); + void (*tuple_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + void (*tuple_multi_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state); + TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state, + int *num_slots); + void (*tuple_multi_insert_flush) (TableInsertState * state); + void (*tuple_insert_end) (TableInsertState * state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1507,98 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int am_flags, + int insert_flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags, + insert_flags); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel))); +} + +static inline void +table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_v2) + state->rel->rd_tableam->tuple_insert_v2(state, slot); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_v2) + state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline TupleTableSlot * +table_multi_insert_next_free_slot(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_next_free_slot) + return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline TupleTableSlot ** +table_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_slots) + return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_slots access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_flush(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_flush) + state->rel->rd_tableam->tuple_multi_insert_flush(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_insert_end(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_end) + state->rel->rd_tableam->tuple_insert_end(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + /* * Delete a tuple. * -- 2.34.1 [application/x-patch] v9-0003-Optimize-RMV-with-multi-inserts.patch (2.9K, 3-v9-0003-Optimize-RMV-with-multi-inserts.patch) download | inline diff: From 1c3eea3d0ac69f590ca641d0efaeaa0585a7a850 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 05:58:33 +0000 Subject: [PATCH v9 3/4] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 1dcfbe879b..f84c79f5f0 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -52,10 +52,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -457,13 +454,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -488,12 +485,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,14 +499,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1 [application/x-patch] v9-0002-Optimize-CTAS-with-multi-inserts.patch (3.8K, 4-v9-0002-Optimize-CTAS-with-multi-inserts.patch) download | inline diff: From 891047c4b20aab2c6d25187b45b775ee9d71fb48 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 05:57:59 +0000 Subject: [PATCH v9 2/4] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 25 +++++++++--------------- src/test/regress/expected/aggregates.out | 2 +- src/test/regress/sql/aggregates.sql | 2 +- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 16a2fe65e6..3a02ea9578 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -58,9 +58,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -557,17 +555,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -595,11 +595,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -617,10 +613,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out index 7a73c19314..2889fd315d 100644 --- a/src/test/regress/expected/aggregates.out +++ b/src/test/regress/expected/aggregates.out @@ -2734,7 +2734,7 @@ CREATE TABLE btg AS SELECT i % 100 AS y, 'abc' || i % 10 AS z, i AS w -FROM generate_series(1,10000) AS i; +FROM generate_series(1,100000) AS i; CREATE INDEX btg_x_y_idx ON btg(x,y); ANALYZE btg; -- GROUP BY optimization by reorder columns by frequency diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql index 916dbf908f..99f890bb85 100644 --- a/src/test/regress/sql/aggregates.sql +++ b/src/test/regress/sql/aggregates.sql @@ -1187,7 +1187,7 @@ CREATE TABLE btg AS SELECT i % 100 AS y, 'abc' || i % 10 AS z, i AS w -FROM generate_series(1,10000) AS i; +FROM generate_series(1,100000) AS i; CREATE INDEX btg_x_y_idx ON btg(x,y); ANALYZE btg; -- 2.34.1 [application/x-patch] v9-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch (6.3K, 5-v9-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch) download | inline diff: From 538c515617a320007d8b76fb48efd75242641428 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 05:59:12 +0000 Subject: [PATCH v9 4/4] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1fe70b9133..8abf33aa97 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -77,10 +77,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -223,14 +222,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -245,7 +261,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -322,8 +338,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -395,13 +409,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -409,18 +418,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -435,7 +433,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -493,20 +491,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -593,13 +586,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -615,6 +620,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1 ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2024-01-29 11:46 Bharath Rupireddy <[email protected]> parent: Bharath Rupireddy <[email protected]> 0 siblings, 1 reply; 10+ messages in thread From: Bharath Rupireddy @ 2024-01-29 11:46 UTC (permalink / raw) To: pgsql-hackers; +Cc: Andres Freund <[email protected]>; Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Mon, Jan 29, 2024 at 12:57 PM Bharath Rupireddy <[email protected]> wrote: > > On Wed, Jan 17, 2024 at 10:57 PM Bharath Rupireddy > <[email protected]> wrote: > > > > Thank you. I'm attaching v8 patch-set here which includes use of new > > insert TAMs for COPY FROM. With this, postgres not only will have the > > new TAM for inserts, but they also can make the following commands > > faster - CREATE TABLE AS, SELECT INTO, CREATE MATERIALIZED VIEW, > > REFRESH MATERIALIZED VIEW and COPY FROM. I'll perform some testing in > > the coming days and post the results here, until then I appreciate any > > feedback on the patches. > > > > I've also added this proposal to CF - > > https://commitfest.postgresql.org/47/4777/. > > Some of the tests related to Incremental Sort added by a recent commit > 0452b461bc4 in aggregates.sql are failing when the multi inserts > feature is used for CTAS (like done in 0002 patch). I'm not so sure if > it's because of the reduction in the CTAS execution times. Execution > time for table 'btg' created with CREATE TABLE AS added by commit > 0452b461bc4 with single inserts is 25.3 msec, with multi inserts is > 17.7 msec. This means that the multi inserts are about 1.43 times or > 30.04% faster than the single inserts. Couple of ways to make these > tests pick Incremental Sort as expected - 1) CLUSTER btg USING abc; or > 2) increase the number of rows in table btg to 100K from 10K. FWIW, if > I reduce the number of rows in the table from 10K to 1K, the > Incremental Sort won't get picked on HEAD with CTAS using single > inserts. Hence, I chose option (2) to fix the issue. > > Please find the attached v9 patch set. > > [1] > -- Engage incremental sort > explain (COSTS OFF) SELECT x,y FROM btg GROUP BY x,y,z,w; > - QUERY PLAN > -------------------------------------------------- > + QUERY PLAN > +------------------------------ > Group > Group Key: x, y, z, w > - -> Incremental Sort > + -> Sort > Sort Key: x, y, z, w > - Presorted Key: x, y > - -> Index Scan using btg_x_y_idx on btg > -(6 rows) > + -> Seq Scan on btg > +(5 rows) CF bot machine with Windows isn't happy with the compilation [1], so fixed those warnings and attached v10 patch set. [1] [07:35:25.458] [632/2212] Compiling C object src/backend/postgres_lib.a.p/commands_copyfrom.c.obj [07:35:25.458] c:\cirrus\src\include\access\tableam.h(1574) : warning C4715: 'table_multi_insert_slots': not all control paths return a value [07:35:25.458] c:\cirrus\src\include\access\tableam.h(1522) : warning C4715: 'table_insert_begin': not all control paths return a value [07:35:25.680] c:\cirrus\src\include\access\tableam.h(1561) : warning C4715: 'table_multi_insert_next_free_slot': not all control paths return a value [07:35:25.680] [633/2212] Compiling C object src/backend/postgres_lib.a.p/commands_createas.c.obj [07:35:25.680] c:\cirrus\src\include\access\tableam.h(1522) : warning C4715: 'table_insert_begin': not all control paths return a value [07:35:26.310] [646/2212] Compiling C object src/backend/postgres_lib.a.p/commands_matview.c.obj [07:35:26.310] c:\cirrus\src\include\access\tableam.h(1522) : warning C4715: 'table_insert_begin': not all control paths return a value -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com Attachments: [application/octet-stream] v10-0001-New-TAMs-for-inserts.patch (15.9K, 2-v10-0001-New-TAMs-for-inserts.patch) download | inline diff: From 3c892cf5c2df949efac1ec5dc8fc390b868fe400 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 10:59:41 +0000 Subject: [PATCH v10 1/4] New TAMs for inserts --- src/backend/access/heap/heapam.c | 224 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 9 + src/include/access/heapam.h | 49 +++++ src/include/access/tableam.h | 138 ++++++++++++++ 4 files changed, 420 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 707460a536..7df305380e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -68,6 +68,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2446,6 +2447,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags) +{ + TableInsertState *tistate; + + tistate = palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->am_flags = am_flags; + tistate->insert_flags = insert_flags; + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 || + (am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + tistate->am_data = palloc0(sizeof(HeapInsertState)); + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + + ((HeapInsertState *) tistate->am_data)->mistate = mistate; + } + + if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate == NULL); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Create/return next free slot from multi-insert buffered slots array. + */ +TupleTableSlot * +heap_multi_insert_next_free_slot(TableInsertState * state) +{ + TupleTableSlot *slot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + slot = mistate->slots[mistate->cur_slots]; + + if (slot == NULL) + { + slot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = slot; + } + else + ExecClearTuple(slot); + + return slot; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + /* + * Caller may have got the slot using heap_multi_insert_next_free_slot, + * filled it and passed. So, skip copying in such a case. + */ + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0) + { + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + } + else + Assert(dstslot == slot); + + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 && + (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)) + heap_multi_insert_flush(state); +} + +/* + * Return pointer to multi-insert buffered slots array and number of currently + * occupied slots. + */ +TupleTableSlot ** +heap_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + HeapMultiInsertState *mistate; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + *num_slots = mistate->cur_slots; + + return mistate->slots; +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_multi_insert_flush(TableInsertState * state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState * state) +{ + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->am_data)->mistate = NULL; + } + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate); + + pfree(state->am_data); + state->am_data = NULL; + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be..795177812d 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2564,6 +2564,15 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_multi_insert_slots = heap_multi_insert_slots, + .tuple_multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4b133f6859..053be18110 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,40 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState * heap_insert_begin(Relation rel, + CommandId cid, + int am_flags, + int insert_flags); +extern void heap_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state); +extern void heap_multi_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state, + int *num_slots); +extern void heap_multi_insert_flush(TableInsertState * state); +extern void heap_insert_end(TableInsertState * state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d..834de15b9b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TABLEAM_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + +/* + * Skip flushing buffered tuples automatically. Responsibility lies with the + * caller to flush the buffered tuples. + */ +#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass + * of insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int insert_flags; + + /* Table AM specific data starts here */ + + void *am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +559,20 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int am_flags, + int insert_flags); + void (*tuple_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + void (*tuple_multi_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state); + TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state, + int *num_slots); + void (*tuple_multi_insert_flush) (TableInsertState * state); + void (*tuple_insert_end) (TableInsertState * state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1507,93 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int am_flags, + int insert_flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags, + insert_flags); + else + { + elog(ERROR, "table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_v2) + state->rel->rd_tableam->tuple_insert_v2(state, slot); + else + elog(ERROR, "table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_v2) + state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + else + elog(ERROR, "table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline TupleTableSlot * +table_multi_insert_next_free_slot(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_next_free_slot) + return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state); + else + { + elog(ERROR, "table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline TupleTableSlot ** +table_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_slots) + return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots); + else + { + elog(ERROR, "table_multi_insert_slots access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_multi_insert_flush(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_flush) + state->rel->rd_tableam->tuple_multi_insert_flush(state); + else + elog(ERROR, "table_multi_insert_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_insert_end(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_end) + state->rel->rd_tableam->tuple_insert_end(state); + else + elog(ERROR, "table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + /* * Delete a tuple. * -- 2.34.1 [application/octet-stream] v10-0002-Optimize-CTAS-with-multi-inserts.patch (3.8K, 3-v10-0002-Optimize-CTAS-with-multi-inserts.patch) download | inline diff: From ab21bc9db0b6a033db3d6d00f72c5b1abf445240 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 11:01:56 +0000 Subject: [PATCH v10 2/4] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 25 +++++++++--------------- src/test/regress/expected/aggregates.out | 2 +- src/test/regress/sql/aggregates.sql | 2 +- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 16a2fe65e6..3a02ea9578 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -58,9 +58,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -557,17 +555,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -595,11 +595,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -617,10 +613,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out index 7a73c19314..2889fd315d 100644 --- a/src/test/regress/expected/aggregates.out +++ b/src/test/regress/expected/aggregates.out @@ -2734,7 +2734,7 @@ CREATE TABLE btg AS SELECT i % 100 AS y, 'abc' || i % 10 AS z, i AS w -FROM generate_series(1,10000) AS i; +FROM generate_series(1,100000) AS i; CREATE INDEX btg_x_y_idx ON btg(x,y); ANALYZE btg; -- GROUP BY optimization by reorder columns by frequency diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql index 916dbf908f..99f890bb85 100644 --- a/src/test/regress/sql/aggregates.sql +++ b/src/test/regress/sql/aggregates.sql @@ -1187,7 +1187,7 @@ CREATE TABLE btg AS SELECT i % 100 AS y, 'abc' || i % 10 AS z, i AS w -FROM generate_series(1,10000) AS i; +FROM generate_series(1,100000) AS i; CREATE INDEX btg_x_y_idx ON btg(x,y); ANALYZE btg; -- 2.34.1 [application/octet-stream] v10-0003-Optimize-RMV-with-multi-inserts.patch (2.9K, 4-v10-0003-Optimize-RMV-with-multi-inserts.patch) download | inline diff: From 621aa97a1708ba178f5ebf6aca869788a2cf1b56 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 11:02:19 +0000 Subject: [PATCH v10 3/4] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 1dcfbe879b..f84c79f5f0 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -52,10 +52,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -457,13 +454,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -488,12 +485,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,14 +499,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1 [application/octet-stream] v10-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch (6.3K, 5-v10-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch) download | inline diff: From 169131f28e09c41b0b100f953b54dd16b2e3185a Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Mon, 29 Jan 2024 11:02:37 +0000 Subject: [PATCH v10 4/4] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1fe70b9133..8abf33aa97 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -77,10 +77,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -223,14 +222,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -245,7 +261,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -322,8 +338,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -395,13 +409,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -409,18 +418,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -435,7 +433,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -493,20 +491,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -593,13 +586,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -615,6 +620,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1 ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2024-03-02 06:32 Bharath Rupireddy <[email protected]> parent: Bharath Rupireddy <[email protected]> 0 siblings, 1 reply; 10+ messages in thread From: Bharath Rupireddy @ 2024-03-02 06:32 UTC (permalink / raw) To: pgsql-hackers; +Cc: Andres Freund <[email protected]>; Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Mon, Jan 29, 2024 at 5:16 PM Bharath Rupireddy <[email protected]> wrote: > > > Please find the attached v9 patch set. I've had to rebase the patches due to commit 874d817, please find the attached v11 patch set. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com Attachments: [application/x-patch] v11-0001-New-TAMs-for-inserts.patch (15.9K, 2-v11-0001-New-TAMs-for-inserts.patch) download | inline diff: From c338f541e01850fa4bb423e09acce618be9e21ba Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 1 Mar 2024 14:23:07 +0000 Subject: [PATCH v11 1/4] New TAMs for inserts --- src/backend/access/heap/heapam.c | 224 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 9 + src/include/access/heapam.h | 49 +++++ src/include/access/tableam.h | 138 ++++++++++++++ 4 files changed, 420 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 707460a536..7df305380e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -68,6 +68,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2446,6 +2447,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags) +{ + TableInsertState *tistate; + + tistate = palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->am_flags = am_flags; + tistate->insert_flags = insert_flags; + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 || + (am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + tistate->am_data = palloc0(sizeof(HeapInsertState)); + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + + ((HeapInsertState *) tistate->am_data)->mistate = mistate; + } + + if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate == NULL); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Create/return next free slot from multi-insert buffered slots array. + */ +TupleTableSlot * +heap_multi_insert_next_free_slot(TableInsertState * state) +{ + TupleTableSlot *slot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + slot = mistate->slots[mistate->cur_slots]; + + if (slot == NULL) + { + slot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = slot; + } + else + ExecClearTuple(slot); + + return slot; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + /* + * Caller may have got the slot using heap_multi_insert_next_free_slot, + * filled it and passed. So, skip copying in such a case. + */ + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0) + { + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + } + else + Assert(dstslot == slot); + + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 && + (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)) + heap_multi_insert_flush(state); +} + +/* + * Return pointer to multi-insert buffered slots array and number of currently + * occupied slots. + */ +TupleTableSlot ** +heap_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + HeapMultiInsertState *mistate; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + *num_slots = mistate->cur_slots; + + return mistate->slots; +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_multi_insert_flush(TableInsertState * state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState * state) +{ + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->am_data)->mistate = NULL; + } + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate); + + pfree(state->am_data); + state->am_data = NULL; + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 680a50bf8b..84793f324e 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2562,6 +2562,15 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_multi_insert_slots = heap_multi_insert_slots, + .tuple_multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4b133f6859..053be18110 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,40 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState * heap_insert_begin(Relation rel, + CommandId cid, + int am_flags, + int insert_flags); +extern void heap_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state); +extern void heap_multi_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state, + int *num_slots); +extern void heap_multi_insert_flush(TableInsertState * state); +extern void heap_insert_end(TableInsertState * state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d..834de15b9b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TABLEAM_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + +/* + * Skip flushing buffered tuples automatically. Responsibility lies with the + * caller to flush the buffered tuples. + */ +#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass + * of insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int insert_flags; + + /* Table AM specific data starts here */ + + void *am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +559,20 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int am_flags, + int insert_flags); + void (*tuple_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + void (*tuple_multi_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state); + TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state, + int *num_slots); + void (*tuple_multi_insert_flush) (TableInsertState * state); + void (*tuple_insert_end) (TableInsertState * state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1507,93 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int am_flags, + int insert_flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags, + insert_flags); + else + { + elog(ERROR, "table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_v2) + state->rel->rd_tableam->tuple_insert_v2(state, slot); + else + elog(ERROR, "table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_v2) + state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + else + elog(ERROR, "table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline TupleTableSlot * +table_multi_insert_next_free_slot(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_next_free_slot) + return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state); + else + { + elog(ERROR, "table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline TupleTableSlot ** +table_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_slots) + return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots); + else + { + elog(ERROR, "table_multi_insert_slots access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_multi_insert_flush(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_flush) + state->rel->rd_tableam->tuple_multi_insert_flush(state); + else + elog(ERROR, "table_multi_insert_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_insert_end(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_end) + state->rel->rd_tableam->tuple_insert_end(state); + else + elog(ERROR, "table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + /* * Delete a tuple. * -- 2.34.1 [application/x-patch] v11-0002-Optimize-CTAS-with-multi-inserts.patch (2.7K, 3-v11-0002-Optimize-CTAS-with-multi-inserts.patch) download | inline diff: From 3ee91fcf1ba848ee07993af51fba3b6a8a2714b5 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 1 Mar 2024 14:27:08 +0000 Subject: [PATCH v11 2/4] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 16a2fe65e6..3a02ea9578 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -58,9 +58,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -557,17 +555,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -595,11 +595,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -617,10 +613,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); -- 2.34.1 [application/x-patch] v11-0003-Optimize-RMV-with-multi-inserts.patch (2.9K, 4-v11-0003-Optimize-RMV-with-multi-inserts.patch) download | inline diff: From 623cbb495fb58a07de5de2884c5cd19059abbb9b Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 1 Mar 2024 14:27:40 +0000 Subject: [PATCH v11 3/4] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 59920ced83..6a8834e84b 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -52,10 +52,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -457,13 +454,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -488,12 +485,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,14 +499,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1 [application/x-patch] v11-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch (6.3K, 5-v11-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch) download | inline diff: From aa0965b2887cd07f5fe3373dcc2509659a9b2fa2 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 1 Mar 2024 14:28:58 +0000 Subject: [PATCH v11 4/4] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index c3bc897028..a18cf718cb 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -77,10 +77,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -223,14 +222,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -245,7 +261,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -322,8 +338,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -395,13 +409,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -409,18 +418,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -435,7 +433,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -493,20 +491,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -593,13 +586,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -615,6 +620,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1 ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2024-03-08 10:36 Bharath Rupireddy <[email protected]> parent: Bharath Rupireddy <[email protected]> 0 siblings, 1 reply; 10+ messages in thread From: Bharath Rupireddy @ 2024-03-08 10:36 UTC (permalink / raw) To: pgsql-hackers; +Cc: Andres Freund <[email protected]>; Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> On Sat, Mar 2, 2024 at 12:02 PM Bharath Rupireddy <[email protected]> wrote: > > On Mon, Jan 29, 2024 at 5:16 PM Bharath Rupireddy > <[email protected]> wrote: > > > > > Please find the attached v9 patch set. > > I've had to rebase the patches due to commit 874d817, please find the > attached v11 patch set. Rebase needed. Please see the v12 patch set. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com Attachments: [application/x-patch] v12-0001-New-TAMs-for-inserts.patch (15.9K, 2-v12-0001-New-TAMs-for-inserts.patch) download | inline diff: From 8a3552e65e62afc40db99fbd7bf4f98990d45390 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 8 Mar 2024 10:11:17 +0000 Subject: [PATCH v12 1/4] New TAMs for inserts --- src/backend/access/heap/heapam.c | 224 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 9 + src/include/access/heapam.h | 49 +++++ src/include/access/tableam.h | 138 ++++++++++++++ 4 files changed, 420 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 34bc60f625..497940d74a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -64,6 +64,7 @@ #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2442,6 +2443,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags) +{ + TableInsertState *tistate; + + tistate = palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->am_flags = am_flags; + tistate->insert_flags = insert_flags; + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 || + (am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + tistate->am_data = palloc0(sizeof(HeapInsertState)); + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + + ((HeapInsertState *) tistate->am_data)->mistate = mistate; + } + + if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate == NULL); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Create/return next free slot from multi-insert buffered slots array. + */ +TupleTableSlot * +heap_multi_insert_next_free_slot(TableInsertState * state) +{ + TupleTableSlot *slot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + slot = mistate->slots[mistate->cur_slots]; + + if (slot == NULL) + { + slot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = slot; + } + else + ExecClearTuple(slot); + + return slot; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + /* + * Caller may have got the slot using heap_multi_insert_next_free_slot, + * filled it and passed. So, skip copying in such a case. + */ + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0) + { + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + } + else + Assert(dstslot == slot); + + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 && + (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)) + heap_multi_insert_flush(state); +} + +/* + * Return pointer to multi-insert buffered slots array and number of currently + * occupied slots. + */ +TupleTableSlot ** +heap_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + HeapMultiInsertState *mistate; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + *num_slots = mistate->cur_slots; + + return mistate->slots; +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_multi_insert_flush(TableInsertState * state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState * state) +{ + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->am_data)->mistate = NULL; + } + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate); + + pfree(state->am_data); + state->am_data = NULL; + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 680a50bf8b..84793f324e 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2562,6 +2562,15 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_multi_insert_slots = heap_multi_insert_slots, + .tuple_multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4b133f6859..053be18110 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,40 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState * heap_insert_begin(Relation rel, + CommandId cid, + int am_flags, + int insert_flags); +extern void heap_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state); +extern void heap_multi_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state, + int *num_slots); +extern void heap_multi_insert_flush(TableInsertState * state); +extern void heap_insert_end(TableInsertState * state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d..834de15b9b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TABLEAM_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + +/* + * Skip flushing buffered tuples automatically. Responsibility lies with the + * caller to flush the buffered tuples. + */ +#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass + * of insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int insert_flags; + + /* Table AM specific data starts here */ + + void *am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +559,20 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int am_flags, + int insert_flags); + void (*tuple_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + void (*tuple_multi_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state); + TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state, + int *num_slots); + void (*tuple_multi_insert_flush) (TableInsertState * state); + void (*tuple_insert_end) (TableInsertState * state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1507,93 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int am_flags, + int insert_flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags, + insert_flags); + else + { + elog(ERROR, "table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_v2) + state->rel->rd_tableam->tuple_insert_v2(state, slot); + else + elog(ERROR, "table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_v2) + state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + else + elog(ERROR, "table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline TupleTableSlot * +table_multi_insert_next_free_slot(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_next_free_slot) + return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state); + else + { + elog(ERROR, "table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline TupleTableSlot ** +table_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_slots) + return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots); + else + { + elog(ERROR, "table_multi_insert_slots access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_multi_insert_flush(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_flush) + state->rel->rd_tableam->tuple_multi_insert_flush(state); + else + elog(ERROR, "table_multi_insert_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_insert_end(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_end) + state->rel->rd_tableam->tuple_insert_end(state); + else + elog(ERROR, "table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + /* * Delete a tuple. * -- 2.34.1 [application/x-patch] v12-0002-Optimize-CTAS-with-multi-inserts.patch (2.7K, 3-v12-0002-Optimize-CTAS-with-multi-inserts.patch) download | inline diff: From fd891115178bc33df87844417e35a724b359af96 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 8 Mar 2024 10:11:41 +0000 Subject: [PATCH v12 2/4] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 62050f4dc5..7a4415c62f 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -53,9 +53,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -552,17 +550,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -590,11 +590,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -612,10 +608,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); -- 2.34.1 [application/x-patch] v12-0003-Optimize-RMV-with-multi-inserts.patch (2.9K, 4-v12-0003-Optimize-RMV-with-multi-inserts.patch) download | inline diff: From 44caa58dc21e8e4634d214c074a88986b2311b41 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 8 Mar 2024 10:12:02 +0000 Subject: [PATCH v12 3/4] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 03373462f0..889a9a21f8 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -47,10 +47,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -453,13 +450,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -484,12 +481,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -503,14 +495,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1 [application/x-patch] v12-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch (6.3K, 5-v12-0004-Use-new-multi-insert-TAM-for-COPY-FROM.patch) download | inline diff: From d53ee9b1b31b0e68858e673a618905d7bfdcf4de Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Fri, 8 Mar 2024 10:12:32 +0000 Subject: [PATCH v12 4/4] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 8908a440e1..c2a81d4df1 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -74,10 +74,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -220,14 +219,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -242,7 +258,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -319,8 +335,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -392,13 +406,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -406,18 +415,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -432,7 +430,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -490,20 +488,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -590,13 +583,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -612,6 +617,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1 ^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: New Table Access Methods for Multi and Single Inserts @ 2024-03-19 05:09 Masahiko Sawada <[email protected]> parent: Bharath Rupireddy <[email protected]> 0 siblings, 0 replies; 10+ messages in thread From: Masahiko Sawada @ 2024-03-19 05:09 UTC (permalink / raw) To: Bharath Rupireddy <[email protected]>; +Cc: pgsql-hackers; Andres Freund <[email protected]>; Dilip Kumar <[email protected]>; Luc Vlaming <[email protected]>; Justin Pryzby <[email protected]>; Jeff Davis <[email protected]>; Michael Paquier <[email protected]>; Matthias van de Meent <[email protected]> Hi, On Fri, Mar 8, 2024 at 7:37 PM Bharath Rupireddy <[email protected]> wrote: > > On Sat, Mar 2, 2024 at 12:02 PM Bharath Rupireddy > <[email protected]> wrote: > > > > On Mon, Jan 29, 2024 at 5:16 PM Bharath Rupireddy > > <[email protected]> wrote: > > > > > > > Please find the attached v9 patch set. > > > > I've had to rebase the patches due to commit 874d817, please find the > > attached v11 patch set. > > Rebase needed. Please see the v12 patch set. > I've not reviewed the patches in depth yet, but run performance tests for CREATE MATERIALIZED VIEW. The test scenarios is: -- setup create unlogged table test (c int); insert into test select generate_series(1, 10000000); -- run create materialized view test_mv as select * from test; Here are the results: * HEAD 3775.221 ms 3744.039 ms 3723.228 ms * v12 patch 6289.972 ms 5880.674 ms 7663.509 ms I can see performance regressions and the perf report says that CPU spent most time on extending the ResourceOwner's array while copying the buffer-heap tuple: - 52.26% 0.18% postgres postgres [.] intorel_receive 52.08% intorel_receive table_multi_insert_v2 (inlined) - heap_multi_insert_v2 - 51.53% ExecCopySlot (inlined) tts_buffer_heap_copyslot tts_buffer_heap_store_tuple (inlined) - IncrBufferRefCount - ResourceOwnerEnlarge ResourceOwnerAddToHash (inlined) Is there any reason why we copy a buffer-heap tuple to another buffer-heap tuple? Which results in that we increments the buffer refcount and register it to ResourceOwner for every tuples. I guess that the destination tuple slot is not necessarily a buffer-heap, and we could use VirtualTupleTableSlot instead. It would in turn require copying a heap tuple. I might be missing something but it improved the performance at least in my env. The change I made was: - dstslot = table_slot_create(state->rel, NULL); + //dstslot = table_slot_create(state->rel, NULL); + dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel), + &TTSOpsVirtual); + And the execution times are: 1588.984 ms 1591.618 ms 1582.519 ms Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com ^ permalink raw reply [nested|flat] 10+ messages in thread
end of thread, other threads:[~2024-03-19 05:09 UTC | newest] Thread overview: 10+ messages (download: mbox mbox.gz follow: Atom feed) -- links below jump to the message on this page -- 2023-06-03 22:38 Re: New Table Access Methods for Multi and Single Inserts Andres Freund <[email protected]> 2023-06-05 02:30 ` Bharath Rupireddy <[email protected]> 2023-08-01 16:30 ` Bharath Rupireddy <[email protected]> 2023-08-01 17:02 ` Jacob Champion <[email protected]> 2024-01-17 17:27 ` Bharath Rupireddy <[email protected]> 2024-01-29 07:27 ` Bharath Rupireddy <[email protected]> 2024-01-29 11:46 ` Bharath Rupireddy <[email protected]> 2024-03-02 06:32 ` Bharath Rupireddy <[email protected]> 2024-03-08 10:36 ` Bharath Rupireddy <[email protected]> 2024-03-19 05:09 ` Masahiko Sawada <[email protected]>
This inbox is served by agora; see mirroring instructions for how to clone and mirror all data and code used for this inbox