public inbox for [email protected]  
help / color / mirror / Atom feed
From: Bharath Rupireddy <[email protected]>
To: Jeff Davis <[email protected]>
Cc: Masahiko Sawada <[email protected]>
Cc: PostgreSQL-development <[email protected]>
Cc: Andres Freund <[email protected]>
Cc: Dilip Kumar <[email protected]>
Cc: Luc Vlaming <[email protected]>
Cc: Justin Pryzby <[email protected]>
Cc: Michael Paquier <[email protected]>
Cc: Matthias van de Meent <[email protected]>
Cc: Alexander Korotkov <[email protected]>
Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
Date: Wed, 15 May 2024 12:56:17 +0530
Message-ID: <CALj2ACUz5+_YNEa4ZY-XG960_oXefM50MjD71VgSCAVDkF3bzQ@mail.gmail.com> (raw)
In-Reply-To: <CALj2ACWTrx1zxWvq8Uj2rEwCsDgQHeJ53WdvzZUw3kW+_VPG6A@mail.gmail.com>
References: <CALj2ACXdrOmB6Na9amHWZHKvRT3Z0nwTRsCwoMT-npOBtmXLXg@mail.gmail.com>
	<[email protected]>
	<CALj2ACX5UMWVFdrRNUE0KDrg54WV1cumBXwcETXhrPc1ibKAQA@mail.gmail.com>
	<CAAWbhmj5Pio3nOUakObzLGCSS9dwFfgsNVDhwTGzXNwZc00uCQ@mail.gmail.com>
	<CALj2ACVHC=c6eC9SRxhcTUrnXvNDNkEBgedi2WkVJYRb=0sWYw@mail.gmail.com>
	<CALj2ACVE2h=LnFnpr3rh+6SZzdwzW5EZOYG2Z0t=p28Fn75eag@mail.gmail.com>
	<CALj2ACWT0Rz8oybWBm5W4CeS0DvFkwaw-pEvGArhDLyPbZnW_g@mail.gmail.com>
	<CALj2ACWxO3HPtpYZb765LZk-uKVuAvZPO1HDeZ8=mzMgVPgaww@mail.gmail.com>
	<CALj2ACXJA4QQ_6zAHez0Uy-9t-ebmpox2y1QBja+mF4QP+h8WQ@mail.gmail.com>
	<CAD21AoD97mhzF8cqsd2v1jg9z8xfvAJrPx6Wvi+Ev0Hmu96LJA@mail.gmail.com>
	<CALj2ACUcv5pZoB0=gRrz54M9+YT9JCmo6FYyo5WUS6wnS+em=A@mail.gmail.com>
	<CALj2ACWm77YofBMs9x3Zmp3ctNAhcS4TvPVuXKdfwCr22FqOHg@mail.gmail.com>
	<[email protected]>
	<CALj2ACWqVzhxDuWNTWAH-LuADvsyX0r-wpwgeJ+Q1FnAKjY5Yw@mail.gmail.com>
	<[email protected]>
	<CALj2ACU70HZm+0QRJdkGA5RdJUo4zPYnV2hzkiV-wH5QS2PAEQ@mail.gmail.com>
	<[email protected]>
	<CALj2ACVMV=gMROte2=0LBFnSCRvzL4D9WK6oQ9ZHr4Qj2S8xWA@mail.gmail.com>
	<[email protected]>
	<CALj2ACX9vVYHYkX8e6w058EuAs8JL5EsnzadTxGhpiE_Ep_ByA@mail.gmail.com>
	<[email protected]>
	<CALj2ACWTrx1zxWvq8Uj2rEwCsDgQHeJ53WdvzZUw3kW+_VPG6A@mail.gmail.com>

On Mon, Apr 29, 2024 at 11:36 AM Bharath Rupireddy
<[email protected]> wrote:
>
> Please see the attached v20 patch set.

It looks like with the use of the new multi insert table access method
(TAM) for COPY (v20-0005), pgbench regressed about 35% [1]. The reason
is that the memory-based flushing decision the new TAM takes [2]
differs from that of what the COPY does today with table_multi_insert.
The COPY with table_multi_insert, maintains exact size of the tuples
in CopyFromState after it does the line parsing. For instance, the
tuple size of a table with two integer columns is 8 (4+4) bytes here.
The new TAM relies on the memory occupied by the slot's memory context
which holds the actual tuple as a good approximation for the tuple
size. But, this memory context size also includes a tuple header, so
the size here is not just 8 (4+4) bytes but more. Because of this, the
buffers get flushed sooner than that of the existing COPY with
table_multi_insert AM causing regression in pgbench which uses COPY
extensively. The new TAM aren't designed to be able to receive tuple
sizes from the callers, even if we do that, the API doesn't look
generic.

Here are couple of ideas to get away with this:

1. Try to get the actual tuple sizes excluding header sizes for each
column in the new TAM.
2. Try not to use the new TAM for COPY in which case the
table_multi_insert stays forever.
3. Try passing a flag to tell the new TAM that the caller does the
flushing and no need for an internal flushing.

I haven't explored the idea (1) in depth yet. If we find a way to do
so, it looks to me that we are going backwards since we need to strip
off headers for each column of a row for all of the rows. I suspect
this would cost a bit more and may not solve the regression.

With an eventual goal to get rid of table_multi_insert, (3) may not be
a better choice.

(3) seems reasonable to implement and reduce the regression. I did so
in the attached v21 patches. A new flag TM_SKIP_INTERNAL_BUFFER_FLUSH
is introduced in v21 patch, when specified, no internal flushing is
done, the caller has to flush the buffered tuples using
table_modify_buffer_flush(). Check the test results [3] HEAD 2.948 s,
PATCHED 2.946 s.

v21 also adds code to maintain tuple size for virtual tuple slots.
This helps make better memory-based flushing decisions in the new TAM.

Thoughts?

[1]
HEAD:
done in 2.84 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 1.99 s, vacuum 0.21 s, primary keys 0.62 s).
done in 2.78 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 1.88 s, vacuum 0.21 s, primary keys 0.69 s).
done in 2.97 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.07 s, vacuum 0.21 s, primary keys 0.69 s).
done in 2.86 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 1.96 s, vacuum 0.21 s, primary keys 0.69 s).
done in 2.90 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.05 s, vacuum 0.21 s, primary keys 0.64 s).
done in 2.83 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 1.96 s, vacuum 0.21 s, primary keys 0.66 s).
done in 2.80 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 1.95 s, vacuum 0.20 s, primary keys 0.63 s).
done in 2.79 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 1.89 s, vacuum 0.21 s, primary keys 0.69 s).
done in 3.75 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.17 s, vacuum 0.32 s, primary keys 1.25 s).
done in 3.86 s (drop tables 0.00 s, create tables 0.08 s, client-side
generate 2.97 s, vacuum 0.21 s, primary keys 0.59 s).

AVG done in 2.948 s

v20 PATCHED:
done in 3.94 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.12 s, vacuum 0.19 s, primary keys 0.62 s).
done in 4.04 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.22 s, vacuum 0.20 s, primary keys 0.61 s).
done in 3.98 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.16 s, vacuum 0.20 s, primary keys 0.61 s).
done in 4.04 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.16 s, vacuum 0.20 s, primary keys 0.67 s).
done in 3.98 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.16 s, vacuum 0.20 s, primary keys 0.61 s).
done in 4.00 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.17 s, vacuum 0.20 s, primary keys 0.63 s).
done in 4.43 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.24 s, vacuum 0.21 s, primary keys 0.98 s).
done in 4.16 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 3.36 s, vacuum 0.20 s, primary keys 0.59 s).
done in 3.62 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.83 s, vacuum 0.20 s, primary keys 0.58 s).
done in 3.67 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.84 s, vacuum 0.21 s, primary keys 0.61 s).

AVG done in 3.986 s

[2]
+    /*
+     * Memory allocated for the whole tuple is in slot's memory context, so
+     * use it keep track of the total space occupied by all buffered tuples.
+     */
+    if (TTS_SHOULDFREE(slot))
+        mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false);
+
+    if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+        mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)
+        heap_modify_buffer_flush(state);

[3]
v21 PATCHED:
done in 2.92 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.12 s, vacuum 0.21 s, primary keys 0.59 s).
done in 2.89 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.07 s, vacuum 0.21 s, primary keys 0.61 s).
done in 2.89 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.05 s, vacuum 0.21 s, primary keys 0.62 s).
done in 2.90 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.07 s, vacuum 0.21 s, primary keys 0.62 s).
done in 2.80 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.00 s, vacuum 0.21 s, primary keys 0.59 s).
done in 2.84 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.04 s, vacuum 0.20 s, primary keys 0.60 s).
done in 2.84 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.03 s, vacuum 0.20 s, primary keys 0.59 s).
done in 2.85 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.04 s, vacuum 0.20 s, primary keys 0.60 s).
done in 3.48 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.44 s, vacuum 0.23 s, primary keys 0.80 s).
done in 3.05 s (drop tables 0.00 s, create tables 0.01 s, client-side
generate 2.28 s, vacuum 0.21 s, primary keys 0.55 s).

AVG done in 2.946 s

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com


Attachments:

  [application/octet-stream] v21-0001-Introduce-new-Table-Access-Methods-for-single-an.patch (22.0K, 2-v21-0001-Introduce-new-Table-Access-Methods-for-single-an.patch)
  download | inline diff:
From 536616aaedab560084e69a3c389fd2bc2e1d489c Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 15 May 2024 05:42:38 +0000
Subject: [PATCH v21 1/5] Introduce new Table Access Methods for single and
 multi inserts

---
 src/backend/access/heap/heapam.c         | 206 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   6 +
 src/backend/access/table/tableam.c       |  97 +++++++++++
 src/backend/access/table/tableamapi.c    |  10 ++
 src/backend/executor/execTuples.c        |   4 +
 src/include/access/heapam.h              |  44 +++++
 src/include/access/tableam.h             | 151 +++++++++++++++++
 src/include/executor/tuptable.h          |   6 +
 src/tools/pgindent/typedefs.list         |   3 +
 9 files changed, 526 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4be0dee4de..b14b3379aa 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -64,6 +64,7 @@
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -112,7 +113,7 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
-
+static void heap_modify_insert_end_callback(TableModifyState *state);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2608,6 +2609,209 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel, int modify_flags,
+				  CommandId cid, int options,
+				  TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				  void *modify_buffer_flush_context)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(TopTransactionContext,
+									"heap_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mem_cxt = context;
+	state->cid = cid;
+	state->options = options;
+	state->modify_buffer_flush_callback = modify_buffer_flush_callback;
+	state->modify_buffer_flush_context = modify_buffer_flush_context;
+	state->modify_end_callback = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+						  TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_cxt);
+
+	/* First time through, initialize heap insert state */
+	if (state->data == NULL)
+	{
+		istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
+		istate->bistate = NULL;
+		istate->mistate = NULL;
+		state->data = istate;
+
+		if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0)
+		{
+			mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+			mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+			istate->mistate = mistate;
+			mistate->mem_cxt = AllocSetContextCreate(CurrentMemoryContext,
+													 "heap_multi_insert memory context",
+													 ALLOCSET_DEFAULT_SIZES);
+		}
+
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			istate->bistate = GetBulkInsertState();
+
+		state->modify_end_callback = heap_modify_insert_end_callback;
+	}
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	dstslot = mistate->slots[mistate->cur_slots];
+	if (dstslot == NULL)
+	{
+		/*
+		 * We use virtual tuple slots buffered slots for leveraging the
+		 * optimization it provides to minimize physical data copying. The
+		 * virtual slot gets materialized when we copy (via below
+		 * ExecCopySlot) the tuples from the source slot which can be of any
+		 * type. This way, it is ensured that the tuple storage doesn't depend
+		 * on external memory, because all the datums that aren't passed by
+		 * value are copied into the slot's memory context.
+		 */
+		dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+									 &TTSOpsVirtual);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	Assert(TTS_IS_VIRTUAL(dstslot));
+
+	/*
+	 * Note that the copy clears the previous destination slot contents, so
+	 * there's no need of explicit ExecClearTuple here.
+	 */
+	ExecCopySlot(dstslot, slot);
+
+	mistate->cur_slots++;
+	mistate->cur_size += ((VirtualTupleTableSlot *) dstslot)->sz;
+
+	if ((state->modify_flags & TM_SKIP_INTERNAL_BUFFER_FLUSH) == 0 &&
+		(mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+		 mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES))
+		heap_modify_buffer_flush(state);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	/* Quick exit if we have flushed already */
+	if (mistate->cur_slots == 0)
+		return;
+
+	/*
+	 * heap_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(mistate->mem_cxt);
+
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->options, istate->bistate);
+
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->mem_cxt);
+
+	if (state->modify_buffer_flush_callback != NULL)
+	{
+		for (int i = 0; i < mistate->cur_slots; i++)
+			state->modify_buffer_flush_callback(state->modify_buffer_flush_context,
+												mistate->slots[i]);
+	}
+
+	mistate->cur_slots = 0;
+	mistate->cur_size = 0;
+}
+
+/*
+ * Heap insert specific callback used for performing work at the end like
+ * flushing buffered tuples if any, cleaning up the insert state and buffered
+ * slots.
+ */
+static void
+heap_modify_insert_end_callback(TableModifyState *state)
+{
+	HeapInsertState *istate;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+
+	if (istate->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate = istate->mistate;
+
+		heap_modify_buffer_flush(state);
+
+		Assert(mistate->cur_slots == 0 &&
+			   mistate->cur_size == 0);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		MemoryContextDelete(mistate->mem_cxt);
+	}
+
+	if (istate->bistate != NULL)
+		FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_callback != NULL)
+		state->modify_end_callback(state);
+
+	MemoryContextDelete(state->mem_cxt);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b7929..eda0c73a16 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2615,6 +2615,12 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_modify_begin = heap_modify_begin,
+	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
+	.tuple_modify_buffer_flush = heap_modify_buffer_flush,
+	.tuple_modify_end = heap_modify_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index e57a0b7ea3..35a3e43c59 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -21,6 +21,7 @@
 
 #include <math.h>
 
+#include "access/heapam.h"		/* just for BulkInsertState */
 #include "access/syncscan.h"
 #include "access/tableam.h"
 #include "access/xact.h"
@@ -29,6 +30,7 @@
 #include "storage/bufmgr.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
+#include "utils/memutils.h"
 
 /*
  * Constants to control the behavior of block allocation to parallel workers
@@ -48,6 +50,7 @@
 char	   *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
 bool		synchronize_seqscans = true;
 
+static void default_table_modify_insert_end_callback(TableModifyState *state);
 
 /* ----------------------------------------------------------------------------
  * Slot functions.
@@ -756,3 +759,97 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths,
 	else
 		*allvisfrac = (double) relallvisible / curpages;
 }
+
+/*
+ * Initialize default table modify state.
+ */
+TableModifyState *
+default_table_modify_begin(Relation rel, int modify_flags,
+						   CommandId cid, int options,
+						   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+						   void *modify_buffer_flush_context)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(CurrentMemoryContext,
+									"default_table_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mem_cxt = context;
+	state->cid = cid;
+	state->options = options;
+	state->modify_buffer_flush_callback = modify_buffer_flush_callback;
+	state->modify_buffer_flush_context = modify_buffer_flush_context;
+	state->modify_end_callback = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Default table modify implementation for inserts.
+ */
+void
+default_table_modify_buffer_insert(TableModifyState *state,
+								   TupleTableSlot *slot)
+{
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_cxt);
+
+	/* First time through, initialize default table modify state */
+	if (state->data == NULL)
+	{
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			state->data = (BulkInsertState) GetBulkInsertState();
+
+		state->modify_end_callback = default_table_modify_insert_end_callback;
+	}
+
+	/* Fallback to table AM single insert routine */
+	table_tuple_insert(state->rel,
+					   slot,
+					   state->cid,
+					   state->options,
+					   (BulkInsertState) state->data);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Default table modify implementation for flush.
+ */
+void
+default_table_modify_buffer_flush(TableModifyState *state)
+{
+	/* no-op */
+}
+
+/*
+ * Default table modify insert specific callback used for performing work at
+ * the end like cleaning up the bulk insert state.
+ */
+static void
+default_table_modify_insert_end_callback(TableModifyState *state)
+{
+	if (state->data != NULL)
+		FreeBulkInsertState((BulkInsertState) state->data);
+}
+
+/*
+ * Clean default table modify state.
+ */
+void
+default_table_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_callback != NULL)
+		state->modify_end_callback(state);
+
+	MemoryContextDelete(state->mem_cxt);
+}
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index ce637a5a5d..96ac951af6 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,16 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->scan_sample_next_block != NULL);
 	Assert(routine->scan_sample_next_tuple != NULL);
 
+	/* optional, but either all of them are defined or none. */
+	Assert((routine->tuple_modify_begin == NULL &&
+			routine->tuple_modify_buffer_insert == NULL &&
+			routine->tuple_modify_buffer_flush == NULL &&
+			routine->tuple_modify_end == NULL) ||
+		   (routine->tuple_modify_begin != NULL &&
+			routine->tuple_modify_buffer_insert != NULL &&
+			routine->tuple_modify_buffer_flush != NULL &&
+			routine->tuple_modify_end != NULL));
+
 	return routine;
 }
 
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 00dc339615..e02228858a 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -97,6 +97,7 @@ const TupleTableSlotOps TTSOpsBufferHeapTuple;
 static void
 tts_virtual_init(TupleTableSlot *slot)
 {
+	((VirtualTupleTableSlot *) slot)->sz = 0;
 }
 
 static void
@@ -113,6 +114,7 @@ tts_virtual_clear(TupleTableSlot *slot)
 
 		pfree(vslot->data);
 		vslot->data = NULL;
+		vslot->sz = 0;
 
 		slot->tts_flags &= ~TTS_FLAG_SHOULDFREE;
 	}
@@ -212,6 +214,8 @@ tts_virtual_materialize(TupleTableSlot *slot)
 		}
 	}
 
+	vslot->sz = sz;
+
 	/* all data is byval */
 	if (sz == 0)
 		return;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index c47a5045ce..c10ebbb5ea 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -271,6 +271,38 @@ typedef enum
 	PRUNE_VACUUM_CLEANUP,		/* VACUUM 2nd heap pass */
 } PruneReason;
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES		65535
+
+typedef struct HeapMultiInsertState
+{
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of buffered slots currently held */
+	int			cur_slots;
+
+	/* Approximate size of all tuples currently held in buffered slots */
+	Size		cur_size;
+
+	MemoryContext mem_cxt;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -321,6 +353,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+										   int modify_flags,
+										   CommandId cid,
+										   int options,
+										   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+										   void *modify_buffer_flush_context);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+									  TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8e583b45cd..d9b2f4e03a 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -255,6 +255,48 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Table modify flags */
+
+/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */
+#define TM_FLAG_MULTI_INSERTS	0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TM_FLAG_BAS_BULKWRITE	0x000002
+
+/*
+ * Skip internal flush of buffered tuples. Caller needs to flush via
+ * table_modify_buffer_flush().
+ */
+#define TM_SKIP_INTERNAL_BUFFER_FLUSH  0x000004
+
+struct TableModifyState;
+
+/* Callback invoked for each tuple that gets flushed to disk from buffer */
+typedef void (*TableModifyBufferFlushCallback) (void *context,
+												TupleTableSlot *slot);
+
+/* Table AM specific callback that gets called in table_modify_end() */
+typedef void (*TableModifyEndCallback) (struct TableModifyState *state);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+	Relation	rel;
+	int			modify_flags;
+	MemoryContext mem_cxt;
+	CommandId	cid;
+	int			options;
+
+	/* Flush callback and its context */
+	TableModifyBufferFlushCallback modify_buffer_flush_callback;
+	void	   *modify_buffer_flush_context;
+
+	/* Table AM specific data */
+	void	   *data;
+
+	TableModifyEndCallback modify_end_callback;
+} TableModifyState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -578,6 +620,21 @@ typedef struct TableAmRoutine
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
 
+	/* ------------------------------------------------------------------------
+	 * Table Modify related functions.
+	 * ------------------------------------------------------------------------
+	 */
+	TableModifyState *(*tuple_modify_begin) (Relation rel,
+											 int modify_flags,
+											 CommandId cid,
+											 int options,
+											 TableModifyBufferFlushCallback modify_buffer_flush_callback,
+											 void *modify_buffer_flush_context);
+	void		(*tuple_modify_buffer_insert) (TableModifyState *state,
+											   TupleTableSlot *slot);
+	void		(*tuple_modify_buffer_flush) (TableModifyState *state);
+	void		(*tuple_modify_end) (TableModifyState *state);
+
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1609,6 +1666,100 @@ table_finish_bulk_insert(Relation rel, int options)
 		rel->rd_tableam->finish_bulk_insert(rel, options);
 }
 
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+extern TableModifyState *default_table_modify_begin(Relation rel, int modify_flags,
+													CommandId cid, int options,
+													TableModifyBufferFlushCallback modify_buffer_flush_callback,
+													void *modify_buffer_flush_context);
+extern void default_table_modify_buffer_insert(TableModifyState *state,
+											   TupleTableSlot *slot);
+extern void default_table_modify_buffer_flush(TableModifyState *state);
+extern void default_table_modify_end(TableModifyState *state);
+
+static inline TableModifyState *
+table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options,
+				   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				   void *modify_buffer_flush_context)
+{
+	if (rel->rd_tableam &&
+		rel->rd_tableam->tuple_modify_begin != NULL)
+	{
+		return rel->rd_tableam->tuple_modify_begin(rel, modify_flags,
+												   cid, options,
+												   modify_buffer_flush_callback,
+												   modify_buffer_flush_context);
+	}
+	else if (rel->rd_tableam &&
+			 rel->rd_tableam->tuple_modify_begin == NULL)
+	{
+		/* Fallback to a default implementation */
+		return default_table_modify_begin(rel, modify_flags,
+										  cid, options,
+										  modify_buffer_flush_callback,
+										  modify_buffer_flush_context);
+	}
+	else
+		Assert(false);
+
+	return NULL;
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_buffer_insert != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_buffer_insert == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_buffer_insert(state, slot);
+	}
+	else
+		Assert(false);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_buffer_flush != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_buffer_flush == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_buffer_flush(state);
+	}
+	else
+		Assert(false);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_modify_end != NULL)
+	{
+		state->rel->rd_tableam->tuple_modify_end(state);
+	}
+	else if (state->rel->rd_tableam &&
+			 state->rel->rd_tableam->tuple_modify_end == NULL)
+	{
+		/* Fallback to a default implementation */
+		default_table_modify_end(state);
+	}
+	else
+		Assert(false);
+}
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index b82655e7e5..6940921078 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -247,6 +247,12 @@ typedef struct VirtualTupleTableSlot
 
 	TupleTableSlot base;
 
+	/*
+	 * Total size of all attributes that this virtual slot holds. Computed and
+	 * set during slot materialization.
+	 */
+	Size		sz;
+
 	char	   *data;			/* data for materialized slots */
 } VirtualTupleTableSlot;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2b83c340fb..6bfec4476b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1140,6 +1140,8 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
 HeapPageFreeze
 HeapScanDesc
 HeapTuple
@@ -2860,6 +2862,7 @@ TableFuncScanState
 TableFuncType
 TableInfo
 TableLikeClause
+TableModifyState
 TableSampleClause
 TableScanDesc
 TableScanDescData
-- 
2.34.1



  [application/octet-stream] v21-0002-Optimize-CTAS-CMV-RMV-and-TABLE-REWRITES-with-mu.patch (7.0K, 3-v21-0002-Optimize-CTAS-CMV-RMV-and-TABLE-REWRITES-with-mu.patch)
  download | inline diff:
From bc554ec451461cc2dfbdefff04b171d1b2ce2fdc Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 15 May 2024 05:42:58 +0000
Subject: [PATCH v21 2/5] Optimize CTAS, CMV, RMV and TABLE REWRITES with multi
 inserts

---
 src/backend/commands/createas.c  | 27 +++++++++++----------------
 src/backend/commands/matview.c   | 26 +++++++++++---------------
 src/backend/commands/tablecmds.c | 31 +++++++++++--------------------
 3 files changed, 33 insertions(+), 51 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 62050f4dc5..2d6fffbf07 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -53,9 +53,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -552,17 +550,21 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
 	 * bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+		myState->mstate = table_modify_begin(intoRelationDesc,
+											 TM_FLAG_MULTI_INSERTS |
+											 TM_FLAG_BAS_BULKWRITE,
+											 GetCurrentCommandId(true),
+											 TABLE_INSERT_SKIP_FSM,
+											 NULL,
+											 NULL);
 	else
-		myState->bistate = NULL;
+		myState->mstate = NULL;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -590,11 +592,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_modify_buffer_insert(myState->mstate, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -612,10 +610,7 @@ intorel_shutdown(DestReceiver *self)
 	IntoClause *into = myState->into;
 
 	if (!into->skipData)
-	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
-	}
+		table_modify_end(myState->mstate);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 6d09b75556..bb97e2fa5f 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -48,9 +48,7 @@ typedef struct
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
 	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -458,9 +456,14 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * Fill private fields of myState for use by later routines
 	 */
 	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->mstate = table_modify_begin(transientrel,
+										 TM_FLAG_MULTI_INSERTS |
+										 TM_FLAG_BAS_BULKWRITE,
+										 GetCurrentCommandId(true),
+										 TABLE_INSERT_SKIP_FSM |
+										 TABLE_INSERT_FROZEN,
+										 NULL,
+										 NULL);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -485,12 +488,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_modify_buffer_insert(myState->mstate, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -505,9 +503,7 @@ transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_modify_end(myState->mstate);
 
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 79c9c03183..bf6449e957 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5973,10 +5973,8 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	int			i;
 	ListCell   *l;
 	EState	   *estate;
-	CommandId	mycid;
-	BulkInsertState bistate;
-	int			ti_options;
 	ExprState  *partqualstate = NULL;
+	TableModifyState *mstate = NULL;
 
 	/*
 	 * Open the relation(s).  We have surely already locked the existing
@@ -5995,18 +5993,15 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	 * Prepare a BulkInsertState and options for table_tuple_insert.  The FSM
 	 * is empty, so don't bother using it.
 	 */
-	if (newrel)
+	if (newrel && mstate == NULL)
 	{
-		mycid = GetCurrentCommandId(true);
-		bistate = GetBulkInsertState();
-		ti_options = TABLE_INSERT_SKIP_FSM;
-	}
-	else
-	{
-		/* keep compiler quiet about using these uninitialized */
-		mycid = 0;
-		bistate = NULL;
-		ti_options = 0;
+		mstate = table_modify_begin(newrel,
+									TM_FLAG_MULTI_INSERTS |
+									TM_FLAG_BAS_BULKWRITE,
+									GetCurrentCommandId(true),
+									TABLE_INSERT_SKIP_FSM,
+									NULL,
+									NULL);
 	}
 
 	/*
@@ -6304,8 +6299,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
 			/* Write the tuple out to the new relation */
 			if (newrel)
-				table_tuple_insert(newrel, insertslot, mycid,
-								   ti_options, bistate);
+				table_modify_buffer_insert(mstate, insertslot);
 
 			ResetExprContext(econtext);
 
@@ -6326,10 +6320,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	table_close(oldrel, NoLock);
 	if (newrel)
 	{
-		FreeBulkInsertState(bistate);
-
-		table_finish_bulk_insert(newrel, ti_options);
-
+		table_modify_end(mstate);
 		table_close(newrel, NoLock);
 	}
 }
-- 
2.34.1



  [application/octet-stream] v21-0003-Optimize-INSERT-INTO-.-SELECT-with-multi-inserts.patch (9.2K, 4-v21-0003-Optimize-INSERT-INTO-.-SELECT-with-multi-inserts.patch)
  download | inline diff:
From 80196eed5a352770f7a1893ffa22d9801569bf16 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 15 May 2024 05:43:25 +0000
Subject: [PATCH v21 3/5] Optimize INSERT INTO ... SELECT with multi inserts

---
 src/backend/executor/nodeModifyTable.c | 170 ++++++++++++++++++++++---
 src/tools/pgindent/typedefs.list       |   1 +
 2 files changed, 153 insertions(+), 18 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index cee60d3659..cd044c9dee 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -114,6 +114,18 @@ typedef struct UpdateContext
 	LockTupleMode lockmode;
 } UpdateContext;
 
+typedef struct InsertModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+	ModifyTableState *mtstate;
+} InsertModifyBufferFlushContext;
+
+static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL;
+static TableModifyState *table_modify_state = NULL;
+
+static void InsertModifyBufferFlushCallback(void *context,
+											TupleTableSlot *slot);
 
 static void ExecBatchInsert(ModifyTableState *mtstate,
 							ResultRelInfo *resultRelInfo,
@@ -726,6 +738,55 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
 	return ExecProject(newProj);
 }
 
+static void
+InsertModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	ModifyTableState *mtstate = ctx->mtstate;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 mtstate->mt_transition_capture);
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 mtstate->mt_transition_capture);
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecInsert
  *
@@ -751,7 +812,8 @@ ExecInsert(ModifyTableContext *context,
 		   TupleTableSlot *slot,
 		   bool canSetTag,
 		   TupleTableSlot **inserted_tuple,
-		   ResultRelInfo **insert_destrel)
+		   ResultRelInfo **insert_destrel,
+		   bool canMultiInsert)
 {
 	ModifyTableState *mtstate = context->mtstate;
 	EState	   *estate = context->estate;
@@ -764,6 +826,7 @@ ExecInsert(ModifyTableContext *context,
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
 	MemoryContext oldContext;
+	bool		ar_insert_triggers_executed = false;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -1126,17 +1189,53 @@ ExecInsert(ModifyTableContext *context,
 		}
 		else
 		{
-			/* insert the tuple normally */
-			table_tuple_insert(resultRelationDesc, slot,
-							   estate->es_output_cid,
-							   0, NULL);
+			if (canMultiInsert &&
+				proute == NULL &&
+				resultRelInfo->ri_WithCheckOptions == NIL &&
+				resultRelInfo->ri_projectReturning == NULL)
+			{
+				if (insert_modify_buffer_flush_context == NULL)
+				{
+					insert_modify_buffer_flush_context =
+						(InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext));
+					insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo;
+					insert_modify_buffer_flush_context->estate = estate;
+					insert_modify_buffer_flush_context->mtstate = mtstate;
+				}
 
-			/* insert index entries for tuple */
-			if (resultRelInfo->ri_NumIndices > 0)
-				recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
-													   slot, estate, false,
-													   false, NULL, NIL,
-													   false);
+				if (table_modify_state == NULL)
+				{
+					table_modify_state = table_modify_begin(resultRelInfo->ri_RelationDesc,
+															TM_FLAG_MULTI_INSERTS,
+															estate->es_output_cid,
+															0,
+															InsertModifyBufferFlushCallback,
+															insert_modify_buffer_flush_context);
+				}
+
+				table_modify_buffer_insert(table_modify_state, slot);
+				ar_insert_triggers_executed = true;
+			}
+			else
+			{
+				/* insert the tuple normally */
+				table_tuple_insert(resultRelationDesc, slot,
+								   estate->es_output_cid,
+								   0, NULL);
+
+				/* insert index entries for tuple */
+				if (resultRelInfo->ri_NumIndices > 0)
+					recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+														   slot, estate, false,
+														   false, NULL, NIL,
+														   false);
+
+				ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+									 mtstate->mt_transition_capture);
+
+				list_free(recheckIndexes);
+				ar_insert_triggers_executed = true;
+			}
 		}
 	}
 
@@ -1170,10 +1269,12 @@ ExecInsert(ModifyTableContext *context,
 	}
 
 	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
-
-	list_free(recheckIndexes);
+	if (!ar_insert_triggers_executed)
+	{
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+		list_free(recheckIndexes);
+	}
 
 	/*
 	 * Check any WITH CHECK OPTION constraints from parent views.  We are
@@ -1869,7 +1970,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
 	/* Tuple routing starts from the root table. */
 	context->cpUpdateReturningSlot =
 		ExecInsert(context, mtstate->rootResultRelInfo, slot, canSetTag,
-				   inserted_tuple, insert_destrel);
+				   inserted_tuple, insert_destrel, false);
 
 	/*
 	 * Reset the transition state that may possibly have been written by
@@ -3364,7 +3465,7 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 				mtstate->mt_merge_action = action;
 
 				rslot = ExecInsert(context, mtstate->rootResultRelInfo,
-								   newslot, canSetTag, NULL, NULL);
+								   newslot, canSetTag, NULL, NULL, false);
 				mtstate->mt_merge_inserted += 1;
 				break;
 			case CMD_NOTHING:
@@ -3749,6 +3850,10 @@ ExecModifyTable(PlanState *pstate)
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
 	ItemPointer tupleid;
+	bool		canMultiInsert = false;
+
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -3844,6 +3949,10 @@ ExecModifyTable(PlanState *pstate)
 		if (TupIsNull(context.planSlot))
 			break;
 
+		if (operation == CMD_INSERT &&
+			nodeTag(subplanstate) == T_SeqScanState)
+			canMultiInsert = true;
+
 		/*
 		 * When there are multiple result relations, each tuple contains a
 		 * junk column that gives the OID of the rel from which it came.
@@ -4057,7 +4166,7 @@ ExecModifyTable(PlanState *pstate)
 					ExecInitInsertProjection(node, resultRelInfo);
 				slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot);
 				slot = ExecInsert(&context, resultRelInfo, slot,
-								  node->canSetTag, NULL, NULL);
+								  node->canSetTag, NULL, NULL, canMultiInsert);
 				break;
 
 			case CMD_UPDATE:
@@ -4116,6 +4225,17 @@ ExecModifyTable(PlanState *pstate)
 			return slot;
 	}
 
+	if (table_modify_state != NULL)
+	{
+		Assert(operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Insert remaining tuples for batch insert.
 	 */
@@ -4228,6 +4348,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	mtstate->mt_merge_updated = 0;
 	mtstate->mt_merge_deleted = 0;
 
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
+
 	/*----------
 	 * Resolve the target relation. This is the same as:
 	 *
@@ -4681,6 +4804,17 @@ ExecEndModifyTable(ModifyTableState *node)
 {
 	int			i;
 
+	if (table_modify_state != NULL)
+	{
+		Assert(node->operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Allow any FDWs to shut down
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 6bfec4476b..5e3e900cb8 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1179,6 +1179,7 @@ ImportForeignSchema_function
 ImportQual
 InProgressEnt
 InProgressIO
+InsertModifyBufferFlushContext
 IncludeWal
 InclusionOpaque
 IncrementVarSublevelsUp_context
-- 
2.34.1



  [application/octet-stream] v21-0004-Optimize-Logical-Replication-apply-with-multi-in.patch (19.7K, 5-v21-0004-Optimize-Logical-Replication-apply-with-multi-in.patch)
  download | inline diff:
From 3ebe28045217778bb145ff12c91e593b940e4818 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 15 May 2024 05:43:48 +0000
Subject: [PATCH v21 4/5] Optimize Logical Replication apply with multi inserts

---
 src/backend/executor/execReplication.c   |  39 +++
 src/backend/replication/logical/proto.c  |  24 ++
 src/backend/replication/logical/worker.c | 351 ++++++++++++++++++++++-
 src/include/executor/executor.h          |   4 +
 src/include/replication/logicalproto.h   |   2 +
 src/tools/pgindent/typedefs.list         |   2 +
 6 files changed, 409 insertions(+), 13 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd577..fae1375537 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -544,6 +544,45 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 	}
 }
 
+void
+ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+						ResultRelInfo *resultRelInfo,
+						EState *estate, TupleTableSlot *slot)
+{
+	bool		skip_tuple = false;
+	Relation	rel = resultRelInfo->ri_RelationDesc;
+
+	/* For now we support only tables. */
+	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+
+	CheckCmdReplicaIdentity(rel, CMD_INSERT);
+
+	/* BEFORE ROW INSERT Triggers */
+	if (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+	{
+		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			skip_tuple = true;	/* "do nothing" */
+	}
+
+	if (!skip_tuple)
+	{
+		/* Compute stored generated columns */
+		if (rel->rd_att->constr &&
+			rel->rd_att->constr->has_generated_stored)
+			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
+									   CMD_INSERT);
+
+		/* Check the constraints of the tuple */
+		if (rel->rd_att->constr)
+			ExecConstraints(resultRelInfo, slot, estate);
+		if (rel->rd_rel->relispartition)
+			ExecPartitionCheck(resultRelInfo, slot, estate, true);
+
+		table_modify_buffer_insert(MultiInsertState, slot);
+	}
+}
+
 /*
  * Find the searchslot tuple and update it with data in the slot,
  * update the indexes, and execute any constraints and per-row triggers.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 95c09c9516..46d38aebd2 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -427,6 +427,30 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_tuple(out, rel, newslot, binary, columns);
 }
 
+LogicalRepRelId
+logicalrep_read_relid(StringInfo in)
+{
+	LogicalRepRelId relid;
+
+	/* read the relation id */
+	relid = pq_getmsgint(in, 4);
+
+	return relid;
+}
+
+void
+logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup)
+{
+	char		action;
+
+	action = pq_getmsgbyte(in);
+	if (action != 'N')
+		elog(ERROR, "expected new tuple but got %d",
+			 action);
+
+	logicalrep_read_tuple(in, newtup);
+}
+
 /*
  * Read INSERT from stream.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..d62772f590 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -148,7 +148,6 @@
 #include <unistd.h>
 
 #include "access/table.h"
-#include "access/tableam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
@@ -416,6 +415,30 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+typedef enum LRMultiInsertReturnStatus
+{
+	LR_MULTI_INSERT_NONE = 0,
+	LR_MULTI_INSERT_REL_SKIPPED,
+	LR_MULTI_INSERT_DISALLOWED,
+	LR_MULTI_INSERT_DONE,
+} LRMultiInsertReturnStatus;
+
+static TableModifyState *MultiInsertState = NULL;
+static LogicalRepRelMapEntry *LastRel = NULL;
+static LogicalRepRelId LastMultiInsertRelId = InvalidOid;
+static ApplyExecutionData *LastEData = NULL;
+static TupleTableSlot *LastRemoteSlot = NULL;
+
+typedef struct LRModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} LRModifyBufferFlushContext;
+
+static LRModifyBufferFlushContext *modify_buffer_flush_context = NULL;
+static void LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot);
+static void FinishMultiInserts(void);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -1017,6 +1040,8 @@ apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit(s, &commit_data);
 
 	if (commit_data.commit_lsn != remote_final_lsn)
@@ -1043,6 +1068,8 @@ apply_handle_begin_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData begin_data;
 
+	FinishMultiInserts();
+
 	/* Tablesync should never receive prepare. */
 	if (am_tablesync_worker())
 		ereport(ERROR,
@@ -1109,6 +1136,8 @@ apply_handle_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_prepare(s, &prepare_data);
 
 	if (prepare_data.prepare_lsn != remote_final_lsn)
@@ -1171,6 +1200,8 @@ apply_handle_commit_prepared(StringInfo s)
 	LogicalRepCommitPreparedTxnData prepare_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit_prepared(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
@@ -1220,6 +1251,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	LogicalRepRollbackPreparedTxnData rollback_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_rollback_prepared(s, &rollback_data);
 	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
@@ -1277,6 +1310,8 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1304,6 +1339,8 @@ apply_handle_stream_prepare(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
 								   prepare_data.xid, prepare_data.prepare_lsn);
 
+			FinishMultiInserts();
+
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
 
@@ -1407,6 +1444,8 @@ apply_handle_stream_prepare(StringInfo s)
 static void
 apply_handle_origin(StringInfo s)
 {
+	FinishMultiInserts();
+
 	/*
 	 * ORIGIN message can only come inside streaming transaction or inside
 	 * remote transaction and before any actual writes.
@@ -1473,6 +1512,8 @@ apply_handle_stream_start(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1628,6 +1669,8 @@ apply_handle_stream_stop(StringInfo s)
 	ParallelApplyWorkerInfo *winfo;
 	TransApplyAction apply_action;
 
+	FinishMultiInserts();
+
 	if (!in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1821,6 +1864,8 @@ apply_handle_stream_abort(StringInfo s)
 	StringInfoData original_msg = *s;
 	bool		toplevel_xact;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2138,6 +2183,8 @@ apply_handle_stream_commit(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2159,6 +2206,8 @@ apply_handle_stream_commit(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
 								   commit_data.commit_lsn);
 
+			FinishMultiInserts();
+
 			apply_handle_commit_internal(&commit_data);
 
 			/* Unlink the files with serialized changes and subxact info. */
@@ -2302,6 +2351,8 @@ apply_handle_relation(StringInfo s)
 {
 	LogicalRepRelation *rel;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
 		return;
 
@@ -2325,6 +2376,8 @@ apply_handle_type(StringInfo s)
 {
 	LogicalRepTyp typ;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
 		return;
 
@@ -2363,16 +2416,126 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
 						RelationGetRelationName(rel))));
 }
 
-/*
- * Handle INSERT message.
- */
+static void
+FinishMultiInserts(void)
+{
+	LogicalRepMsgType saved_command;
+
+	if (MultiInsertState == NULL)
+		return;
+
+	Assert(OidIsValid(LastMultiInsertRelId));
+	Assert(LastEData != NULL);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	ExecDropSingleTupleTableSlot(LastRemoteSlot);
+	LastRemoteSlot = NULL;
+
+	table_modify_end(MultiInsertState);
+	MultiInsertState = NULL;
+	LastMultiInsertRelId = InvalidOid;
+
+	pfree(modify_buffer_flush_context);
+	modify_buffer_flush_context = NULL;
+
+	ExecCloseIndices(LastEData->targetRelInfo);
+
+	finish_edata(LastEData);
+	LastEData = NULL;
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+
+	logicalrep_rel_close(LastRel, NoLock);
+	LastRel = NULL;
+
+	end_replication_step();
+}
 
 static void
-apply_handle_insert(StringInfo s)
+LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	LRModifyBufferFlushContext *ctx = (LRModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	LogicalRepMsgType saved_command;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 NULL);
+
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 NULL);
+	}
+
+	/*
+	 * XXX we should in theory pass a TransitionCaptureState object to the
+	 * above to capture transition tuples, but after statement triggers don't
+	 * actually get fired by replication yet anyway
+	 */
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+}
+
+static LRMultiInsertReturnStatus
+do_multi_inserts(StringInfo s, LogicalRepRelId *relid)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
-	LogicalRepRelId relid;
 	UserContext ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
@@ -2380,17 +2543,143 @@ apply_handle_insert(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	if (MultiInsertState == NULL)
+		begin_replication_step();
+
+	*relid = logicalrep_read_relid(s);
+
+	if (MultiInsertState != NULL &&
+		(LastMultiInsertRelId != InvalidOid &&
+		 *relid != InvalidOid &&
+		 LastMultiInsertRelId != *relid))
+		FinishMultiInserts();
+
+	if (MultiInsertState == NULL)
+		rel = logicalrep_rel_open(*relid, RowExclusiveLock);
+	else
+		rel = LastRel;
+
+	if (!should_apply_changes_for_rel(rel))
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_REL_SKIPPED;
+	}
+
+	/* For a partitioned table, let's not do multi inserts. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_DISALLOWED;
+	}
+
 	/*
-	 * Quick return if we are skipping data modification changes or handling
-	 * streamed transactions.
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
 	 */
-	if (is_skipping_changes() ||
-		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
-		return;
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	if (MultiInsertState == NULL)
+	{
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/* Initialize the executor state. */
+		LastEData = edata = create_edata_for_relation(rel);
+		estate = edata->estate;
+
+		LastRemoteSlot = remoteslot = MakeTupleTableSlot(RelationGetDescr(rel->localrel),
+														 &TTSOpsVirtual);
+
+		modify_buffer_flush_context = (LRModifyBufferFlushContext *) palloc(sizeof(LRModifyBufferFlushContext));
+		modify_buffer_flush_context->resultRelInfo = edata->targetRelInfo;
+		modify_buffer_flush_context->estate = estate;
+
+		MultiInsertState = table_modify_begin(edata->targetRelInfo->ri_RelationDesc,
+											  TM_FLAG_MULTI_INSERTS |
+											  TM_FLAG_BAS_BULKWRITE,
+											  GetCurrentCommandId(true),
+											  0,
+											  LRModifyBufferFlushCallback,
+											  modify_buffer_flush_context);
+		LastRel = rel;
+		LastMultiInsertRelId = *relid;
+
+		/* We must open indexes here. */
+		ExecOpenIndices(edata->targetRelInfo, false);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		CommandId	cid;
+
+		edata = LastEData;
+		estate = edata->estate;
+		ResetExprContext(GetPerTupleExprContext(estate));
+		ExecClearTuple(LastRemoteSlot);
+		remoteslot = LastRemoteSlot;
+		cid = GetCurrentCommandId(true);
+		MultiInsertState->cid = cid;
+		estate->es_output_cid = cid;
+	}
+
+	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
+	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	slot_store_data(remoteslot, rel, &newtup);
+	slot_fill_defaults(rel, estate, remoteslot);
+	MemoryContextSwitchTo(oldctx);
+
+	TargetPrivilegesCheck(edata->targetRelInfo->ri_RelationDesc, ACL_INSERT);
+	ExecRelationMultiInsert(MultiInsertState, edata->targetRelInfo, estate, remoteslot);
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	Assert(MultiInsertState != NULL);
+
+	CommandCounterIncrement();
+
+	return LR_MULTI_INSERT_DONE;
+}
+
+static bool
+do_single_inserts(StringInfo s, LogicalRepRelId relid)
+{
+	LogicalRepRelMapEntry *rel;
+	LogicalRepTupleData newtup;
+	UserContext ucxt;
+	ApplyExecutionData *edata;
+	EState	   *estate;
+	TupleTableSlot *remoteslot;
+	MemoryContext oldctx;
+	bool		run_as_owner;
+
+	Assert(relid != InvalidOid);
 
 	begin_replication_step();
 
-	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
 	if (!should_apply_changes_for_rel(rel))
 	{
@@ -2400,7 +2689,7 @@ apply_handle_insert(StringInfo s)
 		 */
 		logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
-		return;
+		return false;
 	}
 
 	/*
@@ -2422,6 +2711,7 @@ apply_handle_insert(StringInfo s)
 										&TTSOpsVirtual);
 
 	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel, &newtup);
 	slot_fill_defaults(rel, estate, remoteslot);
@@ -2446,6 +2736,35 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
+
+	return true;
+}
+
+/*
+ * Handle INSERT message.
+ */
+static void
+apply_handle_insert(StringInfo s)
+{
+	LRMultiInsertReturnStatus mi_status;
+	LogicalRepRelId relid;
+
+	/*
+	 * Quick return if we are skipping data modification changes or handling
+	 * streamed transactions.
+	 */
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+		return;
+
+	mi_status = do_multi_inserts(s, &relid);
+	if (mi_status == LR_MULTI_INSERT_REL_SKIPPED ||
+		mi_status == LR_MULTI_INSERT_DONE)
+		return;
+
+	do_single_inserts(s, relid);
+
+	return;
 }
 
 /*
@@ -2532,6 +2851,8 @@ apply_handle_update(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -2713,6 +3034,8 @@ apply_handle_delete(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -3154,6 +3477,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 9770752ea3..8f10ea977b 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "access/tableam.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
@@ -656,6 +657,9 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
+extern void ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+									ResultRelInfo *resultRelInfo,
+									EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 									 EState *estate, EPQState *epqstate,
 									 TupleTableSlot *searchslot, TupleTableSlot *slot);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..3f3a7f0a31 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -226,6 +226,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
 									bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_relid(StringInfo in);
+extern void logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5e3e900cb8..8463343325 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1468,6 +1468,8 @@ LPTHREAD_START_ROUTINE
 LPTSTR
 LPVOID
 LPWSTR
+LRModifyBufferFlushContext
+LRMultiInsertReturnStatus
 LSEG
 LUID
 LVRelState
-- 
2.34.1



  [application/octet-stream] v21-0005-Use-new-multi-insert-Table-AM-for-COPY-FROM.patch (13.7K, 6-v21-0005-Use-new-multi-insert-Table-AM-for-COPY-FROM.patch)
  download | inline diff:
From 568a0361b5b25c5a4012f21ec2eacff0e49e95a5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Wed, 15 May 2024 05:56:33 +0000
Subject: [PATCH v21 5/5] Use new multi insert Table AM for COPY FROM

---
 src/backend/commands/copyfrom.c          | 236 +++++++++++++++--------
 src/include/commands/copyfrom_internal.h |   4 +-
 src/tools/pgindent/typedefs.list         |   1 +
 3 files changed, 160 insertions(+), 81 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index ce4d62e707..bf56dd23f7 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -71,14 +71,25 @@
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
+typedef struct CopyModifyBufferFlushContext
+{
+	CopyFromState cstate;
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} CopyModifyBufferFlushContext;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableModifyState *mstate;	/* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
+	TupleTableSlot *multislot;
+	CopyModifyBufferFlushContext *modify_buffer_flush_context;
 	int			nused;			/* number of 'slots' containing tuples */
+	int			currslotno;		/* Current buffered slot number that's being
+								 * flushed; Used to get correct cur_lineno for
+								 * errors while in flush callback. */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
 } CopyMultiInsertBuffer;
@@ -99,6 +110,7 @@ typedef struct CopyMultiInsertInfo
 	int			ti_options;		/* table insert options */
 } CopyMultiInsertInfo;
 
+static void CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot);
 
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
@@ -218,14 +230,39 @@ CopyLimitPrintoutLength(const char *str)
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						  CopyFromState cstate, EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		buffer->modify_buffer_flush_context = (CopyModifyBufferFlushContext *) palloc(sizeof(CopyModifyBufferFlushContext));
+		buffer->modify_buffer_flush_context->cstate = cstate;
+		buffer->modify_buffer_flush_context->resultRelInfo = rri;
+		buffer->modify_buffer_flush_context->estate = estate;
+
+		buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+											TM_FLAG_MULTI_INSERTS |
+											TM_FLAG_BAS_BULKWRITE |
+											TM_SKIP_INTERNAL_BUFFER_FLUSH,
+											miinfo->mycid,
+											miinfo->ti_options,
+											CopyModifyBufferFlushCallback,
+											buffer->modify_buffer_flush_context);
+		buffer->slots = NULL;
+		buffer->multislot = NULL;
+	}
+	else
+	{
+		buffer->mstate = NULL;
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+		buffer->multislot = NULL;
+	}
+
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -236,11 +273,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
  */
 static inline void
 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
+							   ResultRelInfo *rri, CopyFromState cstate,
+							   EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -273,7 +311,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	 * tuples their way for the first time.
 	 */
 	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+		CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
 }
 
 /*
@@ -317,8 +355,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -390,13 +426,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -404,56 +435,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
-		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
-		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-		for (i = 0; i < nused; i++)
-		{
-			/*
-			 * If there are any indexes, update them for all the inserted
-			 * tuples, and run AFTER ROW INSERT triggers.
-			 */
-			if (resultRelInfo->ri_NumIndices > 0)
-			{
-				List	   *recheckIndexes;
-
-				cstate->cur_lineno = buffer->linenos[i];
-				recheckIndexes =
-					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
-										  false, NULL, NIL, false);
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], recheckIndexes,
-									 cstate->transition_capture);
-				list_free(recheckIndexes);
-			}
+		table_modify_buffer_flush(buffer->mstate);
 
-			/*
-			 * There's no indexes, but see if we need to run AFTER ROW INSERT
-			 * triggers anyway.
-			 */
-			else if (resultRelInfo->ri_TrigDesc != NULL &&
-					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-			{
-				cstate->cur_lineno = buffer->linenos[i];
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], NIL,
-									 cstate->transition_capture);
-			}
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-			ExecClearTuple(slots[i]);
-		}
+		/*
+		 * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+		 * in the flush callback CopyModifyBufferFlushCallback.
+		 */
 
 		/* Update the row counter and progress of the COPY command */
 		*processed += nused;
@@ -469,6 +462,60 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	buffer->nused = 0;
 }
 
+static void
+CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	CopyModifyBufferFlushContext *ctx = (CopyModifyBufferFlushContext *) context;
+	CopyFromState cstate = ctx->cstate;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 cstate->transition_capture);
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 cstate->transition_capture);
+	}
+
+	Assert(buffer->currslotno <= buffer->nused);
+}
+
 /*
  * Drop used slots and free member for this buffer.
  *
@@ -489,19 +536,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
 	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
+		table_modify_end(buffer->mstate);
+		ExecDropSingleTupleTableSlot(buffer->multislot);
+		pfree(buffer->modify_buffer_flush_context);
 	}
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -588,13 +634,32 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused = buffer->nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(nused < MAX_BUFFERED_TUPLES);
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		if (buffer->multislot == NULL)
+			buffer->multislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+												   &TTSOpsVirtual);
+
+		/* Caller must clear the slot */
+		slot = buffer->multislot;
+	}
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -608,7 +673,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 
 	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+	if (rri->ri_FdwRoutine != NULL)
+		Assert(slot == buffer->slots[buffer->nused]);
+#endif
 
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
@@ -616,6 +685,14 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	/* Record this slot as being used */
 	buffer->nused++;
 
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		Assert(slot == buffer->multislot);
+		buffer->currslotno = 0;
+
+		table_modify_buffer_insert(buffer->mstate, slot);
+	}
+
 	/* Update how many tuples are stored and their size */
 	miinfo->bufferedTuples++;
 	miinfo->bufferedBytes += tuplen;
@@ -830,7 +907,7 @@ CopyFrom(CopyFromState cstate)
 	/*
 	 * It's generally more efficient to prepare a bunch of tuples for
 	 * insertion, and insert them in one
-	 * table_multi_insert()/ExecForeignBatchInsert() call, than call
+	 * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
 	 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
 	 * However, there are a number of reasons why we might not be able to do
 	 * this.  These are explained below.
@@ -1080,7 +1157,8 @@ CopyFrom(CopyFromState cstate)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
 						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+													   resultRelInfo, cstate,
+													   estate);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
 						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
 typedef enum CopyInsertMethod
 {
 	CIM_SINGLE,					/* use table_tuple_insert or ExecForeignInsert */
-	CIM_MULTI,					/* always use table_multi_insert or
+	CIM_MULTI,					/* always use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert */
-	CIM_MULTI_CONDITIONAL,		/* use table_multi_insert or
+	CIM_MULTI_CONDITIONAL,		/* use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 8463343325..745019153d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -493,6 +493,7 @@ CopyHeaderChoice
 CopyInsertMethod
 CopyLogVerbosityChoice
 CopyMethod
+CopyModifyBufferFlushContext
 CopyMultiInsertBuffer
 CopyMultiInsertInfo
 CopyOnErrorChoice
-- 
2.34.1



view thread (30+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: Re: Introduce new multi insert Table AM and improve performance of various SQL commands with it for Heap AM
  In-Reply-To: <CALj2ACUz5+_YNEa4ZY-XG960_oXefM50MjD71VgSCAVDkF3bzQ@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox