public inbox for [email protected]
help / color / mirror / Atom feedFrom: Bharath Rupireddy <[email protected]>
To: Jeff Davis <[email protected]>
Cc: Masahiko Sawada <[email protected]>
Cc: PostgreSQL-development <[email protected]>
Cc: Andres Freund <[email protected]>
Cc: Dilip Kumar <[email protected]>
Cc: Luc Vlaming <[email protected]>
Cc: Justin Pryzby <[email protected]>
Cc: Michael Paquier <[email protected]>
Cc: Matthias van de Meent <[email protected]>
Cc: Alexander Korotkov <[email protected]>
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
Date: Sat, 26 Oct 2024 06:30:50 -0700
Message-ID: <CALj2ACWUUTFrTF0L_xdj64i9xsjf_+4nt_HPEBiM17cxEk4CWg@mail.gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <CALj2ACXdrOmB6Na9amHWZHKvRT3Z0nwTRsCwoMT-npOBtmXLXg@mail.gmail.com>
<[email protected]>
<CALj2ACX5UMWVFdrRNUE0KDrg54WV1cumBXwcETXhrPc1ibKAQA@mail.gmail.com>
<CAAWbhmj5Pio3nOUakObzLGCSS9dwFfgsNVDhwTGzXNwZc00uCQ@mail.gmail.com>
<CALj2ACVHC=c6eC9SRxhcTUrnXvNDNkEBgedi2WkVJYRb=0sWYw@mail.gmail.com>
<CALj2ACVE2h=LnFnpr3rh+6SZzdwzW5EZOYG2Z0t=p28Fn75eag@mail.gmail.com>
<CALj2ACWT0Rz8oybWBm5W4CeS0DvFkwaw-pEvGArhDLyPbZnW_g@mail.gmail.com>
<CALj2ACWxO3HPtpYZb765LZk-uKVuAvZPO1HDeZ8=mzMgVPgaww@mail.gmail.com>
<CALj2ACXJA4QQ_6zAHez0Uy-9t-ebmpox2y1QBja+mF4QP+h8WQ@mail.gmail.com>
<CAD21AoD97mhzF8cqsd2v1jg9z8xfvAJrPx6Wvi+Ev0Hmu96LJA@mail.gmail.com>
<CALj2ACUcv5pZoB0=gRrz54M9+YT9JCmo6FYyo5WUS6wnS+em=A@mail.gmail.com>
<CALj2ACWm77YofBMs9x3Zmp3ctNAhcS4TvPVuXKdfwCr22FqOHg@mail.gmail.com>
<[email protected]>
<CALj2ACWqVzhxDuWNTWAH-LuADvsyX0r-wpwgeJ+Q1FnAKjY5Yw@mail.gmail.com>
<[email protected]>
<CALj2ACU70HZm+0QRJdkGA5RdJUo4zPYnV2hzkiV-wH5QS2PAEQ@mail.gmail.com>
<[email protected]>
<CALj2ACVMV=gMROte2=0LBFnSCRvzL4D9WK6oQ9ZHr4Qj2S8xWA@mail.gmail.com>
<[email protected]>
<CALj2ACX9vVYHYkX8e6w058EuAs8JL5EsnzadTxGhpiE_Ep_ByA@mail.gmail.com>
<[email protected]>
<CALj2ACWTrx1zxWvq8Uj2rEwCsDgQHeJ53WdvzZUw3kW+_VPG6A@mail.gmail.com>
<CALj2ACUz5+_YNEa4ZY-XG960_oXefM50MjD71VgSCAVDkF3bzQ@mail.gmail.com>
<[email protected]>
<CALj2ACX90L5Mb5Vv=jsvhOdZ8BVsfpZf-CdCGhtm2N+bGUCSjg@mail.gmail.com>
<CALj2ACXnCvcYvaz4aDH_ezUBJanz_Boi8W=76fOwrxiwSnUFOQ@mail.gmail.com>
<[email protected]>
<[email protected]>
<CALj2ACUBwVu_1LOWF07Acv+dWx5NO7uTqAPMNwqi5WpJRw69_Q@mail.gmail.com>
<[email protected]>
Hi,
Thanks for looking into this.
On Thu, Aug 29, 2024 at 12:29 PM Jeff Davis <[email protected]> wrote:
>
> I believe we need the branching in the caller anyway:
>
> 1. If there is a BEFORE row trigger with a volatile function, the
> visibility rules[1] mean that the function should see changes from all
> the rows inserted so far this command, which won't work if they are
> still in the buffer.
>
> 2. Similarly, for an INSTEAD OF row trigger, the visibility rules say
> that the function should see all previous rows inserted.
>
> 3. If there are volatile functions in the target list or WHERE clause,
> the same visibility semantics apply.
>
> 4. If there's a "RETURNING ctid" clause, we need to either come up with
> a way to return the tuples after flushing, or we need to use the
> single-tuple path. (Similarly in the future when we support UPDATE ...
> RETURNING, as Matthias pointed out.)
>
> If we need two paths in each caller anyway, it seems cleaner to just
> wrap the check for tuple_modify_buffer_insert in
> table_modify_buffer_enabled().
>
> We could perhaps use a one path and then force a batch size of one or
> something, which is an alternative, but we have to be careful not to
> introduce a regression (and it still requires a solution for #4).
I chose to branch in the caller e.g. if there's a volatile function
SELECT query of REFRESH MATERIALIZED VIEW, the caller goes
table_tuple_insert() path, else multi-insert path.
I am posting the new v24 patch set organized as follows: 0001
introducing the new table AM, 0002 optimizing CTAS, CMV and RMV, 0003
using the new table AM for COPY ... FROM. I, for now, discarded the
INSERT INTO ... SELECT and Logical Replication Apply patches, the idea
is to take the basic stuff forward.
I reworked structure names, members and function names, reworded
comments, addressed review comments in the v24 patches. Please have a
look.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachments:
[application/octet-stream] v24-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch (11.1K, 2-v24-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch)
download | inline diff:
From 096df7a758b0b7cf00b99be9e4ffadf15ceef535 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Sat, 26 Oct 2024 12:35:51 +0000
Subject: [PATCH v24 2/4] Optimize CTAS/CMV/RMV with new multi-inserts table AM
This commit optimizes the following commands for heap AM using new
multi-inserts table AM added by commit <<CHANGE_ME>>:
- CREATE TABLE AS
- CREATE MATERIALIZED VIEW
- REFRESH MATERIALIZED VIEW
Testing shows that performance of CTAS, CMV, RMV is improved by
<<TO_FILL>> respectively on <<TO_FILL>> system.
f
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/commands/createas.c | 60 ++++++++++++++----
src/backend/commands/matview.c | 106 +++++++++++++++++++++++++++++---
src/include/commands/matview.h | 3 +
3 files changed, 147 insertions(+), 22 deletions(-)
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 68ec122dbf..0affadf404 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -38,6 +38,7 @@
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
+#include "optimizer/optimizer.h"
#include "rewrite/rewriteHandler.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
@@ -56,6 +57,12 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+
+ /* Table modify state. NULL if multi-inserts isn't supported. */
+ TableModifyState *mstate;
+
+ /* True if SELECT query contains volatile functions */
+ bool volatile_funcs;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -313,6 +320,10 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params);
+ /* Check if the SELECT query has any volatile functions */
+ ((DR_intorel *) dest)->volatile_funcs =
+ contain_volatile_functions_after_planning((Expr *) query);
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only
@@ -548,16 +559,32 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->rel = intoRelationDesc;
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM;
+ myState->ti_options = TABLE_INSERT_SKIP_FSM |
+ TABLE_INSERT_BAS_BULKWRITE;
+ myState->mstate = NULL;
+ myState->bistate = NULL;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
- * bulk inserts as there are no tuples to insert.
+ * multi or bulk inserts as there are no tuples to insert.
*/
if (!into->skipData)
- myState->bistate = GetBulkInsertState();
- else
- myState->bistate = NULL;
+ {
+ if (TableModifyIsMultiInsertsSupported(myState->rel,
+ myState->volatile_funcs))
+ {
+ myState->mstate = table_modify_begin(myState->rel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, /* Multi-insert buffer
+ * flush callback */
+ NULL); /* Multi-insert buffer
+ * flush callback
+ * context */
+ }
+ else
+ myState->bistate = GetBulkInsertState();
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -585,11 +612,15 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
* would not be cheap either. This also doesn't allow accessing per-AM
* data (say a tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->rel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+
+ if (myState->mstate != NULL)
+ table_modify_buffer_insert(myState->mstate, slot);
+ else
+ table_tuple_insert(myState->rel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
}
/* We know this is a newly created relation, so there are no indexes */
@@ -608,8 +639,13 @@ intorel_shutdown(DestReceiver *self)
if (!into->skipData)
{
- FreeBulkInsertState(myState->bistate);
- table_finish_bulk_insert(myState->rel, myState->ti_options);
+ if (myState->mstate != NULL)
+ table_modify_end(myState->mstate);
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->rel, myState->ti_options);
+ }
}
/* close rel, but keep lock until commit */
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 010097873d..fa495ec533 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -30,7 +30,9 @@
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "executor/spi.h"
+#include "foreign/fdwapi.h"
#include "miscadmin.h"
+#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lmgr.h"
@@ -51,6 +53,12 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+
+ /* Table modify state. NULL if multi-inserts isn't supported. */
+ TableModifyState *mstate;
+
+ /* True if SELECT query contains volatile functions */
+ bool volatile_funcs;
} DR_transientrel;
static int matview_maintenance_depth = 0;
@@ -428,6 +436,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
+ /*
+ * Check if the stored MATERIALIZED VIEW query has any volatile functions.
+ */
+ ((DR_transientrel *) dest)->volatile_funcs =
+ contain_volatile_functions_after_planning((Expr *) query);
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
@@ -492,8 +506,26 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
*/
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
- myState->bistate = GetBulkInsertState();
+ myState->ti_options = TABLE_INSERT_SKIP_FSM |
+ TABLE_INSERT_FROZEN |
+ TABLE_INSERT_BAS_BULKWRITE;
+ myState->bistate = NULL;
+ myState->mstate = NULL;
+
+ /* Set up the state for multi or bulk inserts */
+ if (TableModifyIsMultiInsertsSupported(myState->transientrel,
+ myState->volatile_funcs))
+ {
+ myState->mstate = table_modify_begin(myState->transientrel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, /* Multi-insert buffer
+ * flush callback */
+ NULL); /* Multi-insert buffer
+ * flush callback context */
+ }
+ else
+ myState->bistate = GetBulkInsertState();
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -519,11 +551,14 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
* tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->transientrel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ if (myState->mstate != NULL)
+ table_modify_buffer_insert(myState->mstate, slot);
+ else
+ table_tuple_insert(myState->transientrel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
@@ -538,9 +573,13 @@ transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- FreeBulkInsertState(myState->bistate);
-
- table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ if (myState->mstate != NULL)
+ table_modify_end(myState->mstate);
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ }
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
@@ -984,3 +1023,50 @@ CloseMatViewIncrementalMaintenance(void)
matview_maintenance_depth--;
Assert(matview_maintenance_depth >= 0);
}
+
+/*
+ * Check if multi-inserts is supported.
+ *
+ * It's generally more efficient to prepare a bunch of tuples for insertion,
+ * and insert them in one multi-inserts call, than call
+ * table_tuple_insert() separately for every tuple. However, there are a
+ * number of reasons why we might not be able to do this. In general, can't
+ * support multi-inserts in the following cases:
+ *
+ * When there are any BEFORE/INSTEAD OF triggers on the table or any volatile
+ * functions/expressions in the SELECT query. Such triggers or volatile
+ * expressions might query the table we're inserting into and act differently
+ * if the tuples that have already been processed and prepared for insertion
+ * are not there.
+ *
+ * When inserting into partitioned table. For partitioned tables, we may still
+ * be able to perform multi-inserts. However, the possibility of this depends
+ * on which types of triggers exist on the partition. We must disable
+ * multi-inserts if the partition is a foreign table that can't use batching or
+ * it has any before row insert or insert instead triggers (same as we checked
+ * above for the parent table). We really can't know all these unless we start
+ * inserting tuples into the respective partitions. We can have an intermediate
+ * insert state to show the intent to do multi-inserts and later determine if
+ * we can use multi-inserts for the partition being inserted into.
+ *
+ * When inserting into foreign table. For foreign tables, we may still be able
+ * to do multi-inserts if the FDW supports batching.
+ */
+bool
+TableModifyIsMultiInsertsSupported(Relation rel, bool volatile_funcs)
+{
+ if (volatile_funcs)
+ return false;
+
+ /*
+ * For CREATE TABLE AS, CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED
+ * VIEW, we really can't have triggers or can't create table as
+ * partitioned or foreign. So, we will assert.
+ */
+ Assert(rel->trigdesc == NULL);
+ Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
+ Assert(rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE);
+
+ /* Can support multi-inserts */
+ return true;
+}
diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h
index c8811e8fc7..28abd7b89b 100644
--- a/src/include/commands/matview.h
+++ b/src/include/commands/matview.h
@@ -33,4 +33,7 @@ extern DestReceiver *CreateTransientRelDestReceiver(Oid transientoid);
extern bool MatViewIncrementalMaintenanceIsEnabled(void);
+extern bool TableModifyIsMultiInsertsSupported(Relation rel,
+ bool volatile_funcs);
+
#endif /* MATVIEW_H */
--
2.40.1
[application/octet-stream] v24-0001-Introduce-new-table-AM-for-multi-inserts.patch (15.4K, 3-v24-0001-Introduce-new-table-AM-for-multi-inserts.patch)
download | inline diff:
From af21a26a83b4efdb5a73d1ba00e03d5138295ded Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Sat, 26 Oct 2024 12:35:08 +0000
Subject: [PATCH v24 1/4] Introduce new table AM for multi-inserts
Until now, it's the COPY ... FROM command using multi inserts
(i.e. buffer some tuples and inserts them to table at once).
Basic idea of multi-inserts is that less WAL and reduced buffer
locking. Multi-inserts is faster than calling heap_insert() in a
loop, because when multiple tuples can be inserted on a single
page, we can write just a single WAL record covering all of them,
and only need to lock/unlock the page once.
Various other commands can benefit from this multi-inserts logic
[Reusable].
Also, there's a need to have these multi-inserts AMs (Access
Methods) as scan-like API [Usability]. With this, various table
AMs can define their own buffering and flushing strategy
[Flexibility] based on the way they store the data in the
underlying storage (e.g. columnar).
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/access/heap/heapam.c | 209 ++++++++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 6 +
src/backend/access/table/tableamapi.c | 5 +
src/include/access/heapam.h | 38 +++++
src/include/access/tableam.h | 81 +++++++++
src/tools/pgindent/typedefs.list | 3 +
6 files changed, 341 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 75ff9e7388..1426cd40c6 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -50,6 +50,7 @@
#include "storage/procarray.h"
#include "utils/datum.h"
#include "utils/inval.h"
+#include "utils/memutils.h"
#include "utils/spccache.h"
@@ -102,7 +103,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
bool *copy);
-
+static void heap_modify_insert_end(TableModifyState *state);
/*
* Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2603,6 +2604,212 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
pgstat_count_heap_insert(relation, ntuples);
}
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ TableModifyState *state;
+ MemoryContext context;
+ MemoryContext oldcontext;
+
+ context = AllocSetContextCreate(TopTransactionContext,
+ "heap_modify memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(context);
+ state = palloc0(sizeof(TableModifyState));
+ state->rel = rel;
+ state->mem_ctx = context;
+ state->cid = cid;
+ state->options = options;
+ state->buffer_flush_cb = buffer_flush_cb;
+ state->buffer_flush_ctx = buffer_flush_ctx;
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot)
+{
+ TupleTableSlot *dstslot;
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(state->mem_ctx);
+
+ /* First time through, initialize heap insert state */
+ if (state->data == NULL)
+ {
+ istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
+ istate->bistate = NULL;
+ istate->mistate = NULL;
+ state->data = istate;
+ mistate =
+ (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+ mistate->slots =
+ (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+ istate->mistate = mistate;
+
+ /*
+ * heap_multi_insert() can leak memory. So switch to this memory
+ * context before every heap_multi_insert() call and reset when
+ * finished.
+ */
+ mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "heap_multi_insert memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ if ((state->options & TABLE_INSERT_BAS_BULKWRITE) != 0)
+ istate->bistate = GetBulkInsertState();
+ }
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+ dstslot = mistate->slots[mistate->cur_slots];
+
+ if (dstslot == NULL)
+ {
+ /*
+ * We use virtual tuple slots buffered slots for leveraging the
+ * optimization it provides to minimize physical data copying. The
+ * virtual slot gets materialized when we copy (via below
+ * ExecCopySlot) the tuples from the source slot which can be of any
+ * type. This way, it is ensured that the tuple storage doesn't depend
+ * on external memory, because all the datums that aren't passed by
+ * value are copied into the slot's memory context.
+ */
+ dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+ &TTSOpsVirtual);
+
+ mistate->slots[mistate->cur_slots] = dstslot;
+ }
+
+ Assert(TTS_IS_VIRTUAL(dstslot));
+
+ /*
+ * Note that the copy clears the previous destination slot contents, so no
+ * need to explicitly ExecClearTuple() here.
+ */
+ ExecCopySlot(dstslot, slot);
+
+ mistate->cur_slots++;
+
+ if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS)
+ heap_modify_buffer_flush(state);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+
+ /* Quick exit if we have flushed already */
+ if (mistate->cur_slots == 0)
+ return;
+
+ /*
+ * heap_multi_insert() can leak memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(mistate->mem_ctx);
+ heap_multi_insert(state->rel,
+ mistate->slots,
+ mistate->cur_slots,
+ state->cid,
+ state->options,
+ istate->bistate);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(mistate->mem_ctx);
+
+ /*
+ * Invoke caller-supplied buffer flush callback after inserting rows from
+ * the buffers to heap.
+ */
+ if (state->buffer_flush_cb != NULL)
+ {
+ for (int i = 0; i < mistate->cur_slots; i++)
+ {
+ state->buffer_flush_cb(state->buffer_flush_ctx,
+ mistate->slots[i]);
+ }
+ }
+
+ mistate->cur_slots = 0;
+}
+
+/*
+ * Heap insert specific function used for performing work at the end like
+ * flushing remaining buffered tuples, cleaning up the insert state and tuple
+ * table slots used for buffered tuples etc.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+ HeapInsertState *istate;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+
+ if (istate->mistate != NULL)
+ {
+ HeapMultiInsertState *mistate = istate->mistate;
+
+ heap_modify_buffer_flush(state);
+
+ Assert(mistate->cur_slots == 0);
+
+ for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+ MemoryContextDelete(mistate->mem_ctx);
+ }
+
+ if (istate->bistate != NULL)
+ FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+ heap_modify_insert_end(state);
+ MemoryContextDelete(state->mem_ctx);
+}
+
/*
* simple_heap_insert - insert a tuple
*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index a8d95e0f1c..d2ef6b4b78 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2644,6 +2644,12 @@ static const TableAmRoutine heapam_methods = {
.tuple_insert_speculative = heapam_tuple_insert_speculative,
.tuple_complete_speculative = heapam_tuple_complete_speculative,
.multi_insert = heap_multi_insert,
+
+ .tuple_modify_begin = heap_modify_begin,
+ .tuple_modify_buffer_insert = heap_modify_buffer_insert,
+ .tuple_modify_buffer_flush = heap_modify_buffer_flush,
+ .tuple_modify_end = heap_modify_end,
+
.tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index e9b598256f..772f29b1b5 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,11 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->scan_sample_next_block != NULL);
Assert(routine->scan_sample_next_tuple != NULL);
+ Assert(routine->tuple_modify_begin != NULL);
+ Assert(routine->tuple_modify_buffer_insert != NULL);
+ Assert(routine->tuple_modify_buffer_flush != NULL);
+ Assert(routine->tuple_modify_end != NULL);
+
return routine;
}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b951466ced..b9404bb83d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -272,6 +272,33 @@ typedef enum
PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */
} PruneReason;
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS 1000
+
+typedef struct HeapMultiInsertState
+{
+ /* Array of buffered slots */
+ TupleTableSlot **slots;
+
+ /* Number of buffered slots currently held */
+ int cur_slots;
+
+ /* Memory context for dealing with multi inserts */
+ MemoryContext mem_ctx;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+ struct BulkInsertStateData *bistate;
+ HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
/* ----------------
* function prototypes for heap access method
*
@@ -322,6 +349,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
int ntuples, CommandId cid, int options,
BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
extern TM_Result heap_delete(Relation relation, ItemPointer tid,
CommandId cid, Snapshot crosscheck, bool wait,
struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index adb478a93c..57b71eef38 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -254,11 +254,42 @@ typedef struct TM_IndexDeleteOp
TM_IndexStatus *status;
} TM_IndexDeleteOp;
+struct TableModifyState;
+
+/* Callback invoked upon flushing each buffered tuple */
+typedef void (*TableModifyBufferFlushCb) (void *context,
+ TupleTableSlot *slot);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+ /* These fields are used for inserts for now */
+
+ Relation rel; /* Relation to insert to */
+ CommandId cid; /* Command ID for insert */
+ int options; /* TABLE_INSERT options */
+
+ /* Memory context for dealing with modify state variables */
+ MemoryContext mem_ctx;
+
+ /* Flush callback and its context used for multi inserts */
+ TableModifyBufferFlushCb buffer_flush_cb;
+ void *buffer_flush_ctx;
+
+ /* Table AM specific data */
+ void *data;
+} TableModifyState;
+
/* "options" flag bits for table_tuple_insert */
/* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
#define TABLE_INSERT_SKIP_FSM 0x0002
#define TABLE_INSERT_FROZEN 0x0004
#define TABLE_INSERT_NO_LOGICAL 0x0008
+/*
+ * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for
+ * HEAP_INSERT_SPECULATIVE.
+ */
+#define TABLE_INSERT_BAS_BULKWRITE 0x0020
/* flag bits for table_tuple_lock */
/* Follow tuples whose update is in progress if lock modes don't conflict */
@@ -577,6 +608,21 @@ typedef struct TableAmRoutine
void (*finish_bulk_insert) (Relation rel, int options);
+ /* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+ TableModifyState *(*tuple_modify_begin) (Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+ void (*tuple_modify_buffer_insert) (TableModifyState *state,
+ TupleTableSlot *slot);
+ void (*tuple_modify_buffer_flush) (TableModifyState *state);
+ void (*tuple_modify_end) (TableModifyState *state);
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
@@ -1608,6 +1654,41 @@ table_finish_bulk_insert(Relation rel, int options)
rel->rd_tableam->finish_bulk_insert(rel, options);
}
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+static inline TableModifyState *
+table_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ return rel->rd_tableam->tuple_modify_begin(rel,
+ cid,
+ options,
+ buffer_flush_cb,
+ buffer_flush_ctx);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_end(state);
+}
/* ------------------------------------------------------------------------
* DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 171a7dd5d2..e7ddf29c16 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1147,6 +1147,8 @@ HeadlineJsonState
HeadlineParsedText
HeadlineWordEntry
HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
HeapPageFreeze
HeapScanDesc
HeapTuple
@@ -2873,6 +2875,7 @@ TableFuncScanState
TableFuncType
TableInfo
TableLikeClause
+TableModifyState
TableSampleClause
TableScanDesc
TableScanDescData
--
2.40.1
[application/octet-stream] v24-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch (14.3K, 4-v24-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch)
download | inline diff:
From fd64f007092f4a71d61aa1f3347390da05c2460c Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Sat, 26 Oct 2024 12:37:14 +0000
Subject: [PATCH v24 3/4] Use new multi-inserts table AM for COPY ... FROM
This commit uses the new multi-inserts table AM added by commit
<<CHANGE_ME>> for COPY ... FROM command.
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/commands/copyfrom.c | 254 +++++++++++++++--------
src/include/commands/copyfrom_internal.h | 4 +-
src/tools/pgindent/typedefs.list | 1 +
3 files changed, 171 insertions(+), 88 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 07cbd5d22b..18fb609cbe 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -74,14 +74,27 @@
*/
#define MAX_PARTITION_BUFFERS 32
+/* Context for multi-inserts buffer flush callback */
+typedef struct MultiInsertBufferFlushCtx
+{
+ CopyFromState cstate;
+ ResultRelInfo *resultRelInfo;
+ EState *estate;
+} MultiInsertBufferFlushCtx;
+
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TableModifyState *mstate; /* Table insert state; NULL if foreign table */
+ TupleTableSlot **slots; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
- BulkInsertState bistate; /* BulkInsertState for this rel if plain
- * table; NULL if foreign table */
+ TupleTableSlot *mislot; /* Slot used for multi-inserts */
+ MultiInsertBufferFlushCtx *mibufferctx; /* Multi-inserts buffer flush
+ * callback context */
int nused; /* number of 'slots' containing tuples */
+ int currslotno; /* Current buffered slot number that's being
+ * flushed; Used to get correct cur_lineno for
+ * errors while in flush callback. */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -216,19 +229,96 @@ CopyLimitPrintoutLength(const char *str)
return res;
}
+/*
+ * Implements for multi-inserts buffer flush callback
+ * i.e. TableModifyEndCallback.
+ *
+ * NB: Caller must take care of opening and closing the indices.
+ */
+static void
+MultiInsertBufferFlushCb(void *context, TupleTableSlot *slot)
+{
+ MultiInsertBufferFlushCtx *mibufferctx = (MultiInsertBufferFlushCtx *) context;
+ CopyFromState cstate = mibufferctx->cstate;
+ ResultRelInfo *resultRelInfo = mibufferctx->resultRelInfo;
+ EState *estate = mibufferctx->estate;
+ CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+ /*
+ * If there are any indexes, update them for all the inserted tuples, and
+ * run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+ recheckIndexes =
+ ExecInsertIndexTuples(resultRelInfo,
+ slot, estate, false,
+ false, NULL, NIL, false);
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, recheckIndexes,
+ cstate->transition_capture);
+
+ list_free(recheckIndexes);
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+ * anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, NIL,
+ cstate->transition_capture);
+ }
+
+ Assert(buffer->currslotno <= buffer->nused);
+}
+
/*
* Allocate memory and initialize a new CopyMultiInsertBuffer for this
* ResultRelInfo.
*/
static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+ CopyFromState cstate, EState *estate)
{
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ buffer->mibufferctx =
+ (MultiInsertBufferFlushCtx *) palloc(sizeof(MultiInsertBufferFlushCtx));
+ buffer->mibufferctx->cstate = cstate;
+ buffer->mibufferctx->resultRelInfo = rri;
+ buffer->mibufferctx->estate = estate;
+
+ buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+ miinfo->mycid,
+ miinfo->ti_options,
+ MultiInsertBufferFlushCb,
+ buffer->mibufferctx);
+
+ buffer->slots = NULL;
+ }
+ else
+ {
+ buffer->mstate = NULL;
+ buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ }
+
+ buffer->mislot = NULL;
buffer->resultRelInfo = rri;
- buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
return buffer;
@@ -239,11 +329,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
*/
static inline void
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
- ResultRelInfo *rri)
+ ResultRelInfo *rri, CopyFromState cstate,
+ EState *estate)
{
CopyMultiInsertBuffer *buffer;
- buffer = CopyMultiInsertBufferInit(rri);
+ buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
/* Setup back-link so we can easily find this buffer again */
rri->ri_CopyMultiInsertBuffer = buffer;
@@ -276,7 +367,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
* tuples their way for the first time.
*/
if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
- CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+ CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
}
/*
@@ -320,8 +411,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
int batch_size = resultRelInfo->ri_BatchSize;
int sent = 0;
- Assert(buffer->bistate == NULL);
-
/* Ensure that the FDW supports batching and it's enabled */
Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
Assert(batch_size > 1);
@@ -393,13 +482,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
}
else
{
- CommandId mycid = miinfo->mycid;
- int ti_options = miinfo->ti_options;
bool line_buf_valid = cstate->line_buf_valid;
uint64 save_cur_lineno = cstate->cur_lineno;
- MemoryContext oldcontext;
-
- Assert(buffer->bistate != NULL);
/*
* Print error context information correctly, if one of the operations
@@ -407,56 +491,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
*/
cstate->line_buf_valid = false;
- /*
- * table_multi_insert may leak memory, so switch to short-lived memory
- * context before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- table_multi_insert(resultRelInfo->ri_RelationDesc,
- slots,
- nused,
- mycid,
- ti_options,
- buffer->bistate);
- MemoryContextSwitchTo(oldcontext);
+ Assert(buffer->currslotno <= buffer->nused);
+ buffer->currslotno = 0;
- for (i = 0; i < nused; i++)
- {
- /*
- * If there are any indexes, update them for all the inserted
- * tuples, and run AFTER ROW INSERT triggers.
- */
- if (resultRelInfo->ri_NumIndices > 0)
- {
- List *recheckIndexes;
-
- cstate->cur_lineno = buffer->linenos[i];
- recheckIndexes =
- ExecInsertIndexTuples(resultRelInfo,
- buffer->slots[i], estate, false,
- false, NULL, NIL, false);
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], recheckIndexes,
- cstate->transition_capture);
- list_free(recheckIndexes);
- }
+ table_modify_buffer_flush(buffer->mstate);
- /*
- * There's no indexes, but see if we need to run AFTER ROW INSERT
- * triggers anyway.
- */
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
- {
- cstate->cur_lineno = buffer->linenos[i];
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], NIL,
- cstate->transition_capture);
- }
+ Assert(buffer->currslotno <= buffer->nused);
+ buffer->currslotno = 0;
- ExecClearTuple(slots[i]);
- }
+ /*
+ * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+ * in the flush callback CopyModifyBufferFlushCallback.
+ */
/* Update the row counter and progress of the COPY command */
*processed += nused;
@@ -492,19 +538,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
if (resultRelInfo->ri_FdwRoutine == NULL)
{
- Assert(buffer->bistate != NULL);
- FreeBulkInsertState(buffer->bistate);
+ table_modify_end(buffer->mstate);
+ ExecDropSingleTupleTableSlot(buffer->mislot);
+ pfree(buffer->mibufferctx);
}
else
- Assert(buffer->bistate == NULL);
-
- /* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
- ExecDropSingleTupleTableSlot(buffer->slots[i]);
+ {
+ /* Since we only create slots on demand, just drop the non-null ones. */
+ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(buffer->slots[i]);
- if (resultRelInfo->ri_FdwRoutine == NULL)
- table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
- miinfo->ti_options);
+ pfree(buffer->slots);
+ }
pfree(buffer);
}
@@ -598,15 +643,36 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
{
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
int nused;
+ TupleTableSlot *slot;
Assert(buffer != NULL);
Assert(buffer->nused < MAX_BUFFERED_TUPLES);
nused = buffer->nused;
- if (buffer->slots[nused] == NULL)
- buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
- return buffer->slots[nused];
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ if (buffer->mislot == NULL)
+ {
+ buffer->mislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+ &TTSOpsVirtual);
+ }
+
+ /* Caller must clear the slot */
+ slot = buffer->mislot;
+ }
+ else
+ {
+ if (buffer->slots[nused] == NULL)
+ {
+ slot = table_slot_create(rri->ri_RelationDesc, NULL);
+ buffer->slots[nused] = slot;
+ }
+ else
+ slot = buffer->slots[nused];
+ }
+
+ return slot;
}
/*
@@ -620,7 +686,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
Assert(buffer != NULL);
- Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+ if (rri->ri_FdwRoutine != NULL)
+ Assert(slot == buffer->slots[buffer->nused]);
+#endif
/* Store the line number so we can properly report any errors later */
buffer->linenos[buffer->nused] = lineno;
@@ -628,6 +698,22 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
/* Record this slot as being used */
buffer->nused++;
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ Assert(slot == buffer->mislot);
+ buffer->currslotno = 0;
+
+ table_modify_buffer_insert(buffer->mstate, slot);
+ }
+ else
+ {
+ /*
+ * The slot previously might point into the per-tuple context. For
+ * batching it needs to be longer lived.
+ */
+ ExecMaterializeSlot(slot);
+ }
+
/* Update how many tuples are stored and their size */
miinfo->bufferedTuples++;
miinfo->bufferedBytes += tuplen;
@@ -841,7 +927,7 @@ CopyFrom(CopyFromState cstate)
/*
* It's generally more efficient to prepare a bunch of tuples for
* insertion, and insert them in one
- * table_multi_insert()/ExecForeignBatchInsert() call, than call
+ * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
* table_tuple_insert()/ExecForeignInsert() separately for every tuple.
* However, there are a number of reasons why we might not be able to do
* this. These are explained below.
@@ -925,7 +1011,8 @@ CopyFrom(CopyFromState cstate)
insertMethod = CIM_MULTI;
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
- estate, mycid, ti_options);
+ estate, mycid,
+ ti_options | TABLE_INSERT_BAS_BULKWRITE);
}
/*
@@ -1094,7 +1181,8 @@ CopyFrom(CopyFromState cstate)
{
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
- resultRelInfo);
+ resultRelInfo, cstate,
+ estate);
}
else if (insertMethod == CIM_MULTI_CONDITIONAL &&
!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
@@ -1224,12 +1312,6 @@ CopyFrom(CopyFromState cstate)
/* Store the slot in the multi-insert buffer, when enabled. */
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
{
- /*
- * The slot previously might point into the per-tuple
- * context. For batching it needs to be longer lived.
- */
- ExecMaterializeSlot(myslot);
-
/* Add this tuple to the tuple buffer */
CopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo, myslot,
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
typedef enum CopyInsertMethod
{
CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */
- CIM_MULTI, /* always use table_multi_insert or
+ CIM_MULTI, /* always use table_modify_buffer_insert or
* ExecForeignBatchInsert */
- CIM_MULTI_CONDITIONAL, /* use table_multi_insert or
+ CIM_MULTI_CONDITIONAL, /* use table_modify_buffer_insert or
* ExecForeignBatchInsert only if valid */
} CopyInsertMethod;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e7ddf29c16..bf21e43ce1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1664,6 +1664,7 @@ MonotonicFunction
MorphOpaque
MsgType
MultiAssignRef
+MultiInsertBufferFlushCtx
MultiSortSupport
MultiSortSupportData
MultiXactId
--
2.40.1
view thread (30+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
In-Reply-To: <CALj2ACWUUTFrTF0L_xdj64i9xsjf_+4nt_HPEBiM17cxEk4CWg@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox