public inbox for [email protected]  
help / color / mirror / Atom feed
From: Bharath Rupireddy <[email protected]>
To: Jingtang Zhang <[email protected]>
Cc: [email protected]
Cc: PostgreSQL-development <[email protected]>
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
Date: Wed, 30 Oct 2024 10:51:23 -0700
Message-ID: <CALj2ACUVE8CYvYrudem4bR7W3sNRE-akC4B996K65_7C6xTBJQ@mail.gmail.com> (raw)
In-Reply-To: <CAPsk3_AuOsLcOrgTX4-QDP0Qv+AHBuuH3fTK6MwVacH6gvR1Nw@mail.gmail.com>
References: <CALj2ACXdrOmB6Na9amHWZHKvRT3Z0nwTRsCwoMT-npOBtmXLXg@mail.gmail.com>
	<[email protected]>
	<CALj2ACX5UMWVFdrRNUE0KDrg54WV1cumBXwcETXhrPc1ibKAQA@mail.gmail.com>
	<CAAWbhmj5Pio3nOUakObzLGCSS9dwFfgsNVDhwTGzXNwZc00uCQ@mail.gmail.com>
	<CALj2ACVHC=c6eC9SRxhcTUrnXvNDNkEBgedi2WkVJYRb=0sWYw@mail.gmail.com>
	<CALj2ACVE2h=LnFnpr3rh+6SZzdwzW5EZOYG2Z0t=p28Fn75eag@mail.gmail.com>
	<CALj2ACWT0Rz8oybWBm5W4CeS0DvFkwaw-pEvGArhDLyPbZnW_g@mail.gmail.com>
	<CALj2ACWxO3HPtpYZb765LZk-uKVuAvZPO1HDeZ8=mzMgVPgaww@mail.gmail.com>
	<CALj2ACXJA4QQ_6zAHez0Uy-9t-ebmpox2y1QBja+mF4QP+h8WQ@mail.gmail.com>
	<CAD21AoD97mhzF8cqsd2v1jg9z8xfvAJrPx6Wvi+Ev0Hmu96LJA@mail.gmail.com>
	<CALj2ACUcv5pZoB0=gRrz54M9+YT9JCmo6FYyo5WUS6wnS+em=A@mail.gmail.com>
	<CALj2ACWm77YofBMs9x3Zmp3ctNAhcS4TvPVuXKdfwCr22FqOHg@mail.gmail.com>
	<[email protected]>
	<CALj2ACWqVzhxDuWNTWAH-LuADvsyX0r-wpwgeJ+Q1FnAKjY5Yw@mail.gmail.com>
	<[email protected]>
	<CALj2ACU70HZm+0QRJdkGA5RdJUo4zPYnV2hzkiV-wH5QS2PAEQ@mail.gmail.com>
	<[email protected]>
	<CALj2ACVMV=gMROte2=0LBFnSCRvzL4D9WK6oQ9ZHr4Qj2S8xWA@mail.gmail.com>
	<[email protected]>
	<CALj2ACX9vVYHYkX8e6w058EuAs8JL5EsnzadTxGhpiE_Ep_ByA@mail.gmail.com>
	<[email protected]>
	<CALj2ACWTrx1zxWvq8Uj2rEwCsDgQHeJ53WdvzZUw3kW+_VPG6A@mail.gmail.com>
	<CALj2ACUz5+_YNEa4ZY-XG960_oXefM50MjD71VgSCAVDkF3bzQ@mail.gmail.com>
	<[email protected]>
	<CALj2ACX90L5Mb5Vv=jsvhOdZ8BVsfpZf-CdCGhtm2N+bGUCSjg@mail.gmail.com>
	<CALj2ACXnCvcYvaz4aDH_ezUBJanz_Boi8W=76fOwrxiwSnUFOQ@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CALj2ACUBwVu_1LOWF07Acv+dWx5NO7uTqAPMNwqi5WpJRw69_Q@mail.gmail.com>
	<[email protected]>
	<CALj2ACWUUTFrTF0L_xdj64i9xsjf_+4nt_HPEBiM17cxEk4CWg@mail.gmail.com>
	<CAPsk3_AuOsLcOrgTX4-QDP0Qv+AHBuuH3fTK6MwVacH6gvR1Nw@mail.gmail.com>

Hi,

Thanks for looking into this.

On Mon, Oct 28, 2024 at 8:18 PM Jingtang Zhang <[email protected]> wrote:
>
> Just found that the initialization
> seems redundant since we have used palloc0?
>
> > +        istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
> > +        istate->bistate = NULL;
> > +        istate->mistate = NULL;

Changed it to palloc() and explicit initializations of the members.
With this, only TupleTableSlot's array in HeapMultiInsertState uses
palloc0(), the rest all use explicit initializations.

> Oh, another comments for v24-0001 patch: we are in heam AM now, should we use
> something like HEAP_INSERT_BAS_BULKWRITE instead of using table AM option,
> just like other heap AM options do?
>
> > +             if ((state->options & TABLE_INSERT_BAS_BULKWRITE) != 0)
> > +                     istate->bistate = GetBulkInsertState();

Defined HEAP_INSERT_BAS_BULKWRITE and used that in heapam.c similar to
INSERT_SKIP_FSM, INSERT_FROZEN, NO_LOGICAL.

> Little question about v24 0002 patch: would it be better to move the
> implementation of TableModifyIsMultiInsertsSupported to somewhere for table AM
> level? Seems it is a common function for future use, not a specific one for
> matview.

It's more tailored for CREATE TABLE AS and CREATE/REFRESH MATERIALIZED
VIEW in the sense that no triggers, foreign table and partitioned
table possible here. INSERT INTO SELECT and Logical Replication Apply
will have a lot more conditions (e.g. RETURNING clause, triggers etc.)
and they will need to be handled differently. So, I left
TableModifyIsMultiInsertsSupported as-is in a common place in
matview.c.

Please find the attached v25 patch set.

-- 
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com


Attachments:

  [application/octet-stream] v25-0001-Introduce-new-table-AM-for-multi-inserts.patch (15.8K, 2-v25-0001-Introduce-new-table-AM-for-multi-inserts.patch)
  download | inline diff:
From c8277e6f5e9a72baebe993b2241d34a7d427473d Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:13:20 +0000
Subject: [PATCH v25 1/3] Introduce new table AM for multi-inserts

Until now, it's the COPY ... FROM command using multi inserts
(i.e. buffer some tuples and inserts them to table at once).
Basic idea of multi-inserts is that less WAL and reduced buffer
locking. Multi-inserts is faster than calling heap_insert() in a
loop, because when multiple tuples can be inserted on a single
page, we can write just a single WAL record covering all of them,
and only need to lock/unlock the page once.

Various other commands can benefit from this multi-inserts logic
[Reusable].

Also, there's a need to have these multi-inserts AMs (Access
Methods) as scan-like API [Usability]. With this, various table
AMs can define their own buffering and flushing strategy
[Flexibility] based on the way they store the data in the
underlying storage (e.g. columnar).

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/access/heap/heapam.c         | 211 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   6 +
 src/backend/access/table/tableamapi.c    |   5 +
 src/include/access/heapam.h              |  39 +++++
 src/include/access/tableam.h             |  81 +++++++++
 src/tools/pgindent/typedefs.list         |   3 +
 6 files changed, 344 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1748eafa10..69b21cf12c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -50,6 +50,7 @@
 #include "storage/procarray.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/spccache.h"
 
 
@@ -102,7 +103,7 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
-
+static void heap_modify_insert_end(TableModifyState *state);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2603,6 +2604,214 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel,
+				  CommandId cid,
+				  int options,
+				  TableModifyBufferFlushCb buffer_flush_cb,
+				  void *buffer_flush_ctx)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(TopTransactionContext,
+									"heap_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc(sizeof(TableModifyState));
+	state->rel = rel;
+	state->cid = cid;
+	state->options = options;
+	state->mem_ctx = context;
+	state->buffer_flush_cb = buffer_flush_cb;
+	state->buffer_flush_ctx = buffer_flush_ctx;
+	state->data = NULL;			/* To be set lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+						  TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_ctx);
+
+	/* First time through, initialize heap insert state */
+	if (state->data == NULL)
+	{
+		istate = (HeapInsertState *) palloc(sizeof(HeapInsertState));
+		istate->bistate = NULL;
+		istate->mistate = NULL;
+		state->data = istate;
+		mistate =
+			(HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState));
+		mistate->slots =
+			(TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+		mistate->cur_slots = 0;
+		istate->mistate = mistate;
+
+		/*
+		 * heap_multi_insert() can leak memory. So switch to this memory
+		 * context before every heap_multi_insert() call and reset when
+		 * finished.
+		 */
+		mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext,
+												 "heap_multi_insert memory context",
+												 ALLOCSET_DEFAULT_SIZES);
+
+		if ((state->options & HEAP_INSERT_BAS_BULKWRITE) != 0)
+			istate->bistate = GetBulkInsertState();
+	}
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+	dstslot = mistate->slots[mistate->cur_slots];
+
+	if (dstslot == NULL)
+	{
+		/*
+		 * We use virtual tuple slots buffered slots for leveraging the
+		 * optimization it provides to minimize physical data copying. The
+		 * virtual slot gets materialized when we copy (via below
+		 * ExecCopySlot) the tuples from the source slot which can be of any
+		 * type. This way, it is ensured that the tuple storage doesn't depend
+		 * on external memory, because all the datums that aren't passed by
+		 * value are copied into the slot's memory context.
+		 */
+		dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+									 &TTSOpsVirtual);
+
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	Assert(TTS_IS_VIRTUAL(dstslot));
+
+	/*
+	 * Note that the copy clears the previous destination slot contents, so no
+	 * need to explicitly ExecClearTuple() here.
+	 */
+	ExecCopySlot(dstslot, slot);
+
+	mistate->cur_slots++;
+
+	if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS)
+		heap_modify_buffer_flush(state);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	/* Quick exit if we have flushed already */
+	if (mistate->cur_slots == 0)
+		return;
+
+	/*
+	 * heap_multi_insert() can leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(mistate->mem_ctx);
+	heap_multi_insert(state->rel,
+					  mistate->slots,
+					  mistate->cur_slots,
+					  state->cid,
+					  state->options,
+					  istate->bistate);
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->mem_ctx);
+
+	/*
+	 * Invoke caller-supplied buffer flush callback after inserting rows from
+	 * the buffers to heap.
+	 */
+	if (state->buffer_flush_cb != NULL)
+	{
+		for (int i = 0; i < mistate->cur_slots; i++)
+		{
+			state->buffer_flush_cb(state->buffer_flush_ctx,
+								   mistate->slots[i]);
+		}
+	}
+
+	mistate->cur_slots = 0;
+}
+
+/*
+ * Heap insert specific function used for performing work at the end like
+ * flushing remaining buffered tuples, cleaning up the insert state and tuple
+ * table slots used for buffered tuples etc.
+ */
+static void
+heap_modify_insert_end(TableModifyState *state)
+{
+	HeapInsertState *istate;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+
+	if (istate->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate = istate->mistate;
+
+		heap_modify_buffer_flush(state);
+
+		Assert(mistate->cur_slots == 0);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		MemoryContextDelete(mistate->mem_ctx);
+	}
+
+	if (istate->bistate != NULL)
+		FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+	heap_modify_insert_end(state);
+	MemoryContextDelete(state->mem_ctx);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index a8d95e0f1c..d2ef6b4b78 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2644,6 +2644,12 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_modify_begin = heap_modify_begin,
+	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
+	.tuple_modify_buffer_flush = heap_modify_buffer_flush,
+	.tuple_modify_end = heap_modify_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index e9b598256f..772f29b1b5 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,11 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->scan_sample_next_block != NULL);
 	Assert(routine->scan_sample_next_tuple != NULL);
 
+	Assert(routine->tuple_modify_begin != NULL);
+	Assert(routine->tuple_modify_buffer_insert != NULL);
+	Assert(routine->tuple_modify_buffer_flush != NULL);
+	Assert(routine->tuple_modify_end != NULL);
+
 	return routine;
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 96cf82f97b..a9722ce947 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -37,6 +37,7 @@
 #define HEAP_INSERT_FROZEN		TABLE_INSERT_FROZEN
 #define HEAP_INSERT_NO_LOGICAL	TABLE_INSERT_NO_LOGICAL
 #define HEAP_INSERT_SPECULATIVE 0x0010
+#define HEAP_INSERT_BAS_BULKWRITE	TABLE_INSERT_BAS_BULKWRITE
 
 /* "options" flag bits for heap_page_prune_and_freeze */
 #define HEAP_PAGE_PRUNE_MARK_UNUSED_NOW		(1 << 0)
@@ -272,6 +273,33 @@ typedef enum
 	PRUNE_VACUUM_CLEANUP,		/* VACUUM 2nd heap pass */
 } PruneReason;
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+typedef struct HeapMultiInsertState
+{
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of buffered slots currently held */
+	int			cur_slots;
+
+	/* Memory context for dealing with multi inserts */
+	MemoryContext mem_ctx;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -322,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+										   CommandId cid,
+										   int options,
+										   TableModifyBufferFlushCb buffer_flush_cb,
+										   void *buffer_flush_ctx);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+									  TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index adb478a93c..57b71eef38 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -254,11 +254,42 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+struct TableModifyState;
+
+/* Callback invoked upon flushing each buffered tuple */
+typedef void (*TableModifyBufferFlushCb) (void *context,
+										  TupleTableSlot *slot);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+	/* These fields are used for inserts for now */
+
+	Relation	rel;			/* Relation to insert to */
+	CommandId	cid;			/* Command ID for insert */
+	int			options;		/* TABLE_INSERT options */
+
+	/* Memory context for dealing with modify state variables */
+	MemoryContext mem_ctx;
+
+	/* Flush callback and its context used for multi inserts */
+	TableModifyBufferFlushCb buffer_flush_cb;
+	void	   *buffer_flush_ctx;
+
+	/* Table AM specific data */
+	void	   *data;
+} TableModifyState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
 #define TABLE_INSERT_FROZEN			0x0004
 #define TABLE_INSERT_NO_LOGICAL		0x0008
+/*
+ * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for
+ * HEAP_INSERT_SPECULATIVE.
+ */
+#define TABLE_INSERT_BAS_BULKWRITE	0x0020
 
 /* flag bits for table_tuple_lock */
 /* Follow tuples whose update is in progress if lock modes don't conflict  */
@@ -577,6 +608,21 @@ typedef struct TableAmRoutine
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
 
+	/* ------------------------------------------------------------------------
+	 * Table Modify related functions.
+	 * ------------------------------------------------------------------------
+	 */
+	TableModifyState *(*tuple_modify_begin) (Relation rel,
+											 CommandId cid,
+											 int options,
+											 TableModifyBufferFlushCb buffer_flush_cb,
+											 void *buffer_flush_ctx);
+	void		(*tuple_modify_buffer_insert) (TableModifyState *state,
+											   TupleTableSlot *slot);
+	void		(*tuple_modify_buffer_flush) (TableModifyState *state);
+	void		(*tuple_modify_end) (TableModifyState *state);
+
+
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1608,6 +1654,41 @@ table_finish_bulk_insert(Relation rel, int options)
 		rel->rd_tableam->finish_bulk_insert(rel, options);
 }
 
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+static inline TableModifyState *
+table_modify_begin(Relation rel,
+				   CommandId cid,
+				   int options,
+				   TableModifyBufferFlushCb buffer_flush_cb,
+				   void *buffer_flush_ctx)
+{
+	return rel->rd_tableam->tuple_modify_begin(rel,
+											   cid,
+											   options,
+											   buffer_flush_cb,
+											   buffer_flush_ctx);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+	state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+	state->rel->rd_tableam->tuple_modify_end(state);
+}
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 171a7dd5d2..e7ddf29c16 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1147,6 +1147,8 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
 HeapPageFreeze
 HeapScanDesc
 HeapTuple
@@ -2873,6 +2875,7 @@ TableFuncScanState
 TableFuncType
 TableInfo
 TableLikeClause
+TableModifyState
 TableSampleClause
 TableScanDesc
 TableScanDescData
-- 
2.40.1



  [application/octet-stream] v25-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch (11.1K, 3-v25-0002-Optimize-CTAS-CMV-RMV-with-new-multi-inserts-tab.patch)
  download | inline diff:
From 5a16c618c4c0875544eb866bee74b68650737fbd Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:29:55 +0000
Subject: [PATCH v25 2/3] Optimize CTAS/CMV/RMV with new multi-inserts table AM

This commit optimizes the following commands for heap AM using new
multi-inserts table AM added by commit <<CHANGE_ME>>:
- CREATE TABLE AS
- CREATE MATERIALIZED VIEW
- REFRESH MATERIALIZED VIEW

Testing shows that performance of CTAS, CMV, RMV is improved by
<<TO_FILL>> respectively on <<TO_FILL>> system.

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/commands/createas.c |  60 ++++++++++++++----
 src/backend/commands/matview.c  | 106 +++++++++++++++++++++++++++++---
 src/include/commands/matview.h  |   3 +
 3 files changed, 147 insertions(+), 22 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 5c92e48a56..55fd439468 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -38,6 +38,7 @@
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "nodes/queryjumble.h"
+#include "optimizer/optimizer.h"
 #include "parser/analyze.h"
 #include "rewrite/rewriteHandler.h"
 #include "tcop/tcopprot.h"
@@ -56,6 +57,12 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+
+	/* Table modify state. NULL if multi-inserts isn't supported. */
+	TableModifyState *mstate;
+
+	/* True if SELECT query contains volatile functions */
+	bool		volatile_funcs;
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -321,6 +328,10 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		plan = pg_plan_query(query, pstate->p_sourcetext,
 							 CURSOR_OPT_PARALLEL_OK, params);
 
+		/* Check if the SELECT query has any volatile functions */
+		((DR_intorel *) dest)->volatile_funcs =
+			contain_volatile_functions_after_planning((Expr *) query);
+
 		/*
 		 * Use a snapshot with an updated command ID to ensure this query sees
 		 * results of any previously executed queries.  (This could only
@@ -556,16 +567,32 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
 	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
+	myState->ti_options = TABLE_INSERT_SKIP_FSM |
+		TABLE_INSERT_BAS_BULKWRITE;
+	myState->mstate = NULL;
+	myState->bistate = NULL;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
-	 * bulk inserts as there are no tuples to insert.
+	 * multi or bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
-	else
-		myState->bistate = NULL;
+	{
+		if (TableModifyIsMultiInsertsSupported(myState->rel,
+											   myState->volatile_funcs))
+		{
+			myState->mstate = table_modify_begin(myState->rel,
+												 myState->output_cid,
+												 myState->ti_options,
+												 NULL,	/* Multi-insert buffer
+														 * flush callback */
+												 NULL); /* Multi-insert buffer
+														 * flush callback
+														 * context */
+		}
+		else
+			myState->bistate = GetBulkInsertState();
+	}
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -593,11 +620,15 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+
+		if (myState->mstate != NULL)
+			table_modify_buffer_insert(myState->mstate, slot);
+		else
+			table_tuple_insert(myState->rel,
+							   slot,
+							   myState->output_cid,
+							   myState->ti_options,
+							   myState->bistate);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -616,8 +647,13 @@ intorel_shutdown(DestReceiver *self)
 
 	if (!into->skipData)
 	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
+		if (myState->mstate != NULL)
+			table_modify_end(myState->mstate);
+		else
+		{
+			FreeBulkInsertState(myState->bistate);
+			table_finish_bulk_insert(myState->rel, myState->ti_options);
+		}
 	}
 
 	/* close rel, but keep lock until commit */
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 010097873d..fa495ec533 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -30,7 +30,9 @@
 #include "commands/tablespace.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
+#include "foreign/fdwapi.h"
 #include "miscadmin.h"
+#include "optimizer/optimizer.h"
 #include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/lmgr.h"
@@ -51,6 +53,12 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+
+	/* Table modify state. NULL if multi-inserts isn't supported. */
+	TableModifyState *mstate;
+
+	/* True if SELECT query contains volatile functions */
+	bool		volatile_funcs;
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -428,6 +436,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
 	/* Plan the query which will generate data for the refresh. */
 	plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
 
+	/*
+	 * Check if the stored MATERIALIZED VIEW query has any volatile functions.
+	 */
+	((DR_transientrel *) dest)->volatile_funcs =
+		contain_volatile_functions_after_planning((Expr *) query);
+
 	/*
 	 * Use a snapshot with an updated command ID to ensure this query sees
 	 * results of any previously executed queries.  (This could only matter if
@@ -492,8 +506,26 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->transientrel = transientrel;
 	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->ti_options = TABLE_INSERT_SKIP_FSM |
+		TABLE_INSERT_FROZEN |
+		TABLE_INSERT_BAS_BULKWRITE;
+	myState->bistate = NULL;
+	myState->mstate = NULL;
+
+	/* Set up the state for multi or bulk inserts */
+	if (TableModifyIsMultiInsertsSupported(myState->transientrel,
+										   myState->volatile_funcs))
+	{
+		myState->mstate = table_modify_begin(myState->transientrel,
+											 myState->output_cid,
+											 myState->ti_options,
+											 NULL,	/* Multi-insert buffer
+													 * flush callback */
+											 NULL); /* Multi-insert buffer
+													 * flush callback context */
+	}
+	else
+		myState->bistate = GetBulkInsertState();
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -519,11 +551,14 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * tuple's xmin), but since we don't do that here...
 	 */
 
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	if (myState->mstate != NULL)
+		table_modify_buffer_insert(myState->mstate, slot);
+	else
+		table_tuple_insert(myState->transientrel,
+						   slot,
+						   myState->output_cid,
+						   myState->ti_options,
+						   myState->bistate);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -538,9 +573,13 @@ transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	if (myState->mstate != NULL)
+		table_modify_end(myState->mstate);
+	else
+	{
+		FreeBulkInsertState(myState->bistate);
+		table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	}
 
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
@@ -984,3 +1023,50 @@ CloseMatViewIncrementalMaintenance(void)
 	matview_maintenance_depth--;
 	Assert(matview_maintenance_depth >= 0);
 }
+
+/*
+ * Check if multi-inserts is supported.
+ *
+ * It's generally more efficient to prepare a bunch of tuples for insertion,
+ * and insert them in one multi-inserts call, than call
+ * table_tuple_insert() separately for every tuple. However, there are a
+ * number of reasons why we might not be able to do this. In general, can't
+ * support multi-inserts in the following cases:
+ *
+ * When there are any BEFORE/INSTEAD OF triggers on the table or any volatile
+ * functions/expressions in the SELECT query. Such triggers or volatile
+ * expressions might query the table we're inserting into and act differently
+ * if the tuples that have already been processed and prepared for insertion
+ * are not there.
+ *
+ * When inserting into partitioned table. For partitioned tables, we may still
+ * be able to perform multi-inserts. However, the possibility of this depends
+ * on which types of triggers exist on the partition. We must disable
+ * multi-inserts if the partition is a foreign table that can't use batching or
+ * it has any before row insert or insert instead triggers (same as we checked
+ * above for the parent table). We really can't know all these unless we start
+ * inserting tuples into the respective partitions. We can have an intermediate
+ * insert state to show the intent to do multi-inserts and later determine if
+ * we can use multi-inserts for the partition being inserted into.
+ *
+ * When inserting into foreign table. For foreign tables, we may still be able
+ * to do multi-inserts if the FDW supports batching.
+ */
+bool
+TableModifyIsMultiInsertsSupported(Relation rel, bool volatile_funcs)
+{
+	if (volatile_funcs)
+		return false;
+
+	/*
+	 * For CREATE TABLE AS, CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED
+	 * VIEW, we really can't have triggers or can't create table as
+	 * partitioned or foreign. So, we will assert.
+	 */
+	Assert(rel->trigdesc == NULL);
+	Assert(rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE);
+	Assert(rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE);
+
+	/* Can support multi-inserts */
+	return true;
+}
diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h
index c8811e8fc7..28abd7b89b 100644
--- a/src/include/commands/matview.h
+++ b/src/include/commands/matview.h
@@ -33,4 +33,7 @@ extern DestReceiver *CreateTransientRelDestReceiver(Oid transientoid);
 
 extern bool MatViewIncrementalMaintenanceIsEnabled(void);
 
+extern bool TableModifyIsMultiInsertsSupported(Relation rel,
+											   bool volatile_funcs);
+
 #endif							/* MATVIEW_H */
-- 
2.40.1



  [application/octet-stream] v25-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch (14.3K, 4-v25-0003-Use-new-multi-inserts-table-AM-for-COPY-.-FROM.patch)
  download | inline diff:
From 16d488cff14c0ce9ace648a1b99e507edb184e74 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 30 Oct 2024 17:30:41 +0000
Subject: [PATCH v25 3/3] Use new multi-inserts table AM for COPY ... FROM

This commit uses the new multi-inserts table AM added by commit
<<CHANGE_ME>> for COPY ... FROM command.

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/commands/copyfrom.c          | 254 +++++++++++++++--------
 src/include/commands/copyfrom_internal.h |   4 +-
 src/tools/pgindent/typedefs.list         |   1 +
 3 files changed, 171 insertions(+), 88 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 07cbd5d22b..18fb609cbe 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -74,14 +74,27 @@
  */
 #define MAX_PARTITION_BUFFERS	32
 
+/* Context for multi-inserts buffer flush callback */
+typedef struct MultiInsertBufferFlushCtx
+{
+	CopyFromState cstate;
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} MultiInsertBufferFlushCtx;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableModifyState *mstate;	/* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
+	TupleTableSlot *mislot;		/* Slot used for multi-inserts */
+	MultiInsertBufferFlushCtx *mibufferctx; /* Multi-inserts buffer flush
+											 * callback context */
 	int			nused;			/* number of 'slots' containing tuples */
+	int			currslotno;		/* Current buffered slot number that's being
+								 * flushed; Used to get correct cur_lineno for
+								 * errors while in flush callback. */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
 } CopyMultiInsertBuffer;
@@ -216,19 +229,96 @@ CopyLimitPrintoutLength(const char *str)
 	return res;
 }
 
+/*
+ * Implements for multi-inserts buffer flush callback
+ * i.e. TableModifyEndCallback.
+ *
+ * NB: Caller must take care of opening and closing the indices.
+ */
+static void
+MultiInsertBufferFlushCb(void *context, TupleTableSlot *slot)
+{
+	MultiInsertBufferFlushCtx *mibufferctx = (MultiInsertBufferFlushCtx *) context;
+	CopyFromState cstate = mibufferctx->cstate;
+	ResultRelInfo *resultRelInfo = mibufferctx->resultRelInfo;
+	EState	   *estate = mibufferctx->estate;
+	CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 cstate->transition_capture);
+
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 cstate->transition_capture);
+	}
+
+	Assert(buffer->currslotno <= buffer->nused);
+}
+
 /*
  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						  CopyFromState cstate, EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		buffer->mibufferctx =
+			(MultiInsertBufferFlushCtx *) palloc(sizeof(MultiInsertBufferFlushCtx));
+		buffer->mibufferctx->cstate = cstate;
+		buffer->mibufferctx->resultRelInfo = rri;
+		buffer->mibufferctx->estate = estate;
+
+		buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+											miinfo->mycid,
+											miinfo->ti_options,
+											MultiInsertBufferFlushCb,
+											buffer->mibufferctx);
+
+		buffer->slots = NULL;
+	}
+	else
+	{
+		buffer->mstate = NULL;
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	}
+
+	buffer->mislot = NULL;
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -239,11 +329,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
  */
 static inline void
 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
+							   ResultRelInfo *rri, CopyFromState cstate,
+							   EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -276,7 +367,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	 * tuples their way for the first time.
 	 */
 	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+		CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
 }
 
 /*
@@ -320,8 +411,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -393,13 +482,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -407,56 +491,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
-		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
-		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-		for (i = 0; i < nused; i++)
-		{
-			/*
-			 * If there are any indexes, update them for all the inserted
-			 * tuples, and run AFTER ROW INSERT triggers.
-			 */
-			if (resultRelInfo->ri_NumIndices > 0)
-			{
-				List	   *recheckIndexes;
-
-				cstate->cur_lineno = buffer->linenos[i];
-				recheckIndexes =
-					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
-										  false, NULL, NIL, false);
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], recheckIndexes,
-									 cstate->transition_capture);
-				list_free(recheckIndexes);
-			}
+		table_modify_buffer_flush(buffer->mstate);
 
-			/*
-			 * There's no indexes, but see if we need to run AFTER ROW INSERT
-			 * triggers anyway.
-			 */
-			else if (resultRelInfo->ri_TrigDesc != NULL &&
-					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-			{
-				cstate->cur_lineno = buffer->linenos[i];
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], NIL,
-									 cstate->transition_capture);
-			}
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-			ExecClearTuple(slots[i]);
-		}
+		/*
+		 * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+		 * in the flush callback CopyModifyBufferFlushCallback.
+		 */
 
 		/* Update the row counter and progress of the COPY command */
 		*processed += nused;
@@ -492,19 +538,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
 	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
+		table_modify_end(buffer->mstate);
+		ExecDropSingleTupleTableSlot(buffer->mislot);
+		pfree(buffer->mibufferctx);
 	}
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -598,15 +643,36 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(buffer->nused < MAX_BUFFERED_TUPLES);
 
 	nused = buffer->nused;
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		if (buffer->mislot == NULL)
+		{
+			buffer->mislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+												&TTSOpsVirtual);
+		}
+
+		/* Caller must clear the slot */
+		slot = buffer->mislot;
+	}
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -620,7 +686,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 
 	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+	if (rri->ri_FdwRoutine != NULL)
+		Assert(slot == buffer->slots[buffer->nused]);
+#endif
 
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
@@ -628,6 +698,22 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	/* Record this slot as being used */
 	buffer->nused++;
 
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		Assert(slot == buffer->mislot);
+		buffer->currslotno = 0;
+
+		table_modify_buffer_insert(buffer->mstate, slot);
+	}
+	else
+	{
+		/*
+		 * The slot previously might point into the per-tuple context. For
+		 * batching it needs to be longer lived.
+		 */
+		ExecMaterializeSlot(slot);
+	}
+
 	/* Update how many tuples are stored and their size */
 	miinfo->bufferedTuples++;
 	miinfo->bufferedBytes += tuplen;
@@ -841,7 +927,7 @@ CopyFrom(CopyFromState cstate)
 	/*
 	 * It's generally more efficient to prepare a bunch of tuples for
 	 * insertion, and insert them in one
-	 * table_multi_insert()/ExecForeignBatchInsert() call, than call
+	 * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
 	 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
 	 * However, there are a number of reasons why we might not be able to do
 	 * this.  These are explained below.
@@ -925,7 +1011,8 @@ CopyFrom(CopyFromState cstate)
 			insertMethod = CIM_MULTI;
 
 		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
-								estate, mycid, ti_options);
+								estate, mycid,
+								ti_options | TABLE_INSERT_BAS_BULKWRITE);
 	}
 
 	/*
@@ -1094,7 +1181,8 @@ CopyFrom(CopyFromState cstate)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
 						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+													   resultRelInfo, cstate,
+													   estate);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
 						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
@@ -1224,12 +1312,6 @@ CopyFrom(CopyFromState cstate)
 				/* Store the slot in the multi-insert buffer, when enabled. */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
-					/*
-					 * The slot previously might point into the per-tuple
-					 * context. For batching it needs to be longer lived.
-					 */
-					ExecMaterializeSlot(myslot);
-
 					/* Add this tuple to the tuple buffer */
 					CopyMultiInsertInfoStore(&multiInsertInfo,
 											 resultRelInfo, myslot,
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
 typedef enum CopyInsertMethod
 {
 	CIM_SINGLE,					/* use table_tuple_insert or ExecForeignInsert */
-	CIM_MULTI,					/* always use table_multi_insert or
+	CIM_MULTI,					/* always use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert */
-	CIM_MULTI_CONDITIONAL,		/* use table_multi_insert or
+	CIM_MULTI_CONDITIONAL,		/* use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e7ddf29c16..bf21e43ce1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1664,6 +1664,7 @@ MonotonicFunction
 MorphOpaque
 MsgType
 MultiAssignRef
+MultiInsertBufferFlushCtx
 MultiSortSupport
 MultiSortSupportData
 MultiXactId
-- 
2.40.1



view thread (8+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
  In-Reply-To: <CALj2ACUVE8CYvYrudem4bR7W3sNRE-akC4B996K65_7C6xTBJQ@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox