public inbox for [email protected]
help / color / mirror / Atom feedFrom: Bharath Rupireddy <[email protected]>
To: Jingtang Zhang <[email protected]>
Cc: [email protected]
Cc: PostgreSQL-development <[email protected]>
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
Date: Wed, 30 Oct 2024 10:51:23 -0700
Message-ID: <CALj2ACUVE8CYvYrudem4bR7W3sNRE-akC4B996K65_7C6xTBJQ@mail.gmail.com> (raw)
In-Reply-To: <CAPsk3_AuOsLcOrgTX4-QDP0Qv+AHBuuH3fTK6MwVacH6gvR1Nw@mail.gmail.com>
References: <CALj2ACXdrOmB6Na9amHWZHKvRT3Z0nwTRsCwoMT-npOBtmXLXg@mail.gmail.com>
<[email protected]>
<CALj2ACX5UMWVFdrRNUE0KDrg54WV1cumBXwcETXhrPc1ibKAQA@mail.gmail.com>
<CAAWbhmj5Pio3nOUakObzLGCSS9dwFfgsNVDhwTGzXNwZc00uCQ@mail.gmail.com>
<CALj2ACVHC=c6eC9SRxhcTUrnXvNDNkEBgedi2WkVJYRb=0sWYw@mail.gmail.com>
<CALj2ACVE2h=LnFnpr3rh+6SZzdwzW5EZOYG2Z0t=p28Fn75eag@mail.gmail.com>
<CALj2ACWT0Rz8oybWBm5W4CeS0DvFkwaw-pEvGArhDLyPbZnW_g@mail.gmail.com>
<CALj2ACWxO3HPtpYZb765LZk-uKVuAvZPO1HDeZ8=mzMgVPgaww@mail.gmail.com>
<CALj2ACXJA4QQ_6zAHez0Uy-9t-ebmpox2y1QBja+mF4QP+h8WQ@mail.gmail.com>
<CAD21AoD97mhzF8cqsd2v1jg9z8xfvAJrPx6Wvi+Ev0Hmu96LJA@mail.gmail.com>
<CALj2ACUcv5pZoB0=gRrz54M9+YT9JCmo6FYyo5WUS6wnS+em=A@mail.gmail.com>
<CALj2ACWm77YofBMs9x3Zmp3ctNAhcS4TvPVuXKdfwCr22FqOHg@mail.gmail.com>
<[email protected]>
<CALj2ACWqVzhxDuWNTWAH-LuADvsyX0r-wpwgeJ+Q1FnAKjY5Yw@mail.gmail.com>
<[email protected]>
<CALj2ACU70HZm+0QRJdkGA5RdJUo4zPYnV2hzkiV-wH5QS2PAEQ@mail.gmail.com>
<[email protected]>
<CALj2ACVMV=gMROte2=0LBFnSCRvzL4D9WK6oQ9ZHr4Qj2S8xWA@mail.gmail.com>
<[email protected]>
<CALj2ACX9vVYHYkX8e6w058EuAs8JL5EsnzadTxGhpiE_Ep_ByA@mail.gmail.com>
<[email protected]>
<CALj2ACWTrx1zxWvq8Uj2rEwCsDgQHeJ53WdvzZUw3kW+_VPG6A@mail.gmail.com>
<CALj2ACUz5+_YNEa4ZY-XG960_oXefM50MjD71VgSCAVDkF3bzQ@mail.gmail.com>
<[email protected]>
<CALj2ACX90L5Mb5Vv=jsvhOdZ8BVsfpZf-CdCGhtm2N+bGUCSjg@mail.gmail.com>
<CALj2ACXnCvcYvaz4aDH_ezUBJanz_Boi8W=76fOwrxiwSnUFOQ@mail.gmail.com>
<[email protected]>
<[email protected]>
<CALj2ACUBwVu_1LOWF07Acv+dWx5NO7uTqAPMNwqi5WpJRw69_Q@mail.gmail.com>
<[email protected]>
<CALj2ACWUUTFrTF0L_xdj64i9xsjf_+4nt_HPEBiM17cxEk4CWg@mail.gmail.com>
<CAPsk3_AuOsLcOrgTX4-QDP0Qv+AHBuuH3fTK6MwVacH6gvR1Nw@mail.gmail.com>
Hi,
Thanks for looking into this.
On Mon, Oct 28, 2024 at 8:18 PM Jingtang Zhang <[email protected]> wrote:
>
> Just found that the initialization
> seems redundant since we have used palloc0?
>
> > + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
> > + istate->bistate = NULL;
> > + istate->mistate = NULL;
Changed it to palloc() and explicit initializations of the members.
With this, only TupleTableSlot's array in HeapMultiInsertState uses
palloc0(), the rest all use explicit initializations.
> Oh, another comments for v24-0001 patch: we are in heam AM now, should we use
> something like HEAP_INSERT_BAS_BULKWRITE instead of using table AM option,
> just like other heap AM options do?
>
> > + if ((state->options & TABLE_INSERT_BAS_BULKWRITE) != 0)
> > + istate->bistate = GetBulkInsertState();
Defined HEAP_INSERT_BAS_BULKWRITE and used that in heapam.c similar to
INSERT_SKIP_FSM, INSERT_FROZEN, NO_LOGICAL.
> Little question about v24 0002 patch: would it be better to move the
> implementation of TableModifyIsMultiInsertsSupported to somewhere for table AM
> level? Seems it is a common function for future use, not a specific one for
> matview.
It's more tailored for CREATE TABLE AS and CREATE/REFRESH MATERIALIZED
VIEW in the sense that no triggers, foreign table and partitioned
table possible here. INSERT INTO SELECT and Logical Replication Apply
will have a lot more conditions (e.g. RETURNING clause, triggers etc.)
and they will need to be handled differently. So, I left
TableModifyIsMultiInsertsSupported as-is in a common place in
matview.c.
Please find the attached v25 patch set.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Attachments:
[application/octet-stream] v25-0001-Introduce-new-table-AM-for-multi-inserts.patch (15.8K, 2-v25-0001-Introduce-new-table-AM-for-multi-inserts.patch)
download | inline diff:
From c8277e6f5e9a72baebe993b2241d34a7d427473d Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:13:20 +0000
Subject: [PATCH v25 1/3] Introduce new table AM for multi-inserts
Until now, it's the COPY ... FROM command using multi inserts
(i.e. buffer some tuples and inserts them to table at once).
Basic idea of multi-inserts is that less WAL and reduced buffer
locking. Multi-inserts is faster than calling heap_insert() in a
loop, because when multiple tuples can be inserted on a single
page, we can write just a single WAL record covering all of them,
and only need to lock/unlock the page once.
Various other commands can benefit from this multi-inserts logic
[Reusable].
Also, there's a need to have these multi-inserts AMs (Access
Methods) as scan-like API [Usability]. With this, various table
AMs can define their own buffering and flushing strategy
[Flexibility] based on the way they store the data in the
underlying storage (e.g. columnar).
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/access/heap/heapam.c | 211 ++++++++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 6 +
src/backend/access/table/tableamapi.c | 5 +
src/include/access/heapam.h | 39 +++++
src/include/access/tableam.h | 81 +++++++++
src/tools/pgindent/typedefs.list | 3 +
6 files changed, 344 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1748eafa10..69b21cf12c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -50,6 +50,7 @@
#include "storage/procarray.h"
#include "utils/datum.h"
#include "utils/inval.h"
+#include "utils/memutils.h"
#include "utils/spccache.h"
@@ -102,7 +103,7 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
bool *copy);
-
+static void heap_modify_insert_end(TableModifyState *state);
/*
* Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2603,6 +2604,214 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
pgstat_count_heap_insert(relation, ntuples);
}
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ TableModifyState *state;
+ MemoryContext context;
+ MemoryContext oldcontext;
+
+ context = AllocSetContextCreate(TopTransactionContext,
+ "heap_modify memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(context);
+ state = palloc(sizeof(TableModifyState));
+ state->rel = rel;
+ state->cid = cid;
+ state->options = options;
+ state->mem_ctx = context;
+ state->buffer_flush_cb = buffer_flush_cb;
+ state->buffer_flush_ctx = buffer_flush_ctx;
+ state->data = NULL; /* To be set lazily */
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot)
+{
+ TupleTableSlot *dstslot;
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(state->mem_ctx);
+
+ /* First time through, initialize heap insert state */
+ if (state->data == NULL)
+ {
+ istate = (HeapInsertState *) palloc(sizeof(HeapInsertState));
+ istate->bistate = NULL;
+ istate->mistate = NULL;
+ state->data = istate;
+ mistate =
+ (HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState));
+ mistate->slots =
+ (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+ mistate->cur_slots = 0;
+ istate->mistate = mistate;
+
+ /*
+ * heap_multi_insert() can leak memory. So switch to this memory
+ * context before every heap_multi_insert() call and reset when
+ * finished.
+ */
+ mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "heap_multi_insert memory context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ if ((state->options & HEAP_INSERT_BAS_BULKWRITE) != 0)
+ istate->bistate = GetBulkInsertState();
+ }
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+ dstslot = mistate->slots[mistate->cur_slots];
+
+ if (dstslot == NULL)
+ {
+ /*
+ * We use virtual tuple slots buffered slots for leveraging the
+ * optimization it provides to minimize physical data copying. The
+ * virtual slot gets materialized when we copy (via below
+ * ExecCopySlot) the tuples from the source slot which can be of any
+ * type. This way, it is ensured that the tuple storage doesn't depend
+ * on external memory, because all the datums that aren't passed by
+ * value are copied into the slot's memory context.
+ */
+ dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+ &TTSOpsVirtual);
+
+ mistate->slots[mistate->cur_slots] = dstslot;
+ }
+
+ Assert(TTS_IS_VIRTUAL(dstslot));
+
+ /*
+ * Note that the copy clears the previous destination slot contents, so no
+ * need to explicitly ExecClearTuple() here.
+ */
+ ExecCopySlot(dstslot, slot);
+
+ mistate->cur_slots++;
+
+ if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS)
+ heap_modify_buffer_flush(state);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+ HeapInsertState *istate;
+ HeapMultiInsertState *mistate;
+ MemoryContext oldcontext;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+ Assert(istate->mistate != NULL);
+ mistate = istate->mistate;
+
+ /* Quick exit if we have flushed already */
+ if (mistate->cur_slots == 0)
+ return;
+
+ /*
+ * heap_multi_insert() can leak memory, so switch to short-lived memory
+ * context before calling it.
+ */
+ oldcontext = MemoryContextSwitchTo(mistate->mem_ctx);
+ heap_multi_insert(state->rel,
+ mistate->slots,
+ mistate->cur_slots,
+ state->cid,
+ state->options,
+ istate->bistate);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(mistate->mem_ctx);
+
+ /*
+ * Invoke caller-supplied buffer flush callback after inserting rows from
+ * the buffers to heap.
+ */
+ if (state->buffer_flush_cb != NULL)
+ {
+ for (int i = 0; i < mistate->cur_slots; i++)
+ {
+ state->buffer_flush_cb(state->buffer_flush_ctx,
+ mistate->slots[i]);
+ }
+ }
+
+ mistate->cur_slots = 0;
+}
+
+/*
+ * Heap insert specific function used for performing work at the end like
+ * flushing remaining buffered tuples, cleaning up the insert state and tuple
+ * table slots used for buffered tuples etc.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+ HeapInsertState *istate;
+
+ /* Quick exit if we haven't inserted anything yet */
+ if (state->data == NULL)
+ return;
+
+ istate = (HeapInsertState *) state->data;
+
+ if (istate->mistate != NULL)
+ {
+ HeapMultiInsertState *mistate = istate->mistate;
+
+ heap_modify_buffer_flush(state);
+
+ Assert(mistate->cur_slots == 0);
+
+ for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+ MemoryContextDelete(mistate->mem_ctx);
+ }
+
+ if (istate->bistate != NULL)
+ FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+ heap_modify_insert_end(state);
+ MemoryContextDelete(state->mem_ctx);
+}
+
/*
* simple_heap_insert - insert a tuple
*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index a8d95e0f1c..d2ef6b4b78 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2644,6 +2644,12 @@ static const TableAmRoutine heapam_methods = {
.tuple_insert_speculative = heapam_tuple_insert_speculative,
.tuple_complete_speculative = heapam_tuple_complete_speculative,
.multi_insert = heap_multi_insert,
+
+ .tuple_modify_begin = heap_modify_begin,
+ .tuple_modify_buffer_insert = heap_modify_buffer_insert,
+ .tuple_modify_buffer_flush = heap_modify_buffer_flush,
+ .tuple_modify_end = heap_modify_end,
+
.tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index e9b598256f..772f29b1b5 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,11 @@ GetTableAmRoutine(Oid amhandler)
Assert(routine->scan_sample_next_block != NULL);
Assert(routine->scan_sample_next_tuple != NULL);
+ Assert(routine->tuple_modify_begin != NULL);
+ Assert(routine->tuple_modify_buffer_insert != NULL);
+ Assert(routine->tuple_modify_buffer_flush != NULL);
+ Assert(routine->tuple_modify_end != NULL);
+
return routine;
}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 96cf82f97b..a9722ce947 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -37,6 +37,7 @@
#define HEAP_INSERT_FROZEN TABLE_INSERT_FROZEN
#define HEAP_INSERT_NO_LOGICAL TABLE_INSERT_NO_LOGICAL
#define HEAP_INSERT_SPECULATIVE 0x0010
+#define HEAP_INSERT_BAS_BULKWRITE TABLE_INSERT_BAS_BULKWRITE
/* "options" flag bits for heap_page_prune_and_freeze */
#define HEAP_PAGE_PRUNE_MARK_UNUSED_NOW (1 << 0)
@@ -272,6 +273,33 @@ typedef enum
PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */
} PruneReason;
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS 1000
+
+typedef struct HeapMultiInsertState
+{
+ /* Array of buffered slots */
+ TupleTableSlot **slots;
+
+ /* Number of buffered slots currently held */
+ int cur_slots;
+
+ /* Memory context for dealing with multi inserts */
+ MemoryContext mem_ctx;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+ struct BulkInsertStateData *bistate;
+ HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
/* ----------------
* function prototypes for heap access method
*
@@ -322,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
int ntuples, CommandId cid, int options,
BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+ TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
extern TM_Result heap_delete(Relation relation, ItemPointer tid,
CommandId cid, Snapshot crosscheck, bool wait,
struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index adb478a93c..57b71eef38 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -254,11 +254,42 @@ typedef struct TM_IndexDeleteOp
TM_IndexStatus *status;
} TM_IndexDeleteOp;
+struct TableModifyState;
+
+/* Callback invoked upon flushing each buffered tuple */
+typedef void (*TableModifyBufferFlushCb) (void *context,
+ TupleTableSlot *slot);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+ /* These fields are used for inserts for now */
+
+ Relation rel; /* Relation to insert to */
+ CommandId cid; /* Command ID for insert */
+ int options; /* TABLE_INSERT options */
+
+ /* Memory context for dealing with modify state variables */
+ MemoryContext mem_ctx;
+
+ /* Flush callback and its context used for multi inserts */
+ TableModifyBufferFlushCb buffer_flush_cb;
+ void *buffer_flush_ctx;
+
+ /* Table AM specific data */
+ void *data;
+} TableModifyState;
+
/* "options" flag bits for table_tuple_insert */
/* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
#define TABLE_INSERT_SKIP_FSM 0x0002
#define TABLE_INSERT_FROZEN 0x0004
#define TABLE_INSERT_NO_LOGICAL 0x0008
+/*
+ * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for
+ * HEAP_INSERT_SPECULATIVE.
+ */
+#define TABLE_INSERT_BAS_BULKWRITE 0x0020
/* flag bits for table_tuple_lock */
/* Follow tuples whose update is in progress if lock modes don't conflict */
@@ -577,6 +608,21 @@ typedef struct TableAmRoutine
void (*finish_bulk_insert) (Relation rel, int options);
+ /* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+ TableModifyState *(*tuple_modify_begin) (Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx);
+ void (*tuple_modify_buffer_insert) (TableModifyState *state,
+ TupleTableSlot *slot);
+ void (*tuple_modify_buffer_flush) (TableModifyState *state);
+ void (*tuple_modify_end) (TableModifyState *state);
+
+
/* ------------------------------------------------------------------------
* DDL related functionality.
* ------------------------------------------------------------------------
@@ -1608,6 +1654,41 @@ table_finish_bulk_insert(Relation rel, int options)
rel->rd_tableam->finish_bulk_insert(rel, options);
}
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+static inline TableModifyState *
+table_modify_begin(Relation rel,
+ CommandId cid,
+ int options,
+ TableModifyBufferFlushCb buffer_flush_cb,
+ void *buffer_flush_ctx)
+{
+ return rel->rd_tableam->tuple_modify_begin(rel,
+ cid,
+ options,
+ buffer_flush_cb,
+ buffer_flush_ctx);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+ state->rel->rd_tableam->tuple_modify_end(state);
+}
/* ------------------------------------------------------------------------
* DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 171a7dd5d2..e7ddf29c16 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1147,6 +1147,8 @@ HeadlineJsonState
HeadlineParsedText
HeadlineWordEntry
HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
HeapPageFreeze
HeapScanDesc
HeapTuple
@@ -2873,6 +2875,7 @@ TableFuncScanState
TableFuncType
TableInfo
TableLikeClause
+TableModifyState
TableSampleClause
TableScanDesc
TableScanDescData
--
2.40.1
[application/octet-stream] v25-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch (11.1K, 3-v25-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch)
download | inline diff:
From 5a16c618c4c0875544eb866bee74b68650737fbd Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:29:55 +0000
Subject: [PATCH v25 2/3] Optimize CTAS/CMV/RMV with new multi-inserts table AM
This commit optimizes the following commands for heap AM using new
multi-inserts table AM added by commit <<CHANGE_ME>>:
- CREATE TABLE AS
- CREATE MATERIALIZED VIEW
- REFRESH MATERIALIZED VIEW
Testing shows that performance of CTAS, CMV, RMV is improved by
<<TO_FILL>> respectively on <<TO_FILL>> system.
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/commands/createas.c | 60 ++++++++++++++----
src/backend/commands/matview.c | 106 +++++++++++++++++++++++++++++---
src/include/commands/matview.h | 3 +
3 files changed, 147 insertions(+), 22 deletions(-)
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 5c92e48a56..55fd439468 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -38,6 +38,7 @@
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/queryjumble.h"
+#include "optimizer/optimizer.h"
#include "parser/analyze.h"
#include "rewrite/rewriteHandler.h"
#include "tcop/tcopprot.h"
@@ -56,6 +57,12 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+
+ /* Table modify state. NULL if multi-inserts isn't supported. */
+ TableModifyState *mstate;
+
+ /* True if SELECT query contains volatile functions */
+ bool volatile_funcs;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -321,6 +328,10 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params);
+ /* Check if the SELECT query has any volatile functions */
+ ((DR_intorel *) dest)->volatile_funcs =
+ contain_volatile_functions_after_planning((Expr *) query);
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only
@@ -556,16 +567,32 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->rel = intoRelationDesc;
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM;
+ myState->ti_options = TABLE_INSERT_SKIP_FSM |
+ TABLE_INSERT_BAS_BULKWRITE;
+ myState->mstate = NULL;
+ myState->bistate = NULL;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
- * bulk inserts as there are no tuples to insert.
+ * multi or bulk inserts as there are no tuples to insert.
*/
if (!into->skipData)
- myState->bistate = GetBulkInsertState();
- else
- myState->bistate = NULL;
+ {
+ if (TableModifyIsMultiInsertsSupported(myState->rel,
+ myState->volatile_funcs))
+ {
+ myState->mstate = table_modify_begin(myState->rel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, /* Multi-insert buffer
+ * flush callback */
+ NULL); /* Multi-insert buffer
+ * flush callback
+ * context */
+ }
+ else
+ myState->bistate = GetBulkInsertState();
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -593,11 +620,15 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
* would not be cheap either. This also doesn't allow accessing per-AM
* data (say a tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->rel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+
+ if (myState->mstate != NULL)
+ table_modify_buffer_insert(myState->mstate, slot);
+ else
+ table_tuple_insert(myState->rel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
}
/* We know this is a newly created relation, so there are no indexes */
@@ -616,8 +647,13 @@ intorel_shutdown(DestReceiver *self)
if (!into->skipData)
{
- FreeBulkInsertState(myState->bistate);
- table_finish_bulk_insert(myState->rel, myState->ti_options);
+ if (myState->mstate != NULL)
+ table_modify_end(myState->mstate);
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->rel, myState->ti_options);
+ }
}
/* close rel, but keep lock until commit */
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 010097873d..fa495ec533 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -30,7 +30,9 @@
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "executor/spi.h"
+#include "foreign/fdwapi.h"
#include "miscadmin.h"
+#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lmgr.h"
@@ -51,6 +53,12 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+
+ /* Table modify state. NULL if multi-inserts isn't supported. */
+ TableModifyState *mstate;
+
+ /* True if SELECT query contains volatile functions */
+ bool volatile_funcs;
} DR_transientrel;
static int matview_maintenance_depth = 0;
@@ -428,6 +436,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
/* Plan the query which will generate data for the refresh. */
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
+ /*
+ * Check if the stored MATERIALIZED VIEW query has any volatile functions.
+ */
+ ((DR_transientrel *) dest)->volatile_funcs =
+ contain_volatile_functions_after_planning((Expr *) query);
+
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries. (This could only matter if
@@ -492,8 +506,26 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
*/
myState->transientrel = transientrel;
myState->output_cid = GetCurrentCommandId(true);
- myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
- myState->bistate = GetBulkInsertState();
+ myState->ti_options = TABLE_INSERT_SKIP_FSM |
+ TABLE_INSERT_FROZEN |
+ TABLE_INSERT_BAS_BULKWRITE;
+ myState->bistate = NULL;
+ myState->mstate = NULL;
+
+ /* Set up the state for multi or bulk inserts */
+ if (TableModifyIsMultiInsertsSupported(myState->transientrel,
+ myState->volatile_funcs))
+ {
+ myState->mstate = table_modify_begin(myState->transientrel,
+ myState->output_cid,
+ myState->ti_options,
+ NULL, /* Multi-insert buffer
+ * flush callback */
+ NULL); /* Multi-insert buffer
+ * flush callback context */
+ }
+ else
+ myState->bistate = GetBulkInsertState();
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -519,11 +551,14 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
* tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->transientrel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ if (myState->mstate != NULL)
+ table_modify_buffer_insert(myState->mstate, slot);
+ else
+ table_tuple_insert(myState->transientrel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
/* We know this is a newly created relation, so there are no indexes */
@@ -538,9 +573,13 @@ transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
- FreeBulkInsertState(myState->bistate);
-
- table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ if (myState->mstate != NULL)
+ table_modify_end(myState->mstate);
+ else
+ {
+ FreeBulkInsertState(myState->bistate);
+ table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ }
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
@@ -984,3 +1023,50 @@ CloseMatViewIncrementalMaintenance(void)
matview_maintenance_depth--;
Assert(matview_maintenance_depth >= 0);
}
+
+/*
+ * Check if multi-inserts is supported.
+ *
+ * It's generally more efficient to prepare a bunch of tuples for insertion,
+ * and insert them in one multi-inserts call, than call
+ * table_tuple_insert() separately for every tuple. However, there are a
+ * number of reasons why we might not be able to do this. In general, can't
+ * support multi-inserts in the following cases:
+ *
+ * When there are any BEFORE/INSTEAD OF triggers on the table or any volatile
+ * functions/expressions in the SELECT query. Such triggers or volatile
+ * expressions might query the table we're inserting into and act differently
+ * if the tuples that have already been processed and prepared for insertion
+ * are not there.
+ *
+ * When inserting into partitioned table. For partitioned tables, we may still
+ * be able to perform multi-inserts. However, the possibility of this depends
+ * on which types of triggers exist on the partition. We must disable
+ * multi-inserts if the partition is a foreign table that can't use batching or
+ * it has any before row insert or insert instead triggers (same as we checked
+ * above for the parent table). We really can't know all these unless we start
+ * inserting tuples into the respective partitions. We can have an intermediate
+ * insert state to show the intent to do multi-inserts and later determine if
+ * we can use multi-inserts for the partition being inserted into.
+ *
+ * When inserting into foreign table. For foreign tables, we may still be able
+ * to do multi-inserts if the FDW supports batching.
+ */
+bool
+TableModifyIsMultiInsertsSupported(Relation rel, bool volatile_funcs)
+{
+ if (volatile_funcs)
+ return false;
+
+ /*
+ * For CREATE TABLE AS, CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED
+ * VIEW, we really can't have triggers or can't create table as
+ * partitioned or foreign. So, we will assert.
+ */
+ Assert(rel->trigdesc == NULL);
+ Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
+ Assert(rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE);
+
+ /* Can support multi-inserts */
+ return true;
+}
diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h
index c8811e8fc7..28abd7b89b 100644
--- a/src/include/commands/matview.h
+++ b/src/include/commands/matview.h
@@ -33,4 +33,7 @@ extern DestReceiver *CreateTransientRelDestReceiver(Oid transientoid);
extern bool MatViewIncrementalMaintenanceIsEnabled(void);
+extern bool TableModifyIsMultiInsertsSupported(Relation rel,
+ bool volatile_funcs);
+
#endif /* MATVIEW_H */
--
2.40.1
[application/octet-stream] v25-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch (14.3K, 4-v25-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch)
download | inline diff:
From 16d488cff14c0ce9ace648a1b99e507edb184e74 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:30:41 +0000
Subject: [PATCH v25 3/3] Use new multi-inserts table AM for COPY ... FROM
This commit uses the new multi-inserts table AM added by commit
<<CHANGE_ME>> for COPY ... FROM command.
Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
src/backend/commands/copyfrom.c | 254 +++++++++++++++--------
src/include/commands/copyfrom_internal.h | 4 +-
src/tools/pgindent/typedefs.list | 1 +
3 files changed, 171 insertions(+), 88 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 07cbd5d22b..18fb609cbe 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -74,14 +74,27 @@
*/
#define MAX_PARTITION_BUFFERS 32
+/* Context for multi-inserts buffer flush callback */
+typedef struct MultiInsertBufferFlushCtx
+{
+ CopyFromState cstate;
+ ResultRelInfo *resultRelInfo;
+ EState *estate;
+} MultiInsertBufferFlushCtx;
+
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TableModifyState *mstate; /* Table insert state; NULL if foreign table */
+ TupleTableSlot **slots; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
- BulkInsertState bistate; /* BulkInsertState for this rel if plain
- * table; NULL if foreign table */
+ TupleTableSlot *mislot; /* Slot used for multi-inserts */
+ MultiInsertBufferFlushCtx *mibufferctx; /* Multi-inserts buffer flush
+ * callback context */
int nused; /* number of 'slots' containing tuples */
+ int currslotno; /* Current buffered slot number that's being
+ * flushed; Used to get correct cur_lineno for
+ * errors while in flush callback. */
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -216,19 +229,96 @@ CopyLimitPrintoutLength(const char *str)
return res;
}
+/*
+ * Implements for multi-inserts buffer flush callback
+ * i.e. TableModifyEndCallback.
+ *
+ * NB: Caller must take care of opening and closing the indices.
+ */
+static void
+MultiInsertBufferFlushCb(void *context, TupleTableSlot *slot)
+{
+ MultiInsertBufferFlushCtx *mibufferctx = (MultiInsertBufferFlushCtx *) context;
+ CopyFromState cstate = mibufferctx->cstate;
+ ResultRelInfo *resultRelInfo = mibufferctx->resultRelInfo;
+ EState *estate = mibufferctx->estate;
+ CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+ /*
+ * If there are any indexes, update them for all the inserted tuples, and
+ * run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+ recheckIndexes =
+ ExecInsertIndexTuples(resultRelInfo,
+ slot, estate, false,
+ false, NULL, NIL, false);
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, recheckIndexes,
+ cstate->transition_capture);
+
+ list_free(recheckIndexes);
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+ * anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slot, NIL,
+ cstate->transition_capture);
+ }
+
+ Assert(buffer->currslotno <= buffer->nused);
+}
+
/*
* Allocate memory and initialize a new CopyMultiInsertBuffer for this
* ResultRelInfo.
*/
static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+ CopyFromState cstate, EState *estate)
{
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ buffer->mibufferctx =
+ (MultiInsertBufferFlushCtx *) palloc(sizeof(MultiInsertBufferFlushCtx));
+ buffer->mibufferctx->cstate = cstate;
+ buffer->mibufferctx->resultRelInfo = rri;
+ buffer->mibufferctx->estate = estate;
+
+ buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+ miinfo->mycid,
+ miinfo->ti_options,
+ MultiInsertBufferFlushCb,
+ buffer->mibufferctx);
+
+ buffer->slots = NULL;
+ }
+ else
+ {
+ buffer->mstate = NULL;
+ buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ }
+
+ buffer->mislot = NULL;
buffer->resultRelInfo = rri;
- buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
buffer->nused = 0;
return buffer;
@@ -239,11 +329,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
*/
static inline void
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
- ResultRelInfo *rri)
+ ResultRelInfo *rri, CopyFromState cstate,
+ EState *estate)
{
CopyMultiInsertBuffer *buffer;
- buffer = CopyMultiInsertBufferInit(rri);
+ buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
/* Setup back-link so we can easily find this buffer again */
rri->ri_CopyMultiInsertBuffer = buffer;
@@ -276,7 +367,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
* tuples their way for the first time.
*/
if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
- CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+ CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
}
/*
@@ -320,8 +411,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
int batch_size = resultRelInfo->ri_BatchSize;
int sent = 0;
- Assert(buffer->bistate == NULL);
-
/* Ensure that the FDW supports batching and it's enabled */
Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
Assert(batch_size > 1);
@@ -393,13 +482,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
}
else
{
- CommandId mycid = miinfo->mycid;
- int ti_options = miinfo->ti_options;
bool line_buf_valid = cstate->line_buf_valid;
uint64 save_cur_lineno = cstate->cur_lineno;
- MemoryContext oldcontext;
-
- Assert(buffer->bistate != NULL);
/*
* Print error context information correctly, if one of the operations
@@ -407,56 +491,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
*/
cstate->line_buf_valid = false;
- /*
- * table_multi_insert may leak memory, so switch to short-lived memory
- * context before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- table_multi_insert(resultRelInfo->ri_RelationDesc,
- slots,
- nused,
- mycid,
- ti_options,
- buffer->bistate);
- MemoryContextSwitchTo(oldcontext);
+ Assert(buffer->currslotno <= buffer->nused);
+ buffer->currslotno = 0;
- for (i = 0; i < nused; i++)
- {
- /*
- * If there are any indexes, update them for all the inserted
- * tuples, and run AFTER ROW INSERT triggers.
- */
- if (resultRelInfo->ri_NumIndices > 0)
- {
- List *recheckIndexes;
-
- cstate->cur_lineno = buffer->linenos[i];
- recheckIndexes =
- ExecInsertIndexTuples(resultRelInfo,
- buffer->slots[i], estate, false,
- false, NULL, NIL, false);
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], recheckIndexes,
- cstate->transition_capture);
- list_free(recheckIndexes);
- }
+ table_modify_buffer_flush(buffer->mstate);
- /*
- * There's no indexes, but see if we need to run AFTER ROW INSERT
- * triggers anyway.
- */
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
- {
- cstate->cur_lineno = buffer->linenos[i];
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], NIL,
- cstate->transition_capture);
- }
+ Assert(buffer->currslotno <= buffer->nused);
+ buffer->currslotno = 0;
- ExecClearTuple(slots[i]);
- }
+ /*
+ * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+ * in the flush callback CopyModifyBufferFlushCallback.
+ */
/* Update the row counter and progress of the COPY command */
*processed += nused;
@@ -492,19 +538,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
if (resultRelInfo->ri_FdwRoutine == NULL)
{
- Assert(buffer->bistate != NULL);
- FreeBulkInsertState(buffer->bistate);
+ table_modify_end(buffer->mstate);
+ ExecDropSingleTupleTableSlot(buffer->mislot);
+ pfree(buffer->mibufferctx);
}
else
- Assert(buffer->bistate == NULL);
-
- /* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
- ExecDropSingleTupleTableSlot(buffer->slots[i]);
+ {
+ /* Since we only create slots on demand, just drop the non-null ones. */
+ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(buffer->slots[i]);
- if (resultRelInfo->ri_FdwRoutine == NULL)
- table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
- miinfo->ti_options);
+ pfree(buffer->slots);
+ }
pfree(buffer);
}
@@ -598,15 +643,36 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
{
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
int nused;
+ TupleTableSlot *slot;
Assert(buffer != NULL);
Assert(buffer->nused < MAX_BUFFERED_TUPLES);
nused = buffer->nused;
- if (buffer->slots[nused] == NULL)
- buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
- return buffer->slots[nused];
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ if (buffer->mislot == NULL)
+ {
+ buffer->mislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+ &TTSOpsVirtual);
+ }
+
+ /* Caller must clear the slot */
+ slot = buffer->mislot;
+ }
+ else
+ {
+ if (buffer->slots[nused] == NULL)
+ {
+ slot = table_slot_create(rri->ri_RelationDesc, NULL);
+ buffer->slots[nused] = slot;
+ }
+ else
+ slot = buffer->slots[nused];
+ }
+
+ return slot;
}
/*
@@ -620,7 +686,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
Assert(buffer != NULL);
- Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+ if (rri->ri_FdwRoutine != NULL)
+ Assert(slot == buffer->slots[buffer->nused]);
+#endif
/* Store the line number so we can properly report any errors later */
buffer->linenos[buffer->nused] = lineno;
@@ -628,6 +698,22 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
/* Record this slot as being used */
buffer->nused++;
+ if (rri->ri_FdwRoutine == NULL)
+ {
+ Assert(slot == buffer->mislot);
+ buffer->currslotno = 0;
+
+ table_modify_buffer_insert(buffer->mstate, slot);
+ }
+ else
+ {
+ /*
+ * The slot previously might point into the per-tuple context. For
+ * batching it needs to be longer lived.
+ */
+ ExecMaterializeSlot(slot);
+ }
+
/* Update how many tuples are stored and their size */
miinfo->bufferedTuples++;
miinfo->bufferedBytes += tuplen;
@@ -841,7 +927,7 @@ CopyFrom(CopyFromState cstate)
/*
* It's generally more efficient to prepare a bunch of tuples for
* insertion, and insert them in one
- * table_multi_insert()/ExecForeignBatchInsert() call, than call
+ * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
* table_tuple_insert()/ExecForeignInsert() separately for every tuple.
* However, there are a number of reasons why we might not be able to do
* this. These are explained below.
@@ -925,7 +1011,8 @@ CopyFrom(CopyFromState cstate)
insertMethod = CIM_MULTI;
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
- estate, mycid, ti_options);
+ estate, mycid,
+ ti_options | TABLE_INSERT_BAS_BULKWRITE);
}
/*
@@ -1094,7 +1181,8 @@ CopyFrom(CopyFromState cstate)
{
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
- resultRelInfo);
+ resultRelInfo, cstate,
+ estate);
}
else if (insertMethod == CIM_MULTI_CONDITIONAL &&
!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
@@ -1224,12 +1312,6 @@ CopyFrom(CopyFromState cstate)
/* Store the slot in the multi-insert buffer, when enabled. */
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
{
- /*
- * The slot previously might point into the per-tuple
- * context. For batching it needs to be longer lived.
- */
- ExecMaterializeSlot(myslot);
-
/* Add this tuple to the tuple buffer */
CopyMultiInsertInfoStore(&multiInsertInfo,
resultRelInfo, myslot,
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
typedef enum CopyInsertMethod
{
CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */
- CIM_MULTI, /* always use table_multi_insert or
+ CIM_MULTI, /* always use table_modify_buffer_insert or
* ExecForeignBatchInsert */
- CIM_MULTI_CONDITIONAL, /* use table_multi_insert or
+ CIM_MULTI_CONDITIONAL, /* use table_modify_buffer_insert or
* ExecForeignBatchInsert only if valid */
} CopyInsertMethod;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e7ddf29c16..bf21e43ce1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1664,6 +1664,7 @@ MonotonicFunction
MorphOpaque
MsgType
MultiAssignRef
+MultiInsertBufferFlushCtx
MultiSortSupport
MultiSortSupportData
MultiXactId
--
2.40.1
view thread (8+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
In-Reply-To: <CALj2ACUVE8CYvYrudem4bR7W3sNRE-akC4B996K65_7C6xTBJQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox