From fd64f007092f4a71d61aa1f3347390da05c2460c Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Sat, 26 Oct 2024 12:37:14 +0000 Subject: [PATCH v24 3/4] Use new multi-inserts table AM for COPY ... FROM This commit uses the new multi-inserts table AM added by commit <> for COPY ... FROM command. Author: Bharath Rupireddy Reviewed-by: Jeff Davis Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com --- src/backend/commands/copyfrom.c | 254 +++++++++++++++-------- src/include/commands/copyfrom_internal.h | 4 +- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 171 insertions(+), 88 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 07cbd5d22b..18fb609cbe 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -74,14 +74,27 @@ */ #define MAX_PARTITION_BUFFERS 32 +/* Context for multi-inserts buffer flush callback */ +typedef struct MultiInsertBufferFlushCtx +{ + CopyFromState cstate; + ResultRelInfo *resultRelInfo; + EState *estate; +} MultiInsertBufferFlushCtx; + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableModifyState *mstate; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ + TupleTableSlot *mislot; /* Slot used for multi-inserts */ + MultiInsertBufferFlushCtx *mibufferctx; /* Multi-inserts buffer flush + * callback context */ int nused; /* number of 'slots' containing tuples */ + int currslotno; /* Current buffered slot number that's being + * flushed; Used to get correct cur_lineno for + * errors while in flush callback. */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ } CopyMultiInsertBuffer; @@ -216,19 +229,96 @@ CopyLimitPrintoutLength(const char *str) return res; } +/* + * Implements for multi-inserts buffer flush callback + * i.e. TableModifyEndCallback. + * + * NB: Caller must take care of opening and closing the indices. + */ +static void +MultiInsertBufferFlushCb(void *context, TupleTableSlot *slot) +{ + MultiInsertBufferFlushCtx *mibufferctx = (MultiInsertBufferFlushCtx *) context; + CopyFromState cstate = mibufferctx->cstate; + ResultRelInfo *resultRelInfo = mibufferctx->resultRelInfo; + EState *estate = mibufferctx->estate; + CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer; + + /* + * If there are any indexes, update them for all the inserted tuples, and + * run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[buffer->currslotno++]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, false); + + ExecARInsertTriggers(estate, resultRelInfo, + slot, recheckIndexes, + cstate->transition_capture); + + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers + * anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[buffer->currslotno++]; + + ExecARInsertTriggers(estate, resultRelInfo, + slot, NIL, + cstate->transition_capture); + } + + Assert(buffer->currslotno <= buffer->nused); +} + /* * Allocate memory and initialize a new CopyMultiInsertBuffer for this * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, + CopyFromState cstate, EState *estate) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + buffer->mibufferctx = + (MultiInsertBufferFlushCtx *) palloc(sizeof(MultiInsertBufferFlushCtx)); + buffer->mibufferctx->cstate = cstate; + buffer->mibufferctx->resultRelInfo = rri; + buffer->mibufferctx->estate = estate; + + buffer->mstate = table_modify_begin(rri->ri_RelationDesc, + miinfo->mycid, + miinfo->ti_options, + MultiInsertBufferFlushCb, + buffer->mibufferctx); + + buffer->slots = NULL; + } + else + { + buffer->mstate = NULL; + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + } + + buffer->mislot = NULL; buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -239,11 +329,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri) */ static inline void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) + ResultRelInfo *rri, CopyFromState cstate, + EState *estate) { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -276,7 +367,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * tuples their way for the first time. */ if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - CopyMultiInsertInfoSetupBuffer(miinfo, rri); + CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate); } /* @@ -320,8 +411,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -393,13 +482,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -407,56 +491,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + Assert(buffer->currslotno <= buffer->nused); + buffer->currslotno = 0; - for (i = 0; i < nused; i++) - { - /* - * If there are any indexes, update them for all the inserted - * tuples, and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, - false, NULL, NIL, false); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } + table_modify_buffer_flush(buffer->mstate); - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) - { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, - cstate->transition_capture); - } + Assert(buffer->currslotno <= buffer->nused); + buffer->currslotno = 0; - ExecClearTuple(slots[i]); - } + /* + * Indexes are updated and AFTER ROW INSERT triggers (if any) are run + * in the flush callback CopyModifyBufferFlushCallback. + */ /* Update the row counter and progress of the COPY command */ *processed += nused; @@ -492,19 +538,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, if (resultRelInfo->ri_FdwRoutine == NULL) { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); + table_modify_end(buffer->mstate); + ExecDropSingleTupleTableSlot(buffer->mislot); + pfree(buffer->mibufferctx); } else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -598,15 +643,36 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(buffer->nused < MAX_BUFFERED_TUPLES); nused = buffer->nused; - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + { + if (buffer->mislot == NULL) + { + buffer->mislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc), + &TTSOpsVirtual); + } + + /* Caller must clear the slot */ + slot = buffer->mislot; + } + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -620,7 +686,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); + +#ifdef USE_ASSERT_CHECKING + if (rri->ri_FdwRoutine != NULL) + Assert(slot == buffer->slots[buffer->nused]); +#endif /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; @@ -628,6 +698,22 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, /* Record this slot as being used */ buffer->nused++; + if (rri->ri_FdwRoutine == NULL) + { + Assert(slot == buffer->mislot); + buffer->currslotno = 0; + + table_modify_buffer_insert(buffer->mstate, slot); + } + else + { + /* + * The slot previously might point into the per-tuple context. For + * batching it needs to be longer lived. + */ + ExecMaterializeSlot(slot); + } + /* Update how many tuples are stored and their size */ miinfo->bufferedTuples++; miinfo->bufferedBytes += tuplen; @@ -841,7 +927,7 @@ CopyFrom(CopyFromState cstate) /* * It's generally more efficient to prepare a bunch of tuples for * insertion, and insert them in one - * table_multi_insert()/ExecForeignBatchInsert() call, than call + * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call * table_tuple_insert()/ExecForeignInsert() separately for every tuple. * However, there are a number of reasons why we might not be able to do * this. These are explained below. @@ -925,7 +1011,8 @@ CopyFrom(CopyFromState cstate) insertMethod = CIM_MULTI; CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, - estate, mycid, ti_options); + estate, mycid, + ti_options | TABLE_INSERT_BAS_BULKWRITE); } /* @@ -1094,7 +1181,8 @@ CopyFrom(CopyFromState cstate) { if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, - resultRelInfo); + resultRelInfo, cstate, + estate); } else if (insertMethod == CIM_MULTI_CONDITIONAL && !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) @@ -1224,12 +1312,6 @@ CopyFrom(CopyFromState cstate) /* Store the slot in the multi-insert buffer, when enabled. */ if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) { - /* - * The slot previously might point into the per-tuple - * context. For batching it needs to be longer lived. - */ - ExecMaterializeSlot(myslot); - /* Add this tuple to the tuple buffer */ CopyMultiInsertInfoStore(&multiInsertInfo, resultRelInfo, myslot, diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc78..14addbc6f6 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -46,9 +46,9 @@ typedef enum EolType typedef enum CopyInsertMethod { CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or + CIM_MULTI, /* always use table_modify_buffer_insert or * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + CIM_MULTI_CONDITIONAL, /* use table_modify_buffer_insert or * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e7ddf29c16..bf21e43ce1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1664,6 +1664,7 @@ MonotonicFunction MorphOpaque MsgType MultiAssignRef +MultiInsertBufferFlushCtx MultiSortSupport MultiSortSupportData MultiXactId -- 2.40.1